springboot~disruptor异步队列
Disruptor
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。
Java内置队列的问题
介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。
队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。
从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。
但是对 volatile类型的变量进行 CAS 操作,存在伪共享问题,下面介绍一下
伪共享
CPU的缓存系统是以缓存行(cache line)为单位存储的,一般的大小为64bytes。在多线程程序的执行过程中,存在着一种情况,多个需要频繁修改的变量存在同一个缓存行当中。
假设:有两个线程分别访问并修改X和Y这两个变量,X和Y恰好在同一个缓存行上,这两个线程分别在不同的CPU上执行。那么每个CPU分别更新好X和Y时将缓存行刷入内存时,发现有别的修改了各自缓存行内的数据,这时缓存行会失效,从L3中重新获取。这样的话,程序执行效率明显下降。为了减少这种情况的发生,其实就是避免X和Y在同一个缓存行中,可以主动添加一些无关变量将缓存行填充满,比如在X对象中添加一些变量,让它有64 Byte那么大,正好占满一个缓存行。
伪共享问题 的解决方案
简单的说,就是 以空间换时间: 使用占位字节,将变量的所在的 缓冲行 塞满。
disruptor 无锁框架就是这么干的。
Disruptor框架是如何解决伪共享问题的?
在Disruptor中有一个重要的类Sequence,该类包装了一个volatile修饰的long类型数据value,无论是Disruptor中的基于数组实现的缓冲区RingBuffer,还是生产者,消费者,都有各自独立的Sequence,RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。对于生产者和消费者来说,Sequence标示着它们的事件序号。
例子
/** * 停车场问题. * 1) 事件对象Event * 2)三个消费者Handler * 3)一个生产者Processer * 4)执行Main方法 */ public class DisruptorCar { private static final Integer NUM = 1; // 1,10,100,1000 /** * 测试入口 , * 一个生产者(汽车进入停车场); * 三个消费者(一个记录汽车信息,一个发送消息给系统,一个发送消息告知司机) * 前两个消费者同步执行,都有结果了再执行第三个消费者 */ @Test public void main() throws InterruptedException { long beginTime = System.currentTimeMillis(); int bufferSize = 2048; // 2的N次方 try { // 创建线程池,负责处理Disruptor的四个消费者 ExecutorService executor = Executors.newFixedThreadPool(4); // 初始化一个 Disruptor Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() { @Override public MyInParkingDataEvent newInstance() { return new MyInParkingDataEvent(); // Event 初始化工厂 } }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy()); // 使用disruptor创建消费者组 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith( new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler()); // 当上面两个消费者处理结束后在消耗 smsHandler MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler(); handlerGroup.then(myParkingDataSmsHandler); // 启动Disruptor disruptor.start(); CountDownLatch countDownLatch = new CountDownLatch(1); // 一个生产者线程准备好了就可以通知主线程继续工作了 // 生产者生成数据 executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor)); countDownLatch.await(); // 等待生产者结束 disruptor.shutdown(); executor.shutdown(); } catch (Exception e) { e.printStackTrace(); } System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime)); } public class MyInParkingDataEvent { private String carLicense; // 车牌号 public String getCarLicense() { return carLicense; } public void setCarLicense(String carLicense) { this.carLicense = carLicense; } } /** * Handler 第一个消费者,负责保存进场汽车的信息 */ public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> { @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception { long threadId = Thread.currentThread().getId(); // 获取当前线程id String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号 System.out.println(String.format("Thread Id %s 保存 %s 到数据库中 ....", threadId, carLicense)); } @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch) throws Exception { this.onEvent(myInParkingDataEvent); } } /** * 第二个消费者,负责发送通知告知工作人员(Kafka是一种高吞吐量的分布式发布订阅消息系统) */ public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> { @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch) throws Exception { long threadId = Thread.currentThread().getId(); // 获取当前线程id String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号 System.out.println(String.format("Thread Id %s 发送 %s 进入停车场信息给 kafka系统...", threadId, carLicense)); } } /** * 第三个消费者,sms短信服务,告知司机你已经进入停车场,计费开始。 */ public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> { @Override public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch) throws Exception { long threadId = Thread.currentThread().getId(); // 获取当前线程id String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号 System.out.println(String.format("Thread Id %s 给 %s 的车主发送一条短信,并告知他计费开始了 ....", threadId, carLicense)); } } /** * 生产者,进入停车场的车辆 */ public class MyInParkingDataEventPublisher implements Runnable { private CountDownLatch countDownLatch; // 用于监听初始化操作,等初始化执行完毕后,通知主线程继续工作 private Disruptor<MyInParkingDataEvent> disruptor; public MyInParkingDataEventPublisher(CountDownLatch countDownLatch, Disruptor<MyInParkingDataEvent> disruptor) { this.countDownLatch = countDownLatch; this.disruptor = disruptor; } @Override public void run() { MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator(); try { for (int i = 0; i < NUM; i++) { disruptor.publishEvent(eventTranslator); Thread.sleep(1000); // 假设一秒钟进一辆车 } } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); // 执行完毕后通知 await()方法 System.out.println(NUM + "辆车已经全部进入进入停车场!"); } } } class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> { @Override public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) { this.generateData(myInParkingDataEvent); } private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) { myInParkingDataEvent.setCarLicense("车牌号: 鄂A-" + (int) (Math.random() * 100000)); // 随机生成一个车牌号 System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event"); return myInParkingDataEvent; } } }
- springboot~ApplicationContextAware与@Autowired注解
- java~Map集合整理
- java~Optional语法糖
- Java~公用包中如何加载资源文件
- [email protected]一切为了可扩展性
- rsa~对接第三方rsa问题排查
- keycloak~自定义directgrant直接认证
- java~RMI引起的log4j漏洞
- k8s~Endpoints的使用之负载均衡
- keycloak~uma远程资源授权对接asp.net core
- es~存储部分字段
- maven编译后复制到目标位置
- keycloak~缓存的使用
- keycloak~授权功能的使用
- java~并行计算~大集合的并行处理
- keycloak~使用JDBC_PING实现k8s里的高可用
- keycloak~为认证提供者添加配置项
- skywalking的介绍
- springboot~disruptor异步队列
- keycloak~自定义rest接口