网站首页 > 技术文章 正文
RQ (Redis Queue)
是一个简单的 Python
库,用于将任务放入队列并通过工作进程( worker
)在后台执行这些任务。它基于 Redis
构建,是一个轻量级的异步任务队列解决方案。
RQ 核心概念
Queue队列
队列是一种先进先出( First In First Out
,简称 FIFO
)的线性数据结构。它类似于现实生活中的排队场景,先到的人先接受服务。
基本操作
RQ
操作队列函数
入队操作
# low 为rq队列服务(worker)名称, 入队
q = Queue('low', connection=redis_conn)
q.enqueue(count_words_at_url, 'http://nvie.com')
队列其它操作
q = Queue(connection=redis_conn)
# 可以取到队列中的 任务(job)数量
print(len(q))
# 队列相关操作
queued_job_ids = q.job_ids # 从队列中获取任务(job)id的列表
queued_jobs = q.jobs # 从队列中获取队列任务(job)实例的列表
job = q.fetch_job('my_id') # 返回一个任务(job) id 为 'my_id'的实例
# 清空队列,这个将会删除所有的队列中的任务(job)
q.empty()
# 删除队列
q.delete(delete_jobs=True) # 传入 True 将会移除队列中所有的任务(job)
基于装饰器
定义 job
并使用装饰器 @job
#tasks.py
from rq import Queue
from rq.decorators import job
from redis import Redis
redis_conn = Redis()
queue = Queue('default', connection=redis_conn)
@job(queue=queue,connection=redis_conn)
def process_data(data):
"""实际的任务处理函数"""
print(f"Processing: {data}")
return data * 2
定义生产者
# producer.py
from tasks import process_data
# 提交任务
job = process_data.delay(10)
print(f"Job ID: {job.id}")
定义启动服务
from redis import Redis
from rq import Queue
from rq_win import WindowsWorker as Worker
redis_conn = Redis()
queue = Queue('default', connection=redis_conn)
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).parent))
if __name__ == '__main__':
redis_conn = Redis()
worker = Worker([queue])
worker.work()
启动服务查看效果如下
python worker.py
python producer.py
队列高级应用
同步操作,不进入 Worker
,直接运行,测试使用
q = Queue('low', is_async=False, connection=my_redis_conn)
job = q.enqueue(fib, 8)
处理任务依赖性的能力可以将一个复杂的任务拆分为几个较小的任务。依赖于另一个任务的任务仅在其依赖关系成功完成时才入队列。
q = Queue('low', connection=my_redis_conn)
report_job = q.enqueue(generate_report)
q.enqueue(send_report, depends_on=report_job)
延迟队列及调度
当某个时间点执行队列
queue = Queue(name='default', connection=Redis())
# 调度任务会在当地时区的2025年07月29号9点15分运行
job = queue.enqueue_at(datetime(2025, 7, 29, 9, 15), say_hello)
延迟多久后执行
queue = Queue(name='default', connection=Redis())
# 调度任务会在10秒后运行
job = queue.enqueue_at(timedelta(seconds=10), say_hello)
其实,计划执行的任务不会放入队列中,而是存储在 ScheduledJobRegistry
中。
from rq import Queue
from rq.registry import ScheduledJobRegistry
redis = Redis()
queue = Queue(name='default', connection=redis)
job = queue.enqueue_in(timedelta(seconds=10), say_nothing)
print(job in queue) # 输出False因为job不在enqueue中
registry = ScheduledJobRegistry(queue=queue)
print(job in registry) # 输出True因为job在ScheduledJobRegistry中
运行任务调度运行 worker
rq worker --with-scheduler
获取定义启动服务
from rq import Worker, Queue
from redis import Redis
redis = Redis()
queue = Queue(connection=redis)
worker = Worker(queues=[queue], connection=redis)
worker.work(with_scheduler=True)
上一篇主要讲解了 rq
的使用,今天主要针对具体 Queue
做详细讲解。
猜你喜欢
- 2025-08-06 生产环境中使用的十大 Python 设计模式
- 2025-08-06 面试必备:Python内存管理机制(建议收藏)
- 2025-08-06 服务端开发面试必背——消息队列及它的主要用途和优点。附代码
- 2025-08-06 Python 栈:深度解析与应用
- 2025-08-06 Python中的多进程
- 2025-08-06 Python Logging 最佳实践
- 2025-08-06 Python并发数据结构实现原理
- 2025-08-06 用SendGrid和Redis队列用Python调度国际空间站的电子邮件
- 2025-08-06 Python教程(三十五):数据库操作进阶
- 2025-08-06 Python倒车请注意!负步长range的10个高能用法,让代码效率翻倍
- 08-06生产环境中使用的十大 Python 设计模式
- 08-06面试必备:Python内存管理机制(建议收藏)
- 08-06服务端开发面试必背——消息队列及它的主要用途和优点。附代码
- 08-06Python 栈:深度解析与应用
- 08-06Python中的多进程
- 08-06Python Logging 最佳实践
- 08-06Python并发数据结构实现原理
- 08-06用SendGrid和Redis队列用Python调度国际空间站的电子邮件
- 最近发表
- 标签列表
-
- python中类 (31)
- python 迭代 (34)
- python 小写 (35)
- python怎么输出 (33)
- python 日志 (35)
- python语音 (31)
- python 工程师 (34)
- python3 安装 (31)
- python音乐 (31)
- 安卓 python (32)
- python 小游戏 (32)
- python 安卓 (31)
- python聚类 (34)
- python向量 (31)
- python大全 (31)
- python次方 (33)
- python桌面 (32)
- python总结 (34)
- python浏览器 (32)
- python 请求 (32)
- python 前端 (32)
- python验证码 (33)
- python 题目 (32)
- python 文件写 (33)
- python中的用法 (32)