博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
rabbitmq(3)---简单模式
阅读量:4087 次
发布时间:2019-05-25

本文共 5577 字,大约阅读时间需要 18 分钟。

P:生产者,也就是要发送消息的程序

C:消费者,消息的接受者,会一直等待消息的到来。

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息。生产者向其中投递消息,消费者从其中读取消息。

注:在rabbitMQ中消费者是一定要到某个消息队列中去获取消息的。

第1步、创建maven项目rabbitmq-demo,在pom.xml中添加如下依赖:

WebProject
com.wzy
1.0-SNAPSHOT
4.0.0
rabbitMQ-demo
com.rabbitmq
amqp-client
5.6.0

第2步、编写消息生产者,代码如下:

package com.wzy.product;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 生产者 * */public class Producer {    static final String QUEUE_NAME="simple_queue";    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接工厂        ConnectionFactory connectionFactory=new ConnectionFactory();        //2.主机地址,默认为localhost        connectionFactory.setHost("192.168.137.101");        //3.连接端口,默认为5672        connectionFactory.setPort(5672);        //4.虚拟主机名称,默认为/        connectionFactory.setVirtualHost("/wzy");        //5.连接用户名,默认为guest        connectionFactory.setUsername("guest");        //6.连接密码,默认为guest        connectionFactory.setPassword("guest");        //7.创建连接        Connection connection=connectionFactory.newConnection();        //8.创建频道        Channel channel=connection.createChannel();        /**         * 9.创建队列         * 参数1:队列名称         * 参数2: 是否定义为持久化队列         * 参数3:是否独占本次连接         * 参数4:是否在不使用的时候自动删除队列         * 参数5:队列其它参数         * */        channel.queueDeclare(QUEUE_NAME,true,false,false,null);        //10.要发送的信息        String message="你好,RabbitMQ";        /**         * 11         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage         * 参数2:路由key,简单模式可以传递队列名称         * 参数3:消息其它属性         * 参数4:消息内容         * */        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());        System.out.println("已发送消息:"+message);        //12.关闭资源        channel.close();        connection.close();    }}

执行后,控制台打印结果如下:

我们也可以登录rabbitMQ的管理控制台,可以发现队列和其消息:

static final String QUEUE_NAME="simple_queue";也就是这段代码命名的队列名称

点击simple_queue链接,进入如下界面。然后点击Get Message(s)就可以得到我们生产者生发送的消息了。

第3步、编写消费者消费消息

package com.wzy.consumer;import com.rabbitmq.client.*;import com.wzy.product.Simple_Producer;import java.io.IOException;import java.util.concurrent.TimeoutException;/** * 简单队列:消费者 * */public class Simple_Consumer {    public static void main(String[] args) throws IOException, TimeoutException {        //1.创建连接工厂        ConnectionFactory connectionFactory=new ConnectionFactory();        //2.主机地址,默认为localhost        connectionFactory.setHost("192.168.137.101");        //3.连接端口,默认为5672        connectionFactory.setPort(5672);        //4.虚拟主机名称,默认为/        connectionFactory.setVirtualHost("/wzy");        //5.连接用户名,默认为guest        connectionFactory.setUsername("guest");        //6.连接密码,默认为guest        connectionFactory.setPassword("guest");        //7.创建连接        Connection connection=connectionFactory.newConnection();        //8.创建频道        Channel channel=connection.createChannel();        /**         * 9、创建队列         * 参数1:队列名称;参数2:是否定义为持久化队列         * 参数3:是否独占本次连接;参数4:是否在不使用的时候自动删除队列         * 参数5:队列的其它参数         * */        channel.queueDeclare("simple_queue",true,false,false,null);        /**         * 10.创建消费者,并设置消息处理         * */        DefaultConsumer consumer=new DefaultConsumer(channel){            /**             * consumerTag:消费者标签,在channel.basicConsumer时可以指定             * envelope:消息包的内容,可从中获取消息id,消息routingkey、交换机、             *          消息和重传标志(收到消息失败后是否需要重新发送)             * properties:属性信息             * body:消息             * */            @Override            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {                //1)、得到路由key                System.out.println("路由key为:"+envelope.getRoutingKey());                //2)、交换机                System.out.println("交换机为:"+envelope.getExchange());                //3)、消息id                System.out.println("消息id为:"+envelope.getDeliveryTag());                //4)、收到的消息                System.out.println("收到的消息为:"+new String(body,"utf-8"));            }        };        /**         * 11.监听消息         * 参数1:队列名称         * 参数2:是否自动确认,设置为false则需要手动确认         *      设置为true表示接收到消息后,自动向mq回复接收到了,mq接收到回复后会删除消息         * 参数3:消息接收到后回调         * */        channel.basicConsume("simple_queue",true,consumer);        //不关闭资源,应该一直会监听消息        //channel.close();        //connection.close();    }}

控制台打印结果如下:

第4步、创建连接公共类ConnectionUtil

因为消费者和生产者,创建连接的代码重复为了提高代码的复用性,可以将这段代码抽取出来

package com.wzy.com.wzy.utils;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class ConnectionUtil {    public static Connection getConn() throws IOException, TimeoutException {        //1.创建连接工厂        ConnectionFactory connectionFactory=new ConnectionFactory();        //2.主机地址,默认为localhost        connectionFactory.setHost("192.168.137.101");        //3.连接端口,默认为5672        connectionFactory.setPort(5672);        //4.虚拟主机名称,默认为/        connectionFactory.setVirtualHost("/wzy");        //5.连接用户名,默认为guest        connectionFactory.setUsername("guest");        //6.连接密码,默认为guest        connectionFactory.setPassword("guest");        //7.创建连接        return connectionFactory.newConnection();    }}

转载地址:http://nmuii.baihongyu.com/

你可能感兴趣的文章
Express: Can’t set headers after they are sent.
查看>>
2017年,这一次我们不聊技术
查看>>
实现接口创建线程
查看>>
Java对象序列化与反序列化(1)
查看>>
HTML5的表单验证实例
查看>>
JavaScript入门笔记:全选功能的实现
查看>>
程序设计方法概述:从面相对象到面向功能到面向对象
查看>>
数据库事务
查看>>
JavaScript基础1:JavaScript 错误 - Throw、Try 和 Catch
查看>>
SQL基础总结——20150730
查看>>
SQL join
查看>>
JavaScript实现页面无刷新让时间走动
查看>>
CSS实例:Tab选项卡效果
查看>>
前端设计之特效表单
查看>>
前端设计之CSS布局:上中下三栏自适应高度CSS布局
查看>>
Java的时间操作玩法实例若干
查看>>
JavaScript:时间日期格式验证大全
查看>>
pinyin4j:拼音与汉字的转换实例
查看>>
XML工具代码:SAX从String字符串XML内获取指定节点或属性的值
查看>>
时间日期:获取两个日期相差几天
查看>>