一、Hystrix
Hystrix是Netflix开源的一款容错系统,能帮助使用者码出具备强大的容错能力和鲁棒性的程序。提供降级,熔断等功能,并且熔断开关打开之后,会在服务可用之后,自动关闭。spring cloud中有用到。
如果你的服务依赖于多个服务,并且不想因为某个服务挂掉,而影响你服务。比如hbase挂掉了,你可以通过降级策略返回默认值,或者直接熔断。
Hystrix提供了服务隔离,每个服务使用独立的线程池实现。
二、 工作流程
下面将更详细的解析每一个步骤都发生哪些动作:
构建一个HystrixCommand或者HystrixObservableCommand对象。
第一步就是构建一个HystrixCommand或者HystrixObservableCommand对象,该对象将代表你的一个依赖请求,向构造函数中传入请求依赖所需要的参数。
如果构建HystrixCommand中的依赖返回单个响应,例如:
public class HelloWorldHystrixCommand extends HystrixCommand {
private final String name;
public HelloWorldHystrixCommand(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
// 如果继承的是HystrixObservableCommand,要重写Observable construct()
@Override
protected String run() throws InterruptedException {
return "Hello " + name + "! thread:" + Thread.currentThread().getName();
}
@Override
protected String getFallback() {
System.out.println("触发了降级!");
return "exeucute fallback";
}
public static void main(String[] args) {
/* 调用程序对HelloWorldHystrixCommand实例化,执行execute()即触发HelloWorldHystrixCommand.run()的执行 */
Object result = new HelloWorldHystrixCommand("HLX").execute();
System.out.println(result); // 打印出Hello HLX
}
}
有4种方式可以执行一个Hystrix命令。
execute()—该方法是阻塞的,从依赖请求中接收到单个响应(或者出错时抛出异常)。
queue()—从依赖请求中返回一个包含单个响应的Future对象。
observe()—订阅一个从依赖请求中返回的代表响应的Observable对象。
toObservable()—返回一个Observable对象,只有当你订阅它时,它才会执行Hystrix命令并发射响应。
K value = command.execute();
Future<K> fValue = command.queue();
Observable<K> ohValue = command.observe(); //hot observable
Observable<K> ocValue = command.toObservable(); //cold observable
三、降级(fallback)
使用fallback机制很简单,继承HystrixCommand只需重写getFallback(),继承HystrixObservableCommand只需重写resumeWithFallback()。
根据上面的工作流程图所示,当出现以下几种情况时会触发降级:
名字 | 描述 |
---|---|
FAILURE | 执行抛出异常 |
TIMEOUT | 执行开始,但没有在允许的时间内完成 |
SHORT_CIRCUITED | 断路器打开,不尝试执行 |
THREAD_POOL_REJECTED | 线程池拒绝,不尝试执行 |
SEMAPHORE_REJECTED | 信号量拒绝,不尝试执行 |
如果没有实现降级方法,默认会抛出异常。降级方法也可能会超时,或者抛出异常
。
四、隔离
hystrix提供了两种隔离策略:线程池隔离和信号量隔离。hystrix默认采用线程池隔离。
我认为采用信号量隔离并不是真正的隔离,因为还是在调用者的线程中执行,如果服务挂了,还是会拖垮调用线程。使用信号量的超时机制,是在结果返回之后才计算是否超时的,服务挂了之后,肯定很长时间才返回,这时候再采取降级已经没有意义了。
线程池隔离:不同服务通过使用不同线程池,彼此间将不受影响,达到隔离效果。**代码是在hystrix的线程中执行的,如果超时,可以中断。**以demo为例,我们通过andThreadPoolKey配置使用命名为ThreadPoolTest的线程池,实现与其他命名的线程池天然隔离,如果不配置andThreadPoolKey则使用withGroupKey配置来命名线程池。
public class HystrixCommand4ThreadPoolTest extends HystrixCommand<String> {
private final String name;
public HystrixCommand4ThreadPoolTest(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(1500)
.withExecutionIsolationThreadInterruptOnTimeout(true)
.withCircuitBreakerErrorThresholdPercentage(50)
.withCircuitBreakerRequestVolumeThreshold(5)
)
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(3)// 配置线程池里的线程数
.withMaxQueueSize(20)
.withQueueSizeRejectionThreshold(20)
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
System.out.println("====command run : " + Thread.currentThread().getName() + " at: " + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(3000);
// throw new RuntimeException("=== timeout ...");
return name;
}
@Override
protected String getFallback() {
return "fallback: " + name + Thread.currentThread().getName() + " at : " + System.currentTimeMillis();
}
public static class UnitTest {
@Test
public void testSynchronous() throws IOException {
for (int i = 0; i < 10; i++) {
try {
new Thread(() -> System.out.println("===========" + new HystrixCommand4ThreadPoolTest("Hlx").execute())).start();
Thread.sleep(500);
} catch (Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
try {
TimeUnit.MILLISECONDS.sleep(5000);
} catch (Exception e) {
}
for (int i = 0; i < 10; i++) {
try {
new Thread(() -> System.out.println("===========" + new HystrixCommand4ThreadPoolTest("Hlx").execute())).start();
// futures.add(new HystrixCommand4ThreadPoolTest("Hlx"+i).queue());
} catch (Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
信号量隔离:线程隔离会带来线程开销,有些场景(比如无网络请求场景)可能会因为用开销换隔离得不偿失,为此hystrix提供了信号量隔离,当服务的并发数大于信号量阈值时将进入fallback。
package com.step.jliang.hystrix;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import java.util.concurrent.TimeUnit;
/**
* @author jliang
*/
public class HystrixCommand4Semphore extends HystrixCommand<String> {
private final String name;
public HystrixCommand4Semphore(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SemphoreTestGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(1500)
.withCircuitBreakerErrorThresholdPercentage(50)
.withCircuitBreakerRequestVolumeThreshold(5)
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
.withExecutionIsolationSemaphoreMaxConcurrentRequests(5)
)
);
this.name = name;
}
@Override
protected String run() throws Exception {
System.out.println("====command run : " + Thread.currentThread().getName() + " at: " + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(3000);
return name;
}
// @Override
// protected String getFallback() {
// return "semphore fallback: " + name + Thread.currentThread().getName() + " at : " + System.currentTimeMillis();
// }
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
try {
new Thread(() -> System.out.println("===========" + new HystrixCommand4Semphore("Hlx").execute())).start();
Thread.sleep(500);
} catch (Exception e) {
System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
}
}
}
}
五、如何使用熔断器(Circuit Breaker)
熔断器,现实生活中有一个很好的类比,就是家庭电路中都会安装一个保险盒,当电流过大的时候保险盒里面的保险丝会自动断掉,来保护家里的各种电器及电路。Hystrix中的熔断器(Circuit Breaker)也是起到这样的作用,Hystrix在运行过程中会向每个commandKey对应的熔断器报告成功、失败、超时和拒绝的状态,熔断器维护计算统计的数据,根据这些统计的信息来确定熔断器是否打开。如果打开,后续的请求都会被截断。然后会隔一段时间默认是5s,尝试半开,放入一部分流量请求进来,相当于对依赖服务进行一次健康检查,如果恢复,熔断器关闭,随后完全恢复调用。
熔断器的数据通路如下图所示:
每个熔断器默认维护10个bucket,每秒一个bucket,每个blucket记录成功,失败,超时,拒绝的状态,默认错误超过50%且10秒内超过20个请求进行中断拦截。
由于Hystrix是一个容错框架,因此我们在使用的时候,要达到熔断的目的只需配置一些参数就可以了。但我们要达到真正的效果,就必须要了解这些参数。Circuit Breaker一共包括如下6个参数。
1、circuitBreaker.enabled
是否启用熔断器,默认是TURE。
2、circuitBreaker.forceOpen
熔断器强制打开,始终保持打开状态。默认值FLASE。
3、circuitBreaker.forceClosed
熔断器强制关闭,始终保持关闭状态。默认值FLASE。
4、circuitBreaker.errorThresholdPercentage
设定错误百分比,默认值50%,例如一段时间(10s)内有100个请求,其中有55个超时或者异常返回了,那么这段时间内的错误百分比是55%,大于了默认值50%,这种情况下触发熔断器-打开。
5、circuitBreaker.requestVolumeThreshold
默认值20.意思是至少有20个请求才进行errorThresholdPercentage错误百分比计算。比如一段时间(10s)内有19个请求全部失败了。错误百分比是100%,但熔断器不会打开,因为requestVolumeThreshold的值是20. 这个参数非常重要,熔断器是否打开首先要满足这个条件。
在自己测试的时候,如果熔断器不起作用,要看下是否满足这个条件。
6、circuitBreaker.sleepWindowInMilliseconds
半开试探休眠时间,默认值5000ms。当熔断器开启一段时间之后比如5000ms,会尝试放过去一部分流量进行试探,确定依赖服务是否恢复。
具体的例子可以参考上面使用线程池隔离的代码。
六、常见的参数及默认值
可以参考这篇
博客
如果还有不明白的,可以参考
官方文档
参考链接:
https://www.jianshu.com/p/14958039fd15
https://github.com/Netflix/Hystrix/wiki/How-it-Works
https://segmentfault.com/a/1190000012439580
https://blog.csdn.net/tongtong_use/article/details/78611225
https://juejin.im/entry/59dbe2cc51882578d152bdf8
https://www.jianshu.com/p/b9af028efebb