rocketmq源码-consumer启动
创始人
2024-04-24 01:39:13
0

前言

这篇笔记记录consumer启动的逻辑
consumer主要是负责去broker中拉取消息,然后将拉取到的消息,交给消费者去处理
consumer本质上也是一个netty客户端,所以,在启动的时候,和producer有很多相似的点,但是有1个区别点:
1、producer的负载均衡是在真正去发送消息的时候,通过轮询的方式,选择其中的一个messageQueue;但是consumer是在启动的时候,通过负载均衡策略去分配consumerQueue

相同点:
1、producer和consumer对于broker来说,都说客户端,所以在启动时,都会启动nettyClient,nettyClient负责向broker发送消息、发送拉取消息的请求

源码

consumer的启动,入口层代码也是比较简单的,如图1所示

  1. 需要初始化一个consumer对象,这里的consumer对象和consumerGroup是存在一个映射关系的
  2. 然后订阅对应的topic即可,这里在订阅topic的时候,同时会把subExpression的信息存入到内存中,方便在后面接收到消息之后进行过滤
  3. 接着注册回调方法,这里的回调方法是真正处理消息的代码
  4. 然后调用start()方法启动即可

图1:图1

在其start()方法中,我们只关心消费者启动的代码,消息轨迹相关的,暂时先不关注,如图2所示
图2:
图2

在start()方法中,会根据当前serviceState的状态,分别进行处理
在create_just的逻辑中,我目前已经看过,看懂的,都备注在注释中了,可以参考看下,如图3所示

  1. 就是会根据消费者注册的回调方法,生成不同的service,就是并行消费和顺序消费两个service
  2. 这里还会调用mQClientFactory.registerConsumer,去注册信息,这里是把group信息和consumer信息放到了内存中的一个map集合,在后面进行负载均衡和拉取消息的时候,会用到

需要核心关注的,还是mqClientFactory.start()方法
图3:
图3

这里的mqClientFactory,之前在记录producer的笔记时,有说过,这里是共用的一个方法,但是在,消费者启动的时候,有两个方法是比较关键的,已经圈了出来;这里我们只需要知道,这两个service分别是用来去拉取消息、负载均衡的,后面单独起博客去学习
简单的记录下,pullMessageService中会启动一个线程,在run方法中,通过while判断,只有consumer没有stop,就一直去broke拉取消息

RebalanceService中:会根据负载均衡策略,给当前消费者分配messageQueue,因为一个topic有可能会有多个消费者,此时,多个消费者构成了消费者组,此时,多个消费者会分摊所有的messageQueue
在这里插入图片描述

总结

consumer端启动的时候,逻辑相对而言,比较简单,并且大部分是和producer共用的,这里需要着重关注的是,负载均衡和拉取消息的service,具体这两个service,后面会拆开详细记录,这篇就不再累述了
在这里插入图片描述

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...
有效的括号 一、题目 给定一个只包括 '(',')','{','}'...