Step 4: Celery based background tasks

Flask-AppFactory includes optional support for Celery integration via the Flask-CeleryExt extension. If you wish to use it, be sure to install Flask-AppFactory like this:

pip install Flask-AppFactory[celery]

To enable Celery support we add one file to our application package, and one file to our reusable package:

myapp/celery.py
mymodule/tasks.py

Configuration

In our configuration we first have to add the Flask-CeleryExt extension (line 5) as well as define the Celery broker via the BROKER_URL variable (line 13). Note in this example we use a local Redis installation. For other options see the Celery documentation.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
# myapp/config.py

EXTENSIONS = [
    "flask_appfactory.ext.jinja2",
    "flask_celeryext:FlaskCeleryExt",
    "myexts.sqlalchemy",
]

PACKAGES = [
    "mymodule",
]

BROKER_URL = "redis://localhost:6379/0"

The Flask-CeleryExt takes care of creating a minimal Celery application with the correct configuration so Celery knows e.g. which broker to use. In addition the minimal Celery application doesn’t load any tasks to ensure faster startup time.

See the Celery documentation for all the possible configuration variables.

Celery application

Next, we create a small Celery application which is later used to start the Celery worker from.

1
2
3
4
5
6
# myapp/celery.py

from flask_appfactory.celery import celeryfactory
from .app import create_app

celery = celeryfactory(create_app())

Reusable tasks

The reusable packages can easily define Celery tasks by adding a tasks.py file like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# mymodule/tasks.py

from celery import shared_task
from flask import current_app, request
from flask_celeryext import RequestContextTask


@shared_task
def appctx():
    current_app.logger.info(current_app.config['BROKER_URL'])


@shared_task(base=RequestContextTask)
def reqctx():
    current_app.logger.info(request.method)

Notice the use of @shared_task decorator (line 8). This ensures that the task can be reused by many different Celery applications. The Celery application created above takes care of register the tasks.

Each task is executed within a Flask application context (notice the use of e.g. current_app). If you need to have a task executed in a request context (e.g. if you need to ensure before first request functions have been run), you just have to change the base class to use the RequestContextTask (line 13).

Running the worker

Next, you can start a Celery worker by simply pointing Celery to your new Celery application in myapp.celery:

$ celery worker -A myapp.celery

Sending tasks

Executing tasks requires you to have your Flask application initialized, hence simply start a Python shell using your management script:

$ myapp shell
...
>>> from mymodule.tasks import appctx
>>> appctx.delay()