Disruptor实战和笔记之二:Disruptor类分析
创始人
2024-05-24 21:54:18
0

1 本篇概览

  • 通过前文的实战,咱们对Disruptor有了初步认识,借助com.lmax.disruptor.dsl.Disruptor类可以轻松完成以下操作:

  1. 环形队列初始化

  1. 指定事件消费者

  1. 启动消费者线程

  • 接下来要面对两个问题:

  1. 深入了解Disruptor类是如何完成上述操作的;

  1. 对Disruptor类有了足够了解时,尝试不用Disruptor,自己动手操作环形队列,实现消息的生产和消费,这样做的目的是加深对Disruptor内部的认识,做到知其所以然;

  • 接下来咱们先解决第一个问题吧,结合Disruptor对象的源码来看看上述三个操作到底做了什么;

2 环形队列初始化

  • 环形队列初始化发生在实例化Disruptor对象的时候,即Disruptor的构造方法:

public Disruptor(final EventFactory eventFactory, final int ringBufferSize, final ThreadFactory threadFactory){this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));}
  • RingBuffer.createMultiProducer方法内部实例化了RingBuffer,如下图红框:

  • 记下第一个重要知识点:创建RingBuffer对象;

3 指定事件消费者

  • 在前文中,下面这行代码指定了事件由StringEventHandler消费:

disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));
  • 查看handleEventsWith方法的内部:

public final EventHandlerGroup handleEventsWith(final EventHandler... handlers)
{return createEventProcessors(new Sequence[0], handlers);
}
  • 展开createEventProcessors方法,如下图,请重点关注创建SequenceBarrier和BatchEventProcessor等操作:

  • 展开上图红框四中的updateGatingSequencesForNextInChain方法,如下图,红框中的ringBuffer.addGatingSequences需要重点关注:

  • 小结一下,disruptor.handleEventsWith方法涉及到四个重要知识点:

  1. 创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件

  1. 创建BatchEventProcessor,负责消费事件

  1. 绑定BatchEventProcessor对象的异常处理类

  1. 调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer

4 启动消费者线程

  • 前文已通过日志确定了消费事件的逻辑是在一个独立的线程中执行的,启动消费者线程的代码如下:

disruptor.start();
  • 展开start方法,如下可见,关键代码是consumerInfo.start(executor):

    public RingBuffer start(){checkOnlyStartedOnce();for (final ConsumerInfo consumerInfo : consumerRepository){consumerInfo.start(executor);}return ringBuffer;}
  • ConsumerInfo是接口,对应的实现类有EventProcessorInfo和WorkerPoolInfo两种,这里应该是哪种呢?既然来源是consumerRepository,这就要看当初是怎么存入consumerRepository的,前面在分析createEventProcessors方法时,下图红框中的consumerRepository.add被忽略了,现在需要进去看看:

  • 进去后一目了然,可见ConsumerInfo的实现是EventProcessorInfo:

  • 所以,回到前面对consumerInfo.start(executor)方法的分析,这里要看的就是EventProcessorInfo的start方法了,如下图,非常简单,就是启动一个线程执行eventprocessor(这个eventprocessor是BatchEventProcessor对象):

  • 小结一下,disruptor.start方法涉及到一个重要知识点:

  1. 启动独立线程,用来执行消费事件的业务逻辑;

5 消费事件的逻辑

  • 为了理解消息处理逻辑,还要重点关注BatchEventProcessor.processEvents方法,如下图所示,其实也很简单,就是不停的从环形队列取出可用的事件,然后再更新自己的Sequence,相当于标记已经消费到哪里了:

6 总结

最后总结Disruptor类的重要功能:

  1. 创建环形队列(RingBuffer对象)

  1. 创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件

  1. 创建BatchEventProcessor,负责消费事件

  1. 绑定BatchEventProcessor对象的异常处理类

  1. 调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer

  1. 启动独立线程,用来执行消费事件的业务逻辑

  • 聪明的您一定会发现,本文并没有全面分析Disruptor类的源码,例如after、shutdown等方法都没有提到,确实如此,欣宸在此给您道歉了,本篇的重点是找出那些与基本功能有关代码,为后面的实战提供理论指导(不用Disruptor类实现消息生产消费的实战),因此很多高级功能都跳过了;

理解官方流程图

  • 此时再看官方流程图,聪明的您应该很快就能理解此图表达的意思:每个消费者都有自己的Sequence,通过此Sequence取得自己在环形队列中消费的位置,再通过SequenceBarrier来等待可用事件的出现,等到事件出现了就用get方法取出具体的事件,给EventHandler来处理:

7 后续预告

  • 此时,咱们对Disruptor类已经有了比较深入的理解,接下来的文章,咱们会尝试不用Disruptor类,仅凭着对RingBuffer对象的操作来实现以下三种功能:

  1. 100个事件,单个消费者消费;

  1. 100个事件,三个消费者,每个都独自消费这个100个事件;

  1. 100个事件,三个消费者共同消费这个100个事件;

8 说明

需要源码的同志们可以私聊我

9 相关文章

一、Disruptor实战和笔记之一:快速入门

二、Disruptor实战和笔记之二:Disruptor类分析

上一篇:洛谷——P1077 摆花

下一篇:MFC常用技巧

相关内容

热门资讯

监控摄像头接入GB28181平... 流程简介将监控摄像头的视频在网站和APP中直播,要解决的几个问题是:1&...
Windows10添加群晖磁盘... 在使用群晖NAS时,我们需要通过本地映射的方式把NAS映射成本地的一块磁盘使用。 通过...
protocol buffer... 目录 目录 什么是protocol buffer 1.protobuf 1.1安装  1.2使用...
在Word、WPS中插入AxM... 引言 我最近需要写一些文章,在排版时发现AxMath插入的公式竟然会导致行间距异常&#...
【PdgCntEditor】解... 一、问题背景 大部分的图书对应的PDF,目录中的页码并非PDF中直接索引的页码...
Fluent中创建监测点 1 概述某些仿真问题,需要创建监测点,用于获取空间定点的数据࿰...
educoder数据结构与算法...                                                   ...
MySQL下载和安装(Wind... 前言:刚换了一台电脑,里面所有东西都需要重新配置,习惯了所...
修复 爱普生 EPSON L4... L4151 L4153 L4156 L4158 L4163 L4165 L4166 L4168 L4...
MFC文件操作  MFC提供了一个文件操作的基类CFile,这个类提供了一个没有缓存的二进制格式的磁盘...