本文共 5577 字,大约阅读时间需要 18 分钟。
P:生产者,也就是要发送消息的程序
C:消费者,消息的接受者,会一直等待消息的到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息。生产者向其中投递消息,消费者从其中读取消息。
注:在rabbitMQ中消费者是一定要到某个消息队列中去获取消息的。
第1步、创建maven项目rabbitmq-demo,在pom.xml中添加如下依赖:
WebProject com.wzy 1.0-SNAPSHOT 4.0.0 rabbitMQ-demo com.rabbitmq amqp-client 5.6.0
第2步、编写消息生产者,代码如下:
package com.wzy.product;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 生产者 * */public class Producer { static final String QUEUE_NAME="simple_queue"; public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2.主机地址,默认为localhost connectionFactory.setHost("192.168.137.101"); //3.连接端口,默认为5672 connectionFactory.setPort(5672); //4.虚拟主机名称,默认为/ connectionFactory.setVirtualHost("/wzy"); //5.连接用户名,默认为guest connectionFactory.setUsername("guest"); //6.连接密码,默认为guest connectionFactory.setPassword("guest"); //7.创建连接 Connection connection=connectionFactory.newConnection(); //8.创建频道 Channel channel=connection.createChannel(); /** * 9.创建队列 * 参数1:队列名称 * 参数2: 是否定义为持久化队列 * 参数3:是否独占本次连接 * 参数4:是否在不使用的时候自动删除队列 * 参数5:队列其它参数 * */ channel.queueDeclare(QUEUE_NAME,true,false,false,null); //10.要发送的信息 String message="你好,RabbitMQ"; /** * 11 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage * 参数2:路由key,简单模式可以传递队列名称 * 参数3:消息其它属性 * 参数4:消息内容 * */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("已发送消息:"+message); //12.关闭资源 channel.close(); connection.close(); }}
执行后,控制台打印结果如下:
我们也可以登录rabbitMQ的管理控制台,可以发现队列和其消息:
static final String QUEUE_NAME="simple_queue";也就是这段代码命名的队列名称
点击simple_queue链接,进入如下界面。然后点击Get Message(s)就可以得到我们生产者生发送的消息了。
第3步、编写消费者消费消息
package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.product.Simple_Producer;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 简单队列:消费者 * */public class Simple_Consumer { public static void main(String[] args) throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2.主机地址,默认为localhost connectionFactory.setHost("192.168.137.101"); //3.连接端口,默认为5672 connectionFactory.setPort(5672); //4.虚拟主机名称,默认为/ connectionFactory.setVirtualHost("/wzy"); //5.连接用户名,默认为guest connectionFactory.setUsername("guest"); //6.连接密码,默认为guest connectionFactory.setPassword("guest"); //7.创建连接 Connection connection=connectionFactory.newConnection(); //8.创建频道 Channel channel=connection.createChannel(); /** * 9、创建队列 * 参数1:队列名称;参数2:是否定义为持久化队列 * 参数3:是否独占本次连接;参数4:是否在不使用的时候自动删除队列 * 参数5:队列的其它参数 * */ channel.queueDeclare("simple_queue",true,false,false,null); /** * 10.创建消费者,并设置消息处理 * */ DefaultConsumer consumer=new DefaultConsumer(channel){ /** * consumerTag:消费者标签,在channel.basicConsumer时可以指定 * envelope:消息包的内容,可从中获取消息id,消息routingkey、交换机、 * 消息和重传标志(收到消息失败后是否需要重新发送) * properties:属性信息 * body:消息 * */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //1)、得到路由key System.out.println("路由key为:"+envelope.getRoutingKey()); //2)、交换机 System.out.println("交换机为:"+envelope.getExchange()); //3)、消息id System.out.println("消息id为:"+envelope.getDeliveryTag()); //4)、收到的消息 System.out.println("收到的消息为:"+new String(body,"utf-8")); } }; /** * 11.监听消息 * 参数1:队列名称 * 参数2:是否自动确认,设置为false则需要手动确认 * 设置为true表示接收到消息后,自动向mq回复接收到了,mq接收到回复后会删除消息 * 参数3:消息接收到后回调 * */ channel.basicConsume("simple_queue",true,consumer); //不关闭资源,应该一直会监听消息 //channel.close(); //connection.close(); }}
控制台打印结果如下:
第4步、创建连接公共类ConnectionUtil
因为消费者和生产者,创建连接的代码重复为了提高代码的复用性,可以将这段代码抽取出来
package com.wzy.com.wzy.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConnectionUtil { public static Connection getConn() throws IOException, TimeoutException { //1.创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2.主机地址,默认为localhost connectionFactory.setHost("192.168.137.101"); //3.连接端口,默认为5672 connectionFactory.setPort(5672); //4.虚拟主机名称,默认为/ connectionFactory.setVirtualHost("/wzy"); //5.连接用户名,默认为guest connectionFactory.setUsername("guest"); //6.连接密码,默认为guest connectionFactory.setPassword("guest"); //7.创建连接 return connectionFactory.newConnection(); }}
转载地址:http://nmuii.baihongyu.com/