当前位置:首页 > 问答 > 正文

用Redis消息队列搞生产者消费者那事儿,消费和生产怎么配合起来玩

关于如何使用Redis实现生产者消费者模式,我们可以把它想象成一个现实中的场景:一个忙碌的快递分拣中心,生产者就像是从各地不断开来、卸下包裹的货车,而消费者就是分拣线上的工人,他们从传送带上取走包裹进行处理,Redis的消息队列,就是这个连接货车和工人的核心传送带。

这个“传送带”在Redis里最常用的数据结构就是列表(List),列表天生就支持从一头推进元素,从另一头弹出元素,这完美契合了“先进先出”的队列特性。

生产者:往传送带上放包裹

生产者的任务很简单,就是产生“消息”(也就是我们的包裹),然后把它放到队列里,在Redis中,我们使用 LPUSH 命令(Left Push),意思是从列表的左边插入元素,这样,最新的消息会放在队列的头部。

举个例子,假设我们的队列名字叫 my_queue。 生产者代码(以Python为例)的核心部分大概长这样:

import redis
# 连接到Redis服务器
r = redis.Redis(host='localhost', port=6379, db=0)
# 生产者不断产生任务
for i in range(10):
    task = f"这是第 {i} 个任务"
    # 将任务从列表左侧放入队列
    r.lpush('my_queue', task)
    print(f'生产者放入了: {task}')

这就好比货车司机把一个个包裹放到了传送带的起点,无论有多少辆货车(多个生产者),它们的行为都是一致的:不断地往传送带开头堆放包裹。

消费者:从传送带上取包裹

消费者的任务是时刻盯着传送带,一旦有包裹过来,就取走并处理,在Redis中,最直接的方式是使用 RPOP 命令(Right Pop),从列表的右边弹出元素,因为生产者从左边推入,所以最右边的是最早进入队列的消息,这样就保证了公平性,先来的任务先被处理。

一个简单的消费者代码可能是这样的:

import redis
r = redis.Redis(host='localhost', port=6379, db=0)
while True:
    # 从队列右侧弹出一个任务,如果队列为空则返回None
    task = r.rpop('my_queue')
    if task:
        # 处理任务
        print(f'消费者处理了: {task.decode()}')
    else:
        # 如果队列为空,休息一下再继续检查,避免无意义的循环消耗CPU
        time.sleep(1)

这个消费者就像一个不知疲倦的工人,一直守在传送带的尽头,有包裹就立刻处理,没有就等一秒再看,这种方式被称为“轮询”,虽然简单,但有个缺点:当队列为空时,消费者会陷入空转,频繁地询问Redis,造成不必要的网络开销和CPU资源浪费。

更高效的配合:阻塞式读取

为了解决轮询的低效问题,Redis提供了一个更强大的命令:BRPOP(Blocking Right Pop),这个命令是“阻塞”版本的RPOP,当消费者调用它时,如果队列是空的,它不会立刻返回,而是会一直连接在那里等待,直到有新的消息被放入队列,或者等待超时。

这就像是工人不再需要不停地低头看传送带有没有货,而是可以靠在旁边休息,传送带系统(Redis)会有一个铃铛,只有当新包裹真的来了的时候,铃铛才会响,通知工人来取货,这样工人(消费者)在等待期间可以完全休息,节省了精力。

使用 BRPOP 的消费者代码会高效很多:

import redis
r = redis.Redis(host='localhost', port=6379, db=0)
while True:
    # 使用brpop,设置超时时间为0表示无限等待直到有消息到来
    # 返回的是一个元组 (queue_name, message)
    result = r.brpop('my_queue', timeout=0)
    if result:
        queue_name, task = result
        print(f'消费者通过阻塞方式处理了: {task.decode()}')

这里的 timeout=0 意味着无限期阻塞等待,在实际应用中,可以设置一个超时时间(比如30秒),这样即使一直没消息,客户端也可以有机会做其他检查(比如检查自身是否应该退出),然后重新发起等待。

多消费者协作

一个工人处理速度可能跟不上货车送货的速度,这时,我们可以启动多个消费者(工人)同时守在传送带的尽头,Redis队列的一个巨大优势就在这里体现:多个消费者同时使用 BRPOP 命令从同一个队列取消息时,Redis会确保一条消息只会被其中一个消费者拿到

这是一种天然的负载均衡,比如有三个消费者C1、C2、C3,当生产者P1放入任务T1,P2放入任务T2时,Redis会公平地将T1分配给正在等待的某一个消费者(比如C2),将T2分配给另一个消费者(比如C1),这样,任务就被并行处理了,整个系统的吞吐量大大提升。

需要注意的问题

这种模式虽然简单强大,但也有一些需要注意的地方:

  1. 消息确认:在我们的简单模型里,工人一旦从传送带上拿起包裹,这个包裹就从传送带上消失了,但如果工人在处理包裹时突然生病了(消费者进程崩溃),这个包裹就永远丢失了,对于不允许丢失任务的应用,需要更复杂的“确认”机制,比如使用Redis的有序集合(Sorted Set)等结构来实现。
  2. 队列监控:需要留意队列的长度,如果生产者速度持续远大于消费者速度,队列会变得越来越长,最终占满Redis内存,可以通过 LLEN 命令监控队列长度。
  3. 生产速度过快:如果生产者太快,除了增加消费者,也可能需要在生产者端进行限流,或者使用多个队列进行分片。

用Redis列表实现生产者消费者模式,核心就是生产者 LPUSH,消费者 BRPOP,这种方式简单、高效、支持多消费者并行工作,是解决不同系统组件间速度不匹配、进行异步处理的经典方案,就像是一个设计良好的分拣中心,只要传送带(Redis)足够坚固,就能让货车(生产者)和工人(消费者)高效、协调地运转起来。

用Redis消息队列搞生产者消费者那事儿,消费和生产怎么配合起来玩