熟悉Stream
独立消费者
发布消息
XADD KEY [NOMKSTREAM] *|ID field value [field value ...]
添加一条消息到 key 的队列中去,如果不存在则创建队列
*:自动生成id
ID:自定义id,需遵循时间戳-序号的格式
field,value:消息键值对,允许一条消息有多个键值对,类似hash
NOMKSTREAM:如果队列不存在,不会创建
订阅消息
XREAD COUNT count BLOCK milliseconds STREAMS key [key ...] ID [ID ...]
可以从多个stream中读取从ID往后的count条消息,如果暂时没有则等待xx毫秒
COUNT:读取数量
BLOCK:等待时间
ID:消息起始ID,可以传特殊值 0:第一个,$:最新消息。
$:是最新消息,返回开启了阻塞等待后的第一条消息,存在漏读的问题,往后的二三条都读不到,如果COUNT为1。
特点
- 消息可以追溯,不是读完就消失了
- 一条消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
消费者组
将消息和消费者分组,每一组的消息由组内的消费者消费
特点:
- 消息分流,同组的信息由多个消费者同时处理
- 消息标识,消费者组会维护一个标识,记录着最后一个被处理的消息,确保每次读取未处理的
- 消息确认,如果被读取的消息没有确认完成,会存在每个消费者的pending-list,供每个消费者继续消费。
创建消费者组
XGROUP CREATE key groupName MKSTREAM
给stream创建一个消费者组,如果Stream不存在则自动创建
key:队列的key
groupName:消费者组名
MKSTREAM:自动创建队列
消费者组读取消息
XREADGROUP GROUP group consumer COUNT count BLOCK mills STREAMS key ID
先写命令 XREADGROUP GROUP
,再写 group consumer
组名和消费者名,再写读取数量和等待时间,最后写队列key和起始消息ID
ID有两类特殊值:
- “>”,从下一个未消费的消息开始
- 其他:从各自消费者的pending-list中取,例如0,表示list中的第一个消息。
确认消息
xack
异步秒杀应用Stream消息队列
准备工作:
- 创建消费者组,自动创建队列
书写lua代码:
- 判断是否有秒杀资格
- 如果有则加入消息队列
书写Java代码:
- 持续监听下单消息
- 取出下单消息 xreadgroup
- 如果不存在,则继续监听
- 如果处理失败,就转入pending-list处理
- 取出待处理消息 xreadgroup
- 如果处理失败,继续取出处理,直到处理完毕
- 最后确认消息 xack streamKey groupName messageId