Django Celery

matclayton 12,822 views 25 slides Jul 27, 2010
Slide 1
Slide 1 of 25
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25

About This Presentation

Quick overview of Celery for Django


Slide Content

celery
distributed task
@matclayton

warning

background

what is celery?

what is celery?
"celery is an open source asynchronous task queue/
job
queue based on distributed message I. It is focused

what is celery?
"celery is an open source asynchronous task queue/
job
queue based on distributed message passing. It is
but what does that

•distributed
•concurrently
•in the background
you can run

•external api calls (twitter)
•long running tasks (transcoding)
•concurrent execution (batch image
resize)
•load balancing (across servers)
use cases

sql
SQL
AMQP
or
STOMP
carroORM
celerydjango

result
•database
•AMQP
•cache
•tokyo tyrant
•redis
•mongodb

components
1.views.py / management
command
2.broker – RabbitMQ
3.workers

workflow
http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-

rabbits and
warrens

setting up the
$ sudo apt-get install rabbitmq-
server
$ sudo pip install celery
$ rabbitmqctl add_user myuser
mypassword
$ rabbitmqctl add_vhost myvhost
$ rabbitmqctl set_permissions -p

setup
INSTALLED_APPS += ("djcelery", )
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = “myuser"
BROKER_PASSWORD = “mypassword"
BROKER_VHOST = “myvhost"
CELERY_QUEUES = {
    "regular_tasks": {
        "binding_key": "task.#",
    },
    "twitter_tasks": {
        "binding_key": "twitter.#",
    },
    "feed_tasks": {
        "binding_key": "feed.#",
    },
}
settings.p
$ python manage.py celeryd -B

hello
from celery.decorators import task

@task
def add(x, y):
return x + y
tasks.p
>>> result = add.delay(4, 4)
>>> result.wait() # wait
8

post to
from celery.task import Task
class UpdateStatus(Task):
    name = "twitter.updatestatus"
    routing_key = 'twitter.updatestatus'
    ignore_result = True
        
    def run(self, tweet, **kwargs):
        post_to_twitter(tweet)
        
from twitter.tasks import UpdateStatus
UpdateStatus.delay(tweet=‘hello world’)
tasks.p
views.p

retry / rate
from celery.task import Task
class UpdateStatus(Task):
    name = "twitter.updatestatus"
    routing_key = 'twitter.updatestatus'
    ignore_result = True
    default_retry_delay = 5 * 60
    max_retries = 12 # 1 hour retry
rate_limit = ‘10/s’
    
    def run(self, tweet, **kwargs):
        try:
post_to_twitter(tweet)
        except Exception, exc:
            # If twitter crashes retry
            self .retry([tweet,], kwargs, exc=exc)
from twitter.tasks import UpdateStatus
UpdateStatus.delay(tweet=‘hello world’)
tasks.p
views.p

podcast
from celery.task import PeriodicTask
class FeedImportPeriodicTask(PeriodicTask):
    run_every = timedelta(hours=1)
    routing_key = 'feed.periodic_import'
    def run(self, **kwargs):
        logger = self.get_logger(**kwargs)
        logger .info("Running Periodic Feed Import task!")
        update_podcasts(silent =False)
tasks.p

class FeedImporter(Task):
    name = "feed.import"
    routing_key = 'feed.import'
    ignore_result = True
    default_retry_delay = 5 * 60 # retry in 5 minutes
    max_retries = 72 # 6 Hours to cover major outages
    def run(self, podcast_id, **kwargs):
        try:
            logger = self.get_logger(**kwargs)
            # The cache key consists of the task name and the MD5 digest of the feed id.
            lock_id = "%s-lock-%s" % (self.name, podcast_id)
            is_locked = lambda: str(cache.get(lock_id)) == "true"
            acquire_lock = lambda: cache.set(lock_id, "true", 300)
            # memcache delete is very slow, so we'd rather set a false value
            # with a very low expiry time.
            release_lock = lambda: cache.set(lock_id, "nil", 1)
    
            logger .debug("Trying to import feed: %s" % podcast_id)
            if is_locked():
                logger .debug("Feed %s is already being imported by another worker" % podcast_id)
                return
            acquire_lock()
            try:
                import_feed(logger, podcast_id)
            finally:
                release_lock()
        except Exception, exc:
            logger .error(exc)
tasks.p

typical
•running out of disk space ==
rabbitmq fail
•queue priorities, difficult
•non-pickle-able errors
•crashing consumers

•tasksets / callbacks
•remote control tasks
•abortable tasks
•eta – run tasks at a set time
•HttpDispatchTask
•expiring tasks
•celerymon
•celeryev
•ajax views other cool

•http://github.com/ask/celery
•http://github.com/ask/django-
celery
•irc.freenode.net #celery (asksol
owner, always helpful and about)
finding

@matclayton
[email protected]