Spring 中的事件机制

  • Post author:
  • Post category:其他

说到事件机制,可能脑海中最先浮现的就是日常使用的各种 listener,listener去监听事件源,如果被监听的事件有变化就会通知listener,从而针对变化做相应的动作。这些listener是怎么实现的呢?说listener之前,我们先从设计模式开始讲起。

观察者模式

观察者模式一般包含以下几个对象:

  • Subject:被观察的对象。它提供一系列方法来增加和删除观察者对象,同时它定义了通知方法notify()。目标类可以是接口,也可以是抽象类或具体类。
  • ConcreteSubject:具体的观察对象。Subject的具体实现类,在这里实现通知事件。
  • Observer:观察者。这里是抽象的观察者,观察者有一个或者多个。
  • ConcreteObserver:具体的观察者。在这里维护观察对象的具体操作。

按照观察者对象,我们来写一个简单的观察者示例,定义一个气象中心,发布气象信息,观察者是各个电视台,订阅气象中心的信息,有新增的气象信息发布的时候,及时播报。

定义气象中心:

public interface WeatherCenter {

    void publishWeatherInfo();

}

定义观察者对象:

public interface Observer {

    void sendWeatherWarning();
}

定义具体的观察者:

public class BeijingTvObserver implements Observer {
    
    @Override
    public void sendWeatherWarning(){
        System.out.println("北京卫视天气预报开始了");
    }

}

中央电视台:

public class CCTVObserver implements Observer {

    @Override
    public void sendWeatherWarning(){
        System.out.println("中央电视台天气预报开始了");
    }

}

现在发布北京的气象信息:

public class BeijingWeather implements WeatherCenter {

    private List<Observer> observerArrayList = new ArrayList<>();

    @Override
    public void publishWeatherInfo() {
        for(Observer observer : observerArrayList) {
            observer.sendWeatherWarning();
        }
    }
}

这时候给所有的订阅者推送一条气象发布消息,那么他们就收到最新的气象预报。

总结一下观察者模式的核心就是:事件中心持有所有的订阅者,每当事件发生时循环通知所有的订阅者。

当然上面我写的比较简单,你也可以在事件中心写一个注册订阅者的方法,每当有新的订阅者加入就调用该方法注册。

Java 中的事件机制

Java中提供了基本的事件处理基类:

  1. EventObject:所有事件状态对象都将从其派生的根类;
  2. EventListener:所有事件侦听器接口必须扩展的标记接口;

具体使用方式可以用一个非常经典的开门案例来讲解:

首先创建一个开/关门事件:

import java.util.EventObject;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc TODO
 */
public class DoorEvent extends EventObject {

    private Integer doorStatus;

    public DoorEvent(Object source) {
        super(source);
    }

    public DoorEvent(Object source, Integer status) {
        super(source);
        this.doorStatus = status;
    }

    public void setStatus(Integer status) {
        this.doorStatus = status;
    }

    public Integer getStatus() {
        return doorStatus;
    }

}

所有的事件都继承 EventObject。

然后创建监听器:

public interface DoorListener extends EventListener {

    void DoorEvent(DoorEvent doorEvent);
}

所有的监听器都要实现 EventListener。

继续创建具体的开门/关门的监听器:

public class CloseDoorListener implements DoorListener {

    @Override
    public void DoorEvent(DoorEvent doorEvent) {
        Integer openStatus =  doorEvent.getStatus();
        if(0 == openStatus) {
            System.out.println("the door is close");
        }
    }
}

开门:

public class OpenDoorListener implements DoorListener {
    @Override
    public void DoorEvent(DoorEvent doorEvent) {
        Integer openStatus = doorEvent.getStatus();
        if(1 == openStatus) {
            System.out.println("the door is open");
        }
    }
}

有了监听器和事件之后,下一步就是用上他们。还记得上面的观察者模式嘛,同样的使用方式:

		/**
     * 将所有的listener保存起来
     *
     * @return
     */
public static List<DoorListener> getAllListener() {
  List<DoorListener> list = Lists.newArrayList();
  list.add(new OpenDoorListener());
  list.add(new CloseDoorListener());
  return list;
}

public static void main(String[] args) {
  DoorEvent open = new DoorEvent("open", 1);
  List<DoorListener> listeners = getAllListener();
  for (DoorListener listener : listeners) {
    listener.DoorEvent(open);
  }
}

Spring 中的事件机制

在 Spring 容器中通过 ApplicationEvent 类和 ApplicationListener 接口来处理事件,如果某个 bean 实现 ApplicationListener 接口并被部署到容器中,那么每次对应的 ApplicationEvent 被发布到容器中都会通知该 bean ,这是典型的观察者模式。

Spring 的事件默认是同步的,即调用 publishEvent 方法发布事件后,它会处于阻塞状态,直到 onApplicationEvent 接收到事件并处理返回之后才继续执行下去,这种单线程同步的好处是可以进行事务管理。

先展示一下使用方式,我们拿用户登录来举例。首先来创建一个事件:


import org.springframework.context.ApplicationEvent;

/**
 * @author rickiyang
 * @date 2019-12-04
 * @Desc TODO
 */
public class UserRegisterEvent extends ApplicationEvent {

    public UserRegisterEvent(Object source) {
        super(source);
    }
}

然后创建监听器去监听这个事件:


import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc  插入用户信息
 */
@Component
public class UserInsertListener implements ApplicationListener<UserRegisterEvent> {


    @Override
    public void onApplicationEvent(UserRegisterEvent userRegisterEvent) {
        String source = (String)userRegisterEvent.getSource();
        User user = JSON.parseObject(source, User.class);
        //insert db
    }
}

创建一个用户注册成功之后插入用户信息的监听器。


import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc  用户注册成功发送短信
 */
@Component
public class NotifyUserListener implements ApplicationListener<UserRegisterEvent> {


    @Override
    public void onApplicationEvent(UserRegisterEvent userRegisterEvent) {
        String source = (String)userRegisterEvent.getSource();
        User user = JSON.parseObject(source, User.class);
        //send sms
    }
}

创建注册成功发送通知短信的监听器。


import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc  用户注册成功给用户生成推荐商品
 */
@Component
public class RecommendListener implements ApplicationListener<UserRegisterEvent> {


    @Override
    public void onApplicationEvent(UserRegisterEvent userRegisterEvent) {
        String source = (String)userRegisterEvent.getSource();
        User user = JSON.parseObject(source, User.class);
        // generate recommend commodity
    }
}

创建用户注册成功之后给用户推荐商品的事件。

用户注册事件的监听器创建完毕,那么接下来就发布事件等待监听器监听就行。在Spring中提供了 ApplicationEventPublisherAware 接口,从名称上看就知道是 ApplicationEventPublisher 的适配器类,用法就是你在业务类中实现该接口,然后使用 ApplicationEventPublisher#publishEvent发布你的事件即可。

package com.rickiyang.learn.controller.test;

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;

/**
 * @author rickiyang
 * @date 2019-12-04
 * @Desc TODO
 */
@Service
public class UserRegisterPublisherService implements ApplicationEventPublisherAware {

    private ApplicationEventPublisher applicationEventPublisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public void insert(User user){
        UserRegisterEvent event = new UserRegisterEvent(JSON.toJSONString(user));
        applicationEventPublisher.publishEvent(event);
    }
}

调用insert方法就可以发布事件,写一个test测试一下:


import com.rickiyang.learn.entity.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

@RunWith(SpringRunner.class)
@SpringBootTest
public class UserRegisterPublisherServiceTest {

    @Resource
    private UserRegisterPublisherService userRegisterPublisherService;


    @Test
    public void test1() {
        User build = User.builder().name("1").sex(1).phone("123456789").build();
        userRegisterPublisherService.insert(build);
    }

}

可以看到3个监听器都打印出来了:

发送短信
商品推荐
插入用户

有个问题不知道大家发现没,监听器的发布顺序是按照 bean 自然装载的顺序执行的,如果我们的bean是有序的应该怎么办呢?别怕,Spring自然考虑到这个问题。

SmartApplicationListener实现有序的监听

SmartApplicationListener 接口继承了 ApplicationListener,使用全局的 ApplicationEvent 作为监听的事件对象。之所以 能提供顺序性,是因为继承了 Ordered 类,实现了排序的逻辑。另外添加了两个方法#supportsEventType、#supportsSourceType 来作为区分是否是我们监听的事件,只有这两个方法同时返回true时才会执行onApplicationEvent方法。

package com.rickiyang.learn.controller.test;

import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.stereotype.Component;

/**
 * @author rickiyang
 * @date 2019-12-05
 * @Desc TODO
 */
@Component
public class UserInsert1Listener implements SmartApplicationListener {

    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> aClass) {
        return aClass == UserRegisterEvent.class;
    }

    @Override
    public boolean supportsSourceType(Class<?> sourceType) {
        return sourceType == User.class;
    }

    /**
     * 数字越小优先级越高
     * 默认值为 2147483647
     * @return
     */
    @Override
    public int getOrder() {
        return 8;
    }

    @Override
    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        UserRegisterEvent event = (UserRegisterEvent)applicationEvent;
        // insert to db
    }
}

如果你有对多个监听器做排序的需求,那么你只用在 getOrder 方法中指定当前的排序级别即可。数字越大优先级越低,默认的排序级别是2147483647,你可以自己调整。

Spring 对事件监听机制的注解支持

Spring4.2之后,ApplicationEventPublisher 自动被注入到容器中,不再需要显示实现Aware接口。


import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author rickiyang
 * @date 2019-12-04
 * @Desc TODO
 */
@Service
public class UserRegisterPublisher1Service  {

    @Resource
    private ApplicationEventPublisher applicationEventPublisher;

    public void insert(User user){
        UserRegisterEvent event = new UserRegisterEvent(JSON.toJSONString(user));
        applicationEventPublisher.publishEvent(event);
    }
}

创建listener也就不需要显式的继承 ApplicationListener 或者 SmartApplicationListener,使用 @EventListener 注解即可:

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Service;

/**
 * @author rickiyang
 * @date 2019-12-07
 * @Desc TODO
 */
@Service
public class UserInfoCheckListener {

    @Order(8)
    @EventListener(classes = UserRegisterEvent.class)
    public void checkUserInfo(UserRegisterEvent event) {
        String source = (String) event.getSource();
        User user = JSON.parseObject(source, User.class);
        //todo check user info
    }
}

如果你想使用顺序性的listener,那么只需要使用 @Order注解就可以了。

异步事件的支持

上面说过 Spring 事件机制默认是同步阻塞的,如果 ApplicationEventPublisher 发布事件之后他会一直阻塞等待listener 响应,多个 listener 的情况下前面的没有执行完后面的一直被阻塞。如果我们的应用场景是:用户订单完成之后异步发货,检查快递信息,这些操作是没有必要返回结果给用户的。

这种情况下,我们是不是想到可以使用异步线程的方式来处理。你可以把listener中的处理流程做一个异步线程,或者利用 Spring 提供的线程池注解 @Async 来实现异步线程。

要使用 @Async 之前需要先开启线程池,在 启动类上添加 @EnableAsync 注解即可。线程池支持配置模式,如果你不想使用默认的线程池配置,可以手动指定:

package com.rickiyang.learn.controller.test;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.*;

/**
 * @author rickiyang
 * @date 2019-12-07
 * @Desc TODO
 */
@Configuration
@EnableAsync
public class AsyncConfig {

    @Bean("userInfoPool")
    public Executor getExecutor() {
        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("consumer-queue-thread-%d").build();
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 线程池维护线程的最少数量
        executor.setCorePoolSize(5);
        // 线程池维护线程的最大数量
        executor.setMaxPoolSize(10);
        // 缓存队列
        executor.setQueueCapacity(25);
        //线程名
        executor.setThreadFactory(namedThreadFactory);
        // 线程池初始化
        executor.initialize();
        return executor;
    }
}

手动配置一个 bean name 为 userInfoPool 的线程池,接下来使用@Async注解使用线程池:

package com.rickiyang.learn.controller.test;

import com.alibaba.fastjson.JSON;
import com.rickiyang.learn.entity.User;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * @author rickiyang
 * @date 2019-12-07
 * @Desc TODO
 */
@Service
public class UserInfoCheckListener {

    @Async("userInfoPool")
    @Order(8)
    @EventListener(classes = UserRegisterEvent.class)
    public void checkUserInfo(UserRegisterEvent event) {
        String source = (String) event.getSource();
        User user = JSON.parseObject(source, User.class);
        System.out.println("async deel");
        //todo check user info
    }
}

这样我们就把 UserInfoCheckListener 变成了异步任务。

Spring中的事件机制分析

上面从基本的发布订阅设计模式到 Java 提供的基本的事件处理基类,再拓展到 Spring 中如何使用事件机制来拓展代码,整条线还是很清晰。讲完了我们应该如何在业务代码中使用发布订阅模式,我们也来分析一下Spring是如何实现发布订阅模式的,看看人家的代码功底。

在Spring 中提供了Event 的基类:ApplicationEvent,如果事件要想被Spring监听那么就必须继承该类,同样该类也继承了 Java 中的事件基类:EventObject。

有了事件源,我们要定义事件监听者用于处理事件,所有的事件监听者都要继承 org.springframework.context.ApplicationListener 接口:

/**
 * Interface to be implemented by application event listeners.
 * Based on the standard {@code java.util.EventListener} interface
 * for the Observer design pattern.
 *
 * <p>As of Spring 3.0, an ApplicationListener can generically declare the event type
 * that it is interested in. When registered with a Spring ApplicationContext, events
 * will be filtered accordingly, with the listener getting invoked for matching event
 * objects only.
 *
 * @author Rod Johnson
 * @author Juergen Hoeller
 * @param <E> the specific ApplicationEvent subclass to listen to
 * @see org.springframework.context.event.ApplicationEventMulticaster
 */
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {

}

ApplicationListener 提供了 一个基于 ApplicationEvent 的泛型,所以你指定了某个类的监听者只会处理该类型的event。

上面我们说了 Spring 是基于 ApplicationEventPublisher 来发布事件,那么监听器是如何获取到事件呢?

注意到 ApplicationListener 上面的注释写到:@param <E> the specific ApplicationEvent subclass to listen to ApplicationEventMulticaster,从名称上看这个类的作用应该是用于事件广播。

ApplicationEventMulticaster是一个接口,提供了如下方法:

  • addApplicationListener(ApplicationListener<?> listener) :新增一个listener;
  • addApplicationListenerBean(String listenerBeanName):新增一个listener,参数为bean name;
  • removeApplicationListener(ApplicationListener<?> listener):删除listener;
  • removeApplicationListenerBean(String listenerBeanName):根据bean name 删除listener;
  • multicastEvent(ApplicationEvent event):广播事件;
  • multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType):广播事件,指定事件的source类型。

从接口的方法看,该类的作用就是添加监听器然后对所有监听器或者指定监听器发送事件进行处理。

ApplicationEventMulticaster 有两个实现类:

  • SimpleApplicationEventMulticaster
  • AbstractApplicationEventMulticaster

因为 AbstractApplicationEventMulticaster 是一个抽象类,并且 SimpleApplicationEventMulticaster 也继承了了 SimpleApplicationEventMulticaster ,所以我们直接看 SimpleApplicationEventMulticaster:

public abstract class AbstractApplicationEventMulticaster
		implements ApplicationEventMulticaster, BeanClassLoaderAware, BeanFactoryAware {

	private final ListenerRetriever defaultRetriever = new ListenerRetriever(false);

	final Map<ListenerCacheKey, ListenerRetriever> retrieverCache = new ConcurrentHashMap<>(64);

  
  @Override
	public void addApplicationListener(ApplicationListener<?> listener) {
		synchronized (this.retrievalMutex) {
			// Explicitly remove target for a proxy, if registered already,
			// in order to avoid double invocations of the same listener.
			Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
			if (singletonTarget instanceof ApplicationListener) {
				this.defaultRetriever.applicationListeners.remove(singletonTarget);
			}
			this.defaultRetriever.applicationListeners.add(listener);
			this.retrieverCache.clear();
		}
	}
  
  ......
  ......


}

#addApplicationListener 方法用于新增监听器,新增的逻辑主要在这一句:

defaultRetriever.applicationListeners.add(listener);

继续看 ListenerRetriever 的实现:

private class ListenerRetriever {

  public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet<>();

  public final Set<String> applicationListenerBeans = new LinkedHashSet<>();

  private final boolean preFiltered;

  public ListenerRetriever(boolean preFiltered) {
    this.preFiltered = preFiltered;
  }

  public Collection<ApplicationListener<?>> getApplicationListeners() {
    List<ApplicationListener<?>> allListeners = new ArrayList<>(
      this.applicationListeners.size() + this.applicationListenerBeans.size());
    allListeners.addAll(this.applicationListeners);
    if (!this.applicationListenerBeans.isEmpty()) {
      BeanFactory beanFactory = getBeanFactory();
      for (String listenerBeanName : this.applicationListenerBeans) {
        try {
          ApplicationListener<?> listener = beanFactory.getBean(listenerBeanName, ApplicationListener.class);
          if (this.preFiltered || !allListeners.contains(listener)) {
            allListeners.add(listener);
          }
        }
        catch (NoSuchBeanDefinitionException ex) {
          // Singleton listener instance (without backing bean definition) disappeared -
          // probably in the middle of the destruction phase
        }
      }
    }
    if (!this.preFiltered || !this.applicationListenerBeans.isEmpty()) {
      AnnotationAwareOrderComparator.sort(allListeners);
    }
    return allListeners;
  }
}

看到没,最终还是 持有了一个 applicationListeners 的集合,跟我们的发布订阅设计模式一样。

剩下的逻辑就好去解释,顺着咱们前面讲过的发布订阅模式的使用套路撸下去就行,事件广播的方法#multicastEvent不外乎就是遍历所有的监听器进行匹配。

总结

这一篇讲的发布订阅模式以及在Spring中的使用在日常开发中只要稍加注意你就会发现对改善代码流程的影响还是挺大。写代码有90%的时间我们都是在写同步代码,因为不用动脑子,顺着该有的流程撸就完事。这样带来的后果就是你真的只是在搬砖!

有的时候停下来,从业务逻辑跳出来拿半个小时想想你应该如何让这这一次搬砖有点技术含量。或许从此刻开始,搬砖也会与众不同。