博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Redis 做消息队列
阅读量:3947 次
发布时间:2019-05-24

本文共 4355 字,大约阅读时间需要 14 分钟。

Redis 做消息队列

1. Redis 做消息队列

我们平时说到消息队列,一般都是指 RabbitMQ、ActiveMQ、RocketMQ 以及大数据里的 KafKa,这些都是我们比较常见的消息中间件,也是非常专业的消息中间件,作为专业的中间见,它也提供了很多功能。

但是,当我们需要使用消息中间件的时候,我们要根据实际情况来选择,例如我们现在要做的消息队列功能很简单,我们可以直接使用 Redis 来做消息队列

Redis 的消息队列不是特别专业,因为它没有很多高级特性,适用于简单的场景,如果对于消息队列可靠性有很大的要求,那么可以使用专业级别的消息中间件来做这些事。

1.1 消息队列

Redis 做消息队列,使用它里面的List 数据结构就可以实现,我们可以使用 lpush/rpush 操作来实现入队,然后使用 lpop/rpop 来实现出队

回顾一下之前的 Redis 命令:

在这里插入图片描述
在客户端(例如 Java 端),我们会维护一个死循环来不停的从队列中读取消息,并处理,如果队列中有消息,则直接获取到,如果没有消息,就会陷入死循环,直到下一次有消息进入,这种死循环会造成大量的资源浪费,这个时候,我们可以使用 blpop/brpop 这些命令

1.2 延迟消息队列

延迟消息队列可以通过zset来实现,因为 zset 中有一个 score,我们可以把时间作为 score将 value 存到 redis 中,然后通过轮询的方式,去不断的读取消息出来。

首先,如果消息是一个字符串,直接发送即可,如果是一个对象,则需要对对象进行序列化,这里我们 使用 JSON 来实现序列化和反序列化。

所以,首先在项目中,添加 JSON 依赖:

com.fasterxml.jackson.core
jackson-databind
2.10.3

接下来,构造一个消息对象:

public class JavaboyMessage {
private String id; private Object data; @Override public String toString() {
return "JavaboyMessage{" + "id='" + id + '\'' + ", data=" + data + '}'; } public String getId() {
return id; } public void setId(String id) {
this.id = id; } public Object getData() {
return data; } public void setData(Object data) {
this.data = data; }}

接下来封装一个消息队列:

public class DelayMsgQueue {
private Jedis jedis; private String queue; public DelayMsgQueue(Jedis jedis, String queue) {
this.jedis = jedis; this.queue = queue; } /** * 消息入队 * * @param data 要发送的消息 */ public void queue(Object data) {
// 构造一个 JavaboyMessage JavaboyMessage msg = new JavaboyMessage(); msg.setId(UUID.randomUUID().toString()); msg.setData(data); try {
// 序列化 String s = new ObjectMapper().writeValueAsString(msg); System.out.println("msg publish:" + new Date()); // 消息发送,score 延迟5秒 jedis.zadd(queue, System.currentTimeMillis() + 5000, s); } catch (JsonProcessingException e) {
e.printStackTrace(); } } /** * 消息消费 */ public void loop() {
while (!Thread.interrupted()) {
// 读取 score 在 0 到当前时间戳之间的消息 Set
zrange = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1); if (zrange.isEmpty()) {
// 如果消息是空的,则休息 500 毫秒然后继续 try {
Thread.sleep(500); } catch (InterruptedException e) {
break; } continue; } // 如果读取到消息,则直接读取消息出来 String next = zrange.iterator().next(); if (jedis.zrem(queue, next) > 0) {
// 抢到消息了,接下来处理业务 try {
JavaboyMessage msg = new ObjectMapper().readValue(next, JavaboyMessage.class); System.out.println("receive msg:" + msg); } catch (JsonProcessingException e) {
e.printStackTrace(); } } } }}

然后测试:

public class DelayMsgTest {
public static void main(String[] args) {
Redis redis = new Redis(); redis.execute(jedis -> {
// 构造一个消息队列 DelayMsgQueue queue = new DelayMsgQueue(jedis, "javaboy-delay-queue"); // 构造消息生产者 Thread producter = new Thread() {
@Override public void run() {
for (int i = 0; i < 5; i++) {
// 调用消息入队方法:5条消息入队 queue.queue("www.javaboy.org>>>" + i); } } }; // 构造一个消息消费者 Thread consumer = new Thread(){
@Override public void run() {
// 调用消息消费方法 queue.loop(); } }; // 启动线程 producter.start(); consumer.start(); // 休息 7 秒后,停止程序,简单测试一下 try {
Thread.sleep(7000); consumer.interrupt(); } catch (InterruptedException e) {
e.printStackTrace(); } }); }}

转载地址:http://opqwi.baihongyu.com/

你可能感兴趣的文章
簡單工廠模式
查看>>
SQL Server的數據類型
查看>>
php的正则表达式&nbsp;&#039;/\b\w…
查看>>
ThinkPHP的标签制作及标签调用解析…
查看>>
thrift的lua实现
查看>>
编写高性能的Lua代码
查看>>
Python正则表达式指南
查看>>
LUA--thrift--lib库的创建生成
查看>>
Shell开启扩展模式匹配shopt -s extglob
查看>>
浅谈 URI 及其转义
查看>>
nginx 优化
查看>>
openresty+lua在反向代理服务中的玩法
查看>>
ClickHouse集群搭建从0到1
查看>>
nginx实现请求的负载均衡 + keepalived实现nginx的高可用
查看>>
linux shell 中数组的定义和for循环遍历的方法
查看>>
求1!+2!+3!....+20!(java代码)
查看>>
VMware安装Ubuntu系统无法选择语言
查看>>
QT5.12安装
查看>>
Git/Github初步使用记录
查看>>
QT 开发问题合集
查看>>