结构:1个生产者+1个队列+1个消费者
图解:
pom配置文件代码:
4.0.0 com.itheima rabbitmq-producer 1.0-SNAPSHOT com.rabbitmq amqp-client 5.6.0 org.apache.maven.plugins maven-compiler-plugin 3.8.0 1.8 1.8
package com.itheima.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments* queue: 队列名称* durable:是否持久化,重启之后还在* exclusive:是否独占,只能有一个消费这监听这个队列;当连接Connection关闭时,是否删除队列* autoDelete 是否自动删除* arguments 参数*/// 如果不存在自动创建,如果存在使用原先的队列channel.queueDeclare("hello", true, false, false, null);// 6.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body* exchange: 交换机名称,简单模式下使用默认的* routingKey:路由名称* props:配置信息* body:发送消息数据*/String body = "Hello,World ~~~~~~~~~~~~~~~~~~~~";channel.basicPublish("", "hello",null, body.getBytes());// 7.释放资源channel.close();connection.close();}
}
重要信息:
注意:channel通道在发布消息时的routingKey是队列名称匹配正则,也就是说routingKey决定向哪个队列发送消息。
当资源不关闭时,在rabbitmq管理界面,可以看到连接在线、通道信息、队列中的信息数。
连接在线:
通道信息
队列中5条消息未消费
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Hello {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:"+ consumerTag);System.out.println("Exchange:"+ envelope.getExchange());System.out.println("RoutingKey:"+ envelope.getRoutingKey());System.out.println("properties:"+ properties);System.out.println("body:"+ new String(body));}};channel.basicConsume("hello", true, consumer);// 关闭资源?不要}
}
结构:1个生产者+1个队列+多个消费者
在工人之间分配任务(竞争消费者模式)
类比:在简单模式的基础上,增加了消费者的数量
图解:
应用场景:任务过多提高处理任务的速度
pom依然拥有简单模式的配置代码。
package com.itheima.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_WorkQueues {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments* queue: 队列名称* durable:是否持久化,重启之后还在* exclusive:是否独占,只能有一个消费这监听这个队列;当连接Connection关闭时,是否删除队列* autoDelete 是否自动删除* arguments 参数*/// 如果不存在自动创建,如果存在使用原先的队列channel.queueDeclare("work_queues", true, false, false, null);// 6.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body* exchange: 交换机名称,简单模式下使用默认的* routingKey:路由名称* props:配置信息* body:发送消息数据*/// 发10遍消息for (int i=1; i<=10; i++){String body = i+"Hello,workqueues ~~~~~~~~~~~~~~~~~~~~";channel.basicPublish("", "work_queues",null, body.getBytes());}// 7.释放资源channel.close();connection.close();}
}
第一位:
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_WorkQueues1 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列channel.queueDeclare("work_queues", true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+ new String(body));}};channel.basicConsume("work_queues", true, consumer);// 关闭资源?不要}
}
第二位:
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_WorkQueues2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列channel.queueDeclare("work_queues", true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+ new String(body));}};channel.basicConsume("work_queues", true, consumer);// 关闭资源?不要}
}
重要信息:必须先启动消费者。申明消费者时,在创建通道后,申明队列。消费者申明的队列要和生产者一样。
结构:1个生产者+1个交换机+多个队列+多个消费者
类比:比工作队列增加了交换机、队列。每个消费者消费相同的消息,没有竞争关系。
图解:
重要信息:
Exchange:交换机(X)。一方面,接收生产者发送的消息,一方面路由处理消息。例如,递交给某个特别队列、递交给所有队列、或是消息丢弃。
交换机常见3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key的队列
Topic通配符,把消息交割符合routing pattern(路由模式)的队列
Exchange交换机只负责转发消息,不具备存储消息的能力,因此如何没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_PubSub {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建交换机/*** String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments* exchange: 交换机名称* type: 枚举或字符串,交换机类型* DIRECT("direct"), 定向* FANOUT("fanout"), 广播(扇形),发送给每个队列* TOPIC("topic"), 通配符* HEADERS("headers"); 参数匹配* durable:是否持久化* autoDelete:是否自动删除* internal:内部使用。一般false* arguments:参数*/String exchangeName = "test_fanout";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);// 6.创建队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 7.绑定队列和交换机/*** String queue, String exchange, String routingKey* queue: 队列名称* exchange: 交换机名称* routingKey: 路由键,绑定规则* 如果交换机类型为fanout,routingKey设置为""*/channel.queueBind(queue1Name,exchangeName,"");channel.queueBind(queue2Name,exchangeName,"");// 8.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body*/String body = "日志消息:张三调用了findAll方法~~~~~~~~~~";channel.basicPublish(exchangeName,"",null,body.getBytes());// 9.释放资源channel.close();connection.close();}
}
消费者1:
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_PubSub1 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue1Name, true, consumer);// 关闭资源?不要}
}
消费者2:
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_PubSub2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.64.129");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue2Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息保存到数据库~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue2Name, true, consumer);// 关闭资源?不要}
}
结构:1个生产者+1个交换机+多个队列+多个消费者
类比:发布订阅模式下,交换机模式是广播,改成直接,增加routingkey匹配
图解:
package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_Routing {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建交换机/*** String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments* exchange: 交换机名称* type: 枚举或字符串,交换机类型* DIRECT("direct"), 定向* FANOUT("fanout"), 广播(扇形),发送给每个队列* TOPIC("topic"), 通配符* HEADERS("headers"); 参数匹配* durable:是否持久化* autoDelete:是否自动删除* internal:内部使用。一般false* arguments:参数*/String exchangeName = "test_routing";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);// 6.创建队列String queue1Name = "test_routing_queue1";String queue2Name = "test_routing_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 7.绑定队列和交换机/*** String queue, String exchange, String routingKey* queue: 队列名称* exchange: 交换机名称* routingKey: 路由键,绑定规则* 如果交换机类型为fanout,routingKey设置为""*/// 第一个绑定通道和routingkeychannel.queueBind(queue1Name,exchangeName,"error");// 第二个绑定通道和routingkeychannel.queueBind(queue2Name,exchangeName,"warning");channel.queueBind(queue2Name,exchangeName,"info");channel.queueBind(queue2Name,exchangeName,"error");// 8.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body*/String body = "日志消息:info级别消息只有队列二打印~~~~~~~~~~";channel.basicPublish(exchangeName,"info",null,body.getBytes());// 9.释放资源channel.close();connection.close();}
}
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing1 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_routing_queue1";String queue2Name = "test_routing_queue2";channel.queueDeclare(queue1Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue1Name, true, consumer);// 关闭资源?不要}
}
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Routing2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_routing_queue1";String queue2Name = "test_routing_queue2";channel.queueDeclare(queue2Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue2Name, true, consumer);// 关闭资源?不要}
}
结构:1个生产者+1个交换机+多个队列+多个消费者
类比:路由模式下,交换机类型是direct,通配符模式的交换机是topic。而且routingKey可以使用星号和#。星号描述1个单词,#表示任意个单词
图解:
生产者代码:
package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Producer_Topic {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.创建交换机/*** String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map arguments* exchange: 交换机名称* type: 枚举或字符串,交换机类型* DIRECT("direct"), 定向* FANOUT("fanout"), 广播(扇形),发送给每个队列* TOPIC("topic"), 通配符* HEADERS("headers"); 参数匹配* durable:是否持久化* autoDelete:是否自动删除* internal:内部使用。一般false* arguments:参数*/String exchangeName = "test_topic";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);// 6.创建队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 7.绑定队列和交换机/*** String queue, String exchange, String routingKey* queue: 队列名称* exchange: 交换机名称* routingKey: 路由键,绑定规则* 如果交换机类型为fanout,routingKey设置为""*/// 第一个绑定通道和routingkeychannel.queueBind(queue1Name,exchangeName,"*.orange.*");// 第二个绑定通道和routingkeychannel.queueBind(queue2Name,exchangeName,"*.*.rabbit");channel.queueBind(queue2Name,exchangeName,"Lazy.#");// 8.发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body*/String body = "apple系统日志存入数据库";channel.basicPublish(exchangeName,"apple.good.rabbit",null,body.getBytes());String body2 = "orange系统日志打印控制台";channel.basicPublish(exchangeName,"gooad.orange.good",null,body2.getBytes());// 9.释放资源channel.close();connection.close();}
}
消费者1代码:
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Topic1 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息打印到控制台~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue1Name, true, consumer);// 关闭资源?不要}
}
消费者2代码:
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer_Topic2 {public static void main(String[] args) throws IOException, TimeoutException {// 1.新建连接工厂ConnectionFactory connectionFactory = new ConnectionFactory();// 2.设置参数connectionFactory.setHost("192.168.163.128");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");// 3.获取连接 ConnectionConnection connection = connectionFactory.newConnection();// 4.创建channelChannel channel = connection.createChannel();// 5.如果不存在自动创建,如果存在使用原先的队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue2Name, true, false, false, null);// 6.接收消息/*** String queue, boolean autoAck, Consumer callback* queue: 队列名称* autoAck: 是否自动确认* callback: 回调对象*/Consumer consumer = new DefaultConsumer(channel){// 当收到消息后,会自动执行该方法/**** @param consumerTag 标识* @param envelope 获取对应信息,比如交换机、路由key。。* @param properties 配置* @param body 内容* @throws IOException 异常*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("将日志信息存入数据库~~~~~~~~~~~");System.out.println("body:"+ new String(body));}};channel.basicConsume(queue2Name, true, consumer);// 关闭资源?不要}
}