-
disruptor笔记之二:Disruptor类分析
本篇概览
- 通过前文的实战,咱们对Disruptor有了初步认识,借助com.lmax.disruptor.dsl.Disruptor类可以轻松完成以下操作:
- 环形队列初始化
- 指定事件消费者
- 启动消费者线程
- 接下来要面对两个问题:
- 深入了解Disruptor类是如何完成上述操作的;
- 对Disruptor类有了足够了解时,尝试不用Disruptor,自己动手操作环形队列,实现消息的生产和消费,这样做的目的是加深对Disruptor内部的认识,做到知其所以然;
- 接下来咱们先解决第一个问题吧,结合Disruptor对象的源码来看看上述三个操作到底做了什么;
环形队列初始化
- 环形队列初始化发生在实例化Disruptor对象的时候,即Disruptor的构造方法:
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}
- RingBuffer.createMultiProducer方法内部实例化了RingBuffer,如下图红框:
- 记下第一个重要知识点:创建RingBuffer对象;
指定事件消费者
- 在前文中,下面这行代码指定了事件由StringEventHandler消费:
disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter));
- 查看handleEventsWith方法的内部:
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers)
{
return createEventProcessors(new Sequence[0], handlers);
}
- 展开createEventProcessors方法,如下图,请重点关注创建SequenceBarrier和BatchEventProcessor等操作:
- 展开上图红框四中的updateGatingSequencesForNextInChain方法,如下图,红框中的ringBuffer.addGatingSequences需要重点关注:
- 小结一下,disruptor.handleEventsWith方法涉及到四个重要知识点:
- 创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件
- 创建BatchEventProcessor,负责消费事件
- 绑定BatchEventProcessor对象的异常处理类
- 调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer
启动消费者线程
- 前文已通过日志确定了消费事件的逻辑是在一个独立的线程中执行的,启动消费者线程的代码如下:
disruptor.start();
- 展开start方法,如下可见,关键代码是consumerInfo.start(executor):
public RingBuffer<T> start()
{
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository)
{
consumerInfo.start(executor);
}
return ringBuffer;
}
- ConsumerInfo是接口,对应的实现类有EventProcessorInfo和WorkerPoolInfo两种,这里应该是哪种呢?既然来源是consumerRepository,这就要看当初是怎么存入consumerRepository的,前面在分析createEventProcessors方法时,下图红框中的consumerRepository.add被忽略了,现在需要进去看看:
- 进去后一目了然,可见ConsumerInfo的实现是EventProcessorInfo:
- 所以,回到前面对consumerInfo.start(executor)方法的分析,这里要看的就是EventProcessorInfo的start方法了,如下图,非常简单,就是启动一个线程执行eventprocessor(这个eventprocessor是BatchEventProcessor对象):
- 小结一下,disruptor.start方法涉及到一个重要知识点:
- 启动独立线程,用来执行消费事件的业务逻辑;
消费事件的逻辑
- 为了理解消息处理逻辑,还要重点关注BatchEventProcessor.processEvents方法,如下图所示,其实也很简单,就是不停的从环形队列取出可用的事件,然后再更新自己的Sequence,相当于标记已经消费到哪里了:
总结
最后总结Disruptor类的重要功能:
- 创建环形队列(RingBuffer对象)
- 创建SequenceBarrier对象,用于接收ringBuffer中的可消费事件
- 创建BatchEventProcessor,负责消费事件
- 绑定BatchEventProcessor对象的异常处理类
- 调用ringBuffer.addGatingSequences,将消费者的Sequence传给ringBuffer
- 启动独立线程,用来执行消费事件的业务逻辑
- 聪明的您一定会发现,本文并没有全面分析Disruptor类的源码,例如after、shutdown等方法都没有提到,确实如此,欣宸在此给您道歉了,本篇的重点是找出那些与基本功能有关代码,为后面的实战提供理论指导(不用Disruptor类实现消息生产消费的实战),因此很多高级功能都跳过了;
理解官方流程图
- 此时再看官方流程图,聪明的您应该很快就能理解此图表达的意思:每个消费者都有自己的Sequence,通过此Sequence取得自己在环形队列中消费的位置,再通过SequenceBarrier来等待可用事件的出现,等到事件出现了就用get方法取出具体的事件,给EventHandler来处理:
后续预告
- 此时,咱们对Disruptor类已经有了比较深入的理解,接下来的文章,咱们会尝试不用Disruptor类,仅凭着对RingBuffer对象的操作来实现以下三种功能:
- 100个事件,单个消费者消费;
- 100个事件,三个消费者,每个都独自消费这个100个事件;
- 100个事件,三个消费者共同消费这个100个事件;
来源:
https://www.cnblogs.com/bolingcavalry/p/15333794.html
最新更新
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
Python初学者友好丨详解参数传递类型
如何有效管理爬虫流量?
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比
一款纯 JS 实现的轻量化图片编辑器
关于开发 VS Code 插件遇到的 workbench.scm.
前端设计模式——观察者模式
前端设计模式——中介者模式
创建型-原型模式