Disruptor-实例化方法

创建一个事件实体

public class LongEvent{
    private Long id;
    
    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }
}

创建一个事件实体工厂

public class LongEventFactory implements EventFactory<LongEvent> {

    public LongEvent newInstance() {
        return new LongEvent();
    }
}

创建两个事件处理类


/**
* EventHandler<LongEvent> 是没有池化的实现方式,每个消费者中只有一个示例
* WorkHandler<LongEvent> 是池化的实现方式,每个消费者中可以以类似线程池的方式去执行这个事件
* 实际根据业务场景 实现其中一个接口就可以
*/
public class A1Handler implements EventHandler<LongEvent> , WorkHandler<LongEvent> {

    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        long id = longEvent.getId();
        id+=1;
        System.out.println("**************************");
        System.out.println(Thread.currentThread().getName());
        System.out.println(System.currentTimeMillis()+"+A1Handler:"+id);
        System.out.println("**************************");
    }

    /**
     * 池化执行
     * */
    @Override
    public void onEvent(LongEvent longEvent) throws Exception {
        long id = longEvent.getId();
        id+=10;
        System.out.println("**************************");
        System.out.println(Thread.currentThread().getName());
        System.out.println(System.currentTimeMillis()+"+A1Handler:"+id);
        System.out.println("**************************");
    }
}
public class A2Handler implements EventHandler<LongEvent> , WorkHandler<LongEvent> {
    public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception {
        long id = longEvent.getId();
        id+=3;
        System.out.println("**************************");
        System.out.println(Thread.currentThread().getName());
        System.out.println(System.currentTimeMillis()+"+A2Handler:"+id);
        System.out.println("**************************");
    }

    /**
     * 池化执行
     * */
    @Override
    public void onEvent(LongEvent longEvent) throws Exception {
        long id = longEvent.getId();
        id+=30;
        System.out.println("**************************");
        System.out.println(Thread.currentThread().getName());
        System.out.println(System.currentTimeMillis()+"+A2Handler:"+id);
        System.out.println("**************************");
    }
}

实例化Disruptor

// 事件工厂
LongEventFactory factory = new LongEventFactory();

// 定义RingBuff大小  注意:数值一定是2的幂次方
int bufferSize = 1024;

// 创建ThreadFactory
ThreadFactory threadFactory = new ThreadFactory() {
    private int counter = 0;
    private String prefix = "DisruptorWorker";
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, prefix + "-" + counter++);
    }
};

/* 实例化Disruptor (官方有4个实例化方法)
 * factory 是事件工厂
 * bufferSize 是上面定义的RingBuffer的大小
 * ProducerType.MULTI 是生产者模式 有SINGL和MULTI两种
 * new BlockingWaitStrategy() 是等待阻塞策略
 */
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
    factory, 
    bufferSize, 
    threadFactory,
    ProducerType.MULTI,
    new BlockingWaitStrategy());

创建消费者

单个模式

disruptor会创建2和线程,一个线程用来执行A1Handler,一个线程用来执行A2Handler.每个线程互不影响。

disruptor.handleEventsWith(new A1Handler(),new A2Handler());

池化模式

disruptor会创建4个线程,其中两个线程组成线程池来执行A1Handler,另外两个线程也组成线程池用来执行A2Handler

disruptor.handleEventsWithWorkerPool(new A1Handler(),new A1Handler())
disruptor.handleEventsWithWorkerPool(new A2Handler(),new A2Handler())

获取RingBuffer

RingBuffer<LongEvent> ringBuffer = disruptor.start();

生产者发布消息

这边直接使用Java8的发布方式来发布消息

// 创建ByteBuffer缓冲区 
ByteBuffer bb = ByteBuffer.allocate(8);
// 生产者发布5条消息
for (long l = 0; l<5; l++) {
    // 写入消息到Bufer缓冲区
    bb.putLong(0, l);
    // Java8 lambda发布方式
    ringBuffer.publishEvent((event,sequece,buffer) -> event.setId(buffer.getLong(0)),bb);
}
System.out.println("发布完成");

完整的Main方法

public class DisruptorMain {
    public static void main(String[] args) {
        // 实例化事件工厂
        LongEventFactory factory = new LongEventFactory();
        // 定义RingBuffer大小
        int bufferSize = 1024;
        // 创建线程工厂
        ThreadFactory threadFactory = new ThreadFactory() {
            private int counter = 0;
            private String prefix = "DisruptorWorker";
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, prefix + "-" + counter++);
            }
        };
        // 实例化Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(
                factory,
                bufferSize,
                threadFactory,
                ProducerType.MULTI,
                new BlockingWaitStrategy());

        // 并行模式 两个Handler互不影响
        disruptor.handleEventsWith(new A1Handler(),new A2Handler());

        // 并行 每个消费端有两个线程实例 (池化模式)
//        disruptor.handleEventsWithWorkerPool(new A1Handler(),new A1Handler());
//        disruptor.handleEventsWithWorkerPool(new A2Handler(),new A2Handler());

        // 获取RingBuffer
        RingBuffer<LongEvent> ringBuffer = disruptor.start();
        
        // 创建ByteBuffer缓冲区
        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; l<5; l++) {
            // 写入数据到缓冲区
            bb.putLong(0, l);
            // Java8的发布方式
            ringBuffer.publishEvent((event,sequece,buffer) -> event.setId(buffer.getLong(0)),bb);
        }
        System.out.println("发布完成");
    }
}