Spring Cloud CircuitBreaker 源码随便看看

  • Post author:
  • Post category:其他



Spring Cloud CircuitBreaker github

Spring Cloud CircuitBreaker 已经是独立项目了,不像上次介绍的Spring Cloud LoadBalancer 还在 common 项目里呆着。这次也是从 Spring Cloud OpenFeign 的 CircuitBreaker 实现开始看起。

从一个配置项开始

Spring Cloud OpenFeign 配置之一

feign.circuitbreaker.enabled=true

开启 CircuitBreaker。直接看 feign 的配置类 FeignClientsConfiguration。

public class FeignClientsConfiguration {

    ......

    @Configuration(proxyBeanMethods = false)
	@ConditionalOnClass(CircuitBreaker.class)
	@ConditionalOnProperty("feign.circuitbreaker.enabled")
	protected static class CircuitBreakerPresentFeignBuilderConfiguration {

		@Bean
		@Scope("prototype")
		@ConditionalOnMissingBean({ Feign.Builder.class, CircuitBreakerFactory.class })
		public Feign.Builder defaultFeignBuilder(Retryer retryer) {
			return Feign.builder().retryer(retryer);
		}

		@Bean
		@Scope("prototype")
		@ConditionalOnMissingBean
		@ConditionalOnBean(CircuitBreakerFactory.class)
		public Feign.Builder circuitBreakerFeignBuilder() {
			return FeignCircuitBreaker.builder();
		}

	}

    ......

}

可以看到注入了一个 Feign.Builder ,默认的为 default 的bulider,开启了 CircuitBreaker 之后注入了一个新的 bulider 用于构建 CircuitBreaker 相关的 feign。主要看 FeignCircuitBreaker.Builder#target()。

public final class FeignCircuitBreaker {

    ......

	public static final class Builder extends Feign.Builder {
    
    ......

        @Override
		public <T> T target(Target<T> target) {
			return build(null).newInstance(target);
		}

		public Feign build(final FallbackFactory<?> nullableFallbackFactory) {
			// 设置 invocationHandlerFactory 为 FeignCircuitBreakerInvocationHandler
			super.invocationHandlerFactory((target, dispatch) -> new FeignCircuitBreakerInvocationHandler(
					circuitBreakerFactory, feignClientName, target, dispatch, nullableFallbackFactory,
					circuitBreakerGroupEnabled, circuitBreakerNameResolver));
			return super.build();
		}
   
    }
}

这里替换了原来的 ReflectiveFeign.FeignInvocationHandler 的 invocationHandlerFactory 为 FeignCircuitBreakerInvocationHandler。

class FeignCircuitBreakerInvocationHandler implements InvocationHandler {
    ......

    @Override
	public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
        ......

		// 解析 feign 的key feign.Feign#configKey()
		String circuitName = circuitBreakerNameResolver.resolveCircuitBreakerName(feignClientName, target, method);
		// 获取CircuitBreaker实例
		CircuitBreaker circuitBreaker = circuitBreakerGroupEnabled ? factory.create(circuitName, feignClientName)
				: factory.create(circuitName);
		// 构建 circuitBreaker 执行的 supplier
		Supplier<Object> supplier = asSupplier(method, args);
		// 设置 fallback
		if (this.nullableFallbackFactory != null) {
			Function<Throwable, Object> fallbackFunction = throwable -> {
				Object fallback = this.nullableFallbackFactory.create(throwable);
				try {
					return this.fallbackMethodMap.get(method).invoke(fallback, args);
				}
				catch (Exception e) {
					throw new IllegalStateException(e);
				}
			};

			// 执行带有 fallback 的 supplier
			return circuitBreaker.run(supplier, fallbackFunction);
		}

		// 执行 supplier
		return circuitBreaker.run(supplier);
	}

	private Supplier<Object> asSupplier(final Method method, final Object[] args) {
		// circuitBreaker 运行的时候会在另一个线程中,所以切换线程的时候同时切换上下文信息
		final RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
		return () -> {
			try {
				RequestContextHolder.setRequestAttributes(requestAttributes);
				return dispatch.get(method).invoke(args);
			}
			catch (RuntimeException throwable) {
				throw throwable;
			}
			catch (Throwable throwable) {
				throw new RuntimeException(throwable);
			}
		};
	}
    ......

}

直接使用 Spring Cloud Common 中的 CircuitBreaker 抽象去执行方法,并且还设置了 fallback。

关于 asSupplier() 中的上下文信息切换这点很重要,因为在编写 skywalking 插件去做链路追踪的时候直接使用这里的 RequestAttributes 保存 context 快照很简单的做到对 CircuitBreaker 的链路追踪。

CircuitBreaker 核心类

代码是在 spring-cloud-commons 项目中的还是要区分 reactive 的和阻塞的2情况。spring-cloud-circuitbreaker 是对 CircuitBreaker 抽象的实现。这里要说下版本,我看的是 springcloud 2020.0.3,感觉后期官方还会有大的改动。

CircuitBreakerFactory

// 阻塞的
public abstract class CircuitBreakerFactory<CONF, CONFB extends ConfigBuilder<CONF>>
		extends AbstractCircuitBreakerFactory<CONF, CONFB> {

	public abstract CircuitBreaker create(String id);

	public CircuitBreaker create(String id, String groupName) {
		return create(id);
	}

}

// reactive 的
public abstract class ReactiveCircuitBreakerFactory<CONF, CONFB extends ConfigBuilder<CONF>>
		extends AbstractCircuitBreakerFactory<CONF, CONFB> {

	public abstract ReactiveCircuitBreaker create(String id);

	public ReactiveCircuitBreaker create(String id, String groupName) {
		return create(id);
	}

}

根据不同的 id 创建 CircuitBreaker。在 feign 中这个 id 为 Feign#configKey() 生成的字符串,但是开启了

feign.circuitbreaker.group.enabled=true

就会使用开发者自定义的 clientName 作为查询 CircuitBreaker 配置的类。

CircuitBreaker

// 阻塞的
public interface CircuitBreaker {

	default <T> T run(Supplier<T> toRun) {
		return run(toRun, throwable -> {
			throw new NoFallbackAvailableException("No fallback available.", throwable);
		});
	};

	<T> T run(Supplier<T> toRun, Function<Throwable, T> fallback);

}

// reactive 的
public interface ReactiveCircuitBreaker {

	default <T> Mono<T> run(Mono<T> toRun) {
		return run(toRun, throwable -> {
			throw new NoFallbackAvailableException("No fallback available.", throwable);
		});
	}

	<T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback);

	default <T> Flux<T> run(Flux<T> toRun) {
		return run(toRun, throwable -> {
			throw new NoFallbackAvailableException("No fallback available.", throwable);
		});
	}

	<T> Flux<T> run(Flux<T> toRun, Function<Throwable, Flux<T>> fallback);

}

CircuitBreaker 作用就是直接去运行指定的方法然后返回结果。

Resilience4J CircuitBreaker

spring cloud 之前的版本使用的是 hystrix 作为 CircuitBreaker 的实现。之后的版本使用 Resilience4J 替代了 hystrix。

CircuitBreakerFactory 的实现类为 Resilience4JCircuitBreakerFactory,ReactiveCircuitBreakerFactory 的实现类为 ReactiveResilience4JCircuitBreakerFactory。

CircuitBreaker 的实现类为 Resilience4JCircuitBreaker。ReactiveCircuitBreaker 的实现类为 ReactiveResilience4JCircuitBreaker。

主要看看 Resilience4JCircuitBreaker#run() 做了什么事情。

public class Resilience4JCircuitBreaker implements CircuitBreaker {

    ......

    @Override
	public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
		// 获取执行的超时时间
		final io.vavr.collection.Map<String, String> tags = io.vavr.collection.HashMap.of(CIRCUIT_BREAKER_GROUP_TAG,
				this.groupName);
		// 感觉这个地方传递id有点问题不应该是 groupName 吗?
		// 如果没有配置默认1s超时
		TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter(id, timeLimiterConfig, tags);
		// 使用线程池执行 Supplier
		// 如果没有配置那就使用 Executors.newCachedThreadPool()
		Supplier<Future<T>> futureSupplier = () -> executorService.submit(toRun::get);

		Callable restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
		io.github.resilience4j.circuitbreaker.CircuitBreaker defaultCircuitBreaker = registry.circuitBreaker(this.id,
				this.circuitBreakerConfig, tags);
		circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(defaultCircuitBreaker));

		// 是否需要 bulkhead
		// 分别为信号量隔离和线程池隔离
		if (bulkheadProvider != null) {
			return bulkheadProvider.run(this.groupName, toRun, fallback, defaultCircuitBreaker, timeLimiter, tags);
		}
		else {
			Callable<T> callable = io.github.resilience4j.circuitbreaker.CircuitBreaker
					.decorateCallable(defaultCircuitBreaker, restrictedCall);
			// 真正发起请求
			return Try.of(callable::call).recover(fallback).get();
		}
	}

    ......
}

在测试

feign.circuitbreaker.group.enabled=true

的时候发现 TimeLimiter 是不会根据 groupName 获取对应的配置,但是写成 id 的配置却可以的,感觉这里有点小问题,不知道后期会不会改。

这里可以看到 CircuitBreaker 的功能,超时、隔离和降级。其中隔离分为线程池隔离和信号量隔离。简单说下当服务调用下游应用的扇出很大的时候最好用信号量隔离,使用线程池隔离的话会占用很多线程资源,比如网关这种场景。

根据之前的文章可以发现 Spring Cloud OpenFeign 的配置是通过 NamedContextFactory 获取的,loadbalancer 的配置也是根据 NamedContextFactory 获取的,然而 Resilience4J 版的 CircuitBreaker 配置却是使用 hashmap 实现的。感觉很不统一,个人猜测有历史原因,原版的 feign 是不提供根据 clientName 获取不同配置的功能,spring cloud添加了这个功能。loadbalancer 是 spring cloud 团队自己开发的,然而 Resilience4J 的核心包却是自己提供了根据 id 获取不同配置的功能,就是使用 InMemoryRegistryStore 中的 hashmap 实现的。所以才会整体看起来很不统一,有兴趣的可以自己开发一套基于NamedContextFactory 的配置。毕竟配置文件的方式也是最近才加上的,难道官方后期会完善?

接下来看看 ReactiveResilience4JCircuitBreaker 的是如何实现的。

public class ReactiveResilience4JCircuitBreaker implements ReactiveCircuitBreaker {

    ......

    	@Override
	public <T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback) {
		io.github.resilience4j.circuitbreaker.CircuitBreaker defaultCircuitBreaker = circuitBreakerRegistry
				.circuitBreaker(id, circuitBreakerConfig);
		circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(defaultCircuitBreaker));
		TimeLimiter timeLimiter = timeLimiterRegistry.timeLimiter(id, timeLimiterConfig);
		Mono<T> toReturn = toRun.transform(CircuitBreakerOperator.of(defaultCircuitBreaker))
				.timeout(timeLimiter.getTimeLimiterConfig().getTimeoutDuration())
				// Since we are using the Mono timeout we need to tell the circuit breaker
				// about the error
				.doOnError(TimeoutException.class,
						t -> defaultCircuitBreaker.onError(
								timeLimiter.getTimeLimiterConfig().getTimeoutDuration().toMillis(),
								TimeUnit.MILLISECONDS, t));
		if (fallback != null) {
			toReturn = toReturn.onErrorResume(fallback);
		}
		return toReturn;
	}

    ......
}

reactive 的版本就有点不一样了,因为没有 bulkhead 实现。官方已经有添加计划了,只不过是没有线程池隔离的实现,可能是因为 reactive 本身就是异步使用线程池去执行任务,所以当发生下游服务响应慢导致线程阻塞的时候使用信号量就足够了。因为spring cloud gateway 是 webflux 的,所以目前是没有 bulkhead 可以用的。

看看阻塞的 bulkhead 的 实现。

public class Resilience4jBulkheadProvider {

    ......

    /**
	 * 判断需要那种隔离方式,优先使用线程池隔离
	 */
	private <T> Supplier<CompletionStage<T>> decorateBulkhead(final String id,
			final io.vavr.collection.Map<String, String> tags, final Supplier<T> supplier) {
		Resilience4jBulkheadConfigurationBuilder.BulkheadConfiguration configuration = configurations
				.computeIfAbsent(id, defaultConfiguration);
		// 信号量隔离
		if (bulkheadRegistry.find(id).isPresent() && !threadPoolBulkheadRegistry.find(id).isPresent()) {
			Bulkhead bulkhead = bulkheadRegistry.bulkhead(id, configuration.getBulkheadConfig(), tags);
			CompletableFuture<T> asyncCall = CompletableFuture.supplyAsync(supplier);
			return Bulkhead.decorateCompletionStage(bulkhead, () -> asyncCall);
		}
		// 线程池隔离
		else {
			ThreadPoolBulkhead threadPoolBulkhead = threadPoolBulkheadRegistry.bulkhead(id,
					configuration.getThreadPoolBulkheadConfig(), tags);
			return threadPoolBulkhead.decorateSupplier(supplier);
		}
	}
    ......
}

如果开启了 bulkhead, CircuitBreaker 就会把 run 方法委托给 Resilience4jBulkheadProvider 去执行,然后选择隔离方式优先使用线程池隔离。

简单的总结

通过重写 feign 的相关组件来扩展 CircuitBreaker 的功能。抽象和实现也是分开的。最乱的可能是相关的配置那块,因为不同的配置决定不同的功能。只能去看

官方文档

了,这些东西也许用不上,但是可以更好的去认识分布式系统,相对于单体项目来说分布式系统会出现更多的挑战,然而这些框架从一定程度上已经有了解决思路,所以学习就好了。同时借鉴这些在设计上的技巧,提高抽象能力。



版权声明:本文为qq_36968688原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。