一、客户端弹性模式
4种客户端弹性模式:
客户端负载均衡(client load balance )模式;
断路器(circuit breaker)模式;
后备(fallback)模式;
舱壁(bulkhead)模式.
1、客户端负载均衡(client load balance )模式
客户端负载均衡涉及让客户端从服务发现代理(如 Netflix Eureka )查找服务的所有实例 ,然后缓存服务实例的物理位置。
每当服务消费者需要调用该服务实例时,客户端负载均衡器将从它维护的服务位置池返回一个位置。
2、断路器模式
当远程服务被调用时,断路器将监视这个调用 。如果调用时间太长,断路器将会介入井中断调用 此外,断路器将监视所有对远程资源的调用,如果对某一个远程资源的调用失败次数足够多,那么断路器实现就会出现并采取快速失败,阻止将来调用失败的远程资源。
3、后备模式
当远程服务调用失败时,服务消费者将执行替代代码路径, 并尝试通过其他方式执行操作 ,而不是生成 一个异常 。这通常涉及从另一数据源查找数据或将用户的请求进行排队以供将来处理。
4、舱壁模式
通过使用舱壁模式,可以把远程资源的调用分到线程池中,并降低一个缓慢的远程资源调用拖垮整个应用程序的风险。
二、使用Spring Cloud和Hystrix
1、一个简单的示例:
1)pom.xml 引入Hystrix相关依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>1.5.9</version>
</dependency>
2)在引导类上加上注解:
@EnableCircuitBreaker
// 告诉Spring Cloud将要为服务使用Hystrix。
3)使用 Hystrix实现断路器
@HystrixCommand
注解。 spring 框架看到@HystrixCommand注解时,它将动态生成一个代理,该代理将包装该方法,并通过专门用于处理远程调用的线程池来管理对该方法的所有调用。
在getLicensesByOrg()方法上增加@HystrixCommand注解:
@HystrixCommand
public List<License> getLicensesByOrg(String organizationId){
logger.debug("LicenseService.getLicensesByOrg Correlation id: {}", UserContextHolder.getContext().getCorrelationId());
randomlyRunLong();
return licenseRepository.findByOrganizationId(organizationId);
}
randomlyRunLong():模拟超时调用
private void randomlyRunLong(){
Random rand = new Random();
int randomNum = rand.nextInt((3 - 1) + 1) + 1;
if (randomNum==3) sleep();
}
sleep()方法:
private void sleep() {
try {
Thread.sleep(11000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
有了
@HystrixCommand
注解,如果查询花费的时间过长,许可证服务将中断其对数据库的调用。
2、自定义断路器的超时时间
Hystrix通过
commandProperties
属性来定制断路器的行为。如:
cornrnandProperties属性接受一个 HystrixProperty 对象数组。它可以传入自定义属性来配置 Hystrix 断路器。
@HystrixCommand(
commandProperties={
@HystrixProperty("execution.isolation.thread.timeoutInMilliseconds" value="12000")})
public List<License> getLicensesByOrg(String organizationId){
logger.debug("LicenseService.getLicensesByOrg Correlation id: {}", UserContextHolder.getContext().getCorrelationId());
randomlyRunLong();
return licenseRepository.findByOrganizationId(organizationId);
}
服务超时:时间设置。
如果一些比其他服务调用需要更长时间的服务调用,需将这些服务调用隔离到单独的线程池中。
三、后备处理
1、Hystrix 中实现一个后备
buildFallbackLicenseList() 后备方法:
private List<License> buildFallbackLicenseList(String organizationId){
List<License> fallbackList = new ArrayList<>();
License license = new License()
.withId("0000000-00-00000")
.withOrganizationId( organizationId )
.withProductName("Sorry no licensing information currently available");
fallbackList.add(license);
return fallbackList;
}
后备方法必须与由@HystrixCommand保护的原始方法位于同一个类中。
并且必须具有与原始方法完全相同的方法签名,因为传递给由@HystrixCommand保护的原始方法的所有参数都将传递给后备方法。
在getLicensesByOrg,添加后备方法buildFallbackLicenseList():
// 当Hystrix 因为调用耗费时间太长而不得不中断该调用时,该buildFallbackLicenseList方法将会被调用。
@HystrixCommand(fallbackMethod = "buildFallbackLicenseList")
public List<License> getLicensesByOrg(String organizationId){
logger.debug("LicenseService.getLicensesByOrg Correlation id: {}", UserContextHolder.getContext().getCorrelationId());
randomlyRunLong();
return licenseRepository.findByOrganizationId(organizationId);
}
private void randomlyRunLong(){
Random rand = new Random();
int randomNum = rand.nextInt((3 - 1) + 1) + 1;
if (randomNum==3) sleep();
}
private void sleep(){
try {
Thread.sleep(11000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
后备:
1.后备是一种在资源超时或失败时提供行动方案的机制。使用后备来捕获超时异常,然后只做日志记录错误,
就应该在服务调用周围使用标准的 try .. catch 块,捕获 HystrixRuntimeException 异常,并将日志记录逻辑放在 try .. catch 块中。
2.注意使用后备方法所执行的操作。
四、舱壁模式
调用多个微服务来完成特定的任务,在不使用舱壁模式的情况下,这些调用默认是使用同一批线程来执行调用的。
在大量请求的情况下,一个服务出现性能问题会导致 Java容器的所有线程被刷爆并等待处理工作,同时堵塞新请求,最终导致 Java 容器崩溃。舱壁模式将远程资源调用隔离在它们自己的线程池中,以便可以控制单个表现不佳的服务,而不会使该容器崩溃。
Hystrix 使用线程池来委派所有对远程服务的请求。在默认情况下,所有的 Hystrix 命令都将共享同一个线程池来处理请求。在应用程序中访问少量的远程资源时,这种模型运行良好,并且各个服务的调用量分布相对均匀。 问题是,如果某些服务具有比其他服务高得多的请求量或更长的完成时间,那么最终可能会导致 Hystrix 线程池中的线程耗尽,因为一个服务最终会占据默认线程池中的所有线程。
Hystrix 提供了 种易于使用的机制,在不同的远程资源调用之间创建舱壁。即每个远程资源调用都放置在自己的线程池中,每个线程池都有可用于处理请求的最大线程数。所以一个性能低下的服务只会影响到同一线程池中的其他服务调用。
1、实现隔离的线程池,需要做以下几步:
1))为 方法(如:getLicensesByOrg())调用建立一个单独的线程池
@HystrixCommand(fallbackMethod = "buildFallbackLicenseList",
threadPoolKey = "licenseByOrgThreadPool",
threadPoolProperties =
{
// coreSize 属性可以设置线程池的大小
@HystrixProperty(name = "coreSize",value="30"),
// maxQueueSize 将控制在线程池中线程繁忙时允许堵塞的请求数。一旦请求数超过队列大小,对线程池的任何其他请求都将失败,直到队列中有空间
@HystrixProperty(name="maxQueueSize", value="10")}
)
public List<License> getLicensesByOrg(String organizationId){
logger.debug("LicenseService.getLicensesByOrg Correlation id: {}",
UserContextHolder.getContext().getCorrelationId());
randomlyRunLong();
return licenseRepository.findByOrganizationId(organizationId);
}
threadPoolkey属性:这向 Hystrix 发出信号,我们想要建立一个新的线程池,如果线程池中没有设置任何进一步的值,Hystrix会使用threadPoolKey 属性中的名称搭建一个线程池,并使用所有的默认值来对线程池进行配置。
要定制线程池,应该使用@HystrixCommand 上的 threadPoolProperties 属性, 此属性使用 HystrixProperty 对象的数组。
2)设置单个线程繁忙时可排队的请求数的队列大小
注:maxQueueSize
1.如果将maxQueueSize 其值设置为-1,则将使用 Java SynchronousQueue 来保存所有传入的请求, 同步队列本质上会强制要求正在处理中的请求数量永远不能超过线程池中可用线程的数量。
2.如果将 maxQueueSize 设置为大于 的值将导致 Hystrix使用 Java LinkedBlockingQueue。 LinkedBlock ngQueue 的允许即使所有线程都在忙于处理请求,也能对请求进行排队。
3.maxQueueSize 属性只能在线程池首次初始化时设置(例如,在应用程序启动时) Hystrix 允许通过使用 qu eueSizeRejectionThreshold 属性来动态更改队列的大小,但只有在 maxQueueSize 属性的值大于0 时,才能设置此属性。
3)自定义线程池的适当大小,Netflix 推荐以下公式:
服务在健康状态时每秒支撑的最大请求数×第 99 百分位延迟时间(以秒为单位)+用于缓冲的少量额外线程
线程池属性调整的关键指标就是,即使目标远程资源是健康的,服务调用仍然超时。
五、微调Hystrix
1、配置断路器的行为
@HystrixCommand(//fallbackMethod = "buildFallbackLicenseList",
threadPoolKey = "licenseByOrgThreadPool",
threadPoolProperties =
{@HystrixProperty(name = "coreSize",value="30"),
@HystrixProperty(name="maxQueueSize", value="10")},
commandProperties={
// 用于控制 Hystrix 考虑将该断路器跳闸之前,在 10秒 之内必须发生的连续调用数量。如:10
@HystrixProperty(name="circuitBreaker.requestVolumeThreshold", value="10"),
// 在超过 circuitBreaker.requestVolumeThreshold 值之后,在断路器跳闸之前必须达到的调用失败((由于超时 抛出异常或返回 HTTP 500)的百分比。如75%
@HystrixProperty(name="circuitBreaker.errorThresholdPercentage", value="75"),
// 在断路器跳闸之后,Hystrix 允许另一个调用通过以便查看服务是否恢复健康之前 Hystrix 的休眠时间。这里设置了7秒
@HystrixProperty(name="circuitBreaker.sleepWindowInMilliseconds", value="7000"),
// 用于于控制 Hystr 用来监视服务调用问题的窗口大小,其默认值为 10000 ms (即10秒),这里设置了15秒。
// 给 metrics rollingStats.timeinMilliseconds置的值必须能被定义的桶(metrics.rollingStats.numBuckets)的数量值整除。
// 如:将使用 15s 的窗口,并将统计数据收集到长度为3s 的5个桶中。
@HystrixProperty(name="metrics.rollingStats.timeInMilliseconds", value="15000"),
// 在定义的滚动窗口中收集统计信息的次数。 Hystrix在桶( bucket )中收集度量数据,并检查这些桶中的统计信息。
@HystrixProperty(name="metrics.rollingStats.numBuckets", value="5")}
)
public List<License> getLicensesByOrg(String organizationId){
logger.debug("LicenseService.getLicensesByOrg Correlation id: {}", UserContextHolder.getContext().getCorrelationId());
randomlyRunLong();
return licenseRepository.findByOrganizationId(organizationId);
}
定制断路器的行为 @HystrixCommand 注解通过
commandPoolProperties
属性公开了这5个属性.。其中, threadPoolProperties 属性用于设置 Hystrix 中使用的底层线程池的行为,而 commandPoolProperties 属性用于定制Hystrix 命令关联的断路器的行为。
检查的统计窗口越小且在窗口中保留的桶的数量越多,就越会加剧高请求服务的CPU利用率和内存利用率。所以要避免将度量收集窗口和桶设置为太细的粒度,除非需要这种可见性级别。
2、重新审视 Hystrix 配置
可以使用 Hystrix 的3个配置级别:
整个应用程序级别的默认值。
类级别的默认值。
在类中定义的线程池级别。
每个 Hystrix 属性都有默认设置的值,这些值将被应用程序中的每个@HystrixCommand注解所使用。除非这些属性值在 Java 类级别被设置,或者被类中单个 Hystrix 线程池级别的值覆盖。
Hystrix 允许开发人员在类级别设置默认参数,以便特定类中的所有 Hystrix 命令共享相同的配置 。类级属性是通 个名为
@DefaultProperties
的类级注解设置的。如:
@DefaultProperties(
commandProperties = {
@HystrixProperty(name="execution.solation.thread.timeoutlnMillseconds", value="10000" )}
class MyService {
// ...
}
除非在线程池级别上显式地覆盖,否则所线程池都将继承应用程序级别的默认属性或类中定义的默认属性。
注:
在生产环境中,需要调整的 Hystrix 数据(超时参数 线程池计数)将被外部化到 Spring Cloud Config。
通过这种方式,如果需要更改参数值,就可以在更改完参数值之后重新启动服务实例,而无需重新编译和重新。
创建和配置@HystrixCommand 注解的所有配置值:
属性名称 | 默认值 | 描述 |
---|---|---|
fallbackMethod | None | 标识类中的方法,如果远程调用超时,将调用该方法。回调方法必须与@HystrixCommand 注解在同一个类中,并必须具有与调用类相同的方法签名。如果值不存在,Hystrix 抛出异常 |
threadPoolKey | None | 给予@HystrixCommand 个唯一名称,并创建一个独立于默认线程池的线程池。 如果没有定义任何值,则将使用默认 Hystrix 线程池 |
threadPoolProperties | None | 核心的 Hystrix 注解属性,用于配置线程池的行为 |
coreSize | 10 | 设置线程池的大小 |
maxQueueSize | -1 | 设置线程池前面的最大队列大小。 如果设置为-1, 则不使用队列, Hystrix 将阻塞请求,直到有一个线程可用来处理 |
circuitBreaker.requestVolumeThreshold | 20 | 设置 Hystrix 开始检查断路器是否跳闸之前滚动窗口中必须处理的最小请求数。注意 :此值只能使用 commandPoolProperties 属性设置 |
circuitBreaker.errorThresholdPercentage | 50 | 在断路器跳闸之前, 滚动窗口 内必须达到的故障百分比。注意:此值只能使用 commandPoolProperties 属性设置 |
circuitBreaker.sleepWindowinMilliseconds | 5000 | 在断路器跳闸之后, Hystrix 尝试进行服务调用之前将要等待的时间(以运秒为单位)。注意:此值只能使用commandPoolProperties 属性 |
metricsRollingStats.timeinMilliseconds | 10000 | Hystrix 集和监控服务调用的统计信息的滚动窗口(以毫秒为单位) |
metricsRollingStats.numBuckets | 10 | Hystrix 中一个监控窗口中维护的度量的桶数。监视窗口内的桶数越多, Hystrix 在窗口内监控故障的时间越低 |
六、线程上下文和Hystrix
当一个@HystrixCommand 被执行时,它可以使用两种不同的隔离策略——THREAD (线程)和SEMAPHORE (信号量)来运行。
THREAD (线程)隔离:
在默认情况下, Hystrix 用THREAD 隔离策略运行。用于保护调用的每个 Hystrix 命令都在
一个单独的线程池
中运行,该线程池不与父线程共享它的上下文。Hystrix 可以在它的控制下中断线程的执行,而不必担心中断与执行原始调用的父线程相关的其他活动。
SEMAPHORE 的隔离:
通过基于 SEMAPHORE 的隔离, Hystrix 管理由@HystrixCommand 注解保护的分布式调用,而不需要启动一个新线程,并且如果调用超时,就会中断父线程 。在同步容器服务器环境( Tomcat)中,中断父线程将导致抛出开发人员无法捕获的异常。
@HystrixCommand(
commandProperties = {
@HystrixProperty(name="execution isolation strategy", value="SEMAPHORE")})
1、ThreadLocal 和Hystrix
THREAD (线程)隔离,在默认情况下,对被父线程调用并由@HystrixCommand 保护的方法而言,在父线程中设置为ThreadLocal 是不可用的。
如:通常在基于 REST 的环境中,希望将上文信息传递给服务调用, 这将助于在运维上管理该服务。
可以在REST 调用的 HTTP 首部中传递 ID ( correlation ID )或验证令牌, 然后将其传播到任何下游服务调用。关联ID是唯一标识符, 标识符可用于在单个服务中跨多个服务调用进跟踪。
如:UserContextFilter 解析 HTTP 首部并检索数据
UserContextHolder 类用于将 UserContext 存储在 ThreadLocal 类中。
UserContextInterceptor:
package demo.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import java.io.IOException;
public class UserContextInterceptor implements ClientHttpRequestInterceptor {
private static final Logger logger = LoggerFactory.getLogger(UserContextInterceptor.class);
@Override
public ClientHttpResponse intercept(
HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
throws IOException {
HttpHeaders headers = request.getHeaders();
headers.add(UserContext.CORRELATION_ID, UserContextHolder.getContext().getCorrelationId());
headers.add(UserContext.AUTH_TOKEN, UserContextHolder.getContext().getAuthToken());
return execution.execute(request, body);
}
}
UserContextFilter 类:
package demo.utils;
@Component
public class UserContextFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(UserContextFilter.class);
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain)
throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
// 检索调用的HTTP首部,设置的值将存储在UserContextHolder中的UserContext。
UserContextHolder.getContext().setCorrelationId( httpServletRequest.getHeader(UserContext.CORRELATION_ID) );
UserContextHolder.getContext().setUserId(httpServletRequest.getHeader(UserContext.USER_ID));
UserContextHolder.getContext().setAuthToken(httpServletRequest.getHeader(UserContext.AUTH_TOKEN));
UserContextHolder.getContext().setOrgId(httpServletRequest.getHeader(UserContext.ORG_ID));
logger.debug("UserContextFilter Correlation id: {}", UserContextHolder.getContext().getCorrelationId());
filterChain.doFilter(httpServletRequest, servletResponse);
}
@Override
public void init(FilterConfig filterConfig) throws ServletException {}
@Override
public void destroy() {}
}
UserContextHolder类 :
package demo.utils;
import org.springframework.util.Assert;
public class UserContextHolder {
// UserContext存储在一个静态ThreadLocal变量中。
private static final ThreadLocal<UserContext> userContext = new ThreadLocal<UserContext>();
// 检索UserContext 以供使用
public static final UserContext getContext(){
UserContext context = userContext.get();
if (context == null) {
context = createEmptyContext();
userContext.set(context);
}
return userContext.get();
}
public static final void setContext(UserContext context) {
Assert.notNull(context, "Only non-null UserContext instances are permitted");
userContext.set(context);
}
public static final UserContext createEmptyContext(){
return new UserContext();
}
}
UserContext 类:
package demo.utils;
import org.springframework.stereotype.Component;
@Component
public class UserContext {
public static final String CORRELATION_ID = "my-correlation-id";
public static final String AUTH_TOKEN = "my-auth-token";
public static final String USER_ID = "my-user-id";
public static final String ORG_ID = "my-org-id";
private String correlationId= new String();
private String authToken= new String();
private String userId = new String();
private String orgId = new String();
public String getCorrelationId() { return correlationId;}
public void setCorrelationId(String correlationId) {
this.correlationId = correlationId;
}
public String getAuthToken() {
return authToken;
}
public void setAuthToken(String authToken) {
this.authToken = authToken;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getOrgId() {
return orgId;
}
public void setOrgId(String orgId) {
this.orgId = orgId;
}
}
LicenseServiceController :
package com.example.licenses.controllers;
@RestController
@RequestMapping(value="v1/organizations/{organizationId}/licenses")
public class LicenseServiceController {
// 省略其他
private static final Logger logger = LoggerFactory.getLogger(LicenseServiceController.class);
@Autowired
private LicenseService licenseService;
@RequestMapping(value="/",method = RequestMethod.GET)
public List<License> getLicenses( @PathVariable("organizationId") String organizationId) {
logger.debug("LicenseServiceController Correlation id: {}", UserContextHolder.getContext().getCorrelationId());
return licenseService.getLicensesByOrg(organizationId);
}
}
LicenseService :
package com.example.licenses.controllers;
@Service
public class LicenseService {
private static final Logger logger = LoggerFactory.getLogger(LicenseService.class);
@HystrixCommand(//fallbackMethod = "buildFallbackLicenseList",
threadPoolKey = "licenseByOrgThreadPool",
threadPoolProperties =
{@HystrixProperty(name = "coreSize",value="30"),
@HystrixProperty(name="maxQueueSize", value="10")},
commandProperties={
@HystrixProperty(name="circuitBreaker.requestVolumeThreshold", value="10"),
@HystrixProperty(name="circuitBreaker.errorThresholdPercentage", value="75"),
@HystrixProperty(name="circuitBreaker.sleepWindowInMilliseconds", value="7000"),
@HystrixProperty(name="metrics.rollingStats.timeInMilliseconds", value="15000"),
@HystrixProperty(name="metrics.rollingStats.numBuckets", value="5")}
)
public List<License> getLicensesByOrg(String organizationId){
logger.debug("LicenseService.getLicensesByOrg Correlation id: {}", UserContextHolder.getContext().getCorrelationId());
randomlyRunLong();
return licenseRepository.findByOrganizationId(organizationId);
}
}
在Postman中使用Http Get 访问,在请求头Headers 添加名为my-correlation-id 值为TEST_CORRELATION_ID 。
http://8080/v1/organizations/xxxx-xxx/licenses
一旦提交了这个调用,当它流经 UserContext 、LicenseServiceController和LicenseServer 类时,我们将看到3条日志消息记录了传入的关联 ID。
UserContext Correlation id: TEST-CORRELATION-ID
LicenseServiceController Correlation id: TEST-CORRELATION-ID
LicenseService.getLicenseByOrg Correlation :
正如预期的那样, 一旦这个调用使用了由 Hystrix 保护的 LicenseService getLicensesByOrg ()方法,就无法得到关联ID的值 。
Hystrix 和Spring Cloud 提供了一种机制,可以将父线程的上下文传播到由 Hystrix 线程池管理的线程 。
这种机制被称为
Hystrix ConcurrencyStrategy
。
2、HystrixConcurrencyStrategy
定义一种自定义的并发策略,它将包装 Hystrix 调用,并允许将附加的父线程上下文注入由 Hystrix命令管理的线程中。 实现自定义 HystrixConcurrencyStrategy。
以下3个步骤:
1.定义自定义的 Hystrix 井发策略类
2.定义 Callable 类,将 UserContext 注入 Hystrix 命令中
3.配置 Spring Cloud 以使用自定义 Hystrix 并发策略
1)定义自定义的 Hystrix 井发策略类
定义自己的 HystrixConcurrencyStrategy 。在默认情况下, Hystrix 只允许为应用程序定义一个 HystrixConcurrencyStrategy 。
ThreadLocalAwareStrategy :
package demo.hystrix;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import demo.utils.UserContextHolder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadLocalAwareStrategy extends HystrixConcurrencyStrategy {
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
public ThreadLocalAwareStrategy(
HystrixConcurrencyStrategy existingConcurrencyStrategy) {
this.existingConcurrencyStrategy = existingConcurrencyStrategy;
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getBlockingQueue(maxQueueSize)
: super.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(
HystrixRequestVariableLifecycle<T> rv) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getRequestVariable(rv)
: super.getRequestVariable(rv);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize,
maximumPoolSize, keepAliveTime, unit, workQueue)
: super.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
keepAliveTime, unit, workQueue);
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
return existingConcurrencyStrategy != null
? existingConcurrencyStrategy
.wrapCallable(new DelegatingUserContextCallable<T>(callable, UserContextHolder.getContext()))
: super.wrapCallable(new DelegatingUserContextCallable<T>(callable, UserContextHolder.getContext()));
}
}
2)定义 Callable 类,将 UserContext 注入 Hystrix 命令中
使用 DelegatingUserContextCallable 传播 UserCotext,如:
DelegatingUserContextCallable :
package demo.hystrix;
import demo.utils.UserContext;
import demo.utils.UserContextHolder;
import java.util.concurrent.Callable;
public final class DelegatingUserContextCallable<V> implements Callable<V> {
private final Callable<V> delegate;
private UserContext originalUserContext;
// 原始 Callable 类将被传递到,自定义的Callable 类,自定Callable 将调用 Hystrix护的代码和来自父线程的UserContext
public DelegatingUserContextCallable(Callable<V> delegate,
UserContext userContext) {
this.delegate = delegate;
this.originalUserContext = userContext;
}
// call 方法在被@HystrixCommand 注解保护的方法之前调用
public V call() throws Exception {
UserContextHolder.setContext( originalUserContext );
try {
// UseContext设置之后,在 Hystrix保护的方法上调用call()方法。如LicenseServer.getLicenseByOrg()方法。
return delegate.call();
}
finally {
this.originalUserContext = null;
}
}
public static <V> Callable<V> create(Callable<V> delegate,
UserContext userContext) {
return new DelegatingUserContextCallable<V>(delegate, userContext);
}
}
3)配置 Spring Cloud 以使用自定义 Hystrix 并发策略
ThreadLocalConfiguration :
package demo.hystrix;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class ThreadLocalConfiguration {
//构造配置对象时,它将自动装配在现有的 HystrixConcurrencyStrategy中。
@Autowired(required = false)
private HystrixConcurrencyStrategy existingConcurrencyStrategy;
@PostConstruct
public void init() {
// 保留现有Hystrix的插件引用
// 因为要注册一个新的并发策略,所以要获取其他的Hystrix组件,然后重新设置Hystrix插件。
HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
.getPropertiesStrategy();
HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance()
.getCommandExecutionHook();
HystrixPlugins.reset();
// 使用 Hystrix 插件注册自定义的 Hystrix并发策略(ThreadConcurrency Strategy )
HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalAwareStrategy(existingConcurrencyStrategy));
// 然后重新注册 Hystrix 插件使用的所有Hystrix组件
HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
}
}
完成以上配置后,重新构建并重新启动
在Postman中使用Http Get 访问,在请求头Headers 添加名为my-correlation-id 值为TEST_CORRELATION_ID 。
http://8080/v1/organizations/xxxx-xxx/licenses
一旦提交了这个调用,当它流经 UserContext 、LicenseServiceController和LicenseServer 类时,我们将看到3条日志消息记录了传入的关联 ID。
UserContext Correlation id: TEST-CORRELATION-ID
LicenseServiceController Correlation id: TEST-CORRELATION-ID
LicenseService.getLicenseByOrg Correlation : TEST_CORRELATION_ID