springboot集成mqtt
创始人
2024-05-24 13:01:44
0

引入jar包

org.springframework.integrationspring-integration-mqtt

com.alibabafastjson

配置

mqtt:username: admin  #用户名password: admin  #密码address: tcp://192.168.236.136:1883   #地址通过TCP访问,realtime:topicprefix: /realtime/    #实时数据消息topic前缀

mqtt生产者配置类

@Configuration
@IntegrationComponentScan
public class MqttConfig {@Value("${mqtt.username}")private String userName;@Value("${mqtt.password}")private String passWord;@Value("${mqtt.address}")private String address;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setConnectionTimeout(10);options.setKeepAliveInterval(90);options.setAutomaticReconnect(true);options.setKeepAliveInterval(2);options.setServerURIs(new String[]{address});options.setUserName(userName);options.setPassword(passWord.toCharArray());factory.setConnectionOptions(options);return factory;}/*** @return 创建推送通道。*/@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}/*** @return 默认输出。*/@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler outbound() {// 在这里进行mqttOutboundChannel的相关设置MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("publishClient", mqttClientFactory());messageHandler.setAsync(true); //如果设置成true,发送消息时将不会阻塞。messageHandler.setDefaultTopic("mqttTopic");return messageHandler;}/*** 推送MQTT的网关。*/@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttSender {/*** 发送信息到MQTT服务器** @param data 发送的文本*/void sendToMqtt(String data);/*** 发送信息到MQTT服务器** @param topic   主题* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,String payload);/*** 发送信息到MQTT服务器** @param topic   主题* @param qos     对消息处理的几种机制。
0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。* @param payload 消息主体*/void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,@Header(MqttHeaders.QOS) int qos,String payload);} }

mqtt生产者测试

@RestController
@RequestMapping("/v1/mqtt")
public class MqttController {@Autowiredprivate MqttConfig.MqttSender mqttSender;@PostMapping("/v1/create")public HttpResult create(@RequestBody final RemoteControlRequestBean requestBean) {mqttSender.sendToMqtt(requestBean.getTopic(), 0, requestBean.getMsg());return DefaultHttpResultFactory.success("发送成功",Boolean.TRUE);}
}

mqtt消费者配置

@Slf4j
@Configuration
@IntegrationComponentScan
public class MqttConfig {@Value("${mqtt.username}")private String userName;@Value("${mqtt.password}")private String passWord;@Value("${mqtt.address}")private String address;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setConnectionTimeout(10);options.setKeepAliveInterval(90);options.setAutomaticReconnect(true);options.setKeepAliveInterval(2);options.setServerURIs(new String[]{address});options.setUserName(userName);options.setPassword(passWord.toCharArray());factory.setConnectionOptions(options);return factory;}@Beanpublic MessageProducer inbound() {String[] topics = new String[]{"test","topicTest","123"};MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter( "123_consumer", mqttClientFactory(), topics);adapter.setCompletionTimeout(30000);DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();converter.setPayloadAsBytes(true);adapter.setConverter(converter);adapter.setQos(2);adapter.setOutputChannel(mqttInputChannel());return adapter;}/*** MQTT信息通道(消费者)**/@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}/*** MQTT消息处理器(消费者)**/@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return new MessageHandler() {@Overridepublic void handleMessage(Message message) throws MessagingException {log.info("收到订阅消息: {}", message);String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();log.info("消息主题:{}", topic);Object payLoad = message.getPayload();log.info("发送的Packet数据{}", JSON.toJSONString(payLoad));}};}
}

topic定义的核心是区分业务场景。

mqtt中topic定义和规范

所有的主题名和主题过滤器必须至少包含一个字符。
主题名和主题过滤器是大小写敏感的。如:ACCOUNTS 和 Accounts 是不同的主题名。
主题名和主题过滤器可以包含空格字符。如:Accounts payable 是合法的主题名
主题名或主题过滤器以前置或后置斜杠 / 区分。如:/finance 和 finance 是不同的。
只包含斜杠 / 的主题名或主题过滤器是合法的。
主题名和主题过滤器不能包含 null 字符(Unicode U+0000)。
主题名和主题过滤器是 UTF-8 编码字符串,除了不能超过 UTF-8 编码字符串的长度限制之外,主题名或主题过滤器的层级数量没有其它限制。

mqtt中topic层级

MQTT 协议主题可以通过斜杠(“/” U+002F)将主题分割成多个层级;作为消息通道,客户端可以通过定义主题层级来实现对消息类型的细分;
例如:一个主机厂有多个车型,每个车型下面有多个车联网业务,我们在定义车机向对某个车型业务系统发消息时可以向<车型A>/ <车辆唯一标识>/<业务X>主题发消息;
当然在MQTT世界中主题可以有很多层(MQTT 协议中没有限制层级数量),比如:<车型A>/<车辆唯一标识(车架号)>/<业务X>/<子业务1>
这样,我们在定义车联网分层级的业务通道的时候可以按主题层级来设计。

topic中通配符

多层通配符#

#字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。如:订阅者可以通过订阅<车型A>/# 接收到:
<车型A>
<车型A>/<车架号1>
<车型A>/<车架号1>/<业务X>
这几类主题的消息。

单层通配符+

加号 (“+” U+002B) 用于单个主题层级匹配的通配符。如:订阅者可以通过订阅<车型A>/+ 来接收
<车型A>/<车架号1>
<车型A>/<车架号2>
不同于多层通配符,使用单层通配符的时候无法匹配子层级的主题,比如:<车型A>/<车架号1>/<业务X>的主题消息就无法接收到。

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...