chapman: building a high-performance distributed task service with mongodb

Post on 25-Jul-2015

296 Views

Category:

Technology

2 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Chapman: Building a Distributed Job Queue

in MongoDB

Rick Copeland @rick446 @synappio

rick@synapp.io

@rick446 @synappio

Getting to Know One Another

@rick446 @synappio

Getting to Know One Another

Rick

@rick446 @synappio

Getting to Know One Another

Rick

@rick446 @synappio

What You’ll Learn

@rick446 @synappio

What You’ll LearnHow to…

@rick446 @synappio

What You’ll LearnHow to…

Build a task queue in MongoDB

@rick446 @synappio

What You’ll LearnHow to…

Build a task queue in MongoDB

@rick446 @synappio

What You’ll LearnHow to…

Build a task queue in MongoDB

Bring consistency to distributed systems (without transactions)

@rick446 @synappio

What You’ll LearnHow to…

Build a task queue in MongoDB

Bring consistency to distributed systems (without transactions)

@rick446 @synappio

What You’ll LearnHow to…

Build a task queue in MongoDB

Bring consistency to distributed systems (without transactions)

Build low-latency reactive systems

@rick446 @synappio

Why a Queue?

@rick446 @synappio

Why a Queue?

• Long-running task (or longer than the web can wait)

@rick446 @synappio

Why a Queue?

• Long-running task (or longer than the web can wait)

• Farm out chunks of work for performance

@rick446 @synappio

Things I Worry About

@rick446 @synappio

Things I Worry About

• Priority

@rick446 @synappio

Things I Worry About

• Priority

• Latency

@rick446 @synappio

Things I Worry About

• Priority

• Latency

• Unreliable workers

@rick446 @synappio

Queue Options

@rick446 @synappio

Queue Options

• SQS? No priority

@rick446 @synappio

Queue Options

• SQS? No priority

• Redis? Can’t overflow memory

@rick446 @synappio

Queue Options

• SQS? No priority

• Redis? Can’t overflow memory

• Rabbit-MQ? Lack of visibility

@rick446 @synappio

Queue Options

• SQS? No priority

• Redis? Can’t overflow memory

• Rabbit-MQ? Lack of visibility

• ZeroMQ? Lack of persistence

@rick446 @synappio

Queue Options

• SQS? No priority

• Redis? Can’t overflow memory

• Rabbit-MQ? Lack of visibility

• ZeroMQ? Lack of persistence

• What about MongoDB?

@rick446 @synappio

Chapman

Graham Arthur Chapman 8 January 1941 – 4 October 1989

@rick446 @synappio

Roadmap

@rick446 @synappio

Roadmap

• Building a scheduled priority queue

@rick446 @synappio

Roadmap

• Building a scheduled priority queue

• Handling unreliable workers

@rick446 @synappio

Roadmap

• Building a scheduled priority queue

• Handling unreliable workers

• Shared resources

@rick446 @synappio

Roadmap

• Building a scheduled priority queue

• Handling unreliable workers

• Shared resources

• Managing Latency

@rick446 @synappio

Building a Scheduled Priority Queue

@rick446 @synappio

Step 1: Simple Queuedb.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z")! }!});!!db.message.ensureIndex({'s.status': 1, 's.ts_enqueue': 1});!!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.ts_enqueue': 1},! update: { '$set': {'s.status': 'reserved'} },! }! );!

@rick446 @synappio

Step 1: Simple Queuedb.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z")! }!});!!db.message.ensureIndex({'s.status': 1, 's.ts_enqueue': 1});!!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.ts_enqueue': 1},! update: { '$set': {'s.status': 'reserved'} },! }! );!

FIFO

@rick446 @synappio

Step 1: Simple Queuedb.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z")! }!});!!db.message.ensureIndex({'s.status': 1, 's.ts_enqueue': 1});!!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.ts_enqueue': 1},! update: { '$set': {'s.status': 'reserved'} },! }! );!

FIFO

Get earliest message for processing

@rick446 @synappio

Step 1: Simple Queue

@rick446 @synappio

Step 1: Simple Queue

Good

@rick446 @synappio

Step 1: Simple Queue

Good• Guaranteed FIFO

@rick446 @synappio

Step 1: Simple Queue

Good• Guaranteed FIFO

Bad

@rick446 @synappio

Step 1: Simple Queue

Good• Guaranteed FIFO

Bad• No priority

(other than FIFO)

@rick446 @synappio

Step 1: Simple Queue

Good• Guaranteed FIFO

Bad• No priority

(other than FIFO)

• No handling of worker problems

@rick446 @synappio

Step 2: Scheduled Messages

db.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! “ts_after" : ISODate(…),! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z")! }!});!!db.message.ensureIndex(! {'s.status': 1, 's.ts_enqueue': 1});!!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready', ’s.ts_after': {$lt: now }},! sort: {'s.ts_enqueue': 1},! update: { '$set': {'s.status': 'reserved'} },! }! );

@rick446 @synappio

Step 2: Scheduled Messages

db.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! “ts_after" : ISODate(…),! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z")! }!});!!db.message.ensureIndex(! {'s.status': 1, 's.ts_enqueue': 1});!!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready', ’s.ts_after': {$lt: now }},! sort: {'s.ts_enqueue': 1},! update: { '$set': {'s.status': 'reserved'} },! }! );

Min Valid Time

@rick446 @synappio

Step 2: Scheduled Messages

db.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! “ts_after" : ISODate(…),! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z")! }!});!!db.message.ensureIndex(! {'s.status': 1, 's.ts_enqueue': 1});!!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready', ’s.ts_after': {$lt: now }},! sort: {'s.ts_enqueue': 1},! update: { '$set': {'s.status': 'reserved'} },! }! );

Min Valid Time

Get earliest message for processing

@rick446 @synappio

Step 2: Scheduled Messages

@rick446 @synappio

Step 2: Scheduled Messages

Good

@rick446 @synappio

Step 2: Scheduled Messages

Good• Easy to build

periodic tasks

@rick446 @synappio

Step 2: Scheduled Messages

Good• Easy to build

periodic tasks

Bad

@rick446 @synappio

Step 2: Scheduled Messages

Good• Easy to build

periodic tasks

Bad• Be careful with

the word “now”

@rick446 @synappio

Step 3: Prioritydb.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "pri": 30128,! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z")! }!});!!db.message.ensureIndex({'s.status': 1, 's.pri': -1, 's.ts_enqueue': 1});!!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.pri': -1, 's.ts_enqueue': 1},! update: { '$set': {'s.status': 'reserved'} },! }! );

@rick446 @synappio

Step 3: Prioritydb.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "pri": 30128,! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z")! }!});!!db.message.ensureIndex({'s.status': 1, 's.pri': -1, 's.ts_enqueue': 1});!!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.pri': -1, 's.ts_enqueue': 1},! update: { '$set': {'s.status': 'reserved'} },! }! );

Add Priority

@rick446 @synappio

Step 3: Prioritydb.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "pri": 30128,! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z")! }!});!!db.message.ensureIndex({'s.status': 1, 's.pri': -1, 's.ts_enqueue': 1});!!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.pri': -1, 's.ts_enqueue': 1},! update: { '$set': {'s.status': 'reserved'} },! }! );

Add Priority

@rick446 @synappio

Step 3: Priority

@rick446 @synappio

Step 3: Priority

Good

@rick446 @synappio

Step 3: Priority

Good• Priorities are

handled

@rick446 @synappio

Step 3: Priority

Good• Priorities are

handled

• Guaranteed FIFO within a priority

@rick446 @synappio

Step 3: Priority

Good• Priorities are

handled

• Guaranteed FIFO within a priority

Bad

@rick446 @synappio

Step 3: Priority

Good• Priorities are

handled

• Guaranteed FIFO within a priority

Bad• No handling of

worker problems

@rick446 @synappio

Handling Unreliable Workers

@rick446 @synappio

Approach 1 Timeouts

db.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "pri": 30128,! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z"),! "ts_timeout" : ISODate("2025-01-01T00:00:00.000Z")! }!});!!db.message.ensureIndex({“s.status": 1, “s.ts_timeout": 1})!!

@rick446 @synappio

Approach 1 Timeouts

db.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "pri": 30128,! "ts_enqueue" : ISODate("2015-03-02T15:27:29.228Z"),! "ts_timeout" : ISODate("2025-01-01T00:00:00.000Z")! }!});!!db.message.ensureIndex({“s.status": 1, “s.ts_timeout": 1})!!

Far-future placeholder

@rick446 @synappio

// Reserve message!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.pri': -1, 's.ts_enqueue': 1},! update: { '$set': {! 's.status': 'reserved',! 's.ts_timeout': now + processing_time } }! }! );!!// Timeout message ("unlock")!db.message.update(! {'s.ts_status': 'reserved', 's.ts_timeout': {'$lt': now}},! {'$set': {'s.status': 'ready'}},! {'multi': true});

Approach 1 Timeouts

@rick446 @synappio

// Reserve message!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.pri': -1, 's.ts_enqueue': 1},! update: { '$set': {! 's.status': 'reserved',! 's.ts_timeout': now + processing_time } }! }! );!!// Timeout message ("unlock")!db.message.update(! {'s.ts_status': 'reserved', 's.ts_timeout': {'$lt': now}},! {'$set': {'s.status': 'ready'}},! {'multi': true});

Client sets timeout

Approach 1 Timeouts

@rick446 @synappio

Approach 1 Timeouts

@rick446 @synappio

Approach 1 Timeouts

Good

@rick446 @synappio

Approach 1 Timeouts

Good• Worker failure

handled via timeout

@rick446 @synappio

Approach 1 Timeouts

Good• Worker failure

handled via timeout

Bad

@rick446 @synappio

Approach 1 Timeouts

Good• Worker failure

handled via timeout

Bad• Requires periodic

“unlock” task

@rick446 @synappio

Approach 1 Timeouts

Good• Worker failure

handled via timeout

Bad• Requires periodic

“unlock” task

• Slow (but “live”) workers can cause spurious timeouts

@rick446 @synappio

db.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "pri": 30128,! "cli": "--------------------------"! "ts_enqueue" : ISODate("2015-03-02T..."),! "ts_timeout" : ISODate("2025-...")! }!});

Approach 2 Worker Identity

@rick446 @synappio

db.message.insert({! "_id" : NumberLong("3784707300388732067"),! "data" : BinData(...),! "s" : {! "status" : "ready",! "pri": 30128,! "cli": "--------------------------"! "ts_enqueue" : ISODate("2015-03-02T..."),! "ts_timeout" : ISODate("2025-...")! }!});

Client / worker placeholder

Approach 2 Worker Identity

@rick446 @synappio

// Reserve message!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.pri': -1, 's.ts_enqueue': 1},! update: { '$set': {! 's.status': 'reserved',! 's.cli': ‘client_name:pid',! 's.ts_timeout': now + processing_time } }! }! );!!// Unlock “dead” client messages!db.message.update(! {'s.status': 'reserved', ! 's.cli': {'$nin': active_clients} },! {'$set': {'s.status': 'ready'}},! {'multi': true});!

Approach 2 Worker Identity

@rick446 @synappio

// Reserve message!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.pri': -1, 's.ts_enqueue': 1},! update: { '$set': {! 's.status': 'reserved',! 's.cli': ‘client_name:pid',! 's.ts_timeout': now + processing_time } }! }! );!!// Unlock “dead” client messages!db.message.update(! {'s.status': 'reserved', ! 's.cli': {'$nin': active_clients} },! {'$set': {'s.status': 'ready'}},! {'multi': true});!

Mark the worker who reserved the

message

Approach 2 Worker Identity

@rick446 @synappio

// Reserve message!db.runCommand(! {! findAndModify: "message",! query: { 's.status': 'ready' },! sort: {'s.pri': -1, 's.ts_enqueue': 1},! update: { '$set': {! 's.status': 'reserved',! 's.cli': ‘client_name:pid',! 's.ts_timeout': now + processing_time } }! }! );!!// Unlock “dead” client messages!db.message.update(! {'s.status': 'reserved', ! 's.cli': {'$nin': active_clients} },! {'$set': {'s.status': 'ready'}},! {'multi': true});!

Mark the worker who reserved the

message

Messages reserved by dead workers are

unlocked

Approach 2 Worker Identity

@rick446 @synappio

Approach 2 Worker Identity

@rick446 @synappio

Approach 2 Worker Identity

Good

@rick446 @synappio

Approach 2 Worker Identity

Good• Worker failure

handled via out-of-band detection of live workers

@rick446 @synappio

Approach 2 Worker Identity

Good• Worker failure

handled via out-of-band detection of live workers

• Handles slow workers

@rick446 @synappio

Approach 2 Worker Identity

Good• Worker failure

handled via out-of-band detection of live workers

• Handles slow workers

@rick446 @synappio

Approach 2 Worker Identity

Good• Worker failure

handled via out-of-band detection of live workers

• Handles slow workers

Bad

@rick446 @synappio

Approach 2 Worker Identity

Good• Worker failure

handled via out-of-band detection of live workers

• Handles slow workers

Bad• Requires periodic

“unlock” task

@rick446 @synappio

Approach 2 Worker Identity

Good• Worker failure

handled via out-of-band detection of live workers

• Handles slow workers

Bad• Requires periodic

“unlock” task

• Unlock updates can be slow

@rick446 @synappio

Shared Resources

@rick446 @synappio

Complex Tasks

Group

check_smtp

Analyze Results

Update Reports

Pipeline

@rick446 @synappio

Semaphores

@rick446 @synappio

Semaphores

• Some services perform connection-throttling (e.g. Mailchimp)

@rick446 @synappio

Semaphores

• Some services perform connection-throttling (e.g. Mailchimp)

• Some services just have a hard time with 144 threads hitting them simultaneously

@rick446 @synappio

Semaphores

• Some services perform connection-throttling (e.g. Mailchimp)

• Some services just have a hard time with 144 threads hitting them simultaneously

• Need a way to limit our concurrency

@rick446 @synappio

Semaphores

Semaphore

Active: msg1, msg2, msg3, …

Capacity: 16

Queued: msg17, msg18, msg19, …

@rick446 @synappio

Semaphores

Semaphore

Active: msg1, msg2, msg3, …

Capacity: 16

Queued: msg17, msg18, msg19, …

• Keep active and queued messages in arrays

@rick446 @synappio

Semaphores

Semaphore

Active: msg1, msg2, msg3, …

Capacity: 16

Queued: msg17, msg18, msg19, …

• Keep active and queued messages in arrays

• Releasing the semaphore makes queued messages available for dispatch

@rick446 @synappio

Semaphores

Semaphore

Active: msg1, msg2, msg3, …

Capacity: 16

Queued: msg17, msg18, msg19, …

• Keep active and queued messages in arrays

• Releasing the semaphore makes queued messages available for dispatch

• Use $slice (2.6) to keep arrays the right size

@rick446 @synappio

Semaphores: Acquiredb.semaphore.insert({! '_id': 'semaphore-name',! 'value': 16,! 'active': [],! 'queued': []});!!def acquire(sem_id, msg_id, sem_size):! sem = db.semaphore.find_and_modify(! {'_id': sem_id},! update={'$push': {! 'active': {! '$each': [msg_id], ! '$slice': sem_size},! 'queued': msg_id}},! new=True)! if msg_id in sem['active']:! db.semaphore.update(! {'_id': 'semaphore-name'},! {'$pull': {'queued': msg_id}})! return True! return False

@rick446 @synappio

Semaphores: Acquiredb.semaphore.insert({! '_id': 'semaphore-name',! 'value': 16,! 'active': [],! 'queued': []});!!def acquire(sem_id, msg_id, sem_size):! sem = db.semaphore.find_and_modify(! {'_id': sem_id},! update={'$push': {! 'active': {! '$each': [msg_id], ! '$slice': sem_size},! 'queued': msg_id}},! new=True)! if msg_id in sem['active']:! db.semaphore.update(! {'_id': 'semaphore-name'},! {'$pull': {'queued': msg_id}})! return True! return False

Pessimistic update

@rick446 @synappio

Semaphores: Acquiredb.semaphore.insert({! '_id': 'semaphore-name',! 'value': 16,! 'active': [],! 'queued': []});!!def acquire(sem_id, msg_id, sem_size):! sem = db.semaphore.find_and_modify(! {'_id': sem_id},! update={'$push': {! 'active': {! '$each': [msg_id], ! '$slice': sem_size},! 'queued': msg_id}},! new=True)! if msg_id in sem['active']:! db.semaphore.update(! {'_id': 'semaphore-name'},! {'$pull': {'queued': msg_id}})! return True! return False

Pessimistic update

Compensation

@rick446 @synappio

Semaphores: Releasedef release(sem_id, msg_id, sem_size):! sem = db.semaphore.find_and_modify(! {'_id': sem_id},! update={'$pull': {! 'active': msg_id, ! 'queued': msg_id}},! new=True)!! while len(sem['active']) < sem_size and sem['queued']:! wake_msg_ids = sem['queued'][:sem_size]! updated = self.cls.m.find_and_modify(! {'_id': sem_id},! update={'$pullAll': {'queued': wake_msg_ids}},! new=True)! for msgid in wake_msg_ids:! make_dispatchable(msgid)! sem = updated

@rick446 @synappio

Semaphores: Releasedef release(sem_id, msg_id, sem_size):! sem = db.semaphore.find_and_modify(! {'_id': sem_id},! update={'$pull': {! 'active': msg_id, ! 'queued': msg_id}},! new=True)!! while len(sem['active']) < sem_size and sem['queued']:! wake_msg_ids = sem['queued'][:sem_size]! updated = self.cls.m.find_and_modify(! {'_id': sem_id},! update={'$pullAll': {'queued': wake_msg_ids}},! new=True)! for msgid in wake_msg_ids:! make_dispatchable(msgid)! sem = updated

Actually release

@rick446 @synappio

Semaphores: Releasedef release(sem_id, msg_id, sem_size):! sem = db.semaphore.find_and_modify(! {'_id': sem_id},! update={'$pull': {! 'active': msg_id, ! 'queued': msg_id}},! new=True)!! while len(sem['active']) < sem_size and sem['queued']:! wake_msg_ids = sem['queued'][:sem_size]! updated = self.cls.m.find_and_modify(! {'_id': sem_id},! update={'$pullAll': {'queued': wake_msg_ids}},! new=True)! for msgid in wake_msg_ids:! make_dispatchable(msgid)! sem = updated

Actually release

Awaken queued message(s)

@rick446 @synappio

Semaphores: Releasedef release(sem_id, msg_id, sem_size):! sem = db.semaphore.find_and_modify(! {'_id': sem_id},! update={'$pull': {! 'active': msg_id, ! 'queued': msg_id}},! new=True)!! while len(sem['active']) < sem_size and sem['queued']:! wake_msg_ids = sem['queued'][:sem_size]! updated = self.cls.m.find_and_modify(! {'_id': sem_id},! update={'$pullAll': {'queued': wake_msg_ids}},! new=True)! for msgid in wake_msg_ids:! make_dispatchable(msgid)! sem = updated

Actually release

Awaken queued message(s)

Some magic (covered later)

@rick446 @synappio

Message States

ready

acquirequeued

busy

@rick446 @synappio

Message States

ready

acquirequeued

busy

• Reserve the message

@rick446 @synappio

Message States

ready

acquirequeued

busy

• Reserve the message

• Acquire resources

@rick446 @synappio

Message States

ready

acquirequeued

busy

• Reserve the message

• Acquire resources

• Process the message

@rick446 @synappio

Message States

ready

acquirequeued

busy

• Reserve the message

• Acquire resources

• Process the message

• Release resources

@rick446 @synappio

Reserve a Message

msg = db.message.find_and_modify(! {'s.status': 'ready'},! sort=[('s.sub_status', -1), ('s.pri', -1), ('s.ts', 1)],! update={'$set': {'s.w': worker, 's.status': 'acquire'}},! new=True)

message.s == {! pri: 10,! semaphores: ['foo'],! status: 'ready',! sub_status: 0,! w: '----------',! ...}

message.s == {! pri: 10,! semaphores: ['foo'],! status: 'acquire! sub_status: 0,! w: worker,! ...}

@rick446 @synappio

Reserve a Message

msg = db.message.find_and_modify(! {'s.status': 'ready'},! sort=[('s.sub_status', -1), ('s.pri', -1), ('s.ts', 1)],! update={'$set': {'s.w': worker, 's.status': 'acquire'}},! new=True)

message.s == {! pri: 10,! semaphores: ['foo'],! status: 'ready',! sub_status: 0,! w: '----------',! ...}

Required semaphores

message.s == {! pri: 10,! semaphores: ['foo'],! status: 'acquire! sub_status: 0,! w: worker,! ...}

@rick446 @synappio

Reserve a Message

msg = db.message.find_and_modify(! {'s.status': 'ready'},! sort=[('s.sub_status', -1), ('s.pri', -1), ('s.ts', 1)],! update={'$set': {'s.w': worker, 's.status': 'acquire'}},! new=True)

message.s == {! pri: 10,! semaphores: ['foo'],! status: 'ready',! sub_status: 0,! w: '----------',! ...}

Required semaphores

# semaphores acquired

message.s == {! pri: 10,! semaphores: ['foo'],! status: 'acquire! sub_status: 0,! w: worker,! ...}

@rick446 @synappio

Reserve a Message

msg = db.message.find_and_modify(! {'s.status': 'ready'},! sort=[('s.sub_status', -1), ('s.pri', -1), ('s.ts', 1)],! update={'$set': {'s.w': worker, 's.status': 'acquire'}},! new=True)

message.s == {! pri: 10,! semaphores: ['foo'],! status: 'ready',! sub_status: 0,! w: '----------',! ...}

Required semaphores

# semaphores acquired

message.s == {! pri: 10,! semaphores: ['foo'],! status: 'acquire! sub_status: 0,! w: worker,! ...}

Prefer partially-acquired messages

@rick446 @synappio

Acquire Resources

def acquire_resources(msg):! for i, sem_id in enumerate(msg['s']['semaphores']):! if i < msg['sub_status']: # already acquired! continue! sem = db.semaphore.find_one({'_id': 'sem_id'})! if try_acquire_resource(sem_id, msg['_id'], sem['value']):! db.message.update(! {'_id': msg['_id']}, {'$set': {'s.sub_status': i}})! else:! return False! db.message.update(! {'_id': msg['_id']}, {'$set': {'s.status': 'busy'}})! return True

@rick446 @synappio

Acquire Resources

def acquire_resources(msg):! for i, sem_id in enumerate(msg['s']['semaphores']):! if i < msg['sub_status']: # already acquired! continue! sem = db.semaphore.find_one({'_id': 'sem_id'})! if try_acquire_resource(sem_id, msg['_id'], sem['value']):! db.message.update(! {'_id': msg['_id']}, {'$set': {'s.sub_status': i}})! else:! return False! db.message.update(! {'_id': msg['_id']}, {'$set': {'s.status': 'busy'}})! return True

Save forward progress

@rick446 @synappio

Acquire Resources

def acquire_resources(msg):! for i, sem_id in enumerate(msg['s']['semaphores']):! if i < msg['sub_status']: # already acquired! continue! sem = db.semaphore.find_one({'_id': 'sem_id'})! if try_acquire_resource(sem_id, msg['_id'], sem['value']):! db.message.update(! {'_id': msg['_id']}, {'$set': {'s.sub_status': i}})! else:! return False! db.message.update(! {'_id': msg['_id']}, {'$set': {'s.status': 'busy'}})! return True

Save forward progress

Failure to acquire (already queued)

@rick446 @synappio

Acquire Resources

def acquire_resources(msg):! for i, sem_id in enumerate(msg['s']['semaphores']):! if i < msg['sub_status']: # already acquired! continue! sem = db.semaphore.find_one({'_id': 'sem_id'})! if try_acquire_resource(sem_id, msg['_id'], sem['value']):! db.message.update(! {'_id': msg['_id']}, {'$set': {'s.sub_status': i}})! else:! return False! db.message.update(! {'_id': msg['_id']}, {'$set': {'s.status': 'busy'}})! return True

Save forward progress

Failure to acquire (already queued)

Resources acquired, message ready to be

processed

@rick446 @synappio

Acquire Resources

def try_acquire_resource(sem_id, msg_id, sem_size):! '''Version 1 (race condition)'''! if reserve(sem_id, msg_id, sem_size):! return True! else:! db.message.update(! {'_id': msg_id},! {'$set': {'s.status': 'queued'}})! return False

@rick446 @synappio

Acquire Resources

def try_acquire_resource(sem_id, msg_id, sem_size):! '''Version 1 (race condition)'''! if reserve(sem_id, msg_id, sem_size):! return True! else:! db.message.update(! {'_id': msg_id},! {'$set': {'s.status': 'queued'}})! return False

Here be dragons!

@rick446 @synappio

Release Resources (v1) “magic”

def make_dispatchable(msg_id):! '''Version 1 (race condition)'''! db.message.update(! {'_id': msg_id, 's.status': 'queued'},! {'$set': {'s.status': 'ready'}})

@rick446 @synappio

Release Resources (v1) “magic”

def make_dispatchable(msg_id):! '''Version 1 (race condition)'''! db.message.update(! {'_id': msg_id, 's.status': 'queued'},! {'$set': {'s.status': 'ready'}})

But what if s.status == ‘acquire’?

@rick446 @synappio

Release Resources (v1) “magic”

def make_dispatchable(msg_id):! '''Version 1 (race condition)'''! db.message.update(! {'_id': msg_id, 's.status': 'queued'},! {'$set': {'s.status': 'ready'}})

But what if s.status == ‘acquire’?

@rick446 @synappio

Release Resources (v1) “magic”

def make_dispatchable(msg_id):! '''Version 1 (race condition)'''! db.message.update(! {'_id': msg_id, 's.status': 'queued'},! {'$set': {'s.status': 'ready'}})

But what if s.status == ‘acquire’?

That’s the dragon.

@rick446 @synappio

Release Resources (v2)

def make_dispatchable(msg_id):! res = db.message.update(! {'_id': msg_id, 's.status': 'acquire'},! {'$set': {'s.event': True}})! if not res['updatedExisting']:! db.message.update(! {'_id': msg_id, 's.status': 'queued'},! {'$set': {'s.status': 'ready'}})

@rick446 @synappio

Release Resources (v2)

def make_dispatchable(msg_id):! res = db.message.update(! {'_id': msg_id, 's.status': 'acquire'},! {'$set': {'s.event': True}})! if not res['updatedExisting']:! db.message.update(! {'_id': msg_id, 's.status': 'queued'},! {'$set': {'s.status': 'ready'}})

Hey, something happened!

@rick446 @synappio

Acquire Resources (v2)

def try_acquire_resource(sem_id, msg_id, sem_size):! '''Version 2'''! while True:! db.message.update(! {'_id': msg_id}, {'$set': {'event': False}})! if reserve(sem_id, msg_id, sem_size):! return True! else:! res = db.message.update(! {'_id': msg_id, 's.event': False},! {'$set': {'s.status': 'queued'}})! if not res['updatedExisting']:! # Someone released this message; try again! continue! return False

@rick446 @synappio

Acquire Resources (v2)

def try_acquire_resource(sem_id, msg_id, sem_size):! '''Version 2'''! while True:! db.message.update(! {'_id': msg_id}, {'$set': {'event': False}})! if reserve(sem_id, msg_id, sem_size):! return True! else:! res = db.message.update(! {'_id': msg_id, 's.event': False},! {'$set': {'s.status': 'queued'}})! if not res['updatedExisting']:! # Someone released this message; try again! continue! return False

Nothing’s happened yet!

@rick446 @synappio

Acquire Resources (v2)

def try_acquire_resource(sem_id, msg_id, sem_size):! '''Version 2'''! while True:! db.message.update(! {'_id': msg_id}, {'$set': {'event': False}})! if reserve(sem_id, msg_id, sem_size):! return True! else:! res = db.message.update(! {'_id': msg_id, 's.event': False},! {'$set': {'s.status': 'queued'}})! if not res['updatedExisting']:! # Someone released this message; try again! continue! return False

Nothing’s happened yet!

Check if something happened

@rick446 @synappio

One More Race….def release(sem_id, msg_id, sem_size):! sem = db.semaphore.find_and_modify(! {'_id': sem_id},! update={'$pull': {! 'active': msg_id, ! 'queued': msg_id}},! new=True)!! while len(sem['active']) < sem_size and sem['queued']:! wake_msg_ids = sem['queued'][:sem_size]! updated = self.cls.m.find_and_modify(! {'_id': sem_id},! update={'$pullAll': {'queued': wake_msg_ids}},! new=True)! for msgid in wake_msg_ids:! make_dispatchable(msgid)! sem = updated

@rick446 @synappio

One More Race….def release(sem_id, msg_id, sem_size):! sem = db.semaphore.find_and_modify(! {'_id': sem_id},! update={'$pull': {! 'active': msg_id, ! 'queued': msg_id}},! new=True)!! while len(sem['active']) < sem_size and sem['queued']:! wake_msg_ids = sem['queued'][:sem_size]! updated = self.cls.m.find_and_modify(! {'_id': sem_id},! update={'$pullAll': {'queued': wake_msg_ids}},! new=True)! for msgid in wake_msg_ids:! make_dispatchable(msgid)! sem = updated

@rick446 @synappio

Compensate!

def fixup_queued_messages():! for msg in db.message.find({'s.status': 'queued'}):! sem_id = msg['semaphores'][msg['s']['sub_status']]! sem = db.semaphore.find_one(! {'_id': sem_id, 'queued': msg['_id']})! if sem is None:! db.message.m.update(! {'_id': msg['_id'], ! 's.status': 'queued', ! 's.sub_status': msg['sub_status']},! {'$set': {'s.status': 'ready'}})

@rick446 @synappio

Managing Latency

@rick446 @synappio

Managing Latency

• Reserving messages is expensive

• Use Pub/Sub system instead

• Publish to the channel whenever a message is ready to be handled

• Each worker subscribes to the channel

• Workers only ‘poll’ when they have a chance of getting work

Capped CollectionsCapped

Collection

• Fixed size

• Fast inserts

• “Tailable” cursors

Tailable Cursor

Capped CollectionsCapped

Collection

• Fixed size

• Fast inserts

• “Tailable” cursors

Tailable Cursor

Capped CollectionsCapped

Collection

• Fixed size

• Fast inserts

• “Tailable” cursors

Tailable Cursor

Capped CollectionsCapped

Collection

• Fixed size

• Fast inserts

• “Tailable” cursors

Tailable Cursor

Capped CollectionsCapped

Collection

• Fixed size

• Fast inserts

• “Tailable” cursors

Tailable Cursor

Capped CollectionsCapped

Collection

• Fixed size

• Fast inserts

• “Tailable” cursors

Tailable Cursor

@rick446 @synappio

Getting a Tailable Cursor

def get_cursor(collection, topic_re, await_data=True):! options = { 'tailable': True }! if await_data:! options['await_data'] = True! cur = collection.find(! { 'k': topic_re },! **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! return cur

@rick446 @synappio

Getting a Tailable Cursor

def get_cursor(collection, topic_re, await_data=True):! options = { 'tailable': True }! if await_data:! options['await_data'] = True! cur = collection.find(! { 'k': topic_re },! **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! return cur

Make cursor tailable

@rick446 @synappio

Getting a Tailable Cursor

def get_cursor(collection, topic_re, await_data=True):! options = { 'tailable': True }! if await_data:! options['await_data'] = True! cur = collection.find(! { 'k': topic_re },! **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! return cur

Holds open cursor for a while

Make cursor tailable

@rick446 @synappio

Getting a Tailable Cursor

def get_cursor(collection, topic_re, await_data=True):! options = { 'tailable': True }! if await_data:! options['await_data'] = True! cur = collection.find(! { 'k': topic_re },! **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! return cur

Holds open cursor for a while

Make cursor tailable

Don’t use indexes

@rick446 @synappio

Getting a Tailable Cursor

def get_cursor(collection, topic_re, await_data=True):! options = { 'tailable': True }! if await_data:! options['await_data'] = True! cur = collection.find(! { 'k': topic_re },! **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! return cur

import re, time!while True:! cur = get_cursor(! db.capped_collection, ! re.compile('^foo'), ! await_data=True)! for msg in cur:! do_something(msg)! time.sleep(0.1)

Holds open cursor for a while

Make cursor tailable

Don’t use indexes

@rick446 @synappio

Getting a Tailable Cursor

def get_cursor(collection, topic_re, await_data=True):! options = { 'tailable': True }! if await_data:! options['await_data'] = True! cur = collection.find(! { 'k': topic_re },! **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! return cur

import re, time!while True:! cur = get_cursor(! db.capped_collection, ! re.compile('^foo'), ! await_data=True)! for msg in cur:! do_something(msg)! time.sleep(0.1)

Holds open cursor for a while

Make cursor tailable

Don’t use indexes

Still some polling when no producer, so don’t

spin too fast

@rick446 @synappio

Building in retry...

def get_cursor(collection, topic_re, last_id=-1, await_data=True):! options = { 'tailable': True }! spec = { ! 'id': { '$gt': last_id }, # only new messages! 'k': topic_re }! if await_data:! options['await_data'] = True! cur = collection.find(spec, **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! return cur

@rick446 @synappio

Building in retry...

def get_cursor(collection, topic_re, last_id=-1, await_data=True):! options = { 'tailable': True }! spec = { ! 'id': { '$gt': last_id }, # only new messages! 'k': topic_re }! if await_data:! options['await_data'] = True! cur = collection.find(spec, **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! return cur

Integer autoincrement “id”

@rick446 @synappio

Ludicrous Speed

from pymongo.cursor import _QUERY_OPTIONS!!def get_cursor(collection, topic_re, last_id=-1, await_data=True):! options = { 'tailable': True }! spec = { ! 'ts': { '$gt': last_id }, # only new messages! 'k': topic_re }! if await_data:! options['await_data'] = True! cur = collection.find(spec, **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! if await:! cur = cur.add_option(_QUERY_OPTIONS['oplog_replay'])! return cur

@rick446 @synappio

Ludicrous Speed

from pymongo.cursor import _QUERY_OPTIONS!!def get_cursor(collection, topic_re, last_id=-1, await_data=True):! options = { 'tailable': True }! spec = { ! 'ts': { '$gt': last_id }, # only new messages! 'k': topic_re }! if await_data:! options['await_data'] = True! cur = collection.find(spec, **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! if await:! cur = cur.add_option(_QUERY_OPTIONS['oplog_replay'])! return cur

id ==> ts

@rick446 @synappio

Ludicrous Speed

from pymongo.cursor import _QUERY_OPTIONS!!def get_cursor(collection, topic_re, last_id=-1, await_data=True):! options = { 'tailable': True }! spec = { ! 'ts': { '$gt': last_id }, # only new messages! 'k': topic_re }! if await_data:! options['await_data'] = True! cur = collection.find(spec, **options)! cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes! if await:! cur = cur.add_option(_QUERY_OPTIONS['oplog_replay'])! return cur

id ==> ts

Co-opt the oplog_replay

option

@rick446 @synappio

The Oplog

• Capped collection that records all operations for replication

• Includes a ‘ts’ field suitable for oplog_replay

• Does not require a separate publish operation (all changes are automatically “published”)

@rick446 @synappio

Using the Oplog

def oplog_await(oplog, spec):! '''Await the very next message on the oplog satisfying the spec'''! last = oplog.find_one(spec, sort=[('$natural', -1)])! if last is None:! return # Can't await unless there is an existing message satisfying spec! await_spec = dict(spec)! last_ts = last['ts']! await_spec['ts'] = {'$gt': bson.Timestamp(last_ts.time, last_ts.inc - 1)}! curs = oplog.find(await_spec, tailable=True, await_data=True)! curs = curs.hint([('$natural', 1)])! curs = curs.add_option(_QUERY_OPTIONS['oplog_replay'])! curs.next() # should always find 1 element! try:! return curs.next()! except StopIteration:! return None

@rick446 @synappio

Using the Oplog

def oplog_await(oplog, spec):! '''Await the very next message on the oplog satisfying the spec'''! last = oplog.find_one(spec, sort=[('$natural', -1)])! if last is None:! return # Can't await unless there is an existing message satisfying spec! await_spec = dict(spec)! last_ts = last['ts']! await_spec['ts'] = {'$gt': bson.Timestamp(last_ts.time, last_ts.inc - 1)}! curs = oplog.find(await_spec, tailable=True, await_data=True)! curs = curs.hint([('$natural', 1)])! curs = curs.add_option(_QUERY_OPTIONS['oplog_replay'])! curs.next() # should always find 1 element! try:! return curs.next()! except StopIteration:! return None

most recent oplog entry

@rick446 @synappio

Using the Oplog

def oplog_await(oplog, spec):! '''Await the very next message on the oplog satisfying the spec'''! last = oplog.find_one(spec, sort=[('$natural', -1)])! if last is None:! return # Can't await unless there is an existing message satisfying spec! await_spec = dict(spec)! last_ts = last['ts']! await_spec['ts'] = {'$gt': bson.Timestamp(last_ts.time, last_ts.inc - 1)}! curs = oplog.find(await_spec, tailable=True, await_data=True)! curs = curs.hint([('$natural', 1)])! curs = curs.add_option(_QUERY_OPTIONS['oplog_replay'])! curs.next() # should always find 1 element! try:! return curs.next()! except StopIteration:! return None

most recent oplog entry

finds most recent plus following entries

@rick446 @synappio

Using the Oplog

def oplog_await(oplog, spec):! '''Await the very next message on the oplog satisfying the spec'''! last = oplog.find_one(spec, sort=[('$natural', -1)])! if last is None:! return # Can't await unless there is an existing message satisfying spec! await_spec = dict(spec)! last_ts = last['ts']! await_spec['ts'] = {'$gt': bson.Timestamp(last_ts.time, last_ts.inc - 1)}! curs = oplog.find(await_spec, tailable=True, await_data=True)! curs = curs.hint([('$natural', 1)])! curs = curs.add_option(_QUERY_OPTIONS['oplog_replay'])! curs.next() # should always find 1 element! try:! return curs.next()! except StopIteration:! return None

most recent oplog entry

finds most recent plus following entries

skip most recent

@rick446 @synappio

Using the Oplog

def oplog_await(oplog, spec):! '''Await the very next message on the oplog satisfying the spec'''! last = oplog.find_one(spec, sort=[('$natural', -1)])! if last is None:! return # Can't await unless there is an existing message satisfying spec! await_spec = dict(spec)! last_ts = last['ts']! await_spec['ts'] = {'$gt': bson.Timestamp(last_ts.time, last_ts.inc - 1)}! curs = oplog.find(await_spec, tailable=True, await_data=True)! curs = curs.hint([('$natural', 1)])! curs = curs.add_option(_QUERY_OPTIONS['oplog_replay'])! curs.next() # should always find 1 element! try:! return curs.next()! except StopIteration:! return None

most recent oplog entry

finds most recent plus following entries

skip most recent

return on anything new

@rick446 @synappio

What We’ve Learned

@rick446 @synappio

What We’ve LearnedHow to…

@rick446 @synappio

What We’ve LearnedHow to…

Build a task queue in MongoDB

@rick446 @synappio

What We’ve LearnedHow to…

Build a task queue in MongoDB

@rick446 @synappio

What We’ve LearnedHow to…

Build a task queue in MongoDB

Bring consistency to distributed systems (without transactions)

@rick446 @synappio

What We’ve LearnedHow to…

Build a task queue in MongoDB

Bring consistency to distributed systems (without transactions)

@rick446 @synappio

What We’ve LearnedHow to…

Build a task queue in MongoDB

Bring consistency to distributed systems (without transactions)

Build low-latency reactive systems

@rick446 @synappio

Tips

@rick446 @synappio

Tips

• findAndModify is ideal for queues

@rick446 @synappio

Tips

• findAndModify is ideal for queues

@rick446 @synappio

Tips

• findAndModify is ideal for queues

• Atomic update + compensation brings consistency to your distributed system

@rick446 @synappio

Tips

• findAndModify is ideal for queues

• Atomic update + compensation brings consistency to your distributed system

@rick446 @synappio

Tips

• findAndModify is ideal for queues

• Atomic update + compensation brings consistency to your distributed system

• Use the oplog to build reactive, low-latency systems

Questions?

Rick Copeland rick@synapp.io

@rick446

top related