首页 Java正文

Disruptor并发框架

webpro Java 2019-05-26 213 0

一、Disruptor框架介绍


极高性能、并发、无锁的编程框架 - Disruptor

  • 建立在JVM平台上

  • 每秒可处理6百万订单[官方自述]

  • 运行在内存中

  • 采用事件源驱动方式

  • 无锁的Queue(高并发无锁队列事件)


[RingBuffer]是Disruptor的核心

RingBuffer采用数组实现,无首尾指针

Quene:

image.png

RingBuffer是一个环形队列,起到缓存的效果

image.png=>image.png

  • 随着不停的填充RingBuffer,序号会一直增长,直到超过这个环的最大长度(会覆盖旧的序号)

  • 如何计算序号指向的元素?采用mod运算,序号%长度=索引,例如计算上图10的索引,10%8=2,在第2的位置

  • 关于设置环的最大长度。上图的环长度只有8,在实际情况>>8。长度尽量设置成2n,比如1024、2048

  • 为什么长度尽量设置成2n?如此可采用"与"运算计算索引号,即序号&(长度-1)=索引号,其效率要高于mod计算效率


通过"生产者->RingBuffer->消费者"运作,下图是一个最简单的处理链

image.png

实际情况中,生产者和消费者必定是多个线程执行。


二、开发模型


1、定义 Event ,代表Disruptor所处理的数据单元。

2、定义 EventFactory 即生产者,用来填充RingBuffer容器。 EventFactory<?> 接口。

3、定义 EventHandler 即消费者,用来从RingBuffer取出数据。 EventHandler<?> 接口。

4、组合1~3步。


三、Disruptor Demo1


配置项目运行环境

环境:win10 + IDEA + JDK8

采用maven项目结构

根据开发模型,首先建立一个maven项目。

image.png

建立com.webpro.disruptor.demo1文件夹,demo1将全部写在这里。

打开pom.xml文件,向其中添加LAMX官方的disruptor依赖

我们使用的版本是3.3.6,如下:

<dependencies>
    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.3.6</version>
    </dependency>
</dependencies>

另外,设置编译环境为JDK8,因为JDK8支持一些新的写法,比如lambda表达式等等,在下面会用到lambda表达式简化代码。

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>utf-8</encoding>
            </configuration>
        </plugin>
    </plugins>
</build>

ctrl+s更新maven依赖,等待其更新完毕。


1、定义 Event ,代表Disruptor所处理的数据单元。

新建LongEvent类,顾名思义,对Long类型的数据进行一些操作,在其中定义set方法,override toString方法。

package com.webpro.disruptor.demo1;

public class LongEvent {

    private long value;

    public void set(long value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "LongEvent [value=" + value + "]";
    }
}


2、定义 EventFactory 即生产者,用来填充RingBuffer容器。 EventFactory<?> 接口。

新建LongEventFactory类,作为生产者。注意此处需要引用 com.lmax.disruptor.EventFactory

package com.webpro.disruptor.demo1;

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent>{

    public LongEvent newInstance(){
        LongEvent longEvent = new LongEvent();
        return longEvent;
    }

}


3、定义 EventHandler 即消费者,用来从RingBuffer取出数据。 EventHandler<?> 接口。

新建LongEventHandler类,作为消费者,处理LongEventFactory类产生的数据。

注意此处需要引用 com.lmax.disruptor.EventHandler

package com.webpro.disruptor.demo1;

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent>{

    /**
     * @param event 发布到 RingBuffer中的事件
     * @param sequence 当前正在处理事件的序号
     * @param endOfBatch 是否为RingBuffer最后一个
     * @throws Exception
     */
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception{
        System.out.println("[" + Thread.currentThread().getName() + "]LongEventHandler consum -> " + event + ",耗时 -> "+(System.currentTimeMillis()-LongEventMain3.startTime) + ",sleep=0s");
    }

}


4、组合1~3步。

直接给出代码,有注释

注意需要引用

com.lmax.disruptor.RingBuffer

com.lmax.disruptor.dsl.Disruptor

java.util.concurrent.Executor

java.util.concurrent.Executors

package com.webpro.disruptor.demo1;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;


public class LongEventMain0 {

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws InterruptedException {
        // 初始化线程池-用户执行Consumer
        // 没有容量限制的线程池
        Executor executor = Executors.newCachedThreadPool();

        // 初始化EventFactory
        // 生产者
        LongEventFactory factory = new LongEventFactory();

        // 初始化RingBuffer的大小,必须是2的指数
        // 初始化环形队列
        int bufferSize = 1024;

        // 初始化RingBuffer
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor);

        // 指定事件处理器
        disruptor.handleEventsWith(new LongEventHandler());

        // 开启Disruptor,开启所有线程,(此方法只能调用一次,并且所有的EventHandler必须在start之前添加,包括ExceptionHandler)
        disruptor.start();

        // 获取RingBuffer
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        // 以下代码为模板化代码,怎样发布事件到RingBuffer
        // 获取下一个序号
        long sequence = ringBuffer.next();
        try {
            // 根据下一个序号获取Event
            LongEvent event = ringBuffer.get(sequence);
            // Do some work with the event.
            event.set(10000L);
            System.out.println("Main product -> " + event);
        } finally {
            // 发布序号(发布后可以被消费)
            ringBuffer.publish(sequence);
        }
    }

}

运行一下,如下图:

image.png

1生产者带着1个数据跑通了整个流程。


===还没写完


版权声明

本文仅代表作者观点,不代表本站立场。
本文系作者授权发表,未经许可,不得转载。

评论

运行

«   2019年10月   »
123456
78910111213
14151617181920
21222324252627
28293031

WebPro统计

Top