Java集合之Disruptor操作示例
作者:上善若泪
Disruptor简介
Disruptor 是一个开源的高性能内存队列,由英国外汇交易公司 LMAX 开发的,获得了 2011 年的 Oracle 官方的 Duke's Choice Awards(Duke 选择大奖)。
Disruptor 提供的功能类似于 Kafka、RocketMQ 这类分布式队列,不过,其作为范围是 JVM(内存),Disruptor 解决了 JDK 内置线程安全队列的性能和内存安全问题,Disruptor 有个最大的优点就是快
Disruptor被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到 Disruptor,它可以带来显著的性能提升。其实 Disruptor 与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在并发、缓冲区、生产者—消费者模型、事务处理这些元素的程序来说,Disruptor 提出了一种大幅提升性能(TPS)的方案。
github 地址
Github 地址:https://github.com/LMAX-Exchange/disruptor
官方教程:https://lmax-exchange.github.io/disruptor/user-guide/index.html
Java中线程安全队列
JDK 中常见的线程安全的队列如下:
| 队列名字 | 锁 | 是否有界 |
|---|---|---|
| ArrayBlockingQueue | 加锁(ReentrantLock) | 有界 |
| LinkedBlockingQueue | 加锁(ReentrantLock) | 有界 |
| LinkedTransferQueue | 无锁(CAS) | 无界 |
| ConcurrentLinkedQueue | 无锁(CAS) | 无界 |
从上表中可以看出:这些队列要不就是加锁有界,要不就是无锁无界。而加锁的的队列势必会影响性能,无界的队列又存在内存溢出的风险。
因此,一般情况下,我们都是不建议使用 JDK 内置线程安全队列。Disruptor 就不一样了!它在无锁的情况下还能保证队列有界,并且还是线程安全的。
Disruptor 核心概念
Disruptor 核心概念:
Event:可以把Event理解为存放在队列中等待消费的消息对象。
在Disruptor的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被Disruptor定义的特定类型,而是由Disruptor的使用者定义并指定。EventFactory:事件工厂用于生产事件,我们在初始化Disruptor类的时候需要用到。EventHandler:Event在对应的Handler中被处理,你可以将其理解为生产消费者模型中的消费者。Disruptor定义的事件处理接口,由用户实现,用于处理事件,是Consumer的真正实现EventProcessor:EventProcessor持有特定消费者(Consumer)的Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)Disruptor:事件的生产和消费需要用到Disruptor对象。RingBuffer:RingBuffer(环形数组)用于保存事件。
如其名,环形的缓冲区。曾经RingBuffer是Disruptor中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过Disruptor进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer可以由用户的自定义实现来完全替代。WaitStrategy:等待策略。决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。定义Consumer如何进行等待下一个事件的策略。(注:Disruptor定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)Producer:生产者,只是泛指调用Disruptor发布事件的用户代码,Disruptor没有定义特定接口或类型ProducerType:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似)。Sequencer:Sequencer是Disruptor的真正核心。此接口有两个实现类 -SingleProducerSequencer、MultiProducerSequencer,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。Sequence Disruptor:通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个Sequence用于跟踪标识某个特定的事件处理者(RingBuffer/Consumer)的处理进度。
虽然一个AtomicLong也可以用于标识进度,但定义Sequence来负责该问题还有另一个目的,那就是防止不同的Sequence之间的CPU缓存伪共享(Flase Sharing)问题。(注:这是Disruptor实现高性能的关键点之一)Sequence Barrier:用于保持对RingBuffer的main published Sequence和Consumer依赖的其它Consumer的Sequence的引用。Sequence Barrier还定义了决定Consumer是否还有可处理的事件的逻辑。

操作
坐标依赖
pom.xml
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.4</version>
</dependency>Gradle:
implementation 'com.lmax:disruptor:3.4.4'
创建事件
我们先来定义一个代表日志事件的类:LogEvent 。
事件中包含了一些和事件相关的属性,比如我们这里定义的 LogEvent 对象中就有一个用来表示日志消息内容的属性:message。
@Data
public class LogEvent {
private String message;
}我们这里只是为了演示,实际项目中,一个标准日志事件对象所包含的属性肯定不是只有一个 message
创建事件工厂
创建一个工厂类 LogEventFactory 用来创建 LogEvent 对象。
LogEventFactory 继承 EventFactory 接口并实现了 newInstance() 方法 。
public class LogEventFactory implements EventFactory<LogEvent> {
@Override
public LogEvent newInstance() {
return new LogEvent();
}
}创建处理事件Handler--消费者
创建一个用于处理后续发布的事件的类:LogEventHandler 。
LogEventHandler 继承 EventHandler 接口并实现了 onEvent() 方法 。
public class LogEventHandler implements EventHandler<LogEvent> {
@Override
public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) throws Exception {
System.out.println(logEvent.getMessage());
}
}EventHandler 接口的 onEvent() 方法共有 3 个参数:
event:待消费/处理的事件sequence:正在处理的事件在环形数组(RingBuffer)中的位置endOfBatch:表示这是否是来自环形数组(RingBuffer)中一个批次的最后一个事件(批量处理事件)
初始化 Disruptor
静态类
我们这里定义一个方法用于获取 Disruptor 对象
private static Disruptor<LogEvent> getLogEventDisruptor() {
// 创建 LogEvent 的工厂
LogEventFactory logEventFactory = new LogEventFactory();
// Disruptor 的 RingBuffer 缓存大小
int bufferSize = 1024 * 1024;
// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
}
};
//实例化 Disruptor
return new Disruptor<>(
logEventFactory,
bufferSize,
threadFactory,
// 单生产者
ProducerType.SINGLE,
// 阻塞等待策略
new BlockingWaitStrategy());
}配置类
使用配置类的方式
@Configuration
public class MQManager {
@Bean("messageModel")
public RingBuffer<LogEvent> messageModelRingBuffer() {
//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理
// 生产者的线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
final AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "LogEventThread" + " [#" + threadNum.incrementAndGet() + "]");
}
};
//指定事件工厂
LogEventFactory factory = new LogEventFactory();
//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率
int bufferSize = 1024 * 256;
//单线程模式,获取额外的性能
Disruptor<LogEvent> disruptor = new Disruptor<>(factory,
bufferSize,
threadFactory,
ProducerType.SINGLE,
new BlockingWaitStrategy());
//设置事件业务处理器---消费者
//Disruptor 的 handleEventsWith 方法来绑定处理事件的 Handler 对象。
disruptor.handleEventsWith(new LogEventHandler ());
// Disruptor 可以设置多个处理事件的 Handler,并且可以灵活的设置消费者的处理顺序,串行,并行都是可以的。
//就比如下面的代码表示 Handler1 和 Handler2 是并行执行,最后再执行 Handler3 。
//disruptor.handleEventsWith(new Handler1(), new Handler2()).handleEventsWith(new Handler3());
// 启动disruptor线程
disruptor.start();
//获取ringbuffer环,用于接取生产者生产的事件
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
return ringBuffer;
}Disruptor 构造函数讲解
Disruptor 的推荐使用的构造函数如下:
public class Disruptor<T> {
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
......
}我们需要传递 5 个参数:
eventFactory:我们自定义的事件工厂。ringBufferSize:指定RingBuffer的容量大小。threadFactory:自定义的线程工厂。Disruptor 的默认线程池是自定义的,我们只需要传入线程工厂即可。producerType:指定是单个事件发布者模式还是多个事件发布者模式(发布者和生产者的意思类似,我个人比较喜欢用发布者)。waitStrategy:等待策略,决定了没有事件可以消费的时候,事件消费者如何等待新事件的到来。
ProducerType 的源码如下,它是一个包含两个变量的枚举类型
SINGLE:单个事件发布者模式,不需要保证线程安全。MULTI:多个事件发布者模式,基于 CAS 来保证线程安全。
WaitStrategy (等待策略)接口的实现类中只有两个方法:
waitFor():等待新事件的到来。signalAllWhenBlocking():唤醒所有等待的消费者。
public interface WaitStrategy
{
long waitFor(long sequence, Sequence cursor, Sequence dependentSequence, SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException;
void signalAllWhenBlocking();
}
WaitStrategy 的实现类共有 8 个,也就是说共有 8 种等待策略可供选择。

除了上面介绍的这个构造函数之外,Disruptor 还有一个只有 3 个参数构造函数。
使用这个构造函数创建的 Disruptor 对象会默认使用 ProducerType.MULTI(多个事件发布者模式)和 BlockingWaitStrategy(阻塞等待策略) 。
public Disruptor(final EventFactory<T> eventFactory, final int ringBufferSize, final ThreadFactory threadFactory)
{
this(RingBuffer.createMultiProducer(eventFactory, ringBufferSize), new BasicExecutor(threadFactory));
}发布事件main方法测试
//获取 Disruptor 对象
Disruptor<LogEvent> disruptor = getLogEventDisruptor();
//绑定处理事件的Handler对象
disruptor.handleEventsWith(new LogEventHandler());
//启动 Disruptor
disruptor.start();
//获取保存事件的环形数组(RingBuffer)
RingBuffer<LogEvent> ringBuffer = disruptor.getRingBuffer();
//发布 10w 个事件
for (int i = 1; i <= 100000; i++) {
// 通过调用 RingBuffer 的 next() 方法获取下一个空闲事件槽的序号
long sequence = ringBuffer.next();
try {
LogEvent logEvent = ringBuffer.get(sequence);
// 初始化 Event,对其赋值
logEvent.setMessage("这是第%d条日志消息".formatted(i));
} finally {
// 发布事件
ringBuffer.publish(sequence);
}
}
// 关闭 Disruptor
disruptor.shutdown();使用配置方式
public interface DisruptorMqService {
/**
* 消息
* @param message
*/
void sayHelloMq(String message);
}
@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {
@Autowired
private RingBuffer<LogEvent> messageModelRingBuffer;
@Override
public void sayHelloMq(String message) {
log.info("record the message: {}",message);
//获取下一个Event槽的下标
long sequence = messageModelRingBuffer.next();
try {
//给Event填充数据
MessageModel event = messageModelRingBuffer.get(sequence);
event.setMessage(message);
log.info("往消息队列中添加消息:{}", event);
} catch (Exception e) {
log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
} finally {
//发布Event,激活观察者去消费,将sequence传递给改消费者
//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producer
messageModelRingBuffer.publish(sequence);
}
}
}以上就是Java集合之Disruptor操作示例的详细内容,更多关于Java集合Disruptor的资料请关注脚本之家其它相关文章!
