Getting started¶
Requirements¶
- Python 3.9+
- MongoDB (CI uses MongoDB 7)
Install¶
pip install mongo-taskqueue
Async support:
pip install "mongo-taskqueue[async]"
Create a queue¶
from mongotq import get_task_queue
queue = get_task_queue(
database_name="app",
collection_name="jobs",
host="mongodb://localhost:27017",
ttl=-1,
)
Enqueue tasks¶
queue.append({"job": "email", "to": "alice"})
queue.append({"job": "report", "id": 42}, priority=5)
Worker loop¶
import time
from mongotq import get_task_queue
queue = get_task_queue(
database_name="app",
collection_name="jobs",
host="mongodb://localhost:27017",
ttl=-1,
visibility_timeout=30,
)
while True:
task = queue.next()
if task is None:
time.sleep(0.5)
continue
try:
# do work using task.payload
queue.on_success(task)
except Exception as exc:
queue.on_failure(task, error_message=str(exc))
Common patterns¶
- Use
visibility_timeoutto prevent multiple workers from processing the same task at the same time. - Call
refresh()periodically to requeue expired leases and discard tasks over retry limits. - Use
dedupe_keyfor idempotent enqueue.