Java集合之Disruptor操作示例
作者:上善若泪
Disruptor简介
Disruptor
是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 选择大奖)。
Disruptor
提供的功能类似于 Kafka
、RocketMQ
这类分布式队列,不过,其作为范围是 JVM
(内存),Disruptor
解决了 JDK
内置线程安全队列的性能和内存安全问题,Disruptor
有个最大的优点就是快
Disruptor
被设计用于在生产者
—消费者
(producer-consumer problem
,简称PCP
)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟Disruptor
是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS
,除金融领域之外,其他一般的应用中都可以用到 Disruptor
,它可以带来显著的性能提升。其实 Disruptor
与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在并发、缓冲区、生产者—消费者模型、事务处理
这些元素的程序来说,Disruptor
提出了一种大幅提升性能(TPS)的方案。
github 地址
Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html
Java中线程安全队列
JDK 中常见的线程安全的队列如下:
队列名字 | 锁 | 是否有界 |
---|---|---|
ArrayBlockingQueue | 加锁(ReentrantLock) | 有界 |
LinkedBlockingQueue | 加锁(ReentrantLock) | 有界 |
LinkedTransferQueue | 无锁(CAS) | 无界 |
ConcurrentLinkedQueue | 无锁(CAS) | 无界 |
从上表中可以看出:这些队列要不就是加锁有界,要不就是无锁无界。而加锁的的队列势必会影响性能,无界的队列又存在内存溢出的风险。
因此,一般情况下,我们都是不建议使用 JDK 内置线程安全队列。Disruptor
就不一样了!它在无锁的情况下还能保证队列有界,并且还是线程安全的。
Disruptor 核心概念
Disruptor
核心概念:
Event
:可以把Event
理解为存放在队列中等待消费的消息对象。
在Disruptor
的语义中,生产者和消费者之间进行交换的数据被称为事件(Event
)。它不是一个被Disruptor
定义的特定类型,而是由Disruptor
的使用者定义并指定。EventFactory
:事件工厂用于生产事件,我们在初始化Disruptor
类的时候需要用到。EventHandler
:Event
在对应的Handler
中被处理,你可以将其理解为生产消费者模型中的消费者。Disruptor
定义的事件处理接口,由用户实现,用于处理事件,是Consumer
的真正实现EventProcessor
:EventProcessor
持有特定消费者(Consumer
)的Sequence
,并提供用于调用事件处理实现的事件循环(Event Loop
)Disruptor
:事件的生产和消费需要用到Disruptor
对象。RingBuffer
:RingBuffer
(环形数组)用于保存事件
。
如其名,环形的缓冲区。曾经RingBuffer
是Disruptor
中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过Disruptor
进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer
可以由用户的自定义实现来完全替代。WaitStrategy
:等待策略。决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。定义Consumer
如何进行等待下一个事件的策略。(注:Disruptor
定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)Producer
:生产者,只是泛指调用Disruptor
发布事件的用户代码,Disruptor
没有定义特定接口或类型ProducerType
:指定是单个事件发布者模式
还是多个事件发布者模式
(发布者和生产者的意思类似)。Sequencer
:Sequencer
是Disruptor
的真正核心。此接口有两个实现类 -SingleProducerSequencer
、MultiProducerSequencer
,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。Sequence Disruptor
:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个Sequence
用于跟踪标识某个特定的事件处理者(RingBuffer/Consumer
)的处理进度。
虽然一个AtomicLong
也可以用于标识进度,但定义Sequence
来负责该问题还有另一个目的,那就是防止不同的Sequence
之间的CPU
缓存伪共享(Flase Sharing
)问题。(注:这是Disruptor
实现高性能的关键点之一)Sequence Barrier
:用于保持对RingBuffer
的main published Sequence
和Consumer
依赖的其它Consumer
的Sequence
的引用。Sequence Barrier
还定义了决定Consumer
是否还有可处理的事件的逻辑。
操作
坐标依赖
pom.xml
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.4</version> </dependency>
Gradle:
implementation 'com.lmax:disruptor:3.4.4'
创建事件
我们先来定义一个代表日志事件的类:LogEvent 。
事件中包含了一些和事件相关的属性,比如我们这里定义的 LogEvent 对象中就有一个用来表示日志消息内容的属性:message。
@Data public class LogEvent { private String message; }
我们这里只是为了演示,实际项目中,一个标准日志事件对象所包含的属性肯定不是只有一个 message
创建事件工厂
创建一个工厂类 LogEventFactory 用来创建 LogEvent 对象。
LogEventFactory 继承 EventFactory
接口并实现了 newInstance()
方法 。
public class LogEventFactory implements EventFactory<LogEvent> { @Override public LogEvent newInstance() { return new LogEvent(); } }
创建处理事件Handler--消费者
创建一个用于处理后续发布的事件的类:LogEventHandler 。
LogEventHandler 继承 EventHandler
接口并实现了 onEvent()
方法 。
public class LogEventHandler implements EventHandler<LogEvent> { @Override public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception { System.out.println(logEvent.getMessage()); } }
EventHandler
接口的 onEvent()
方法共有 3 个参数:
event
:待消费/处理
的事件sequence
:正在处理的事件在环形数组(RingBuffer)中的位置endOfBatch
:表示这是否是来自环形数组(RingBuffer)中一个批次的最后一个事件(批量处理事件)
初始化 Disruptor
静态类
我们这里定义一个方法用于获取 Disruptor 对象
private static Disruptor<LogEvent> getLogEventDisruptor() { // 创建 LogEvent 的工厂 LogEventFactory logEventFactory = new LogEventFactory(); // Disruptor 的 RingBuffer 缓存大小 int bufferSize = 1024 * 1024; // 生产者的线程工厂 ThreadFactory threadFactory = new ThreadFactory() { final AtomicInteger threadNum = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]"); } }; //实例化 Disruptor return new Disruptor<>( logEventFactory, bufferSize, threadFactory, // 单生产者 ProducerType.SINGLE, // 阻塞等待策略 new BlockingWaitStrategy()); }
配置类
使用配置类的方式
@Configuration public class MQManager { @Bean("messageModel") public RingBuffer<LogEvent> messageModelRingBuffer() { //定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理 // 生产者的线程工厂 ThreadFactory threadFactory = new ThreadFactory() { final AtomicInteger threadNum = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]"); } }; //指定事件工厂 LogEventFactory factory = new LogEventFactory(); //指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率 int bufferSize = 1024 * 256; //单线程模式,获取额外的性能 Disruptor<LogEvent> disruptor = new Disruptor<>(factory, bufferSize, threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); //设置事件业务处理器---消费者 //Disruptor 的 handleEventsWith 方法来绑定处理事件的 Handler 对象。 disruptor.handleEventsWith(new LogEventHandler ()); // Disruptor 可以设置多个处理事件的 Handler,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。 //就比如下面的代码表示 Handler1 和 Handler2 是并行执行,最后再执行 Handler3 。 //disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3()); // 启动disruptor线程 disruptor.start(); //获取ringbuffer环,用于接取生产者生产的事件 RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); return ringBuffer; }
Disruptor 构造函数讲解
Disruptor 的推荐使用的构造函数如下:
public class Disruptor<T> { public Disruptor( final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory, final ProducerType producerType, final WaitStrategy waitStrategy) { this( RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), new BasicExecutor(threadFactory)); } ...... }
我们需要传递 5 个参数:
eventFactory
:我们自定义的事件工厂。ringBufferSize
:指定RingBuffer
的容量大小。threadFactory
:自定义的线程工厂。Disruptor 的默认线程池是自定义的,我们只需要传入线程工厂即可。producerType
:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。waitStrategy
:等待策略,决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。
ProducerType
的源码如下,它是一个包含两个变量的枚举类型
SINGLE
:单个事件发布者模式,不需要保证线程安全。MULTI
:多个事件发布者模式,基于 CAS 来保证线程安全。
WaitStrategy
(等待策略)接口的实现类中只有两个方法:
waitFor()
:等待新事件的到来。signalAllWhenBlocking()
:唤醒所有等待的消费者。
public interface WaitStrategy { long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier) throws AlertException, InterruptedException, TimeoutException; void signalAllWhenBlocking(); }
WaitStrategy 的实现类共有 8 个,也就是说共有 8 种等待策略可供选择。
除了上面介绍的这个构造函数之外,Disruptor 还有一个只有 3 个参数构造函数。
使用这个构造函数创建的 Disruptor
对象会默认使用 ProducerType.MULTI
(多个事件发布者模式)和 BlockingWaitStrategy
(阻塞等待策略) 。
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory) { this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory)); }
发布事件main方法测试
//获取 Disruptor 对象 Disruptor<LogEvent> disruptor = getLogEventDisruptor(); //绑定处理事件的Handler对象 disruptor.handleEventsWith(new LogEventHandler()); //启动 Disruptor disruptor.start(); //获取保存事件的环形数组(RingBuffer) RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer(); //发布 10w 个事件 for (int i = 1; i <= 100000; i++) { // 通过调用 RingBuffer 的 next() 方法获取下一个空闲事件槽的序号 long sequence = ringBuffer.next(); try { LogEvent logEvent = ringBuffer.get(sequence); // 初始化 Event,对其赋值 logEvent.setMessage("这是第%d条日志消息".formatted(i)); } finally { // 发布事件 ringBuffer.publish(sequence); } } // 关闭 Disruptor disruptor.shutdown();
使用配置方式
public interface DisruptorMqService { /** * 消息 * @param message */ void sayHelloMq(String message); } @Slf4j @Component @Service public class DisruptorMqServiceImpl implements DisruptorMqService { @Autowired private RingBuffer<LogEvent> messageModelRingBuffer; @Override public void sayHelloMq(String message) { log.info("record the message: {}",message); //获取下一个Event槽的下标 long sequence = messageModelRingBuffer.next(); try { //给Event填充数据 MessageModel event = messageModelRingBuffer.get(sequence); event.setMessage(message); log.info("往消息队列中添加消息:{}", event); } catch (Exception e) { log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage()); } finally { //发布Event,激活观察者去消费,将sequence传递给改消费者 //注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer messageModelRingBuffer.publish(sequence); } } }
以上就是Java集合之Disruptor操作示例的详细内容,更多关于Java集合Disruptor的资料请关注脚本之家其它相关文章!