practical celery

Post on 04-Jul-2015

689 Views

Category:

Technology

5 Downloads

Preview:

Click to see full reader

DESCRIPTION

Learn the ins and outs of running background tasks with the popular python module Celery. We'll hit the ground running. With everything you need to know to run your first task, to scaling your stack to run millions each day.

TRANSCRIPT

PRACTICALCELERY

CAMERON MASKEtwitter: @cameronmaske

email: cam@trackmaven.com

web: http://cameronmaske.com

WHAT WE'LLCOVER...

WHAT IS CELERY?HOW DOES IT WORK?

USING CELERY, BESTPRACTICES AND SCALING.

SURVEY

CELERYASYNCHRONOUS

DISTRIBUTEDTASK QUEUE

OUT OF THEREQUEST/RESPONSE

CYCLE.Example: Sending emails asynchronously.

TASKS IN THEBACKGROUND.

Example: Computational heavy jobs.Example: Interacting with external APIs.

PERIODIC JOBS.

HISTORYPython.Released (0.1) in 2009.Currently on 3.1, with 3.2 in alpha.Developed by Ask Solem (@asksol)

ARCHITECTURE

PRODUCERProduces a task for the queue.

BROKERStores the task backlogAnswers, what work remains to be done?RabbitMQ, Redis, SQLAlchemy, Django's ORM, MongoDB...

WORKERExecute and consumes tasks.Distributed.

RESULTS BACKEND.Stores the results from our tasks.Redis, Redis, SQLAlchemy, Django's ORM, MongoDB...Optional!

EXAMPLE

from celery import Celery

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')

@app.taskdef add(x, y): return x + y

>>> result = add.delay(4, 4)>>> result.state'SUCCESS'>>> result.id'4cc7438e-afd4-4f8f-a2f3-f46567e7ca77'>>> result.get()8

http://celery.readthedocs.org/en/latest/reference/celery.result.html

PICK YOUR FLAVOR.@app.taskdef add(x, y): return x + y

add(2, 4)

class AddTask(app.Task): def run(self, x, y): return x + y

AddTask().run(2, 4)

# Asyncadd.delay(2, 4)add.apply_aync(args=(2, 4), expires=30)# Eager!result = add.apply(args=(2, 4)) # Executes locally.# Or...add(2, 4) # Does not return a celery result!

INTERGRATING WITHDJANGO.

BEWARE OF DJANGO-CELERY.

http://docs.celeryproject.org/en/master/django/first-steps-with-django.html

- project/ - config/__init__.py - config/settings.py - config/urls.py- manage.py

# project/config/celery.py

from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# Set the default Django settings module for the 'celery' program.os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

app = Celery('app')

# Using a string here means the worker will not have to# pickle the object when using Windows.app.config_from_object('django.conf:settings')app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)def debug_task(self): print('Request: {0!r}'.format(self.request))

# project/config/__init__.pyfrom __future__ import absolute_import

# This will make sure the app is always imported when# Django starts so that shared_task will use this app.from .celery import app as celery_app

__all__ = ['celery_app']

celery -A project worker -l info

TESTING# settings.pyimport sysif 'test' in sys.argv: CELERY_EAGER_PROPAGATES_EXCEPTIONS=True, CELERY_ALWAYS_EAGER=True, BROKER_BACKEND='memory'

PATTERNSAND BEST

PRACTICES.

NEVER PASS OBJECTS ASARGUMENTS.

# Bad@app.task()def send_reminder(reminder): reminder.send_email()

# Good@app.task()def send_reminder(pk): try: reminder = Reminder.objects.get(pk=pk) except Reminder.DoesNotExist: return reminder.send_email()

KEEP TASKS GRANUAL.CAN PROCESS MORE IN

PARALLEL.

AVOID LAUNCHINGSYNCHRONOUS

SUBTASKS

# Bad@app.taskdef update_page_info(url): page = fetch_page.delay(url).get() info = parse_page.delay(url, page).get() store_page_info.delay(url, info)

@app.taskdef fetch_page(url): return myhttplib.get(url)

@app.taskdef parse_page(url, page): return myparser.parse_document(page)

@app.taskdef store_page_info(url, info): return PageInfo.objects.create(url, info)

# Gooddef update_page_info(url): chain = fetch_page.s() | parse_page.s() | store_page_info.s(url) chain()

@app.task()def fetch_page(url): return myhttplib.get(url)

@app.task()def parse_page(page): return myparser.parse_document(page)

@app.task(ignore_result=True)def store_page_info(info, url): PageInfo.objects.create(url=url, info=info)

http://celery.readthedocs.org/en/latest/userguide/canvas.html

PERIODIC TASKS.http://celery.readthedocs.org/en/latest/userguide/periodic-

tasks.html

from datetime import timedelta

@app.periodic_task(run_every=timedelta(minutes=5)):def run_every_five(): pass

from datetime import timedelta

class RunEveryFive(app.PeriodicTask): run_every = timedelta(minutes=5) def run(self): pass

from datetime import timedelta

@app.task():def run_every_five(): pass

CELERYBEAT_SCHEDULE = { 'run-every-five': { 'task': 'tasks.run_every_five', 'schedule': timedelta(seconds=30) }, }

CRON STYLE.from celery.schedules import crontab

crontab(minute=0, hour='*/3') # Every 3 hours.crontab(day_of_week='sunday') # Every minute on Sundays.crontab(0, 0, 0, month_of_year='*/3') # First month of every quarter.

@app.periodic_task(run_every=crontab(minute=0, hour=1))def schedule_emails(): user_ids = User.objects.values_list('id', flat=True) for user_id in user_ids: send_daily_email.delay(user_id)

@app.task()def send_daily_email(user_id): user = User.objects.get(id=user_id) try: today = datetime.now() Email.objects.get( user=user, date__year=today.year, date__month=today.month, date__day=today.day) except Email.DoesNotExist: email = Email(user=user, body="Hey, don't forget to LOGIN PLEASE!") email.send() email.save()

CELERY BEAT A.K.A THESCHEDULER.

celery -A project beat

NEVER RUN A BEAT +WORKER ON A SINGLE

CELERY PROCESS.# Really bad idea....celery -A project worker -B

FREQUENTLY RUNNINGPERIODIC TASKS.

BEWARE OF "TASK STACKING"

Schedule task runs every 5 minutes.Tasks take 30 minutes.Schedule task stacks.Bad stuff.

EXPIRES!from time import sleep

@app.periodic_task(expires=5*60, run_every=timedelta(minutes=5))def schedule_task(): for _ in range(30): one_minute_task.delay()

@app.task(expires=5*60)def one_minute_task(): sleep(60)

THINGS GO WRONG INTASKS!

RETRY

from celery.exceptions import Retry

@app.task(max_retries=10)def gather_data(): try: data = api.get_data() # etc, etc, ... except api.RateLimited as e: raise Retry(exc=e, when=e.cooldown) except api.IsDown: return

ERROR INSIGHT.

SENTRY.

STAGES

class DebugTask(app.Task): def after_return(self, status, retval, task_id, args, kwargs, einfo): print("I'm done!")

def on_failure(self, exc, task_id, args, kwargs, einfo): print("I failed :(")

def on_retry(self, exc, task_id, args, kwargs, einfo): print("I'll try again!")

def on_success(self, retval, task_id, args, kwargs): print("I did it!")

ABSTRACTclass AbstractTask(app.Task): abstract = True def after_return(self, *args, **kwargs): print("All done!")

@app.task(base=AbstractTask)def add(x, y): return x + y

INSTANTIATIONclass DatabaseTask(app.Task): abstract = True _db = None

@property def db(self): if self._db is None: self._db = Database.connect() return self._db

ENSURE A TASK ISEXECUTED ONE AT A TIME

from celery import taskfrom celery.utils.log import get_task_loggerfrom django.core.cache import cachefrom django.utils.hashcompat import md5_constructor as md5from djangofeeds.models import Feed

logger = get_task_logger(__name__)

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes

@taskdef import_feed(feed_url): # The cache key consists of the task name and the MD5 digest # of the feed URL. feed_url_digest = md5(feed_url).hexdigest() lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest)

# cache.add fails if if the key already exists acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE) # memcache delete is very slow, but we have to use it to take # advantage of using add() for atomic locking release_lock = lambda: cache.delete(lock_id)

logger.debug('Importing feed: %s', feed_url) if acquire_lock(): try: feed = Feed.objects.import_feed(feed_url) finally: release_lock() return feed.url

logger.debug( 'Feed %s is already being imported by another worker', feed_url)

IMPORTANT SETTINGS

# settings.pyCELERY_IGNORE_RESULT = TrueCELERYD_TASK_SOFT_TIME_LIMIT = 500CELERYD_TASK_TIME_LIMIT = 1000

# tasks.pyapp.task(ignore_result=True, soft_time_limit=60, time_limit=120)def add(x, y): pass

# settings.pyCELERYD_MAX_TASKS_PER_CHILD = 500CELERYD_PREFETCH_MULTIPLIER = 4

BROKER

SO MANYCHOICES!

RabbitMQRedisSQLAlchemyDjango's ORMMongoDBAmazon SQSCouchDBBeanstalkIronMQ

DJANGO ORM.# settings.pyBROKER_URL = 'django://'INSTALLED_APPS = ( 'kombu.transport.django',)CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend'

python manage.py syncdb

DON'T DO THIS FORANYTHING SERIOUS.

USE RABBITMQ

C OPTIMIZED LIBRARY$ pip install librabbitmq

WORKERS

CONCURRENCYcelery -A project worker -C 10celery -A project worker --autoscale=10,1

INCREASED CONCURRENCY CANQUICKLY DRAIN CONNECTIONS ON

YOUR DATABASEUse a connection pooler (pgbouncer).

ROUTING

CELERY_ROUTES = { 'email.tasks.send_mail': { 'queue': 'priority', },}

# orsend_mail.apply_async(queue="priority")

celery -A project worker -Q email

DEDICATED WORKERS.

BOTTLENECKS

IdentifyFixRepeat

Make tasks faster.Reduce volume of tasks.

NEWRELIC

MONITORING IS VITAL.

RABBITMQ MANGEMENTPLUGIN

RABBITMQ MANGEMENT PLUGINHAS A GREAT HTTP API!

CELERY FLOWER

QUESTIONS?

top related