Table of Contents
- Introduction to Asynchronous Tasks
- What is Celery?
- Core Concepts in Celery
- Setting Up Celery
- Example: Basic Celery Task
- Running Celery Workers and Sending Tasks
- Celery with Flask Integration
- Celery with Django Integration
- Broker Choices: Redis vs RabbitMQ
- Best Practices for Production-Ready Celery Apps
- Conclusion
Introduction to Asynchronous Tasks
Many real-world applications need to perform time-consuming or resource-intensive operations without blocking the user’s experience. Examples include:
- Sending emails
- Generating reports
- Processing images
- Data backups
- Communicating with external APIs
If handled synchronously, these tasks would slow down your application, leading to a poor user experience.
The solution? Asynchronous task queues — where long-running tasks are delegated to a background worker system, allowing the main app to respond quickly.
What is Celery?
Celery is one of the most widely used task queue libraries for Python. It enables applications to asynchronously run tasks in the background by sending them to a distributed system of workers.
Celery works well with web frameworks like Flask, Django, FastAPI, and can integrate with a variety of message brokers like RabbitMQ or Redis.
Why Use Celery?
- Asynchronous execution
- Scheduled tasks (like cron jobs)
- Retries on failure
- Result backend storage
- High scalability with multiple workers
Core Concepts in Celery
Concept | Description |
---|---|
Task | A Python function that is run asynchronously. |
Worker | A process that continuously listens for new tasks and executes them. |
Broker | A message queue (e.g., Redis, RabbitMQ) that transports messages between clients and workers. |
Result Backend | Stores the result of completed tasks (optional). |
Setting Up Celery
First, install Celery:
pip install celery
Optionally, if using Redis as a broker:
pip install redis
Example: Basic Celery Task
Let’s create a simple project structure:
mkdir celery_example
cd celery_example
touch tasks.py
tasks.py
from celery import Celery
# Create a Celery instance
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
- We defined a simple task
add(x, y)
. - Broker URL points to a local Redis server.
Running Celery Workers and Sending Tasks
First, start your Redis server if not already running:
redis-server
Next, in a terminal window, start a Celery worker:
celery -A tasks worker --loglevel=info
Now, in another Python shell or script:
from tasks import add
result = add.delay(4, 6) # .delay() sends the task asynchronously
print(result.id) # Unique ID
print(result.get(timeout=10)) # Wait for the result
You will see the worker pick up the task, process it, and return the result 10
.
Celery with Flask Integration
Integrating Celery with a Flask app involves a few steps. Let’s see a minimal example:
app.py
from flask import Flask, request, jsonify
from celery import Celery
app = Flask(__name__)
# Configure Celery
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost:6379/0'
def make_celery(app):
celery = Celery(
app.import_name,
broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND']
)
celery.conf.update(app.config)
return celery
celery = make_celery(app)
@app.route('/add', methods=['POST'])
def add_task():
data = request.json
task = add_numbers.delay(data['x'], data['y'])
return jsonify({'task_id': task.id}), 202
@celery.task
def add_numbers(x, y):
return x + y
if __name__ == '__main__':
app.run(debug=True)
Now:
- Start Redis
- Start the Celery worker
- Run Flask
- Send a POST request to
/add
with JSON payload like{"x": 10, "y": 5}
You will receive a task ID, which you can later use to check the task status.
Celery with Django Integration
Celery easily integrates with Django projects as well.
Install:
pip install django-celery-beat
settings.py
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
celery.py (inside the Django project)
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
init.py (inside the Django project)
from .celery import app as celery_app
__all__ = ('celery_app',)
Define tasks inside app folders (tasks.py
) and you are ready to queue them.
Broker Choices: Redis vs RabbitMQ
Feature | Redis | RabbitMQ |
---|---|---|
Ease of Setup | Very easy | Slightly complex |
Performance | Extremely fast (memory-based) | Very reliable messaging protocol |
Use Case | Best for simple task queues | Best for complex workflows and message guarantees |
Both are excellent options. Redis is generally easier for beginners.
Best Practices for Production-Ready Celery Apps
- Use Dedicated Queues: Separate critical tasks and low-priority tasks into different queues.
- Monitor Celery Workers: Use Flower, Prometheus, or Grafana to monitor.
- Graceful Shutdowns: Properly manage worker shutdown to avoid losing tasks.
- Retry Failed Tasks: Implement retry mechanisms on transient errors.
- Use Result Expiration: Prevent memory bloat by expiring old results.
- Security: Avoid putting sensitive data directly into tasks.
Example of auto-retrying a task:
@app.task(bind=True, max_retries=3)
def unreliable_task(self):
try:
# Some logic
pass
except Exception as exc:
raise self.retry(exc=exc, countdown=5)
Conclusion
Celery is a powerful tool that enables asynchronous, distributed task execution in Python applications. Whether you are sending emails, processing large files, or building complex workflows, Celery can handle it efficiently and reliably.
Understanding how to integrate it with Flask, Django, or standalone Python applications is essential for scaling real-world projects.