Disruptor-快速开始
创建一个事件实体
public class LongEvent{
private Long id;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
}
创建一个事件实体工厂
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
创建两个事件处理类
/**
* EventHandler<LongEvent> 是没有池化的实现方式,每个消费者中只有一个示例
* WorkHandler<LongEvent> 是池化的实现方式,每个消费者中可以以类似线程池的方式去执行这个事件
* 实际根据业务场景 实现其中一个接口就可以
*/
public class A1Handler implements EventHandler<LongEvent> , WorkHandler<LongEvent> {
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
long id = longEvent.getId();
id+=1;
System.out.println("**************************");
System.out.println(Thread.currentThread().getName());
System.out.println(System.currentTimeMillis()+"+A1Handler:"+id);
System.out.println("**************************");
}
/**
* 池化执行
* */
@Override
public void onEvent(LongEvent longEvent) throws Exception {
long id = longEvent.getId();
id+=10;
System.out.println("**************************");
System.out.println(Thread.currentThread().getName());
System.out.println(System.currentTimeMillis()+"+A1Handler:"+id);
System.out.println("**************************");
}
}
public class A2Handler implements EventHandler<LongEvent> , WorkHandler<LongEvent> {
public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
long id = longEvent.getId();
id+=3;
System.out.println("**************************");
System.out.println(Thread.currentThread().getName());
System.out.println(System.currentTimeMillis()+"+A2Handler:"+id);
System.out.println("**************************");
}
/**
* 池化执行
* */
@Override
public void onEvent(LongEvent longEvent) throws Exception {
long id = longEvent.getId();
id+=30;
System.out.println("**************************");
System.out.println(Thread.currentThread().getName());
System.out.println(System.currentTimeMillis()+"+A2Handler:"+id);
System.out.println("**************************");
}
}
实例化Disruptor
// 事件工厂
LongEventFactory factory = new LongEventFactory();
// 定义RingBuff大小 注意:数值一定是2的幂次方
int bufferSize = 1024;
// 创建ThreadFactory
ThreadFactory threadFactory = new ThreadFactory() {
private int counter = 0;
private String prefix = "DisruptorWorker";
@Override
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
};
/* 实例化Disruptor (官方有4个实例化方法)
* factory 是事件工厂
* bufferSize 是上面定义的RingBuffer的大小
* ProducerType.MULTI 是生产者模式 有SINGL和MULTI两种
* new BlockingWaitStrategy() 是等待阻塞策略
*/
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
factory,
bufferSize,
threadFactory,
ProducerType.MULTI,
new BlockingWaitStrategy());
创建消费者
单个模式
disruptor会创建2和线程,一个线程用来执行A1Handler
,一个线程用来执行A2Handler
.每个线程互不影响。
disruptor.handleEventsWith(new A1Handler(),new A2Handler());
池化模式
disruptor会创建4个线程,其中两个线程组成线程池来执行A1Handler
,另外两个线程也组成线程池用来执行A2Handler
disruptor.handleEventsWithWorkerPool(new A1Handler(),new A1Handler())
disruptor.handleEventsWithWorkerPool(new A2Handler(),new A2Handler())
获取RingBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.start();
生产者发布消息
这边直接使用Java8的发布方式来发布消息
// 创建ByteBuffer缓冲区
ByteBuffer bb = ByteBuffer.allocate(8);
// 生产者发布5条消息
for (long l = 0; l<5; l++) {
// 写入消息到Bufer缓冲区
bb.putLong(0, l);
// Java8 lambda发布方式
ringBuffer.publishEvent((event,sequece,buffer) -> event.setId(buffer.getLong(0)),bb);
}
System.out.println("发布完成");
完整的Main方法
public class DisruptorMain {
public static void main(String[] args) {
// 实例化事件工厂
LongEventFactory factory = new LongEventFactory();
// 定义RingBuffer大小
int bufferSize = 1024;
// 创建线程工厂
ThreadFactory threadFactory = new ThreadFactory() {
private int counter = 0;
private String prefix = "DisruptorWorker";
@Override
public Thread newThread(Runnable r) {
return new Thread(r, prefix + "-" + counter++);
}
};
// 实例化Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
factory,
bufferSize,
threadFactory,
ProducerType.MULTI,
new BlockingWaitStrategy());
// 并行模式 两个Handler互不影响
disruptor.handleEventsWith(new A1Handler(),new A2Handler());
// 并行 每个消费端有两个线程实例 (池化模式)
// disruptor.handleEventsWithWorkerPool(new A1Handler(),new A1Handler());
// disruptor.handleEventsWithWorkerPool(new A2Handler(),new A2Handler());
// 获取RingBuffer
RingBuffer<LongEvent> ringBuffer = disruptor.start();
// 创建ByteBuffer缓冲区
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; l<5; l++) {
// 写入数据到缓冲区
bb.putLong(0, l);
// Java8的发布方式
ringBuffer.publishEvent((event,sequece,buffer) -> event.setId(buffer.getLong(0)),bb);
}
System.out.println("发布完成");
}
}