聊透Spring事件機制
事件機制是Spring為企業級開發提供的神兵利器之一,它提供了一種低耦合、無侵入的解決方式,是我們行走江湖必備保命技能。但其實Spring事件的設計其實並不複雜,它由三部分組成:事件、釋出器、監聽器。事件是主體,釋出器負責釋出事件,監聽器負責處理事件。
在簡單瞭解Spring事件的機制之後,本文將從原始碼的角度出發,和大家一起探討:Spring事件的核心工作機制,並看一下作為企業級開發工具,Spring事件是如何支援全域性異常處理和非同步執行的。最後會和大家討論目前Spring事件機制的一些缺陷和問題,話不多說,我們開始吧。
1. Spring事件如何使用
所謂千里之行始於足下,在研究Spring的事件的機制之前,我們先來看一下Spring事件是如何使用的。通常情況下,我們使用自定義事件和內建事件,自定義事件主要是配合業務使用,自定義事件則多是做系統啟動時的初始化工作或者收尾工作。
1.1 自定義事件的使用
-
定義自定義事件
自定義一個事件在使用上很簡單,繼承ApplicationEvent即可: ```java // 事件需要繼承ApplicationEvent public class MyApplicationEvent extends ApplicationEvent { private Long id; public MyApplicationEvent(Long id) { super(id); this.id = id; }public Long getId() { return id; } }
- **釋出自定義事件**<br>  現在自定義事件已經有了,該如何進行釋出呢?Spring提供了`ApplicationEventPublisher`進行事件的釋出,我們平常使用最多的`ApplicationContext`也繼承了該釋出器,所以我們可以直接使用applicationContext進行事件的釋出。
java // 釋出MyApplicationEvent型別事件 applicationContext.publishEvent(new MyApplicationEvent(1L));- **處理自定義事件**<br>  現在事件已經發布了,誰負責處理事件呢?當然是監聽器了,Spring要求監聽器需要實現`ApplicationListener`介面,同時需要`通過泛型引數指定處理的事件型別`。有了監聽器需要處理的事件型別資訊,Spring在進行事件廣播的時候,就能找到需要廣播的監聽器了,從而準確傳遞事件了。
java // 需要繼承ApplicationListener,並指定事件型別 public class MyEventListener implements ApplicationListener{ // 處理指定型別的事件 @Override public void onApplicationEvent(MyApplicationEvent event) { System.out.println(Thread.currentThread().getName() + "接受到事件:"+event.getSource()); } } ```
1.2 Spring內建事件
1.2.1 ContextRefreshedEvent
在ConfigurableApplicationContext
的refresh()
執行完成時,會發出ContextRefreshedEvent
事件。refresh()是Spring最核心的方法,該方法內部完成的Spring容器的啟動,是研究Spring的重中之重。在該方法內部,當Spring容器啟動完成,會在finishRefresh()發出ContextRefreshedEvent事件,通知容器重新整理完成。我們一起來看一下原始碼:
```java // ConfigurableApplicationContext.java public void refresh() throws BeansException, IllegalStateException { try { // ...省略部分非關鍵程式碼 //完成普通單例Bean的例項化(非延遲的) this.finishBeanFactoryInitialization(beanFactory);
// 初始化宣告週期處理器,併發出對應的時間通知
this.finishRefresh();
}
}
protected void finishRefresh() { // ...省略部分非核心程式碼 // 釋出上下文已經重新整理完成的事件 this.publishEvent(new ContextRefreshedEvent(this)); } ```
其實這是Spring提供給我們的拓展點,此時容器已經啟動完成,容器中的bean也已經建立完成,對應的屬性、init()、Aware回撥等,也全部執行。很適合我們做一些系統啟動後的準備工作,此時我們就可以監聽該事件,作為系統啟動後初始預熱的契機。其實Spring內部也是這樣使用ContextRefreshedEvent的, 比如我們常用的Spring內建的排程器,就是在接收到該事件後,才進行排程器的執行的。
java
public class ScheduledAnnotationBeanPostProcessor implements ApplicationListener<ContextRefreshedEvent> {
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
finishRegistration();
}
}
}
1.2.2 ContextStartedEvent
在ConfigurableApplicationContext
的start()
執行完成時,會發出ContextStartedEvent事件。
java
@Override
public void start() {
this.getLifecycleProcessor().start();
this.publishEvent(new ContextStartedEvent(this));
}
ContextRefreshedEvent
事件的觸發是所有的單例bean建立完成後釋出,此時實現了Lifecycle
介面的bean還沒有回撥start(),當這些start()
被呼叫後,才會釋出ContextStartedEvent
事件。
1.2.3 ContextClosedEvent
在ConfigurableApplicationContext
的close()
執行完成時,會發出ContextStartedEvent事件。此時IOC容器已經關閉,但尚未銷燬所有的bean。
```java
@Override
public void close() {
synchronized (this.startupShutdownMonitor) {
this.doClose();
}
}
protected void doClose() { // 釋出ContextClosedEvent事件 this.publishEvent(new ContextClosedEvent(this)); } ```
1.2.4 ContextStoppedEvent
在ConfigurableApplicationContext
的stop()
執行完成時,會發出ContextStartedEvent事件。
java
@Override
public void stop() {
this.getLifecycleProcessor().stop();
this.publishEvent(new ContextStoppedEvent(this));
}
該事件在ContextClosedEvent事件觸發之後才會觸發,此時單例bean還沒有被銷燬,要先把他們都停掉才可以釋放資源,銷燬bean。
2. Spring事件是如何運轉的
經過第一章節的探討,我們已經清楚Spring事件是如何使用的,然而這只是皮毛而已,我們的目標是把Spring事件機制脫光扒淨的展示給大家看。所以這一章節我們深入探討一下,Spring事件的執行機制,重點我們看一下:
- 事件是怎麼廣播給監聽器的?會不會發送阻塞?
- 系統中bean那麼多,ApplicationListener
是被如何識別為監聽器的?
- 監聽器處理事件的時候,是同步處理還是非同步處理的?
- 處理的時候發生異常怎麼辦,後面的監聽器還能執行嗎?
乍一看是不是問題還挺多,沒事,不要著急,讓我們一起來開啟愉快的探索路程,看看Spring是怎麼玩轉事件的吧。
2.1 事件釋出
在第一章節,我們直接通過applicationContext
釋出了事件,同時也提到了,它之所以能釋出事件,是因為它是ApplicationEventPublisher
的子類,因此是具備事件釋出能力的。但按照介面隔離原則,如果我們只需要進行事件釋出,applicationContext
提供的能力太多,還是推薦直接使用ApplicationEventPublisher
進行操作。
2.1.1 獲取事件釋出器的方式
我們先來ApplicationEventPublisher
的提供的能力,它是一個介面,結構如下:
```java
@FunctionalInterface
public interface ApplicationEventPublisher {
//釋出ApplicationEvent事件
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
//釋出PayloadApplicationEvent事件
void publishEvent(Object event);
}
``
 通過原始碼我們發現
ApplicationEventPublisher僅僅提供了事件釋出的能力,支援自定義型別和
PayloadApplicationEvent型別(如果沒有定義事件型別,預設包裝為該型別)。那我們如何獲取該釋出器呢,我們最常使用的
@Autowired`注入是否可以呢,試一下唄。
-
通過@Autowired 注入 ApplicationEventPublisher
通過debug,我們可以直觀的看到:是可以的,而且注入的就是ApplicationContext例項。也就是說注入ApplicationContext
和注入ApplicationEventPublisher
是等價的,都是一個ApplicationContext例項。 -
通過ApplicationEventPublisherAware獲取 ApplicationEventPublisher
除了@Autowired
注入,Spring還提供了使用ApplicationEventPublisherAware
獲取ApplicationEventPublisher
的方式,如果實現了這個感知介面,Spring會在合適的時機,回撥setApplicationEventPublisher()
,將applicationEventPublisher
傳遞給我們。使用起來也很方便。程式碼所示: ```java public class UserService implements ApplicationEventPublisherAware { private ApplicationEventPublisher applicationEventPublisher;public void login(String username, String password){ // 1: 進行登入處理 ... // 2: 傳送登入事件,用於記錄操作 applicationEventPublisher.publishEvent(new UserLoginEvent(userId)); }
// Aware介面回撥注入applicationEventPublisher @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; } }
``  現在我們已經知道通過
@Autowired和
ApplicationEventPublisherAware`回撥都能獲取到事件釋出器,兩種有什麼區別嗎? 其實區別不大,主要是呼叫時機的細小差別,另外就是默寫特殊場景下,@Autowired注入可能無法正常注入,實際開發中完成可以忽略不計。所以優先推薦小夥伴們使用ApplicationEventPublisherAware,如果覺得麻煩,使用@Autowired也未嘗不可。
如果使是自動注入模型,是無法通過setter()注入ApplicationEventPublisher的,因為在prepareBeanFactory時已經指定忽略此介面的注入了(
beanFactory.ignoreDependencyInterface(ApplicationEventPublisherAware.class)
)。順便說一句,@Autowired
不算自動注入哦。
2.1.2 事件的廣播方式
現在我們已經知道,可以通過ApplicationEventPublisher
傳送事件了,那麼這個事件傳送後肯定是要分發給對應的監聽器處理啊,誰處理這個分發邏輯呢?又是怎麼匹配對應的監聽器的呢?我們帶著這兩個問題來看ApplicationEventMulticaster
。
- 事件是如何廣播的
要探查事件是如何廣播的,需要跟隨事件釋出後的邏輯一起看一下:
```java @Override public void publishEvent(ApplicationEvent event) { this.publishEvent(event, null); }
protected void publishEvent(Object event, @Nullable ResolvableType eventType) { // ...省略部分程式碼 if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { // 將事件廣播給Listener this.getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } }
// 獲取事件廣播器 ApplicationEventMulticaster getApplicationEventMulticaster() throws IllegalStateException { if (this.applicationEventMulticaster == null) { throw new IllegalStateException("ApplicationEventMulticaster not initialized - " + "call 'refresh' before multicasting events via the context: " + this); } return this.applicationEventMulticaster; } ``` 通過上面原始碼,我們發現釋出器直接把事件轉交給applicationEventMulticaster了,我們再去裡面看一下廣播器裡面做了什麼。
```java // SimpleApplicationEventMulticaster.java public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { // ...省略部分程式碼 // getApplicationListeners 獲取符合的監聽器 for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { // 執行每個監聽器的邏輯 invokeListener(listener, event); } }
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { // 呼叫監聽器的onApplicationEvent方法進行處理 listener.onApplicationEvent(event); } } ``` 看到這裡,我們發現事件的分發邏輯:先找到匹配的監聽器,然後逐個呼叫onApplicationEvent()進行事件處理。
- 事件和監聽器是如何匹配的
通過上述原始碼,我們發現通過getApplicationListeners(event, type)找到了所有匹配的監聽器,我們繼續跟蹤看一下是如何匹配的。
```java
protected Collection<ApplicationListener<?>> getApplicationListeners( ApplicationEvent event, ResolvableType eventType) { // 省略快取相關程式碼 return retrieveApplicationListeners(eventType, sourceType, newRetriever); }
private Collection<ApplicationListener<?>> retrieveApplicationListeners(
ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable CachedListenerRetriever retriever) {
// 1: 獲取所有的ApplicationListener
Set<ApplicationListener<?>> listeners;
Set
for (ApplicationListener<?> listener : listeners) {
// 2: 遍歷判斷是否匹配
if (supportsEvent(listener, eventType, sourceType)) {
if (retriever != null) {
filteredListeners.add(listener);
}
allListeners.add(listener);
}
}
}
protected boolean supportsEvent(
ApplicationListener<?> listener, ResolvableType eventType, @Nullable Class<?> sourceType) {
GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ?
(GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener));
// supportsEventType 根據ApplicationListener的泛型, 和事件型別,看是否匹配
// supportsSourceType 根據事件源型別,判斷是否匹配
return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType));
}
```
通過原始碼跟蹤,我們發現監聽器匹配是根據事件型別匹配的,先獲取容器中所有的監聽器,在用supportsEvent()去判斷對應的監聽器是否匹配事件。這裡匹配主要看兩點:
1. 判斷事件型別和監聽器上的泛型型別,是否匹配(子類也能匹配)。
2. 監聽器是否支援事件源型別,預設情況下,都是支援的。
如果兩者都匹配,就轉發給處理器處理。
- ApplicationEventMulticaster是如何獲取的(選讀)
在事件廣播時,Spring直接呼叫getApplicationEventMulticaster()去獲取屬性applicationEventMulticaster,並且當applicationEventMulticaster為空時,直接異常終止了。那麼就要求該成員變數提早初始化,那麼它是何時初始化的呢。
```java public void refresh() throws BeansException, IllegalStateException { // ...省略無關程式碼 // 初始化事件廣播器(轉發ApplicationEvent給對應的ApplicationListener處理) this.initApplicationEventMulticaster(); }
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();
// spring容器中存在,直接返回
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (this.logger.isTraceEnabled()) {
this.logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
// 容器中不存在,建立SimpleApplicationEventMulticaster,放入容器
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (this.logger.isTraceEnabled()) {
this.logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
``
 看到這裡,是不是豁然開朗,原來在容器啟動的時候,專門呼叫了
initApplicationEventMulticaster()對
applicationEventMulticaster`進行了初始化,並放到了spring容器中。
其實這裡還有個問題,就是事件整體的初始化流程在
BeanFactoryPostProcessor
之後,如果在自定義的BeanFactoryPostProcessor
釋出事件,此時applicationEventMulticaster還沒有初始化,監聽器也沒有註冊,是無法進行事件的廣播的。該問題在Spring3之前普遍存在,在最近的版本已經解決,其思路是:先將早期事件放入集合中,待廣播器、監聽器註冊後,再從集合中取出進行廣播。
2.2 事件監聽器
監聽器是負責處理事件的,在廣播器將對應的事件廣播給它之後,它正式上崗開始處理事件。Spring預設的監聽器是同步執行的,並且支援一個事件由多個監聽器處理,並可通過@Order
指定監聽器處理順序。
2.2.1 定義監聽器的方式
- 實現ApplicationListener定義監聽器
第一種方式定義的方式當然是通過直接繼承ApplicationListener
,同時不要忘記通過泛型指定事件型別,它可是將事件廣播給監聽器的核心匹配標誌。
java
public class MyEventListener implements ApplicationListener<MyApplicationEvent> {
@Override
public void onApplicationEvent(MyApplicationEvent event) {
System.out.println(Thread.currentThread().getName() + "接受到事件:"+event.getSource());
}
}
通過ApplicationListener定義的監聽器,本質上是一個單事件監聽器,也就是隻能處理一種型別的事件。
- 使用@EventListener定義監聽器
第二種方式我們還可以使用@EventListener
標註方法為監聽器,該註解標註的方法上,方法引數為事件型別,標註該監聽器要處理的事件型別
。 ```java public class AnnotationEventListener { // 使用@EventListener標註方法為監聽器,引數型別為事件型別 @EventListener public void onApplicationEvent(MyApplicationEvent event) { System.out.println(Thread.currentThread().getName() + "接受到事件:"+event.getSource()); }
@EventListener
public void onApplicationEvent(PayloadApplicationEvent payloadApplicationEvent) {
System.out.println(Thread.currentThread().getName() + "接受到事件:"+payloadApplicationEvent.getPayload());
}
} ```
通過廣播器分發事件的邏輯,我們知道事件只能分發給ApplicationListener型別的監聽器例項處理,這裡僅僅是標註了@EventListener的方法,也能被是識別成ApplicationListener型別的監聽器嗎?答案是肯定的,只是Spring在底層進行了包裝,偷偷把@EventListener標註的方法包裝成了
ApplicationListenerMethodAdapter
,它也是ApplicationListener的子類,這樣就成功的把方法轉換成ApplicationListener例項了,後續章節我們會詳細揭露Spring偷樑換柱的小把戲,小夥伴們稍安勿躁。
2.2.2 ApplicationListener監聽器是如何被識別的
本小節我們一起看一下監聽器是如何被是識別的,畢竟大多數情況下,我們只是直接加了@Component註解,然後實現了一下ApplicationListener介面,並沒有特殊指定為監聽器。那有沒有可能就是基於這個繼承關係,Spring自己在容器中進行型別查詢呢? ```java public void refresh() throws BeansException, IllegalStateException { try { // ...省略部分程式碼 // 初始化各種監聽器 this.registerListeners(); } }
// 註冊監聽器 protected void registerListeners() { // 1: 處理context.addApplicationListener(new MyEventListener()) 方式註冊的監聽器,並將監聽器註冊到廣播器中, for (ApplicationListener<?> listener : this.getApplicationListeners()) { this.getApplicationEventMulticaster().addApplicationListener(listener); }
// 2: 去Spring容器中獲取監聽器(處理掃描的或者register方式註冊的),同樣也是新增到廣播器中
String[] listenerBeanNames = this.getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
this.getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
}
``
 通過上述原始碼跟蹤,我們發現原來在容器refresh()的時候,專門有個步驟是用來初始化各種監聽器的。它的具體實現是:先把通過addApplicationListener()直接指定的註冊為監聽器 -> 再通過型別查詢,把當做普通bean註冊到容器中,類似是
ApplicationListener`的找了出來 -> 快取到ApplicationEventMulticaster中的監聽器集合中了。一路跟蹤下來,確實是根據型別查詢的,和我們的猜想完全一致。
2.2.3 @EventListener標註的處理器是如何識別註冊的
本小節我們探究一下,標註了@EventListener
的方法是如何被包裝成ApplicationListener
例項的。我們直接從原始碼入手,Spring在例項化bean後,呼叫了afterSingletonsInstantiated()
對@EventListener
的方法進行了保證,我們一起看一下。
```java public void refresh() throws BeansException, IllegalStateException { try { // ...省略部分程式碼 //完成普通單例Bean的例項化(非延遲的) this.finishBeanFactoryInitialization(beanFactory); // ...省略部分程式碼 } }
protected void finishBeanFactoryInitialization(ConfigurableListableBeanFactory beanFactory) { // ...省略部分程式碼 // 初始化非延遲載入的單例bean beanFactory.preInstantiateSingletons(); }
@Override
public void preInstantiateSingletons() throws BeansException {
List
// 2: 呼叫bean的後置處理方法
for (String beanName : beanNames) {
// ...省略部分程式碼
// 呼叫到EventListenerMethodProcessor的afterSingletonsInstantiated(),完成@EventListener的方法的轉換註冊
smartSingleton.afterSingletonsInstantiated();
}
}
// EventListenerMethodProcessor.java public void afterSingletonsInstantiated() { // ...省略部分程式碼 for (String beanName : beanNames) { // ...省略部分程式碼 processBean(beanName, type); } }
private void processBean(final String beanName, final Class<?> targetType) {
// 1: 解析bean上加了@EventListener的方法
Map
// ...省略部分程式碼
// 2: 遍歷加了@EventListener的方法,註冊為事件監聽器
for (Method method : annotatedMethods.keySet()) {
for (EventListenerFactory factory : factories) {
if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
// 2.1 通過EventListenerFactory,將方法建立為監聽器例項(ApplicationListenerMethodAdapter)
ApplicationListener<?> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse);
if (applicationListener instanceof ApplicationListenerMethodAdapter) {
((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
}
// 2.2 註冊為ApplicationListener
context.addApplicationListener(applicationListener);
break;
}
}
}
// ...省略部分程式碼
}
``
 我們整理一下呼叫關係:
refresh()->
finishBeanFactoryInitialization(beanFactory) ->
beanFactory.preInstantiateSingletons() ->
eventListenerMethodProcessor.afterSingletonsInstantiated()->
eventListenerMethodProcessor.processBean()`;在容器中所有的bean例項化後,會再次遍歷遍歷所有bean,呼叫SmartInitializingSingleton型別的bean的afterSingletonsInstantiated()的方法,此時符合條件的EventListenerMethodProcessor就會被呼叫,進而通過processBean(),先找出標註了@EventListener的方法,然後遍歷這些方法,通過EventListenerFactory工廠,包裝方法為EventListener例項,最後在註冊到容器中。至此,完成了查詢,轉換的過程。
關於@EventListener標註方法的解析時機,筆者首先想到的應該和
@Bean
的處理時機一致,在掃描類的時候,就解析出來加了@EventListener
的方法,抽象為BeanDefinition放到容器中,後面例項化時候,和正常掃描出來的bean是一樣的例項化流程。但是查詢下來發現Spring並沒有這樣處理,而是在bean初始化後回撥階段處理的。究其原因,大概是@Bean真的是需要託付給Spring管理,而@EventListener
只是一個標識,無需放入放入容器,防止對完暴露所致吧。
- EventListenerMethodProcessors是如何註冊的
通過上述的原始碼分析,我們清楚對於@EventListener的方法的處理,EventListenerMethodProcessor
可謂是至關重要,那麼他是怎麼註冊到Spring中的。而且我們也沒有通過@EnableXXX
進行開啟啊。其實Spring除了管理我們定義的bean,還會有一些內建的bean,來承接一些Spring核心工作,這些內建的bean一般在application容器建立的時候,就放入到Spring容器中了。下面我們來看一下是不是這樣: ```java // 構造方法 public AnnotationConfigApplicationContext() { // 1: 初始化BeanDefinition渲染器,註冊一下Spring內建的BeanDefinition this.reader = new AnnotatedBeanDefinitionReader(this); this.scanner = new ClassPathBeanDefinitionScanner(this); }
// AnnotatedBeanDefinitionReader.java public class AnnotatedBeanDefinitionReader { public AnnotatedBeanDefinitionReader(BeanDefinitionRegistry registry, Environment environment) { // ...省略部分程式碼 // 註冊一些內建後置處理器的BeanDefinition,是spring這兩個最核心的功能類 AnnotationConfigUtils.registerAnnotationConfigProcessors(this.registry); } }
// AnnotationConfigUtils.java
public static Set
2.3 非同步處理事件
通過上面的分析,我們知道事件在廣播時是同步執行的,廣播流程為:先找到匹配的監聽器 -> 逐個呼叫onApplicationEvent()進行事件處理,整個過程是同步處理的。下面我們做一個測試驗證一下: ```java public void applicationListenerTest(){ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(AnnotationEventListener.class); context.refresh(); System.out.printf("執行緒:[%s],時間:[%s],開始釋出事件\n", new Date(), Thread.currentThread().getName()); context.publishEvent(new MyApplicationEvent(1L)); System.out.printf("執行緒:[%s],時間:[%s],釋出事件完成\n", new Date(), Thread.currentThread().getName()); context.stop(); }
public class AnnotationEventListener { @EventListener @Order(1) public void onApplicationEvent(MyApplicationEvent event) { Date start = new Date(); Thread.sleep(3000); System.out.printf("執行緒:[%s],監聽器1,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource()); }
@EventListener
@Order(2)
public void onApplicationEvent2(MyApplicationEvent event) {
Date start = new Date();
System.out.printf("執行緒:[%s],監聽器2,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}
}
// 輸出資訊: // 執行緒:[main],時間[22:59:24],開始釋出事件 // 執行緒:[main],監聽器1,接收時間:[22:59:24],處理完成時間:[22:59:27],接收到事件:1 // 執行緒:[main],監聽器1,接收時間:[22:59:27],處理完成時間:[22:59:27],接收到事件:1 // 執行緒:[main],時間[22:59:27],,釋出事件完成 ``` 通過上述示例程式碼,發現確實是同步呼叫的,處理執行緒都是main,監聽器1處理緩慢,監聽器2只能默默等待監聽器1處理後才能接收到事件。這能滿足我們的需求嗎,畢竟現在系統動輒就要求毫秒計返回,QPS沒有1000+你都不好意思出門,哪怕只有十個使用者😂。
除了效能問題,我們基於真實業務場景出發,考慮一下什麼場景下,我們使用事件比較合適。個人使用最多的場景是:在執行某個業務時,需要通知別的業務方,該業務的執行情況時,會使用事件機制進行通知。就拿這個場景來說,我們考慮幾個問題: 1. 我們是否關心監聽者的執行時機? 2. 我們是否關心監聽者的執行結果?
大多數情況下,其實我們並不關心的監聽者什麼時候執行,執行結果如何的。如果對執行結果有依賴,通常直接呼叫了,如果有可能,還能享受事務的便利,還藉助事件幹什麼呢。所以這裡其實有個需求,希望Spring事件的處理是非同步的,那如何實現呢?
2.3.1 通過注入taskExecutor,非同步處理事件
通過前文的分析,我們知道事件的廣播是由ApplicationEventMulticaster進行處理的,那我們去看看,是否支援非同步處理呢。 ```java @Override public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { // 獲取執行執行緒池 Executor executor = getTaskExecutor(); for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { // 如果存線上程池,使用執行緒池非同步執行 if (executor != null) { executor.execute(() -> invokeListener(listener, event)); } // 如果不存線上程池,同步執行 else { invokeListener(listener, event); } } }
// 獲取執行緒池 protected Executor getTaskExecutor() { return this.taskExecutor; }
// 設定執行緒池 public void setTaskExecutor(@Nullable Executor taskExecutor) { this.taskExecutor = taskExecutor; } ```
通過原始碼我們發現,其實Spring提供了使用執行緒池非同步執行的邏輯,前提是需要先設定執行緒池,只是這裡設定執行緒池的方式稍微麻煩些,需要通過applicationEventMulticaster例項的setTaskExecutor()設定,下面我們試一下是否可行。
```java public void applicationListenerTest(){ AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(AnnotationEventListener.class); context.refresh(); ApplicationEventMulticaster multicaster = context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); if (multicaster instanceof SimpleApplicationEventMulticaster) { ((SimpleApplicationEventMulticaster) multicaster).setTaskExecutor(Executors.newFixedThreadPool(10)); } System.out.printf("執行緒:[%s],時間:[%s],開始釋出事件\n", new Date(), Thread.currentThread().getName()); context.publishEvent(new MyApplicationEvent(1L)); System.out.printf("執行緒:[%s],時間:[%s],釋出事件完成\n", new Date(), Thread.currentThread().getName()); context.stop(); }
public class AnnotationEventListener { @EventListener @Order(1) public void onApplicationEvent(MyApplicationEvent event) { Date start = new Date(); Thread.sleep(3000); System.out.printf("執行緒:[%s],監聽器1,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource()); }
@EventListener
@Order(2)
public void onApplicationEvent2(MyApplicationEvent event) {
Date start = new Date();
System.out.printf("執行緒:[%s],監聽器2,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}
}
// 輸出資訊: // 執行緒:[main],時間[22:57:13],開始釋出事件 // 執行緒:[main],時間[22:57:13],,釋出事件完成 // 執行緒:[pool-2-thread-1],監聽器2,接收時間:[22:57:13],處理完成時間:[22:57:13],接收到事件:1 // 執行緒:[pool-2-thread-2],監聽器1,接收時間:[22:57:13],處理完成時間:[22:57:16],接收到事件:1 ``` 經過測試發現:設定了執行緒池之後,監聽器確實是非同步執行的,並且是全域性生效,對所有型別的監聽器都適用。只是這裡的設定稍顯不便,需要先獲取到applicationEventMulticaster這個bean之後,再使用內建方法設定。
2.3.2 使用@Async,非同步處理事件
通過注入執行緒池,是全域性生效的。如果我們專案中有些事件需要非同步處理,又有些事件需要同步執行的,怎麼辦,總不能告訴你的leader做不了吧。NO,那不是顯得我很沒有用。面對這種情況,我們可以藉助@Async註解,使單個監聽器非同步執行。我們測試一下:
```java // 使用@EnableAsync開啟非同步 @EnableAsync public class AnnotationEventListener {
@EventListener
@Order(1)
public void onApplicationEvent(MyApplicationEvent event) {
Date start = new Date();
Thread.sleep(3000);
System.out.printf("執行緒:[%s],監聽器1,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}
@EventListener
@Order(2)
public void onApplicationEvent2(MyApplicationEvent event) {
Date start = new Date();
Thread.sleep(1000);
System.out.printf("執行緒:[%s],監聽器2,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}
}
// 輸出資訊:
// 執行緒:[main],時間[23:18:32],開始釋出事件
// 執行緒:[main],監聽器1,接收時間:[23:18:32],處理完成時間:[23:18:35],接收到事件:1
// 執行緒:[main],時間[23:18:35],,釋出事件完成
// 執行緒:[SimpleAsyncTaskExecutor-1],監聽器2,接收時間:[23:18:35],處理完成時間:[23:18:36],接收到事件:1
```
經過測試發現:在@Async的加持下,確實可以控制某個監聽器非同步執行。其實@Async也是使用了執行緒池執行的,對@Async感興趣的同學可以自行查閱資料,這裡我們不做展開了。
2.4 全域性異常處理
通過我們長時間的囉嗦,聰明的你肯定清楚:Spring事件的處理,預設是同步依次執行。那如果前面的監聽器出現了異常,並且沒有處理異常,會對後續的監聽器還能順利接收該事件嗎?其實不能的,因為異常中斷了事件的傳送了,這裡我們不做演示了,有興趣的同學們可以自行驗證一下。
那如果我們設定了非同步執行呢,還會有影響嗎,對執行緒池有所瞭解的同學肯定可以給出答案:不會,因為不是一個執行緒執行,是不會互相影響的。
難道同步執行我們就要在每個監聽器都try catch一下,避免相互影響嗎,不能全域性處理嗎?當前可以了,貼心的Spring為了簡化我們的開發邏輯,特意提供了ErrorHandler來統一處理,話不多說,我們趕緊來試一下吧。
```java public class AnnotationEventListener {
@EventListener
@Order(1)
public void onApplicationEvent(MyApplicationEvent event) {
Date start = new Date();
// 製造異常
int i = 1/0;
System.out.printf("執行緒:[%s],監聽器1,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}
@EventListener
@Order(2)
public void onApplicationEvent2(MyApplicationEvent event) {
Date start = new Date();
System.out.printf("執行緒:[%s],監聽器2,接收時間:[%s],處理完成時間:[%s],接收到事件:%s\n", Thread.currentThread().getName(), start, new Date(), event.getSource());
}
}
// 測試方法 public void applicationListenerTest() throws InterruptedException { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); context.register(AnnotationEventListener.class); context.refresh(); ApplicationEventMulticaster multicaster = context.getBean(AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); if (multicaster instanceof SimpleApplicationEventMulticaster) { // 簡單列印異常資訊 ((SimpleApplicationEventMulticaster) multicaster).setErrorHandler(t -> System.out.println(t)); } System.out.printf("執行緒:[%s],時間:[%s],開始釋出事件\n", new Date(), Thread.currentThread().getName()); context.publishEvent(new MyApplicationEvent(1L)); System.out.printf("執行緒:[%s],時間:[%s],釋出事件完成\n", new Date(), Thread.currentThread().getName()); context.stop(); }
// 輸出資訊: // 執行緒:[main],時間[23:35:15],開始釋出事件 // java.lang.ArithmeticException: / by zero // 執行緒:[main],監聽器2,接收時間:[23:35:15],處理完成時間:[23:35:15],接收到事件:1 // 執行緒:[main],時間[23:35:15],,釋出事件完成 ``` 經過測試發現:設定了ErrorHandler之後,確實可以對異常進行統一的管理了,再也不用繁瑣的try catch了,今天又多了快樂划水五分鐘的理由呢。老規矩,我們不光要做到知其然,還要做到知其所以然,我們探究一下為什麼加了ErrorHandler之後,就可以全域性處理呢?
java
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
// 獲取ErrorHandler
ErrorHandler errorHandler = getErrorHandler();
// 如果ErrorHandler存在,監聽器執行出現異常,交給errorHandler處理,不會傳遞向上丟擲異常。
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
// 呼叫監聽器處理
doInvokeListener(listener, event);
}
}
經過閱讀原始碼,我們發現:Sring先查詢是否配置了ErrorHandler,如果配置了,在發生異常的時候,把異常資訊轉交給errorHandler處理,並且不會在向上傳遞異常了。這樣可以達到異常全域性處理的效果了。
3. Spring事件機制存在什麼問題
3.1 釋出阻塞
Spring釋出事件的時候,由applicationEventMulticaster來處理分發邏輯,這是單執行緒處理,處理邏輯我們分析過,就是:找到事件對應的監聽器(有快取) -> 逐個分發給監聽器處理(默認同步,可非同步)
。我們考慮一下這種設計會不會有效能問題了?同步執行的情況我們就不討論了,對應的場景一定是事件發生頻率較低,這種場景討論效能沒有意義。
我們主要討論非同步模式,無論是@Async還是注入執行緒池,本質都是:通過執行緒池執行
,並且執行緒池的執行緒是所有監聽器共同使用的(@Async對應的執行緒池供所有加了@Async的方法使用)。我們都清楚執行緒池的執行流程:先建立執行緒執行任務,之後會放到緩衝佇列,最後可能直接拒絕。
基於共享執行緒池執行的監聽器的模式,有什麼問題呢?最嚴重的問題就是:監聽器的執行速度會互相影響、甚至會發生阻塞
。假如某一個監聽器執行的很慢,把執行緒池中執行緒都佔用了,此時其他的事件雖然釋出但沒有資源執行,只能在快取佇列等待執行緒釋放,哪怕該事件的處理很快、很重要,也不行。
其實這裡可以參考Netty的boss-work工作模型,廣播器只負責分發事件,排程執行監聽器的邏輯交給由具體的work執行緒負責會更合適。
3.2 無法訂製監聽器執行執行緒數
正是由於每種事件產生的數量、處理邏輯、處理速度差異化可能很大,所以每個監聽器都有適合自己場景的執行緒數,所以為每個監聽器配置執行緒池就顯得尤為重要。Spring事件機制,無法單獨為事件(或者監聽器)設定執行緒池,只能共用執行緒池,無法做到精準控制,執行緒擁堵或者執行緒浪費出現的機率極大。當然,我們也可以在監聽器內部,接收到事件後使用自定義的執行緒池處理,但是我們更希望簡單化配置就能支援
關於Spring事件機制存在的問題,筆者在專案中藉助記憶體佇列Disruptor儲存事件,採用雙匯流排的思想實現自研專案event-bus,解決了Spring事件機制不完美的部分。後續有機會再和大家分享該專案的詳細情況。