Spring Cloud-Hystrix断路器

简介

项目使用的Spring Cloud为2021.0.5,Spring Boot为2.7.6版本。
Spring Cloud Netflix Hystrix是Spring Cloud Netflix子项目的核心组件之一,具有服务容错及线程隔离等一系列服务保护功能。
下面将对其用法进行详细介绍。

Hystrix简介

在微服务架构中,服务与服务之间通过远程调用的方式进行通信,一旦某个被调用的服务发生了故障,其依赖服务也会发生故障,此时就会发生故障的蔓延,最终导致系统瘫痪。

Hystrix实现了断路器模式,当某个服务发生故障时,通过断路器的监控,给调用方返回一个错误响应,而不是长时间的等待,这样就不会使得调用方由于长时间得不到响应而占用线程,从而防止故障的蔓延。Hystrix具备服务降级、服务熔断、线程隔离、请求缓存、请求合并及服务监控等强大功能。

创建hystrix-service模块

pom.xml添加相关依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.1.0</version>
</dependency>

在application.yml进行配置

# 指定服务端口
server:
  port: 8401
# 指定服务名称
spring:
  application:
    name: hystrix-service
# 指定主机地址
eureka:
  client:
    # 指定是否从注册中心获取服务
    fetch-registry: true
    # 指定是否将服务注册到注册中心
    register-with-eureka: true
    service-url:
      # 配置注册中心地址
      defaultZone: http://localhost:8001/eureka/

service-url:
  user-service: http://user-service

开启Hystrix断路器功能

在启动类上添加@EnableCircuitBreaker来开启Hystrix的断路器。
HystrixServiceApplication启动类:

@EnableCircuitBreaker
@EnableDiscoveryClient
@SpringBootApplication
public class HystrixServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(HystrixServiceApplication.class, args);
    }

}

创建UserHystrixController接口类用于调用user-service服务。
UserHystrixController.java类:

@RestController
@RequestMapping("/user/hystrix")
public class UserHystrixController {

    @Autowired
    private UserService userService;

    @GetMapping("/testFallback/{id}")
    public Result testFallback(@PathVariable Long id) {
        return userService.getUser(id);
    }

    @GetMapping("/testException/{id}")
    public Result testException(@PathVariable Long id) {
        return userService.getUserException(id);
    }

    @GetMapping("/testCommand/{id}")
    public Result testCommand(@PathVariable Long id) {
        return userService.getUserCommand(id);
    }

    @GetMapping("/testCache/{id}")
    public Result testCache(@PathVariable Long id) {
        userService.getUserCache(id);
        userService.getUserCache(id);
        userService.getUserCache(id);
        return Result.isSuccess();
    }

    @GetMapping("/testRemoveCache/{id}")
    public Result testRemoveCache(@PathVariable Long id) {
        userService.getUserCache(id);
        userService.removeCache(id);
        userService.getUserCache(id);
        return Result.isSuccess();
    }

    @GetMapping("/testCollapser")
    public Result testCollapser() throws ExecutionException, InterruptedException {
        Future<User> future1 = userService.getUserFuture(1L);
        Future<User> future2 = userService.getUserFuture(2L);
        future1.get();
        future2.get();
        ThreadUtil.safeSleep(200);
        Future<User> future3 = userService.getUserFuture(3L);
        future3.get();
        return Result.isSuccess();
    }
}

服务降级测试

测试服务降级接口:

@GetMapping("/testFallback/{id}")
public Result testFallback(@PathVariable Long id) {
    return userService.getUser(id);
}

UserService中添加调用方法与服务降级方法,方法上需要添加@HystrixCommand注解。

@HystrixCommand(fallbackMethod = "fallbackMethod1")
@Override
public Result getUser(Long id) {
    logger.info("getUser {}", id);
    return restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id);
}

public Result fallbackMethod1(@PathVariable Long id) {
    logger.info("fallbackMethod1 {}", id);
    return new Result("服务调用失败", 500);
}

启动eureka-service、user-service、hystrix-service服务

访问地址:
http://localhost:8401/user/hystrix/testFallback/100

正常响应:
{"message":"操作成功","code":200,"data":{"id":100,"username":"admin","password":"123456"}}

关闭user-service服务后响应:
{"message":"服务调用失败","code":500,"data":null}

发现服务已发生了降级;

@HystrixCommand详解

@HystrixCommand中的常用参数

  • fallbackMethod:指定服务降级处理方法;
  • ignoreExceptions:忽略某些异常,不发生服务降级;
  • commandKey:命令名称,用于区分不同的命令;
  • groupKey:分组名称,Hystrix会根据不同的分组来统计命令的告警及仪表盘信息;
  • threadPoolKey:线程池名称,用于划分线程池;

设置命令、分组及线程池名称

测试接口:

@GetMapping("/testCommand/{id}")
public Result testCommand(@PathVariable Long id) {
    return userService.getUserCommand(id);
}

服务降级设置命令、分组及线程池名称如下所示:

@HystrixCommand(fallbackMethod = "fallbackMethod1", commandKey = "getUserCommand",
        groupKey = "getUserGroup", threadPoolKey = "getUserThreadPool")
@Override
public Result getUserCommand(Long id) {
    logger.info("getUserCommand {}", id);
    return restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id);
}

使用ignoreExceptions忽略某些异常

测试接口:

@GetMapping("/testException/{id}")
public Result testException(@PathVariable Long id) {
    return userService.getUserException(id);
}

服务降级方法忽略NullPointerException异常(即抛出此异常时不发生降级)

@HystrixCommand(fallbackMethod = "fallbackMethod2", ignoreExceptions = {NullPointerException.class})
@Override
public Result getUserException(Long id) {
    logger.info("getUserException {}", id);
    if (id == 1) {
        throw new IndexOutOfBoundsException();
    } else if (id == 2) {
        throw new NullPointerException();
    }
    return restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id);
}

(1)测试地址:
http://localhost:8401/user/hystrix/testException/1

响应:{"message":"服务调用失败","code":500,"data":null}

(2)测试地址:
http://localhost:8401/user/hystrix/testException/2

响应如下图示:

springcloud-hystrix-02.pngspringcloud-hystrix-02.png

(3)测试地址:
http://localhost:8401/user/hystrix/testException/100

响应:
{"message":"操作成功","code":200,"data":{"id":100,"username":"admin","password":"123456"}}

可以发现,当抛出NullPointerException异常时,不会做降级处理;其他情况做降级处理。

Hystrix的请求缓存

当系统并发量越来越大时,我们需要使用缓存来优化系统,达到减轻并发请求线程数,提高响应速度。

相关注解:

  • @CacheResult:开启缓存,默认所有参数作为缓存的key,cacheKeyMethod可以通过返回String类型的方法指定key;
  • @CacheKey:指定缓存key,可以指定参数或指定参数中的属性值为缓存key;
  • @CacheRemove:移出缓存,需要指定commandKey;

测试接口(同样的id连续调用三次getUserCache方法):

@GetMapping("/testCache/{id}")
public Result testCache(@PathVariable Long id) {
    userService.getUserCache(id);
    userService.getUserCache(id);
    userService.getUserCache(id);
    return Result.isSuccess();
}


@CacheResult(cacheKeyMethod = "getCacheKey")
@HystrixCommand(fallbackMethod = "fallbackMethod1", commandKey = "getUserCache")
@Override
public Result getUserCache(Long id) {
    logger.info("getUserCache {}", id);
    return restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id);
}

请求地址:
http://localhost:8401/user/hystrix/testCache/100

接口响应:
{"message":"操作成功","code":200,"data":null}

从user-service控制台日志可以发现,只请求了一次user-service服务。
控制台日志如下图示:

springcloud-hystrix-03.pngspringcloud-hystrix-03.png

说明:
在缓存使用过程中,我们需要在每次使用缓存的请求前后对HystrixRequestContext进行初始化和关闭,否则会出现如下异常:

java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext?
    at com.netflix.hystrix.HystrixRequestCache.get(HystrixRequestCache.java:104) ~[hystrix-core-1.5.18.jar:1.5.18]
    at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:478) ~[hystrix-core-1.5.18.jar:1.5.18]
    at com.netflix.hystrix.AbstractCommand$7.call(AbstractCommand.java:454) ~[hystrix-core-1.5.18.jar:1.5.18]
    

这里我们通过使用过滤器,在每个请求前后初始化和关闭HystrixRequestContext来解决该问题

@Component
@WebFilter(urlPatterns = "/*", asyncSupported = true)
public class HystrixRequestContextFilter implements Filter {

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } catch (Exception e) {
            context.close();
        }
    }
}

删除缓存功能removeCache方法

测试删除接口:

@GetMapping("/testRemoveCache/{id}")
public Result testRemoveCache(@PathVariable Long id) {
    userService.getUserCache(id);
    userService.removeCache(id);
    userService.getUserCache(id);
    return Result.isSuccess();
}


@CacheResult(cacheKeyMethod = "getCacheKey")
@HystrixCommand(fallbackMethod = "fallbackMethod1", commandKey = "getUserCache")
@Override
public Result getUserCache(Long id) {
    logger.info("getUserCache {}", id);
    return restTemplate.getForObject(userServiceUrl + "/user/{1}", Result.class, id);
}

public String getCacheKey(Long id) {
    return String.valueOf(id);
}

@CacheRemove(commandKey = "getUserCache", cacheKeyMethod = "getCacheKey")
@HystrixCommand
@Override
public Result removeCache(Long id) {
    logger.info("removeCache {}", id);
    return null;
}

测试地址:
http://localhost:8401/user/hystrix/testRemoveCache/100

通过user-service控制台日志可以看到,有两次接口请求。
如下图示:

springcloud-hystrix-04.pngspringcloud-hystrix-04.png

Hystrix请求合并

微服务系统中服务间通信,需要通过远程调用来实现,随着调用次数越来越多,占用线程资源也会越来越多。Hystrix中提供了@HystrixCollapser用于合并请求,从而达到减少通信消耗及线程数量的效果。

@HystrixCollapser的常用属性

  • batchMethod:用于设置请求合并的方法;
  • collapserProperties:请求合并属性,用于控制实例属性,有很多;
  • timerDelayInMilliseconds:collapserProperties中的属性,用于控制每隔多少时间合并一次请求;

请求合并方法

在UserHystrixController中添加testCollapser方法,这里我们先进行两次服务调用,再间隔200ms以后进行第三次服务调用:

方法如下:

使用@HystrixCollapser实现请求合并,所有对getUserFuture的多次调用都会转换为对getUserByIds的单次调用:

请求合并测试

测试接口:
http://localhost:8401/user/hystrix/testCollapser

Hystrix常用配置

全局配置:

hystrix:
  command: #用于控制HystrixCommand的行为
    default:
      execution:
        isolation:
          strategy: THREAD #控制HystrixCommand的隔离策略,THREAD->线程池隔离策略(默认),SEMAPHORE->信号量隔离策略
          thread:
            timeoutInMilliseconds: 1000 #配置HystrixCommand执行的超时时间,执行超过该时间会进行服务降级处理
            interruptOnTimeout: true #配置HystrixCommand执行超时的时候是否要中断
            interruptOnCancel: true #配置HystrixCommand执行被取消的时候是否要中断
          timeout:
            enabled: true #配置HystrixCommand的执行是否启用超时时间
          semaphore:
            maxConcurrentRequests: 10 #当使用信号量隔离策略时,用来控制并发量的大小,超过该并发量的请求会被拒绝
      fallback:
        enabled: true #用于控制是否启用服务降级
      circuitBreaker: #用于控制HystrixCircuitBreaker的行为
        enabled: true #用于控制断路器是否跟踪健康状况以及熔断请求
        requestVolumeThreshold: 20 #超过该请求数的请求会被拒绝
        forceOpen: false #强制打开断路器,拒绝所有请求
        forceClosed: false #强制关闭断路器,接收所有请求
      requestCache:
        enabled: true #用于控制是否开启请求缓存
  collapser: #用于控制HystrixCollapser的执行行为
    default:
      maxRequestsInBatch: 100 #控制一次合并请求合并的最大请求数
      timerDelayinMilliseconds: 10 #控制多少毫秒内的请求会被合并成一个
      requestCache:
        enabled: true #控制合并请求是否开启缓存
  threadpool: #用于控制HystrixCommand执行所在线程池的行为
    default:
      coreSize: 10 #线程池的核心线程数
      maximumSize: 10 #线程池的最大线程数,超过该线程数的请求会被拒绝
      maxQueueSize: -1 #用于设置线程池的最大队列大小,-1采用SynchronousQueue,其他正数采用LinkedBlockingQueue
      queueSizeRejectionThreshold: 5 #用于设置线程池队列的拒绝阀值,由于LinkedBlockingQueue不能动态改版大小,使用时需要用该参数来控制线程数

实例配置:
实例配置只需要将全局配置中的default换成对应的key即可。

hystrix:
  command:
    HystrixComandKey: #将default换成HystrixCommandKey
      execution:
        isolation:
          strategy: THREAD
  collapser:
    HystrixCollapserKey: #将default换成HystrixCollapserKey
      maxRequestsInBatch: 100
  threadpool:
    HystrixThreadPoolKey: #将default换成HystrixThreadPoolKey
      coreSize: 10

配置中的key说明:

  • HystrixComandKey对应@HystrixCommand中的commandKey属性;
  • HystrixCollapserKey对应@HystrixCollapser中的collapserKey属性;
  • HystrixThreadPoolKey对应@HystrixCommand中的threadPoolKey属性;

(完)

最后修改于:2022年12月11日 00:53

添加新评论