Hystrix介绍和使用指南

  • Post author:
  • Post category:其他


一、Hystrix

Hystrix是Netflix开源的一款容错系统,能帮助使用者码出具备强大的容错能力和鲁棒性的程序。提供降级,熔断等功能,并且熔断开关打开之后,会在服务可用之后,自动关闭。spring cloud中有用到。

如果你的服务依赖于多个服务,并且不想因为某个服务挂掉,而影响你服务。比如hbase挂掉了,你可以通过降级策略返回默认值,或者直接熔断。

Hystrix提供了服务隔离,每个服务使用独立的线程池实现。

二、 工作流程

流程

下面将更详细的解析每一个步骤都发生哪些动作:

构建一个HystrixCommand或者HystrixObservableCommand对象。

第一步就是构建一个HystrixCommand或者HystrixObservableCommand对象,该对象将代表你的一个依赖请求,向构造函数中传入请求依赖所需要的参数。

如果构建HystrixCommand中的依赖返回单个响应,例如:

public class HelloWorldHystrixCommand extends HystrixCommand {
    private final String name;

    public HelloWorldHystrixCommand(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }

    // 如果继承的是HystrixObservableCommand,要重写Observable construct()
    @Override
    protected String run() throws InterruptedException {
        return "Hello " + name + "! thread:" + Thread.currentThread().getName();
    }

    @Override
    protected String getFallback() {
        System.out.println("触发了降级!");
        return "exeucute fallback";
    }

    public static void main(String[] args) {
        /* 调用程序对HelloWorldHystrixCommand实例化,执行execute()即触发HelloWorldHystrixCommand.run()的执行 */
        Object result = new HelloWorldHystrixCommand("HLX").execute();
        System.out.println(result);  // 打印出Hello HLX
    }
}

有4种方式可以执行一个Hystrix命令。

execute()—该方法是阻塞的,从依赖请求中接收到单个响应(或者出错时抛出异常)。

queue()—从依赖请求中返回一个包含单个响应的Future对象。

observe()—订阅一个从依赖请求中返回的代表响应的Observable对象。

toObservable()—返回一个Observable对象,只有当你订阅它时,它才会执行Hystrix命令并发射响应。

K             value   = command.execute();
Future<K>     fValue  = command.queue();
Observable<K> ohValue = command.observe();         //hot observable
Observable<K> ocValue = command.toObservable();    //cold observable

在这里插入图片描述

三、降级(fallback)

使用fallback机制很简单,继承HystrixCommand只需重写getFallback(),继承HystrixObservableCommand只需重写resumeWithFallback()。

根据上面的工作流程图所示,当出现以下几种情况时会触发降级:

名字 描述
FAILURE 执行抛出异常
TIMEOUT 执行开始,但没有在允许的时间内完成
SHORT_CIRCUITED 断路器打开,不尝试执行
THREAD_POOL_REJECTED 线程池拒绝,不尝试执行
SEMAPHORE_REJECTED 信号量拒绝,不尝试执行


如果没有实现降级方法,默认会抛出异常。降级方法也可能会超时,或者抛出异常

四、隔离

hystrix提供了两种隔离策略:线程池隔离和信号量隔离。hystrix默认采用线程池隔离。


我认为采用信号量隔离并不是真正的隔离,因为还是在调用者的线程中执行,如果服务挂了,还是会拖垮调用线程。使用信号量的超时机制,是在结果返回之后才计算是否超时的,服务挂了之后,肯定很长时间才返回,这时候再采取降级已经没有意义了。

在这里插入图片描述

线程池隔离:不同服务通过使用不同线程池,彼此间将不受影响,达到隔离效果。**代码是在hystrix的线程中执行的,如果超时,可以中断。**以demo为例,我们通过andThreadPoolKey配置使用命名为ThreadPoolTest的线程池,实现与其他命名的线程池天然隔离,如果不配置andThreadPoolKey则使用withGroupKey配置来命名线程池。

public class HystrixCommand4ThreadPoolTest extends HystrixCommand<String> {

    private final String name;

    public HystrixCommand4ThreadPoolTest(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
                .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolTest"))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(1500)
                                .withExecutionIsolationThreadInterruptOnTimeout(true)
                                .withCircuitBreakerErrorThresholdPercentage(50)
                                .withCircuitBreakerRequestVolumeThreshold(5)
                )
                .andThreadPoolPropertiesDefaults(
                        HystrixThreadPoolProperties.Setter()
                                .withCoreSize(3)// 配置线程池里的线程数
                                .withMaxQueueSize(20)
                                .withQueueSizeRejectionThreshold(20)
                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        System.out.println("====command run : " + Thread.currentThread().getName() + " at: " + System.currentTimeMillis());
        TimeUnit.MILLISECONDS.sleep(3000);
//        throw  new RuntimeException("=== timeout ...");
        return name;

    }

    @Override
    protected String getFallback() {
        return "fallback: " + name + Thread.currentThread().getName() + " at : " + System.currentTimeMillis();
    }

    public static class UnitTest {

        @Test
        public void testSynchronous() throws IOException {
            for (int i = 0; i < 10; i++) {
                try {
                    new Thread(() -> System.out.println("===========" + new HystrixCommand4ThreadPoolTest("Hlx").execute())).start();
                    Thread.sleep(500);
                } catch (Exception e) {
                    System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
                }
            }

            try {
                TimeUnit.MILLISECONDS.sleep(5000);
            } catch (Exception e) {
            }

            for (int i = 0; i < 10; i++) {
                try {
                    new Thread(() -> System.out.println("===========" + new HystrixCommand4ThreadPoolTest("Hlx").execute())).start();
//                    futures.add(new HystrixCommand4ThreadPoolTest("Hlx"+i).queue());
                } catch (Exception e) {
                    System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
                }
            }
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

信号量隔离:线程隔离会带来线程开销,有些场景(比如无网络请求场景)可能会因为用开销换隔离得不偿失,为此hystrix提供了信号量隔离,当服务的并发数大于信号量阈值时将进入fallback。

package com.step.jliang.hystrix;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;

import java.util.concurrent.TimeUnit;

/**
 * @author jliang
 */
public class HystrixCommand4Semphore extends HystrixCommand<String> {
    private final String name;

    public HystrixCommand4Semphore(String name) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("SemphoreTestGroup"))
                .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey"))
                .andCommandPropertiesDefaults(
                        HystrixCommandProperties.Setter()
                                .withExecutionTimeoutInMilliseconds(1500)
                                .withCircuitBreakerErrorThresholdPercentage(50)
                                .withCircuitBreakerRequestVolumeThreshold(5)
                                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
                                .withExecutionIsolationSemaphoreMaxConcurrentRequests(5)
                )
        );
        this.name = name;
    }

    @Override
    protected String run() throws Exception {
        System.out.println("====command run : " + Thread.currentThread().getName() + " at: " + System.currentTimeMillis());
        TimeUnit.MILLISECONDS.sleep(3000);
        return name;

    }

//    @Override
//    protected String getFallback() {
//        return "semphore fallback: " + name + Thread.currentThread().getName() + " at : " + System.currentTimeMillis();
//    }

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            try {
                new Thread(() -> System.out.println("===========" + new HystrixCommand4Semphore("Hlx").execute())).start();
                Thread.sleep(500);
            } catch (Exception e) {
                System.out.println("run()抛出HystrixBadRequestException时,被捕获到这里" + e.getCause());
            }
        }
    }
}

五、如何使用熔断器(Circuit Breaker)

熔断器,现实生活中有一个很好的类比,就是家庭电路中都会安装一个保险盒,当电流过大的时候保险盒里面的保险丝会自动断掉,来保护家里的各种电器及电路。Hystrix中的熔断器(Circuit Breaker)也是起到这样的作用,Hystrix在运行过程中会向每个commandKey对应的熔断器报告成功、失败、超时和拒绝的状态,熔断器维护计算统计的数据,根据这些统计的信息来确定熔断器是否打开。如果打开,后续的请求都会被截断。然后会隔一段时间默认是5s,尝试半开,放入一部分流量请求进来,相当于对依赖服务进行一次健康检查,如果恢复,熔断器关闭,随后完全恢复调用。

熔断器的数据通路如下图所示:

在这里插入图片描述

每个熔断器默认维护10个bucket,每秒一个bucket,每个blucket记录成功,失败,超时,拒绝的状态,默认错误超过50%且10秒内超过20个请求进行中断拦截。

由于Hystrix是一个容错框架,因此我们在使用的时候,要达到熔断的目的只需配置一些参数就可以了。但我们要达到真正的效果,就必须要了解这些参数。Circuit Breaker一共包括如下6个参数。

1、circuitBreaker.enabled

是否启用熔断器,默认是TURE。

2、circuitBreaker.forceOpen

熔断器强制打开,始终保持打开状态。默认值FLASE。

3、circuitBreaker.forceClosed

熔断器强制关闭,始终保持关闭状态。默认值FLASE。

4、circuitBreaker.errorThresholdPercentage

设定错误百分比,默认值50%,例如一段时间(10s)内有100个请求,其中有55个超时或者异常返回了,那么这段时间内的错误百分比是55%,大于了默认值50%,这种情况下触发熔断器-打开。

5、circuitBreaker.requestVolumeThreshold

默认值20.意思是至少有20个请求才进行errorThresholdPercentage错误百分比计算。比如一段时间(10s)内有19个请求全部失败了。错误百分比是100%,但熔断器不会打开,因为requestVolumeThreshold的值是20. 这个参数非常重要,熔断器是否打开首先要满足这个条件。

在自己测试的时候,如果熔断器不起作用,要看下是否满足这个条件。


6、circuitBreaker.sleepWindowInMilliseconds

半开试探休眠时间,默认值5000ms。当熔断器开启一段时间之后比如5000ms,会尝试放过去一部分流量进行试探,确定依赖服务是否恢复。

具体的例子可以参考上面使用线程池隔离的代码。

六、常见的参数及默认值

可以参考这篇

博客

如果还有不明白的,可以参考

官方文档

参考链接:


https://www.jianshu.com/p/14958039fd15



https://github.com/Netflix/Hystrix/wiki/How-it-Works



https://segmentfault.com/a/1190000012439580



https://blog.csdn.net/tongtong_use/article/details/78611225



https://juejin.im/entry/59dbe2cc51882578d152bdf8



https://www.jianshu.com/p/b9af028efebb



版权声明:本文为liang0000zai原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。