程序员文章、书籍推荐和程序员创业信息与资源分享平台

网站首页 > 技术文章 正文

RQ, 一个超酷的 Python 库

hfteth 2025-08-06 19:44:18 技术文章 2 ℃

RQ (Redis Queue) 是一个简单的 Python 库,用于将任务放入队列并通过工作进程( worker )在后台执行这些任务。它基于 Redis 构建,是一个轻量级的异步任务队列解决方案。

rq的安装及基本使用示例

RQ 核心概念

Queue队列

队列是一种先进先出( First In First Out ,简称 FIFO )的线性数据结构。它类似于现实生活中的排队场景,先到的人先接受服务。

基本操作

RQ 操作队列函数

入队操作

# low 为rq队列服务(worker)名称, 入队
q = Queue('low', connection=redis_conn)
q.enqueue(count_words_at_url, 'http://nvie.com')

队列其它操作

q = Queue(connection=redis_conn)

# 可以取到队列中的 任务(job)数量
print(len(q))

# 队列相关操作
queued_job_ids = q.job_ids # 从队列中获取任务(job)id的列表
queued_jobs = q.jobs # 从队列中获取队列任务(job)实例的列表
job = q.fetch_job('my_id') # 返回一个任务(job) id 为 'my_id'的实例

# 清空队列,这个将会删除所有的队列中的任务(job)
q.empty()

# 删除队列
q.delete(delete_jobs=True) # 传入 True 将会移除队列中所有的任务(job)

基于装饰器

定义 job 并使用装饰器 @job

#tasks.py
from rq import Queue
from rq.decorators import job
from redis import Redis

redis_conn = Redis()
queue = Queue('default', connection=redis_conn)

@job(queue=queue,connection=redis_conn)
def process_data(data):
"""实际的任务处理函数"""
print(f"Processing: {data}")
return data * 2

定义生产者

# producer.py
from tasks import process_data

# 提交任务
job = process_data.delay(10)
print(f"Job ID: {job.id}")

定义启动服务

from redis import Redis
from rq import Queue
from rq_win import WindowsWorker as Worker
redis_conn = Redis()
queue = Queue('default', connection=redis_conn)

import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent))

if __name__ == '__main__':
redis_conn = Redis()
worker = Worker([queue])
worker.work()

启动服务查看效果如下

python worker.py
python producer.py


队列高级应用

同步操作,不进入 Worker ,直接运行,测试使用

q = Queue('low', is_async=False, connection=my_redis_conn)
job = q.enqueue(fib, 8)

处理任务依赖性的能力可以将一个复杂的任务拆分为几个较小的任务。依赖于另一个任务的任务仅在其依赖关系成功完成时才入队列。

q = Queue('low', connection=my_redis_conn)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)

延迟队列及调度

当某个时间点执行队列

queue = Queue(name='default', connection=Redis())

# 调度任务会在当地时区的2025年07月29号9点15分运行
job = queue.enqueue_at(datetime(2025, 7, 29, 9, 15), say_hello)

延迟多久后执行

queue = Queue(name='default', connection=Redis())

# 调度任务会在10秒后运行
job = queue.enqueue_at(timedelta(seconds=10), say_hello)

其实,计划执行的任务不会放入队列中,而是存储在 ScheduledJobRegistry 中。

from rq import Queue
from rq.registry import ScheduledJobRegistry

redis = Redis()

queue = Queue(name='default', connection=redis)
job = queue.enqueue_in(timedelta(seconds=10), say_nothing)
print(job in queue) # 输出False因为job不在enqueue中

registry = ScheduledJobRegistry(queue=queue)
print(job in registry) # 输出True因为job在ScheduledJobRegistry中

运行任务调度运行 worker

rq worker --with-scheduler

获取定义启动服务

from rq import Worker, Queue
from redis import Redis

redis = Redis()

queue = Queue(connection=redis)
worker = Worker(queues=[queue], connection=redis)
worker.work(with_scheduler=True)

上一篇主要讲解了 rq 的使用,今天主要针对具体 Queue 做详细讲解。


Tags:

最近发表
标签列表