Apong's Blog

当你快坚持不住的时候,困难也快坚持不住了

0%

Redis的Stream消息队列应用

熟悉Stream

独立消费者

发布消息

XADD KEY [NOMKSTREAM] *|ID field value [field value ...]

添加一条消息到 key 的队列中去,如果不存在则创建队列

*:自动生成id

ID:自定义id,需遵循时间戳-序号的格式

field,value:消息键值对,允许一条消息有多个键值对,类似hash

NOMKSTREAM:如果队列不存在,不会创建

订阅消息

XREAD COUNT count BLOCK milliseconds STREAMS key [key ...] ID [ID ...]

可以从多个stream中读取从ID往后的count条消息,如果暂时没有则等待xx毫秒

COUNT:读取数量

BLOCK:等待时间

ID:消息起始ID,可以传特殊值 0:第一个,$:最新消息。

$:是最新消息,返回开启了阻塞等待后的第一条消息,存在漏读的问题,往后的二三条都读不到,如果COUNT为1。

特点

  1. 消息可以追溯,不是读完就消失了
  2. 一条消息可以被多个消费者读取
  3. 可以阻塞读取
  4. 有消息漏读的风险

消费者组

将消息和消费者分组,每一组的消息由组内的消费者消费

特点:

  1. 消息分流,同组的信息由多个消费者同时处理
  2. 消息标识,消费者组会维护一个标识,记录着最后一个被处理的消息,确保每次读取未处理的
  3. 消息确认,如果被读取的消息没有确认完成,会存在每个消费者的pending-list,供每个消费者继续消费。

创建消费者组

XGROUP CREATE key groupName MKSTREAM

给stream创建一个消费者组,如果Stream不存在则自动创建

key:队列的key

groupName:消费者组名

MKSTREAM:自动创建队列

消费者组读取消息

XREADGROUP GROUP group consumer COUNT count BLOCK mills STREAMS key ID

先写命令 XREADGROUP GROUP,再写 group consumer 组名和消费者名,再写读取数量和等待时间,最后写队列key和起始消息ID

ID有两类特殊值:

  1. “>”,从下一个未消费的消息开始
  2. 其他:从各自消费者的pending-list中取,例如0,表示list中的第一个消息。

确认消息

xack

异步秒杀应用Stream消息队列

准备工作:

  1. 创建消费者组,自动创建队列

书写lua代码:

  1. 判断是否有秒杀资格
  2. 如果有则加入消息队列

书写Java代码:

  1. 持续监听下单消息
  2. 取出下单消息 xreadgroup
  3. 如果不存在,则继续监听
  4. 如果处理失败,就转入pending-list处理
  5. 取出待处理消息 xreadgroup
  6. 如果处理失败,继续取出处理,直到处理完毕
  7. 最后确认消息 xack streamKey groupName messageId