rabbitmq的七种工作模式之五种
创始人
2024-06-03 07:55:16
0

第一种:简单模式

结构:1个生产者+1个队列+1个消费者
图解:
在这里插入图片描述
pom配置文件代码:


4.0.0com.itheimarabbitmq-producer1.0-SNAPSHOTcom.rabbitmqamqp-client5.6.0org.apache.maven.pluginsmaven-compiler-plugin3.8.01.81.8

生产者代码:

package com.itheima.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments* queue: 队列名称* durable:是否持久化,重启之后还在* exclusive:是否独占,只能有一个消费这监听这个队列;当连接Connection关闭时,是否删除队列* autoDelete 是否自动删除* arguments 参数*/// 如果不存在自动创建,如果存在使用原先的队列channel.queueDeclare("hello", true, false, false, null);// 6.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body* exchange: 交换机名称,简单模式下使用默认的* routingKey:路由名称* props:配置信息* body:发送消息数据*/String body = "Hello,World ~~~~~~~~~~~~~~~~~~~~";channel.basicPublish("", "hello",null, body.getBytes());// 7.释放资源channel.close();connection.close();}
}

重要信息:
注意:channel通道在发布消息时的routingKey是队列名称匹配正则,也就是说routingKey决定向哪个队列发送消息。
当资源不关闭时,在rabbitmq管理界面,可以看到连接在线、通道信息、队列中的信息数。
连接在线:
在这里插入图片描述
通道信息
在这里插入图片描述
队列中5条消息未消费
在这里插入图片描述

消费者代码

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Hello {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+ consumerTag);System.out.println("Exchange:"+ envelope.getExchange());System.out.println("RoutingKey:"+ envelope.getRoutingKey());System.out.println("properties:"+ properties);System.out.println("body:"+ new String(body));}};channel.basicConsume("hello", true, consumer);// 关闭资源?不要}
}

第二种:工作队列

结构:1个生产者+1个队列+多个消费者
在工人之间分配任务(竞争消费者模式)
类比:在简单模式的基础上,增加了消费者的数量
图解:
在这里插入图片描述
应用场景:任务过多提高处理任务的速度

pom依然拥有简单模式的配置代码。

生产者代码

package com.itheima.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_WorkQueues {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments* queue: 队列名称* durable:是否持久化,重启之后还在* exclusive:是否独占,只能有一个消费这监听这个队列;当连接Connection关闭时,是否删除队列* autoDelete 是否自动删除* arguments 参数*/// 如果不存在自动创建,如果存在使用原先的队列channel.queueDeclare("work_queues", true, false, false, null);// 6.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body* exchange: 交换机名称,简单模式下使用默认的* routingKey:路由名称* props:配置信息* body:发送消息数据*/// 发10遍消息for (int i=1; i<=10; i++){String body = i+"Hello,workqueues ~~~~~~~~~~~~~~~~~~~~";channel.basicPublish("", "work_queues",null, body.getBytes());}// 7.释放资源channel.close();connection.close();}
}

消费者代码

第一位:

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_WorkQueues1 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列channel.queueDeclare("work_queues", true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+ new String(body));}};channel.basicConsume("work_queues", true, consumer);// 关闭资源?不要}
}

第二位:

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_WorkQueues2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列channel.queueDeclare("work_queues", true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+ new String(body));}};channel.basicConsume("work_queues", true, consumer);// 关闭资源?不要}
}

重要信息:必须先启动消费者。申明消费者时,在创建通道后,申明队列。消费者申明的队列要和生产者一样。

第三种:发布订阅模式

结构:1个生产者+1个交换机+多个队列+多个消费者
类比:比工作队列增加了交换机、队列。每个消费者消费相同的消息,没有竞争关系。
图解:
在这里插入图片描述
重要信息:
Exchange:交换机(X)。一方面,接收生产者发送的消息,一方面路由处理消息。例如,递交给某个特别队列、递交给所有队列、或是消息丢弃。
交换机常见3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic通配符,把消息交割符合routing pattern(路由模式)的队列
Exchange交换机只负责转发消息,不具备存储消息的能力,因此如何没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
在这里插入图片描述
在这里插入图片描述

生产者代码

package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_PubSub {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建交换机/*** String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments* exchange: 交换机名称* type: 枚举或字符串,交换机类型*     DIRECT("direct"), 定向*     FANOUT("fanout"), 广播(扇形),发送给每个队列*     TOPIC("topic"), 通配符*     HEADERS("headers"); 参数匹配* durable:是否持久化* autoDelete:是否自动删除* internal:内部使用。一般false* arguments:参数*/String exchangeName = "test_fanout";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);// 6.创建队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 7.绑定队列和交换机/*** String queue, String exchange, String routingKey* queue: 队列名称* exchange: 交换机名称* routingKey: 路由键,绑定规则*  如果交换机类型为fanout,routingKey设置为""*/channel.queueBind(queue1Name,exchangeName,"");channel.queueBind(queue2Name,exchangeName,"");// 8.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body*/String body = "日志消息:张三调用了findAll方法~~~~~~~~~~";channel.basicPublish(exchangeName,"",null,body.getBytes());// 9.释放资源channel.close();connection.close();}
}

消费者代码

消费者1:

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_PubSub1 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue1Name, true, consumer);// 关闭资源?不要}
}

消费者2:

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_PubSub2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue2Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息保存到数据库~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue2Name, true, consumer);// 关闭资源?不要}
}

第四种:路由模式

结构:1个生产者+1个交换机+多个队列+多个消费者
类比:发布订阅模式下,交换机模式是广播,改成直接,增加routingkey匹配
图解:
在这里插入图片描述

生产者代码:

package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_Routing {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建交换机/*** String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments* exchange: 交换机名称* type: 枚举或字符串,交换机类型*     DIRECT("direct"), 定向*     FANOUT("fanout"), 广播(扇形),发送给每个队列*     TOPIC("topic"), 通配符*     HEADERS("headers"); 参数匹配* durable:是否持久化* autoDelete:是否自动删除* internal:内部使用。一般false* arguments:参数*/String exchangeName = "test_routing";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);// 6.创建队列String queue1Name = "test_routing_queue1";String queue2Name = "test_routing_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 7.绑定队列和交换机/*** String queue, String exchange, String routingKey* queue: 队列名称* exchange: 交换机名称* routingKey: 路由键,绑定规则*  如果交换机类型为fanout,routingKey设置为""*/// 第一个绑定通道和routingkeychannel.queueBind(queue1Name,exchangeName,"error");// 第二个绑定通道和routingkeychannel.queueBind(queue2Name,exchangeName,"warning");channel.queueBind(queue2Name,exchangeName,"info");channel.queueBind(queue2Name,exchangeName,"error");// 8.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body*/String body = "日志消息:info级别消息只有队列二打印~~~~~~~~~~";channel.basicPublish(exchangeName,"info",null,body.getBytes());// 9.释放资源channel.close();connection.close();}
}

消费者1代码:

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing1 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_routing_queue1";String queue2Name = "test_routing_queue2";channel.queueDeclare(queue1Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue1Name, true, consumer);// 关闭资源?不要}
}

消费者2代码:

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_routing_queue1";String queue2Name = "test_routing_queue2";channel.queueDeclare(queue2Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue2Name, true, consumer);// 关闭资源?不要}
}

第五种通配符模式

结构:1个生产者+1个交换机+多个队列+多个消费者
类比:路由模式下,交换机类型是direct,通配符模式的交换机是topic。而且routingKey可以使用星号和#。星号描述1个单词,#表示任意个单词
图解:
在这里插入图片描述
生产者代码:

package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_Topic {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建交换机/*** String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments* exchange: 交换机名称* type: 枚举或字符串,交换机类型*     DIRECT("direct"), 定向*     FANOUT("fanout"), 广播(扇形),发送给每个队列*     TOPIC("topic"), 通配符*     HEADERS("headers"); 参数匹配* durable:是否持久化* autoDelete:是否自动删除* internal:内部使用。一般false* arguments:参数*/String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);// 6.创建队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 7.绑定队列和交换机/*** String queue, String exchange, String routingKey* queue: 队列名称* exchange: 交换机名称* routingKey: 路由键,绑定规则*  如果交换机类型为fanout,routingKey设置为""*/// 第一个绑定通道和routingkeychannel.queueBind(queue1Name,exchangeName,"*.orange.*");// 第二个绑定通道和routingkeychannel.queueBind(queue2Name,exchangeName,"*.*.rabbit");channel.queueBind(queue2Name,exchangeName,"Lazy.#");// 8.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body*/String body = "apple系统日志存入数据库";channel.basicPublish(exchangeName,"apple.good.rabbit",null,body.getBytes());String body2 = "orange系统日志打印控制台";channel.basicPublish(exchangeName,"gooad.orange.good",null,body2.getBytes());// 9.释放资源channel.close();connection.close();}
}

消费者1代码:

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Topic1 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue1Name, true, consumer);// 关闭资源?不要}
}

消费者2代码:

package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Topic2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue2Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息存入数据库~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue2Name, true, consumer);// 关闭资源?不要}
}

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...
【Ctfer训练计划】——(三... 作者名:Demo不是emo  主页面链接:主页传送门 创作初心ÿ...