advanced task management with celery
DESCRIPTION
Celery is a really good framework for doing background task processing in Python (and other languages). While it is ridiculously easy to use celery, doing complex task flow has been a challenge in celery. (w.r.t task trees/graphs/dependecies etc.) This talk introduces the audience to these challenges in celery and also explains how these can be fixed programmatically and by using latest features in Celery (3+)TRANSCRIPT
![Page 1: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/1.jpg)
Advanced Task Management in CeleryAdvanced Task Management in Celery
Mahendra MMahendra M
@mahendra@mahendra
https://github.com/mahendrahttps://github.com/mahendra
![Page 2: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/2.jpg)
@mahendra@mahendra
● Python developer for 6 yearsPython developer for 6 years● FOSS enthusiast/volunteer for 14 yearsFOSS enthusiast/volunteer for 14 years
● Bangalore LUG and Infosys LUGBangalore LUG and Infosys LUG● FOSS.in and LinuxBangalore/200xFOSS.in and LinuxBangalore/200x
● Celery user for 3 yearsCelery user for 3 years● ContributionsContributions
● patches, testing new releasespatches, testing new releases● Zookeeper msg transport for kombuZookeeper msg transport for kombu● Kafka support (in-progress)Kafka support (in-progress)
![Page 3: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/3.jpg)
Quick Intro to CeleryQuick Intro to Celery
● Asynchronous task/job queueAsynchronous task/job queue● Uses distributed message passingUses distributed message passing● Tasks are run asynchronously on worker nodesTasks are run asynchronously on worker nodes● Results are passed back to the caller (if any)Results are passed back to the caller (if any)
![Page 4: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/4.jpg)
OverviewOverview
Sender Msg Q
Worker 1
Worker 2
Worker N
.
.
.
![Page 5: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/5.jpg)
Sample CodeSample Code
from celery.task import taskfrom celery.task import task
@task@task
def add(x, y):def add(x, y):
return x + yreturn x + y
result = add.delay(5,6)result = add.delay(5,6)
result.get()result.get()
![Page 6: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/6.jpg)
Uses of CeleryUses of Celery
● Asynchronous task processingAsynchronous task processing● Handling long running / heavy jobsHandling long running / heavy jobs
● Image resizing, video transcode, PDF generationImage resizing, video transcode, PDF generation
● Offloading heavy web backend operationsOffloading heavy web backend operations● Scheduling tasks to be run at a particular timeScheduling tasks to be run at a particular time
● Cron for pythonCron for python
![Page 7: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/7.jpg)
Advanced UsesAdvanced Uses
● Task RoutingTask Routing● Task retries, timeout and revokingTask retries, timeout and revoking● Task Canvas – combining tasksTask Canvas – combining tasks
● Task co-ordinationTask co-ordination● DependenciesDependencies● Task trees or graphsTask trees or graphs● Batch tasksBatch tasks● Progress monitoringProgress monitoring
● TricksTricks● DB conflict managementDB conflict management
![Page 8: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/8.jpg)
Sending tasks to a particular workerSending tasks to a particular worker
Sender Msg Q
Worker 1(Windows)
Worker 2(Windows)
Worker N(Linux)
.
.
.
windows
windows
linux
![Page 9: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/9.jpg)
Routing tasks – Use casesRouting tasks – Use cases
● Priority executionPriority execution● Based on hardware capabilitiesBased on hardware capabilities
● Special cards available for video captureSpecial cards available for video capture● Making use of GPUs (CUDA)Making use of GPUs (CUDA)
● Based on OS (for eg. Playready encryption)Based on OS (for eg. Playready encryption)● Based on locationBased on location
● Moving compute closer to data (Hadoop-ish)Moving compute closer to data (Hadoop-ish)● Sending tasks to different data centersSending tasks to different data centers
● Sequencing operations (CouchDB conflicts)Sequencing operations (CouchDB conflicts)
![Page 10: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/10.jpg)
Sample CodeSample Code
from celery.task import taskfrom celery.task import task
@task(queue = 'windows')@task(queue = 'windows')
def drm_encrypt(audio_file, key_phrase):def drm_encrypt(audio_file, key_phrase):
......
r = drm_encrypt.apply_async( args = [afile, key],r = drm_encrypt.apply_async( args = [afile, key],
queue = 'windows' )queue = 'windows' )
#Start celery worker with queues options#Start celery worker with queues options
$ celery worker -Q windows$ celery worker -Q windows
![Page 11: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/11.jpg)
Retrying tasksRetrying tasks
@task( default_retry_delay = 60,@task( default_retry_delay = 60,
max_retries = 3 )max_retries = 3 )
def drm_encrypt(audio_file, key_phrase):def drm_encrypt(audio_file, key_phrase):
try:try:
playready.encrypt(...)playready.encrypt(...)
except Exception, exc:except Exception, exc:
raise drm_encrypt.retry(exc=exc, countdown=5)raise drm_encrypt.retry(exc=exc, countdown=5)
![Page 12: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/12.jpg)
Retrying tasksRetrying tasks
● You can specify the number of times a task can You can specify the number of times a task can be retried.be retried.
● The cases for retrying a task must be handled The cases for retrying a task must be handled within code. Celery will not do it automaticallywithin code. Celery will not do it automatically
● The tasks should be designed to be idempotentThe tasks should be designed to be idempotent
![Page 13: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/13.jpg)
Handling worker failuresHandling worker failures
@task( acks_late = True )@task( acks_late = True )
def drm_encrypt(audio_file, key_phrase):def drm_encrypt(audio_file, key_phrase):
try:try:
playready.encrypt(...)playready.encrypt(...)
except Exception, exc:except Exception, exc:
raise drm_encrypt.retry(exc=exc, countdown=5)raise drm_encrypt.retry(exc=exc, countdown=5)
● This is used where the task must be resend in case of This is used where the task must be resend in case of worker or node failureworker or node failure
● The ack message to the message queue is sent after the The ack message to the message queue is sent after the task finishes executingtask finishes executing
![Page 14: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/14.jpg)
Worker processesWorker processes
Sender Msg Q
Worker 1(Windows)
Worker 2(Windows)
Worker N(Linux)
.
.
.
windows
windows
linux
Process 1
Process 2
Process N
![Page 15: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/15.jpg)
Worker processesWorker processes
Sender Msg Q
Worker 1(Windows)
Worker 2(Windows)
Worker N(Linux)
.
.
.
windows
windows
linux
Process 1
Process 2
Process N
![Page 16: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/16.jpg)
Worker processWorker process
● In every worker node, celery starts a pool of In every worker node, celery starts a pool of worker processesworker processes
● The number is determined by the concurrency The number is determined by the concurrency setting (or autodetected – for full CPU usage)setting (or autodetected – for full CPU usage)
● Each processes can be configured to restart Each processes can be configured to restart after running x number of tasksafter running x number of tasks● Disabled by defaultDisabled by default
● Alternately eventlet can be used instead of Alternately eventlet can be used instead of processes (discuss later)processes (discuss later)
![Page 17: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/17.jpg)
Revoking tasksRevoking tasks
celery.control.revoke( task_id,celery.control.revoke( task_id,
terminate = False,terminate = False,
signal = 'SIGKILL' )signal = 'SIGKILL' )
● revoke()revoke() works by sending a broadcast works by sending a broadcast message to all workersmessage to all workers
● If a task has not yet run, workers will keep this If a task has not yet run, workers will keep this task_idtask_id in memory and ensure that it does not in memory and ensure that it does not runrun
● If a task is running, If a task is running, revoke()revoke() will not work will not work unless unless terminate = Trueterminate = True
![Page 18: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/18.jpg)
Task expirationTask expiration
task.apply_async( expires = x )task.apply_async( expires = x )
x can bex can be
* in seconds * in seconds
* a specific datetime()* a specific datetime()
● Global time limits can be configured in settingsGlobal time limits can be configured in settings● Soft time limit – the task receives an exception Soft time limit – the task receives an exception
which can be used to cleanupwhich can be used to cleanup● Hard time limit – the worker running the task is Hard time limit – the worker running the task is
killed and is replaced with another one.killed and is replaced with another one.
![Page 19: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/19.jpg)
Handling soft time limitHandling soft time limit
@task()@task()
def drm_encrypt(audio_file, key_phrase):def drm_encrypt(audio_file, key_phrase):
Try:Try:
setup_tmp_files()setup_tmp_files()
playready.encrypt(...)playready.encrypt(...)
except SoftTimeLimitExceeded:except SoftTimeLimitExceeded:
cleanup_tmp_files()cleanup_tmp_files()
except Exception, exc:except Exception, exc:
raise drm_encrypt.retry(exc=exc, countdown=5)raise drm_encrypt.retry(exc=exc, countdown=5)
SoftTimeLimitExceeded:
![Page 20: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/20.jpg)
Task CanvasTask Canvas
● Chains – Linking one task to anotherChains – Linking one task to another● Groups – Execute several tasks in parallelGroups – Execute several tasks in parallel● Chord – execute a task after a set of tasks has Chord – execute a task after a set of tasks has
finishedfinished● Map and starmap – Similar to map() functionMap and starmap – Similar to map() function● Chunks – divide an iterable of work into chunksChunks – divide an iterable of work into chunks● Chunks + Chord/chain can be used for map-Chunks + Chord/chain can be used for map-
reducereduce
Best shown in a demoBest shown in a demo
![Page 21: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/21.jpg)
Task treesTask trees
[ task 1 ] --- spawns --- [ task 2 ] ---- spawns --> [ task 2_1 ] | [ task 2_3 ] | +------ [ task 3 ] ---- spawns --> [ task 3_1 ] | [ task 3_2 ] | +------ [ task 4 ] ---- links ---> [ task 5 ] |(spawns) | | [ task 8 ] <--- links <--- [ task 6 ] |(spawns) [ task 7 ]
![Page 22: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/22.jpg)
Task TreesTask Trees
● Home grown solution (our current approach)Home grown solution (our current approach)● Use db models and keep track of treesUse db models and keep track of trees
● Better approachBetter approach● Use celery-tasktreeUse celery-tasktree● http://pypi.python.org/pypi/celery-tasktreehttp://pypi.python.org/pypi/celery-tasktree
![Page 23: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/23.jpg)
Celery BatchesCelery Batches
● Collect jobs and execute it in a batch.Collect jobs and execute it in a batch.● Can be used for stats collectionCan be used for stats collection● Batch execution is done onceBatch execution is done once
● a configured timeout is reached ORa configured timeout is reached OR● a configured number of tasks have been receiveda configured number of tasks have been received
● Useful for reducing n/w and db loadsUseful for reducing n/w and db loads
![Page 24: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/24.jpg)
Celery BatchesCelery Batches
from celery.contrib.batches import Batchesfrom celery.contrib.batches import Batches
@task( base=Batches, flush_every=50, flush_interval=10 )@task( base=Batches, flush_every=50, flush_interval=10 )
def collect_stats( requests ):def collect_stats( requests ):
items = {}items = {}
for request in requests:for request in requests:
item_id = request.kwargs['item_id']item_id = request.kwargs['item_id']
items[ item_id ] = get_obj( item_id )items[ item_id ] = get_obj( item_id )
items[ item_id ].count += 1items[ item_id ].count += 1
# Sync to db# Sync to db
collect_stats.delay( item_id = 45 )collect_stats.delay( item_id = 45 )
collect_stats.delay( item_id = 57 )collect_stats.delay( item_id = 57 )
![Page 25: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/25.jpg)
Celery monitoringCelery monitoring
● Celery FlowerCelery Flower
https://github.com/mher/flower https://github.com/mher/flower ● Django admin monitorDjango admin monitor● Celery jobstaticCelery jobstatic
http://pypi.python.org/pypi/jobtastichttp://pypi.python.org/pypi/jobtastic
![Page 26: Advanced task management with Celery](https://reader034.vdocuments.us/reader034/viewer/2022052315/54922258b4795963488b5318/html5/thumbnails/26.jpg)
Celery deploymentCelery deployment
● Cyme – celery instance managerCyme – celery instance manager
https://github.com/celery/cyme https://github.com/celery/cyme ● Celery autoscalingCelery autoscaling● Use celery eventlet where requiredUse celery eventlet where required