@EnableAsync注解说明

  • Post author:
  • Post category:其他




@EnableAsync注解说明

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
  Class<? extends Annotation> annotation() default Annotation.class;
  boolean proxyTargetClass() default false;
  AdviceMode mode() default AdviceMode.PROXY;
  int order() default Ordered.LOWEST_PRECEDENCE;
}
  • 在Spring Boot中,@EnableXXX注解的功能通常是开启某个功能。@EnableXXX注解通常会和@Import注解配合使用,通过@Import导入配置类,根据配置类自动装配一些Bean,来达到开启某些功能的目的。



@Import(AsyncConfigurationSelector.class)

导入AsyncConfigurationSelector配置

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


	/**
	 * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration}
	 * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()},
	 * respectively.
	 */
	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}

}



AdviceModeImportSelector

AsyncConfigurationSelector继承的父类

AdviceModeImportSelector

,实现ImportSelector 接口重写

selectImports

方法,该方法的功能是通过条件(AnnotationMetadata注解属性值)来选择需要导入的配置类

public abstract class AdviceModeImportSelector<A extends Annotation> implements ImportSelector {
/**
	 * The default advice mode attribute name.
	 */
	public static final String DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME = "mode";

	/**
	 * The name of the {@link AdviceMode} attribute for the annotation specified by the
	 * generic type {@code A}. The default is {@value #DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME},
	 * but subclasses may override in order to customize.
	 */
	protected String getAdviceModeAttributeName() {
		return DEFAULT_ADVICE_MODE_ATTRIBUTE_NAME;
	}
public final String[] selectImports(AnnotationMetadata importingClassMetadata) {
		Class<?> annType = GenericTypeResolver.resolveTypeArgument(getClass(), AdviceModeImportSelector.class);
		Assert.state(annType != null, "Unresolvable type argument for AdviceModeImportSelector");

		AnnotationAttributes attributes = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
		if (attributes == null) {
			throw new IllegalArgumentException(String.format(
					"@%s is not present on importing class '%s' as expected",
					annType.getSimpleName(), importingClassMetadata.getClassName()));
		}

		AdviceMode adviceMode = attributes.getEnum(getAdviceModeAttributeName());
		String[] imports = selectImports(adviceMode);
		if (imports == null) {
			throw new IllegalArgumentException("Unknown AdviceMode: " + adviceMode);
		}
		return imports;
	}
  
}
  • 获取@EnableAsync注解中属性为“mode”的值,根据mode导入

    ProxyAsyncConfiguration



    AspectJAsyncConfiguration

    配置类

  • 默认

    mode = AdviceMode.PROXY

    ,导入

    ProxyAsyncConfiguration

    配置类



ProxyAsyncConfiguration配置类

  • 可以看到该配置类主要

    • 继承了抽象类

      AbstractAsyncConfiguration
    • 实例化


      AsyncAnnotationBeanPostProcessor


      bean
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
    // 实例化一个AsyncAnnotationBeanPostProcessor
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
    // 设置执行器与异常执行器
		bpp.configure(this.executor, this.exceptionHandler);
    // @EnableAsync注解annotation是否指定了自定义的注解,如果没有指定默认@Async和 @javax.ejb.Asynchronous注解生效,若自定义需要加入到bbp中
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
    // @EnableAsync注解的属性值(proxyTargetClass、order)设置给bpp
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}

}
  • 整个方法就是在Spring上下文中实例化一个AsyncAnnotationBeanPostProcessor,这个Bean主要是实现方法或者类上使用@Async注解从而达到异步的效果



AsyncAnnotationBeanPostProcessor的实例化

  • 通过上面的分析,@EnableAsync注解前面的种种操作,导入配置类后主要作用就是初始化这个Bean,可以说@Async的重点核心就是这个类,之前做了这么多准备就是为了初始化这个类



setBeanFactory:创建切面处理

@Override
public void setBeanFactory(BeanFactory beanFactory) {
		super.setBeanFactory(beanFactory);

    // 创建切面处理
		AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
		if (this.asyncAnnotationType != null) {
			advisor.setAsyncAnnotationType(this.asyncAnnotationType);
		}
		advisor.setBeanFactory(beanFactory);
		this.advisor = advisor;
}



AsyncAnnotationAdvisor:创建切面的切点

// 构造方法
public AsyncAnnotationAdvisor(@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    // 存储异步注解类型,默认的@Async注解和在@EnableAsync自定义的注解(如果有的话),用于标示切点位置
		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
		} catch (ClassNotFoundException ex) {
			// If EJB 3.1 API not present, simply ignore.
		}
    // 创建AOP的切面通知
		this.advice = buildAdvice(executor, exceptionHandler);
    // 创建AOP的切点
		this.pointcut = buildPointcut(asyncAnnotationTypes);
}

// 创建通知
protected Advice buildAdvice(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {
    // 创建切面处理的拦截器
		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
  	// 配置自定义的执行器和错误处理器(如果为空,就使用默认的)
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
}

// 创建切点
protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
		ComposablePointcut result = null;
    // 遍历异步注解类型(@Async和自定义的异步注解)并创建切点
		for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
			Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
			Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
			if (result == null) {
				result = new ComposablePointcut(cpc);
			}
			else {
				result.union(cpc);
			}
			result = result.union(mpc);
		}
		return (result != null ? result : Pointcut.TRUE);
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2SkcHHX3-1619162321154)(@Async注解.assets/format,png.png)]


  • AnnotationAsyncExecutionInterceptor

    类就是我们环绕通知的处理类了,

    Advice说明了这个类是个aop通知处理类,Interceptor说明了处理的方法是拦截器的invoke方法

    。切面拦截到切点时候就会到这个方法的invoke中执行对应的业务处理逻辑。



创建代理

  • AsyncAnnotationBeanPostProcessor实现了BeanPostProcessor接口,我们知道该接口在bean的生命周期管理过程中有着重要的作用

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MFEufakf-1619162321166)(@Async注解.assets/BeanPostProcessor.jpeg)]

// 每一个bean创建后都会调用这个BeanPostProcessor后置处理,作用:生成满足条件的Bean的代理
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (this.advisor == null || bean instanceof AopInfrastructureBean) {
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		}

		if (bean instanceof Advised) {
			Advised advised = (Advised) bean;
			if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
				// Add our local Advisor to the existing proxy's Advisor chain...
				if (this.beforeExistingAdvisors) {
					advised.addAdvisor(0, this.advisor);
				}
				else {
					advised.addAdvisor(this.advisor);
				}
				return bean;
			}
		}

		if (isEligible(bean, beanName)) {
			ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
			if (!proxyFactory.isProxyTargetClass()) {
        // 添加需要代理的接口
				evaluateProxyInterfaces(bean.getClass(), proxyFactory);
			}
      // 添加初始化好的切面
			proxyFactory.addAdvisor(this.advisor);
			customizeProxyFactory(proxyFactory);
      // aop生成代理
			return proxyFactory.getProxy(getProxyClassLoader());
		}

		// No proxy needed.
		return bean;
}

@Override
protected boolean isEligible(Object bean, String beanName) {
		return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) &&
				super.isEligible(bean, beanName));
}

// super.isEligible
protected boolean isEligible(Object bean, String beanName) {
		return isEligible(bean.getClass());
}

protected boolean isEligible(Class<?> targetClass) {
		Boolean eligible = this.eligibleBeans.get(targetClass);
		if (eligible != null) {
			return eligible;
		}
		if (this.advisor == null) {
			return false;
		}
    // 判断一个建议(advisor)能否匹配一个指定的类型,(判断是否生成代理类)
		eligible = AopUtils.canApply(this.advisor, targetClass);
		this.eligibleBeans.put(targetClass, eligible);
		return eligible;
}

protected void evaluateProxyInterfaces(Class<?> beanClass, ProxyFactory proxyFactory) {
    // 获取bean对象的所有接口
		Class<?>[] targetInterfaces = ClassUtils.getAllInterfacesForClass(beanClass, getProxyClassLoader());
		boolean hasReasonableProxyInterface = false;
		for (Class<?> ifc : targetInterfaces) {
			if (!isConfigurationCallbackInterface(ifc) && !isInternalLanguageInterface(ifc) &&
					ifc.getMethods().length > 0) {
				hasReasonableProxyInterface = true;
				break;
			}
		}
		if (hasReasonableProxyInterface) {
			// Must allow for introductions; can't just set interfaces to the target's interfaces only.
			for (Class<?> ifc : targetInterfaces) {
        // 添加要代理的接口
				proxyFactory.addInterface(ifc);
			}
		}
		else {
			proxyFactory.setProxyTargetClass(true);
		}
}



拦截@Async后的处理

  • AnnotationAsyncExecutionInterceptor#invoke
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

    // 获取executor执行器
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}
    // 定义一个Callable异步任务
		Callable<Object> task = () -> {
			try {
        // 被拦截的方法执行
				Object result = invocation.proceed();
				if (result instanceof Future) {
					return ((Future<?>) result).get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};
    // 异步执行task,并返回结果
		return doSubmit(task, executor, invocation.getMethod().getReturnType());
}


@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
  	// 如果返回值是CompletableFuture类型的(异步方法有返回值)
		if (CompletableFuture.class.isAssignableFrom(returnType)) {
      
      // 异步执行并返回 CompletableFuture 对象,通过future.get()获取结果
			return CompletableFuture.supplyAsync(() -> {
				try {
					return task.call();
				}
				catch (Throwable ex) {
					throw new CompletionException(ex);
				}
			}, executor);
		}
		else if (ListenableFuture.class.isAssignableFrom(returnType)) {
			return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
		}
		else if (Future.class.isAssignableFrom(returnType)) {
			return executor.submit(task);
		}
		else { // 如果异步方法没有返回值
			executor.submit(task);
			return null;
		}
}

private final Map<Method, AsyncTaskExecutor> executors = new ConcurrentHashMap<>(16);

// 获取异步执行器
@Nullable
protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
		AsyncTaskExecutor executor = this.executors.get(method);
		if (executor == null) {
			Executor targetExecutor;
			String qualifier = getExecutorQualifier(method);
			if (StringUtils.hasLength(qualifier)) {
				targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
			}
			else {
				targetExecutor = this.defaultExecutor.get();
			}
			if (targetExecutor == null) {
				return null;
			}
			executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
					(AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
			this.executors.put(method, executor);
		}
		return executor;
}

// 返回在执行给定@Async异步方法时要使用的执行器executor的bean名称(@Async注解配置的执行器名称)
@Override
@Nullable
protected String getExecutorQualifier(Method method) {
		// Maintainer's note: changes made here should also be made in
		// AnnotationAsyncExecutionAspect#getExecutorQualifier
    // 获取@Async注解
		Async async = AnnotatedElementUtils.findMergedAnnotation(method, Async.class);
		if (async == null) {
			async = AnnotatedElementUtils.findMergedAnnotation(method.getDeclaringClass(), Async.class);
		}
		return (async != null ? async.value() : null);
}

// 获取默认的执行器SimpleAsyncTaskExecutor
@Override
@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
		return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
}

// super.getDefaultExecutor ===> AsyncExecutionAspectSupport#getDefaultExecutor

// 默认的异步执行器bean的名称
public static final String DEFAULT_TASK_EXECUTOR_BEAN_NAME = "taskExecutor";

@Nullable
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		if (beanFactory != null) {
			try {
				// Search for TaskExecutor bean... not plain Executor since that would
				// match with ScheduledExecutorService as well, which is unusable for
				// our purposes here. TaskExecutor is more clearly designed for it.
				return beanFactory.getBean(TaskExecutor.class);
			}
			catch (NoUniqueBeanDefinitionException ex) {
				logger.debug("Could not find unique TaskExecutor bean", ex);
				try {
					return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
				}
				catch (NoSuchBeanDefinitionException ex2) {
					if (logger.isInfoEnabled()) {
						logger.info("More than one TaskExecutor bean found within the context, and none is named " +
								"'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " +
								"as an alias) in order to use it for async processing: " + ex.getBeanNamesFound());
					}
				}
			}
			catch (NoSuchBeanDefinitionException ex) {
				logger.debug("Could not find default TaskExecutor bean", ex);
				try {
					return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class);
				}
				catch (NoSuchBeanDefinitionException ex2) {
					logger.info("No task executor bean found for async processing: " +
							"no bean of type TaskExecutor and no bean named 'taskExecutor' either");
				}
				// Giving up -> either using local default executor or none at all...
			}
		}
		return null;
}

https://blog.csdn.net/qingqingxiangyang/article/details/115252843

https://blog.csdn.net/weixin_37760377/article/details/103627676

https://blog.csdn.net/xiaoxiaole0313/article/details/104666789

https://www.jianshu.com/p/5f3bf8a12e26

https://blog.csdn.net/qq_22167989/article/details/103358514



版权声明:本文为qingqingxiangyang原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。