building a high-performance distributed task queue on mongodb
DESCRIPTION
TRANSCRIPT
![Page 2: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/2.jpg)
Getting to Know OneAnother
Rick
![Page 3: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/3.jpg)
Roadmap
Define the problem
Schema design & operations
Types of tasks
Reducing Polling
••••
![Page 4: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/4.jpg)
RequirementsDigital Ocean
Rackspace US
Rackspace UK
SMTPServer
SMTPServer
SMTPServer
SMTPServer
SMTPServer
SMTPServer
AppServer
![Page 5: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/5.jpg)
Requirements
Group
check_smtp
AnalyzeResults
UpdateReports
Pipeline
![Page 6: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/6.jpg)
Requirements
MongoDB(of course)
![Page 7: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/7.jpg)
Basic Ideas
msg
Chapman Insecure
msg
msg
msg
msg
msgTask
Task
WorkerProcess
![Page 8: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/8.jpg)
Job Queue Schema:Message
{ _id: ObjectId(...), task_id: ObjectId(...), slot: 'run', s: { status: 'ready', ts: ISODateTime(...), q: 'chapman', pri: 10, w: '----------', }, args: Binary(...), kwargs: Binary(...), send_args: Binary(...), send_kwargs: Binary(...)}
Task method tobe run
DestinationTask
Scheduling /SynchronizationMessage
arguments
![Page 9: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/9.jpg)
Job Queue Schema:TaskState
{ _id: ObjectId(...), type: 'Group', parent_id: ObjectId(...), on_complete: ObjectId(...), mq: [ObjectId(...), ...], status: 'pending', options: { queue: 'chapman', priority: 10, immutable: false, ignore_result: true, } result: Binary(...), data: {...}}
Python classregistered for
task
Parent task (if any)
Message to besent on
completion
![Page 10: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/10.jpg)
Message State: NaiveApproach
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
ReserveMessage
Try tolock task
Un-reserveMessage
![Page 11: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/11.jpg)
Message States:Reserve Message
findAndModify (‘ready’)
s.state => ‘q1’
s.w => worker_id
$push _id onto task’s mq field
If msg is first in mq, s.state =>‘busy’
Start processing
•
•
•
•
•
•
ready
q1
busy
q2
next
pending
![Page 12: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/12.jpg)
Message States:Reserve Message
findAndModify (‘next’)
s.state => ‘busy’
s.w => worker_id
start processing
•
•
•
•
ready
q1
busy
q2
next
pending
![Page 13: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/13.jpg)
Message States:Retire Message
findAndModify TaskState
$pull message _id from ‘mq’
findAndModify new first messagein ‘mq’ if its s.state is in [‘q1’, ‘q2’]
s.state => ‘next’
•
•
•
•
ready
q1
busy
q2
next
pending
![Page 14: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/14.jpg)
Task States
States mainlyadvisory
success,failuretransitions triggeron_completemessage
‘chained’ is a tail-call optimization
•
•
•
pending
active
chainedfailuresuccess
![Page 15: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/15.jpg)
Basic Tasks:FunctionTask
Simplest task: run a function tocompletion, set the result to the returnvalue
If a ‘Suspend’ exception is raised, movethe task to ‘chained’ status
Other exceptions set task to ‘failure’, save
•
•
•
@task(ignore_result=True, priority=50)def track(event, user_id=None, **kwargs): log.info('track(%s, %s...)', event, user_id) # ...
![Page 16: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/16.jpg)
Digression: TaskChaining
Task state set to ‘chained’
New “Chain” task is created that will
Call the “next” task
When the “next” task completes, alsocomplete the “current” task
••••
@task(ignore_result=True,priority=50)def function_task(*args, **kwargs): # ... Chain.call(some_other_task)
![Page 17: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/17.jpg)
Composite Tasks
on_complete message for each subtaskwith slot=retire_subtask, specifying subtaskposition & the result of the subtask
Different composite tasks implement ‘run’and ‘retire_subtask’ differently
•
•
task_state.update( { '_id': subtask_id }, { $set: { 'parent_id': composite_id, 'data.composite_position': position, 'options.ignore_result': false }})
![Page 18: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/18.jpg)
Composite Task:Pipeline
Run
Send a ‘run’ message to the subtask withposition=0
Retire_subtask(position, result)
Send a ‘run’ message with the previousresult to the subtask with position =(position+1), OR retire the Pipeline if nomore tasks
••
••
![Page 19: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/19.jpg)
Composite Task:Group
Run
Send a ‘run’ message to all subtasks
Retire_subtask(position, result)
Decrement the num_waiting counter
If num_waiting is 0, retire the group
Collect subtask results, completegroup, delete subtasks
••
••••
![Page 20: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/20.jpg)
Reducing Polling
Reserving messages is expensive
Use Pub/Sub system instead
Publish to the channel whenever amessage is ready to be handled
Each worker subscribes to the channel
•••
•
![Page 21: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/21.jpg)
Pub/Sub for MongoDBCapped
Collection
Fixed size
Fast inserts
“Tailable”cursors
•
•
•
Tailable Cursor
![Page 22: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/22.jpg)
Getting a TailableCursor
def get_cursor(collection, topic_re, await_data=True): options = { 'tailable': True } if await_data: options['await_data'] = True cur = collection.find( { 'k': topic_re }, **options) cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes return cur
import re, timewhile True: cur = get_cursor( db.capped_collection, re.compile('^foo'), await_data=True) for msg in cur: do_something(msg) time.sleep(0.1)
Holds open cursor for awhile
Make cursor tailable
Don’t use indexes
Still some polling when noproducer, so don’t spin
too fast
![Page 23: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/23.jpg)
Building in retry...
def get_cursor(collection, topic_re, last_id=-1, await_data=True): options = { 'tailable': True } spec = { 'id': { '$gt': last_id }, # only new messages 'k': topic_re } if await_data: options['await_data'] = True cur = collection.find(spec, **options) cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes return cur
Integer autoincrement
![Page 24: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/24.jpg)
Building auto-increment
class Sequence(object):...
def next(self, sname, inc=1): doc = self._db[self._name].find_and_modify( query={'_id': sname}, update={'$inc': { 'value': inc } }, upsert=True, new=True) return doc['value']
Atomically $inc thededicated document
![Page 25: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/25.jpg)
Ludicrous Speed
from pymongo.cursor import _QUERY_OPTIONSdef get_cursor(collection, topic_re, last_id=-1, await_data=True): options = { 'tailable': True } spec = { 'ts': { '$gt': last_id }, # only new messages 'k': topic_re } if await_data: options['await_data'] = True cur = collection.find(spec, **options) cur = cur.hint([('$natural', 1)]) # ensure we don't use any indexes if await: cur = cur.add_option(_QUERY_OPTIONS['oplog_replay']) return cur
id ==> ts
Co-opt theoplog_replay
option
![Page 26: Building a High-Performance Distributed Task Queue on MongoDB](https://reader031.vdocuments.us/reader031/viewer/2022020217/547eacb3b4af9fea158b5640/html5/thumbnails/26.jpg)
Performance