uname -a
准备工作
dev-sidecar
的东西安装,这样以后进入github就很快了,还有另外的很多方式,不介绍了。
rpm -ivh erlang文件
命令
yum install rpm
指令即可安装rpm
yum install socat -y
6、启动RabbitMQ服务
启动服务 sbin/service rabbitmq-server start 停止服务 /sbin/service rabbitmq-server stop 查看启动状态 /sbin/service rabbitmq-server status 开启开机自动 chkconfig rabbitmq-server on
1、停止RabbitMQ服务 service rabbitmq-server stop // 使用上面的命令 /sbin/service rabbitmq-server stop也行 2、安装插件 rabbitmq-plugins enable rabbitmq_management 3、开启RabbitMQ服务 service rabbitmq-server start
# 查看防火墙状态 systemctl status firewalld # 关闭防火墙 systemctl stop firewalld # 一劳永逸 禁用防火墙 systemctl enable firewalld
需要保证自己的Linux中有Docker容器,教程链接:https://www.cnblogs.com/xiegongzi/p/15621992.html
使用下面的两种方式都不需要进行web管理插件的安装和erlang的安装
docker images
docker rmi 镜像ID // 如上例的 dockerrmi 16c 即可删除镜像
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management
docker ps
# 拉取镜像 docker pull 镜像名称 # 查看全部镜像 docker images # 删除镜像 docker rmi 镜像ID # 将本地的镜像导出 docker save -o 导出的路径 镜像id # 加载本地的镜像文件 docker load -i 镜像文件 # 修改镜像名称 docker tag 镜像id 新镜像名称:版本 # 简单运行操作 docker run 镜像ID | 镜像名称 # 跟参数的运行 docker run -d -p 宿主机端口:容器端口 --name 容器名称 镜像ID | 镜像名称 # 如:docker run -d -p 8081:8080 --name tomcat b8 # -d:代表后台运行容器 # -p 宿主机端口:容器端口:为了映射当前Linux的端口和容器的端口 # --name 容器名称:指定容器的名称 # 查看运行的容器 docker ps [-qa] # -a:查看全部的容器,包括没有运行 # -q:只查看容器的标识 # 查看日志 docker logs -f 容器id # -f:可以滚动查看日志的最后几行 # 进入容器内部 docker exec -it 容器id bash # 退出容器:exit # 将宿主机的文件复制到容器内部的指定目录 docker cp 文件名称 容器id:容器内部路径 docker cp index.html 982:/usr/local/tomcat/webapps/ROOT ===================================================================== # 重新启动容器 docker restart 容器id # 启动停止运行的容器 docker start 容器id # 停止指定的容器(删除容器前,需要先停止容器) docker stop 容器id # 停止全部容器 docker stop $(docker ps -qa) # 删除指定容器 docker rm 容器id # 删除全部容器 docker rm $(docker ps -qa)
# 创建文件夹 mkdir 文件夹名
# 创建文件 touch docker-compose.yml
# 编辑文件 vim docker-compose.yml
version: "3.1" services: rabbitmq: # 镜像 image: rabbitmq:3.9-management # 自启 restart: always # Docker容器名 container_name: rabbitmq # 端口号,docker容器内部端口 映射 外部端口 ports: - 5672:5672 - 15672:15672 # 数据卷映射 把容器里面的东西映射到容器外面去 容易操作,否则每次都要进入容器 volumes: - ./data:/opt/install/rabbitMQ-docker/
# 启动 docker-compose up -d # -d 后台启动 ========================================================= # 附加内容:docker-compose的一些命令操作 # 1. 基于docker-compose.yml启动管理的容器 docker-compose up -d # 2. 关闭并删除容器 docker-compose down # 3. 开启|关闭|重启已经存在的由docker-compose维护的容器 docker-compose start|stop|restart # 4. 查看由docker-compose管理的容器 docker-compose ps # 5. 查看日志 docker-compose logs -f # 有兴趣的也可以去了解docker-file自定义镜像
查看当前用户 / 角色有哪些 rabbitmqctl list_users 删除用户 rabbitmqctl delete_user 用户名 添加用户 rabbitmqctl add_user 用户名 密码 设置用户角色 rabbitmqctl set_user_tags 用户名 administrator 设置用户权限【 ps:guest角色就是没有这一步 】 rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*" # 设置用户权限指令解释 set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> </dependencies>
生产者
package cn.zixieqing; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { private static final String HOST = "ip"; // 放RabbitMQ服务的服务器ip private static final int PORT = 5672; // 服务器中RabbitMQ的端口号,在浏览器用的15672是通过5672映射出来的15672 private static final String USER_NAME = "admin"; private static final String PASSWORD = "admin"; private static final String QUEUE_NAME = "hello word"; public static void main(String[] args) throws IOException, TimeoutException { // 1、获取链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置链接信息 factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); /* 当然:这里还可以设置vhost虚拟机 - 前提是自己在web管理界面中添加了vhost factory.setVirtualHost(); */ // 3、获取链接Connection Connection connection = factory.newConnection(); // 4、创建channel信道 - 它才是去和交换机 / 队列打交道的 Channel channel = connection.createChannel(); // 5、准备一个队列queue // 这里理论上是去和exchange打交道,但是:这里是hello word简单模式,所以直接使用默认的exchange即可 /* 下面这是参数的完整意思,源码中偷懒了,没有见名知意 queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties ) 参数1、队列名字 参数2、是否持久化( 保存到磁盘 ),默认是在内存中的 参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息 参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除 参数5、其他配置项,这涉及到后面的知识,目前选择null */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("正在发送信息!!!"); // 6、推送信息到队列中 // 准备发送的信息内容 String message = "it is hello word"; /* basicPublish( exchangeName,queueName,properties,message ) 参数1、交互机名字 - 目前使用了默认的 参数2、指定路由规则 - 目前使用队列名字 参数3、指定传递的消息所携带的properties 参数4、推送的具体消息 - byte类型的 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // 7、释放资源 - 倒着关闭即可 if ( null != channel ) channel.close(); if ( null != connection ) connection.close(); System.out.println("消息发送完毕"); } }
消费者
public class Consumer { private static final String HOST = "ip"; // 自己的服务器ip private static final int PORT = 5672; private static final String USER_NAME = "admin"; private static final String PASSWORD = "admin"; private static final String QUEUE_NAME = "hello word"; public static void main(String[] args) throws IOException, TimeoutException { // 1、创建链接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置链接信息 factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); // 3、创建链接对象 Connection connection = factory.newConnection(); // 4、创建信道channel Channel channel = connection.createChannel(); // 5、从指定队列中获取消息 /* basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback ) 参数1、队列名 参数2、是否自动应答,为true时,消费者接收到消息后,会立即告诉RabbitMQ 参数3、消费者如何消费消息的回调 参数4、消费者取消消费的回调 */ System.out.println("开始接收消息!!!"); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接收到了消息:" + new String(message.getBody(), StandardCharsets.UTF_8) ); }; CancelCallback cancelCallback = consumerTag -> System.out.println("消费者取消了消费信息行为"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); // 6、释放资源 - 但是这里不能直接关闭啊,否则:看不到接收的结果的,可以选择不关,也可以选择加一句代码System.in.read(); // channel.close(); // connection.close(); } }
抽取RabbitMQ链接的工具类
package cn.zixieqing.util; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class MQUtil { private static final String HOST = "自己的ip"; private static final int PORT = 5672; private static final String USER_NAME = "admin"; private static final String PASSWORD = "admin"; public static Channel getChannel(String vHost ) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); if ( !vHost.isEmpty() ) factory.setVirtualHost(vHost); return factory.newConnection().createChannel(); } }
生产者
package cn.zixieqing.workqueue; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.Channel; import java.io.IOException; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class WorkProducer { private static final String QUEUE_NAME = "work queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); // 1、声明队列 /* 下面这是参数的完整意思,源码中偷懒了,没有见名知意 queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties ) 参数1、队列名字 参数2、是否持久化( 保存到磁盘 ),默认是在内存中的 参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息 参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除 参数5、其他配置项,这涉及到后面的知识,目前选择null */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 2、准备消息 System.out.println("请输入要推送的信息,按回车确认:"); Scanner input = new Scanner(System.in); // 3、推送信息到队列中 while (input.hasNext()) { /* basicPublish( exchangeName,routing key,properties,message ) 参数1、交互机名字 - 目前是使用了默认的 参数2、指定路由规则 - 目前使用队列名字 参数3、指定传递的消息所携带的properties 参数4、推送的具体消息 - byte类型的 */ String message = input.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息====>" + message + "====>推送完毕!"); } } }
消费者
package cn.zixieqing.workqueue; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class WorkConsumer { private static final String QUEUE_NAME = "work queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接收到了消息====>" + new String(message.getBody(), StandardCharsets.UTF_8)); }; CancelCallback cancelCallback = consumerTag -> { System.out.println( consumerTag + "消费者中断了接收消息====>" ); }; System.out.println("消费者01正在接收消息......"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
package cn.zixieqing.workqueue; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class WorkConsumer { private static final String QUEUE_NAME = "work queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("接收到了消息====>" + new String(message.getBody(), StandardCharsets.UTF_8)); }; CancelCallback cancelCallback = consumerTag -> { System.out.println( consumerTag + "消费者中断了接收消息====>" ); }; System.out.println("消费者02正在接收消息......"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
/* basicConsume( queueName,isAutoAnswer,deliverCallback,cancelCallback ) 参数1、队列名 参数2、是否自动应答,为true时,消费者接收到消息后,会立即告诉RabbitMQ 参数3、消费者如何消费消息的回调 参数4、消费者取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
就是我们自己去设定,好处是可以批量应答并且减少网络拥堵
调用的API如下:
Channel.basicACK( long, boolean ); // 用于肯定确认,即:MQ已知道该消息 并且 该消息已经成功被处理了,所以MQ可以将其丢弃了 Channel.basicNack( long, boolena, boolean ); // 用于否定确认 Channel.basicReject( long, boolea ); // 用于否定确认 与Channel.basicNack( long, boolena, boolean )相比,少了一个参数,这个参数名字叫做:multiple
multiple参数说明,它为true和false有着截然不同的意义【 ps:建议弄成false,虽然是挨个去处理,从而应答,效率慢,但是:数据安全,否则:很大可能造成数据丢失 】
生产者
package cn.zixieqing.ACK; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class AckProducer { private static final String QUEUE_NAME = "ack queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); // 声明队列 /* 下面这是参数的完整意思,源码中偷懒了,没有见名知意 queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties ) 参数1、队列名字 参数2、是否持久化( 保存到磁盘 ),默认是在内存中的 参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息 参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除 参数5、其他配置项,这涉及到后面的知识,目前选择null */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("请输入要推送的消息:"); Scanner input = new Scanner(System.in); while (input.hasNext()) { /* basicPublish( exchangeName,routing key,properties,message ) 参数1、交互机名字 - 使用了默认的 参数2、指定路由规则,使用队列名字 参数3、指定传递的消息所携带的properties 参数4、推送的具体消息 - byte类型的 */ String message = input.next(); channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息====>" + message + "推送完毕"); } } }
消费者01
package cn.zixieqing.ACK; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.TimeoutException; public class AckConsumer { private static final String QUEUE_NAME = "ack queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); DeliverCallback deliverCallback = (consumerTag, message) -> { try { Thread.sleep(5*1000); System.out.println("接收到了消息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 )); // 添加手动应答 /* basicAck( long, boolean ) 参数1、消息的标识tag,这个标识就相当于是消息的ID 参数2、是否批量应答multiple */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; System.out.println("消费者01正在接收消息,需要5秒处理完"); channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { System.out.println("触发消费者取消消费消息行为的回调"); System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8))); }); } }
消费者02
package cn.zixieqing.ACK; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.TimeoutException; public class AckConsumer { private static final String QUEUE_NAME = "ack queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); DeliverCallback deliverCallback = (consumerTag, message) -> { try { Thread.sleep(10*1000); System.out.println("接收到了消息=====>" + new String( message.getBody(), StandardCharsets.UTF_8 )); // 添加手动应答 /* basicAck( long, boolean ) 参数1、消息的标识tag,这个标识就相当于是消息的ID 参数2、是否批量应答multiple */ channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; System.out.println("消费者02正在接收消息,需要10秒处理完"); channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { System.out.println("触发消费者取消消费消息行为的回调"); System.out.println(Arrays.toString(consumerTag.getBytes(StandardCharsets.UTF_8))); }); } }
/* 下面这是参数的完整意思,源码中偷懒了,没有见名知意 queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties ) 参数1、队列名字 参数2、是否持久化( 保存到磁盘 ),默认是在内存中的 参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息 参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除 参数5、其他配置项,这涉及到后面的知识,目前选择null */ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
inequivalent arg 'durable' for queue 'queue durable' in vhost '/': received 'true' but current is 'false'
注意:这里说的消息持久化不是说配置之后消息就一定不会丢失,而是:把消息标记为持久化,然后RabbitMQ尽量让其持久化到磁盘
但是:也会有意外,比如:RabbitMQ在将消息持久化到磁盘时,这是有一个时间间隔的,数据还没完全刷写到磁盘呢,RabbitMQ万一出问题了,那么消息 / 数据还是会丢失的,所以:消息持久化配置是一个弱持久化,但是:对于简单队列模式完全足够了,强持久化的实现方式在后续的publisher / confirm发布确认模式中
至于配置极其地简单,在前面都已经见过这个配置项,就是生产者发消息时做文章,就是下面的第三个参数,把它改为MessageProperties.PERSISTENT_TEXT_PLAIN
即可
/* basicPublish( exchangeName,routing key,properties,message ) 参数1、交互机名字 - 使用了默认的 参数2、指定路由规则,使用队列名字 参数3、指定传递的消息所携带的properties 参数4、推送的具体消息 - byte类型的 */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); // 改成消息持久化 channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
public class MessageProperties { public static final BasicProperties MINIMAL_BASIC = new BasicProperties((String)null, (String)null, (Map)null, (Integer)null, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties MINIMAL_PERSISTENT_BASIC = new BasicProperties((String)null, (String)null, (Map)null, 2, (Integer)null, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties PERSISTENT_BASIC = new BasicProperties("application/octet-stream", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 1, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", (String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null); public MessageProperties() { } }
public static class BasicProperties extends AMQBasicProperties { // 消息内容的类型 private String contentType; // 消息内容的编码格式 private String contentEncoding; // 消息的header private Map<String, Object> headers; // 消息是否持久化,1:否,2:是 private Integer deliveryMode; // 消息的优先级 private Integer priority; // 关联ID private String correlationId; // :用于指定回复的队列的名称 private String replyTo; // 消息的失效时间 private String expiration; // 消息ID private String messageId; // 消息的发送时间 private Date timestamp; // 类型 private String type; // 用户ID private String userId; // 应用程序ID private String appId; // 集群ID private String clusterId; }
不公平分发
channel.basicQos( int prefetchCount )设置
// 不公平分发,就是在这里接收消息之前做处理 /* basicQos( int prefetchCount ) 为0、轮询分发 也是RabbitMQ的默认值 为1、不公平分发 */ channel.basicQos(1); channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> { System.out.println("消费者中断了接收消息行为触发的回调"); });
预取值
// 预取值,也是在这里接收消息之前做处理,和不公平分发调的是同一个API /* basicQos( int prefetchCount ) 为0、轮询分发 也是RabbitMQ的默认值;为1、不公平分发 而当这里的数字变成其他的,如:上图中上面的那个消费者要消费20条消息,那么把下面的数字改成对应的即可 注意点:这是要设置哪个消费者的预取值,那就是在哪个消费者代码中进行设定啊 */ channel.basicQos(10); // 这样就表示这个代码所在的消费者需要消费10条消息了 channel.basicConsume("qos queue", true, deliverCallback, consumerTag -> { System.out.println("消费者中断了接收消息行为触发的回调"); });
开启发布确认
channel.confirmSelect(); // 没有参数
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { // 单个确认发布 singleConfirm(); // 单个确认发布发送这些消息花费4797ms } public static void singleConfirm() throws IOException, TimeoutException, InterruptedException { Channel channel = MQUtil.getChannel(""); // 开启确认发布 channel.confirmSelect(); // 声明队列 并 让队列持久化 channel.queueDeclare("singleConfirm", true, false, false, null); long begin = System.currentTimeMillis(); for (int i = 1; i <= 100; i++) { // 发送消息 并 让消息持久化 channel.basicPublish("","singleConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() ); // 发布一个 确认一个 channel.waitForConfirms() if ( channel.waitForConfirms() ) System.out.println("消息".concat( String.valueOf(i) ).concat( "发送成功") ); } long end = System.currentTimeMillis(); System.out.println("单个确认发布发送这些消息花费".concat( String.valueOf( end-begin ) ).concat("ms") ); }
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { // 单个确认发布 // singleConfirm(); // 单个确认发布发送这些消息花费4797ms // 批量发布 batchConfirm(); // 批量发布发送的消息共耗时:456ms } public static void batchConfirm() throws IOException, TimeoutException, InterruptedException { Channel channel = MQUtil.getChannel(""); // 开启确认发布 channel.confirmSelect(); // 声明队列 并 让队列持久化 channel.queueDeclare("batchConfirm", true, false, false, null); long begin = System.currentTimeMillis(); for (int i = 1; i <= 100; i++) { // 发送消息 并 让消息持久化 channel.basicPublish("","batchConfirm", MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes() ); // 批量发布 并 回复批量发布的结果 - 发了10条之后再确认 if (i % 10 == 0) { channel.waitForConfirms(); System.out.println("消息" + ( i-10 ) + "====>" + i + "的消息发布成功"); } } // 为了以防还有另外的消息未被确认,再次确认一下 channel.waitForConfirms(); long end = System.currentTimeMillis(); System.out.println("批量发布发送的消息共耗时:" + (end - begin) + "ms"); }
代码实现
public static void main(String[] args) throws InterruptedException, TimeoutException, IOException { // 单个确认发布 // singleConfirm(); // 单个确认发布发送这些消息花费4797ms // 批量发布 // batchConfirm(); // 批量发布发送的消息共耗时:456ms asyncConfirm(); // 异步发布确认耗时:10ms } // 异步发布确认 public static void asyncConfirm() throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); channel.confirmSelect(); channel.queueDeclare("async confirm", true, false, false, null); // 1、准备符合条件的map ConcurrentSkipListMap<Long, Object> messagePoolMap = new ConcurrentSkipListMap<>(); // 3、对信道channel进行监听 // 成功确认发布回调 ConfirmCallback ackCallback = (messageTag, multiple) -> { System.out.println("确认发布了消息=====>" + messagePoolMap.headMap(messageTag) ); // 4、把确认发布的消息删掉,减少内存开销 // 判断是否是批量删除 if ( multiple ){ // 通过消息标识tag 把 确认发布的消息取出 messagePoolMap.headMap(messageTag).clear(); /** * 上面这句代码拆分写法 * ConcurrentNavigableMap<Long, Object> confirmed = messagePoolMap.headMap(messageTag); * confirmed.clear(); */ }else { messagePoolMap.remove(messageTag); } }; // 没成功发布确认回调 ConfirmCallback nackCallback = (messageTag, multiple) -> { System.out.println("未确认的消息是:" + messagePoolMap.get(messageTag) ); }; // 进行channel监听 这是异步的 /** * channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2) * 参数1、消息成功发布的回调函数 ackCallback() * 参数2、消息未成功发布的回调函数 nackCallback() */ channel.addConfirmListener( ackCallback,nackCallback ); long begin = System.currentTimeMillis(); for (int i = 1; i <= 100; i++) { // 2、将要发布的全部信息保存到map中去 /* channel.getNextPublishSeqNo() 获取下一次将要发送的消息标识tag */ messagePoolMap.put(channel.getNextPublishSeqNo(),String.valueOf(i) ); // 生产者只管发布就行 channel.basicPublish("","async confirm",MessageProperties.PERSISTENT_TEXT_PLAIN,String.valueOf(i).getBytes()); System.out.println("消息=====>" + i + "发送完毕"); } long end = System.currentTimeMillis(); System.out.println("异步发布确认耗时:" + ( end-begin ) + "ms" ); }
临时队列
/* 下面这是参数的完整意思,源码中偷懒了,没有见名知意 queueDeclare( queueName,isPersist,isShare,isAutoDelete,properties ) 参数1、队列名字 参数2、是否持久化( 保存到磁盘 ),默认是在内存中的 参数3、是否共享,即:是否只供一个消费者消费,是否让多个消费者共享这个队列中的信息 参数4、是否自动删除,即:最后一个消费者获取信息之后,这个队列是否自动删除 参数5、其他配置项,这涉及到后面的知识,目前选择null */ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String queueName = channel.queueDeclare().getQueue();
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
是一样的了
生产者
package cn.zixieqing.fanout; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class FanoutProducer { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); /** * 定义交换机 * 参数1、交换机名字 * 参数2、交换机类型 */ channel.exchangeDeclare("fanoutExchange", BuiltinExchangeType.FANOUT); System.out.println("请输入要发送的内容:"); Scanner input = new Scanner(System.in); while (input.hasNext()){ String message = input.next(); channel.basicPublish("fanoutExchange","", null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息=====>" + message + "发送完毕"); } } }
消费者01
package cn.zixieqing.fanout; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class FanoutConsumer01 { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); // 绑定队列 /** * 参数1、队列名字 * 参数2、交换机名字 * 参数3、用于绑定的routing key / binding key */ String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "fanoutExchange", ""); System.out.println("01消费者正在接收消息........"); channel.basicConsume(queueName,true,(consumerTag,message)->{ // 这里面接收到消息之后就可以用来做其他事情了,如:存到磁盘 System.out.println("接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8)); },consumerTage->{}); } }
消费者02
package cn.zixieqing.fanout; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class FanoutConsumer02 { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); // 绑定队列 /** * 参数1、队列名字 * 参数2、交换机名字 * 参数3、用于绑定的routing key / binding key */ String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "fanoutExchange", ""); System.out.println("02消费者正在接收消息........"); channel.basicConsume(queueName,true,(consumerTag,message)->{ // 这里面接收到消息之后就可以用来做其他事情了,如:存到磁盘 System.out.println("接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8)); },consumerTage->{}); } }
生产者
package cn.zixieqing.direct; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Scanner; import java.util.concurrent.TimeoutException; public class DirectProducer { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); channel.exchangeDeclare("directExchange", BuiltinExchangeType.DIRECT); System.out.println("请输入要发送的消息:"); Scanner input = new Scanner(System.in); while (input.hasNext()){ String message = input.next(); /** * 对第二个参数routing key做文章 * 假如这里的routing key为zixieqing 那么:就意味着消费者只能是绑定了zixieqing的队列才可以进行接收这里发的消息内容 */ channel.basicPublish("directExchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息=====>" + message + "====>发送完毕"); } } }
消费者01
package cn.zixieqing.direct; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class DirectConsumer01 { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); channel.queueDeclare("direct", false, false, false, null); /** * 队列绑定 * 参数1、队列名 * 参数2、交换机名字 * 参数3、routing key 这里的routing key 就需要和生产者中的一样了,这样才可以通过这个routing key去对应的队列中取消息 */ channel.queueBind("direct", "directExchange", "zixieqing"); System.out.println("01消费者正在接收消息......."); channel.basicConsume("direct",true,(consumerTag,message)->{ System.out.println("01消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8)); },consumerTag->{}); } }
package cn.zixieqing.direct; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class DirectConsumer02 { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); channel.queueDeclare("direct", false, false, false, null); /** * 队列绑定 * 参数1、队列名 * 参数2、交换机名字 * 参数3、routing key 这里的routing key 就需要和生产者中的一样了,这样才可以通过这个routing key去对应的队列中取消息 */ // 搞点事情:这里的routing key的值zixieqing和生产者的不同 channel.queueBind("direct", "directExchange", "xiegongzi"); System.out.println("02消费者正在接收消息......."); channel.basicConsume("direct",true,(consumerTag,message)->{ System.out.println("02消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8)); },consumerTag->{}); } }
topic中routing key的要求
*
代表一个单词#
代表零活无数个单词
quick.orange.rabbit 被队列 Q1Q2 接收到 lazy.orange.elephant 被队列 Q1Q2 接收到 quick.orange.fox 被队列 Q1 接收到 lazy.brown.fox 被队列 Q2 接收到 lazy.pink.rabbit 虽然满足两个绑定,但只被队列 Q2 接收一次 quick.brown.fox 不满足任何绑定关系,不会被任何队列接收到,会被丢弃 quick.orange.male.rabbit 是四个单词,不满足任何绑定关系,会被丢弃 lazy.orange.male.rabbit 虽是四个单词,但匹配 Q2,因:符合lazy.#这个规则
把上面的绑定关系和测试转换成代码玩一波
生产者
package cn.zixieqing.topic; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; public class TopicProducer { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC); /** * 准备大量的routing key 和 message */ HashMap<String, String> routesAndMessageMap = new HashMap<>(); routesAndMessageMap.put("quick.orange.rabbit", "被队列 Q1Q2 接收到"); routesAndMessageMap.put("lazy.orange.elephant", "被队列 Q1Q2 接收到"); routesAndMessageMap.put("quick.orange.fox", "被队列 Q1 接收到"); routesAndMessageMap.put("lazy.brown.fox", "被队列 Q2 接收到"); routesAndMessageMap.put("lazy.pink.rabbit", "虽然满足两个绑定,但只被队列 Q2 接收一次"); routesAndMessageMap.put("quick.brown.fox", "不满足任何绑定关系,不会被任何队列接收到,会被丢弃"); routesAndMessageMap.put("quick.orange.male.rabbit", "是四个单词,不满足任何绑定关系,会被丢弃"); routesAndMessageMap.put("lazy.orange.male.rabbit ", "虽是四个单词,但匹配 Q2,因:符合lazy.#这个规则"); System.out.println("生产者正在发送消息......."); for (Map.Entry<String, String> routesAndMessageEntry : routesAndMessageMap.entrySet()) { String routingKey = routesAndMessageEntry.getKey(); String message = routesAndMessageEntry.getValue(); channel.basicPublish("topicExchange",routingKey,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息====>" + message + "===>发送完毕"); } } }
消费者01
package cn.zixieqing.topic; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class TopicConsumer01 { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC); channel.queueDeclare("Q1", false, false, false, null); channel.queueBind("Q1", "topicExchange", "*.orange.*"); System.out.println("消费者01正在接收消息......"); channel.basicConsume("Q1",true,(consumerTage,message)->{ System.out.println("01消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8)); System.out.println("此条消息的交换机名为:" + message.getEnvelope().getExchange() + ",路由键为:" + message.getEnvelope().getRoutingKey()); },consumerTag->{}); } }
消费者02
package cn.zixieqing.topic; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class TopicConsumer02 { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); channel.exchangeDeclare("topicExchange", BuiltinExchangeType.TOPIC); channel.queueDeclare("Q2", false, false, false, null); channel.queueBind("Q2", "topicExchange", "*.*.rabbit"); channel.queueBind("Q2", "topicExchange", "lazy.#"); System.out.println("消费者02正在接收消息......"); channel.basicConsume("Q2",true,(consumerTage,message)->{ System.out.println("02消费者接收到了消息====>" + new String( message.getBody(), StandardCharsets.UTF_8)); System.out.println("此条消息的交换机名为:" + message.getEnvelope().getExchange() + ",路由键为:" + message.getEnvelope().getRoutingKey()); },consumerTag->{}); } }
生产者
package cn.zixieqing.dead_letter_queue.queuelength.queuenumber; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); channel.exchangeDeclare("messageNumber_normal_exchange", BuiltinExchangeType.DIRECT); for (int i = 1; i < 11; i++) { String message = "生产者发送了消息" + i; channel.basicPublish("messageNumber_normal_exchange","zi",null, message.getBytes(StandardCharsets.UTF_8) ); System.out.println("消息====>" + message + "====>发送完毕"); } } }
01消费者
package cn.zixieqing.dead_letter_queue.queuelength.queuenumber; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.concurrent.TimeoutException; public class Consumer01 { /** * 正常交换机名称 */ public static final String NORMAL_EXCHANGE = "messageNumber_normal_exchange"; /** * 正常队列名称 */ public static final String NORMAL_QUEUE = "messageNumber_queue"; /** * 死信交换机名称 */ public static final String DEAD_EXCHANGE = "messageNumber_dead_exchange"; /** * 死信队列名称 */ public static final String DEAD_QUEUE = "messageNumber_dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); // 声明正常交换机、死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); // 死信交换机和死信队列进行绑定 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie"); // 声明正常队列 并 考虑达到条件时和死信交换机进行联系 HashMap<String, Object> params = new HashMap<>(); // 死信交换机 params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 死信路由键 params.put("x-dead-letter-routing-key", "xie"); // 达到队列能接受的最大个数限制就多了如下的配置 params.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, params); // 正常队列和正常交换机进行绑定 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zi"); System.out.println("01消费者正在接收消息......"); channel.basicConsume(NORMAL_QUEUE,true,(consumeTag,message)->{ System.out.println("01消费者接收到了消息:" + new String( message.getBody(), StandardCharsets.UTF_8)); },consumeTag->{}); } }
params.put("x-max-length-bytes", 255);
注意:关于两种情况同时使用的问题
params.put("x-max-length", 6); params.put("x-max-length-bytes", 255);
// 第二个参数改成false channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{},consumeTag->{});
生产者
package cn.zixieqing.dead_letter_queue.reack; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); channel.exchangeDeclare("reack_normal_exchange", BuiltinExchangeType.DIRECT); for (int i = 1; i < 11; i++) { String message = "生产者发送的消息" + i; channel.basicPublish("reack_normal_exchange","zixieqing",null,message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息===>" + message + "===>发送完毕"); } } }
消费者
package cn.zixieqing.dead_letter_queue.reack; import cn.zixieqing.util.MQUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.concurrent.TimeoutException; public class Consumer01 { public static final String NORMAL_EXCHANGE = "reack_normal_exchange"; public static final String DEAD_EXCHANGE = "reack_dead_exchange"; public static final String DEAD_QUEUE = "reack_dead_queue"; public static final String NORMAL_QUEUE = "reack_normal_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = MQUtil.getChannel(""); // 声明正常交换机、死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); // 声明死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); // 死信队列绑定死信交换机 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "xie"); // 声明正常队列 HashMap<String, Object> params = new HashMap<>(); params.put("x-dead-letter-exchange", DEAD_EXCHANGE); params.put("x-dead-letter-routing-key", "xie"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, params); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zixieqing"); System.out.println("01消费者正在接收消息....."); // 1、注意:需要开启手动应答( 第二个参数为false ) channel.basicConsume(NORMAL_QUEUE,false,(consumeTag,message)->{ String msg = new String(message.getBody(), StandardCharsets.UTF_8); // 如果发送的消息为:生产者发送的消息5 则:拒收 if ( "生产者发送的消息5".equals( msg ) ) { System.out.println("此消息====>" + msg + "===>是拒收的"); // 2、做拒收处理 - 注意:第二个参数设为false,表示不再重新入正常队列的队,这样消息才可以进入死信队列 channel.basicReject( message.getEnvelope().getDeliveryTag(),false); }else { System.out.println("01消费者接收到了消息=====>" + msg); } },consumeTag->{}); } }