-
Spring Cloud专题之三:Hystrix
在微服务架构中,我们将系统拆分成很多个服务单元,各单位的应用间通过服务注册与订阅的方式相互依赖。由于每个单元都在不同的进程中运行,依赖通过远程调用的方式执行,这样就有可能因为网络原因或是依赖服务自身问题出现调用故障或延迟,而这些问题会直接导致调用方的对外服务也出现延迟,若此时调用方的请求不断增加,最后就会因等待出现故障的依赖方响应形成任务积压,最终导致自身服务的不可用。这样的架构相对于传统架构更加的不稳定,为了解决这样的问题,就产生了断路器等一系列的服务保护机制。而Spring Cloud Hystrix就是这样的实现了服务降级、服务熔断、线程和信号量隔离、请求缓存、请求合并以及服务监控等一系列服务保护功能的组件。
本篇文章还是在前两篇文章的基础上所作的:
SpringCloud专题之一:Eureka
Spring Cloud专题之二:OpenFeign
欢迎大家查看!!!
先启动需要的服务工程:
- EUREKA-SERVER:注册中心,端口为9001
- HELLO-SERVER:提供服务的客户端,端口为9002和9003
- EUREKA-CUSTOMER:消费服务的消费者端,端口为9004
在未加入Hystrix(断路器)之前,如果我关闭掉一个客户端,那么使用消费者访问的时候可以获得如下的输出:
因为feign的默认连接时间是1s,所以超过1s后就会报连接不上的错。
Hystrix代码
由于 openfeign 包 默认集成了 hystrix,所以只需要开启开关即可
#开启Hystrix降级处理
feign.hystrix.enabled=true
引入jar包
<!--hystrix-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!--hystrix-javanica-->
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
</dependency>
1.服务降级
降级是指当请求超时,资源不足等情况发生时,进行的服务降级处理,不调用真实服务逻辑,而是使用快速失败的方式直接返回一个托底数据,保证服务链路的完整,避免服务雪崩。
降级的实现是指在调用远程服务的方法上增加@HystrixCommand的注解,通过指定fallbackMethod的值设置失败的回调方法,也可以使用@FeignClient的方式指定fallback类。
1.在Customer1Feign类的@FeignClient注解上添加fallback的类
/**
* @className: Customer1Feign
* @description: 测试多个feign使用相同的name的问题
* @author: charon
* @create: 2021-06-06 09:42
*/
@FeignClient(value = "HELLO-SERVER",fallback = EurekaClientFallBack.class)
public interface Customer1Feign {
/**
* 要求:
* 必须要指定RequestParam属性的value值,同时RequestMethod的method也需要指定
* 方法上添加SpringMVC的注解
* @return
*/
@RequestMapping(value = "/sayHello1",method = RequestMethod.GET)
String sayHello1(@RequestParam("name") String name);
}
3.编写fallback使用的类:EurekaClientFallBack
/**
* @className: EurekaClientFallBack
* @description: 客户端的降级实现类
* @author: charon
* @create: 2021-06-20 22:06
*/
@Component
public class EurekaClientFallBack implements Customer1Feign {
/**
* 日志记录类
*/
private final Logger logger = LoggerFactory.getLogger(getClass());
/**
* sayHello1接口的服务降级类
* @param name 参数
* @return
*/
@Override
public String sayHello1(String name) {
logger.error("您访问了EurekaClientFallBack#sayHello1(),传入的参数为:{}" , name);
return "您访问了EurekaClientFallBack#sayHello1(),传入的参数为:" + name;
}
}
然后消费者端再次调用接口,会发现页面展示为如下图,而不是之前的Whitelabel Error Page了。
到这里,我们就实现了一个最简单的断路器功能了。
4.模拟实现提供服务的客户端代码执行超时的情况:
@RestController
public class Hello1Controller {
/**
* 日志记录类
*/
private final Logger logger = LoggerFactory.getLogger(getClass());
@Value("${server.port}")
private String host;
@Value("${spring.application.name}")
private String instanceName;
@RequestMapping("/sayHello1")
public String sayHello1(@RequestParam("name") String name){
try {
int sleepTime = new Random().nextInt(3000);
logger.error("让线程阻塞 {} 毫秒",sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("你好,服务名:{},端口为:{},接收到的参数为:{}",instanceName,host,name);
return "你好,服务名:"+instanceName+",端口为:"+host+",接收到的参数为:"+name;
}
}
在HELLO-SERVER的工程中让线程随机sleep几秒,然后消费者端调用,可以发现,当调用HELLO-SERVER超过1000毫秒时就会因为服务超时从而触发熔断请求,并调用回调逻辑返回结果。
除了上面的方式外,还可以使用@HystrixCommand的注解来配置fallbackMethod方法(更灵活)。
@HystrixCommand(fallbackMethod = "sayHello1Fallback")
@Override
public String invokeSayHello1(String name) {
long startTime = System.currentTimeMillis();
String result = feign1.sayHello1(name);
logger.error("您访问了CustomerServiceImpl#sayHello1(),执行时间为:{} 毫秒",System.currentTimeMillis() - startTime );
return result;
}
public String sayHello1Fallback(String name){
logger.error("出错了,您访问了CustomerServiceImpl#sayHello1Fallback()" );
return "出错了,您访问了CustomerServiceImpl#sayHello1Fallback()";
}
2.服务熔断
熔断就是当一定时间内,异常请求的比例(请求超时,网络故障,服务异常等)达到阈值,启动熔断器,熔断器一旦启动,则会停止调用具体的服务逻辑,通过fallback快速返回托底数据,保证服务链的完整。熔断又自动恢复的机制,如:当熔断器启动后,每隔5秒尝试将新的请求发送给服务端,如果服务可正常执行并返回结果,则关闭熔断器,服务恢复。如果仍然调用失败,则继续返回托底数据,熔断器处于开启状态。
服务降级是指调用服务出错了返回托底数据,而熔断则是出错后如果开启了熔断器将在一定的时间内不调用服务端。
熔断的实现是指在调用远程服务的方法上增加@HystrixCommand的注解。通过@HystrixProperty的name属性指定需要配置的属性(可以是字符串,也可以使用HystrixPropertiesManager常量类的常量),通过value设置属性的值,也可以通过setter方式设置属性值。
/**
* 注解的配置意思为:当时间在20s内的10个请求中,当出现了30%(3个)的失败,则触发熔断
* @param name 参数
* @return 结果
*/
@HystrixCommand(fallbackMethod = "sayHello1Fallback",commandProperties = {
@HystrixProperty(name="circuitBreaker.requestVolumeThreshold",value = "10"),
@HystrixProperty(name= HystrixPropertiesManager.EXECUTION_ISOLATION_THREAD_TIMEOUT_IN_MILLISECONDS,value = "20000"),
@HystrixProperty(name= HystrixPropertiesManager.CIRCUIT_BREAKER_ERROR_THRESHOLD_PERCENTAGE,value = "30"),
})
@Override
public String invokeSayHello1(String name) {
long startTime = System.currentTimeMillis();
String result = feign1.sayHello1(name);
logger.error("您访问了CustomerServiceImpl#sayHello1(),执行时间为:{} 毫秒",System.currentTimeMillis() - startTime );
return result;
}
Hystrix 常用属性配置
3.请求缓存
请求缓存是保证在一次请求中多次调用同一个服务提供者接口,在cacheKey不变的情况下,后续调用结果都是第一次的缓存结果,而不是多次请求服务提供者,从而降低服务提供者处理重复请求的压力。
设置请求缓存:
@Override
@CacheResult//用来标记请求命令返回的结果应该被缓存
@HystrixCommand
public User getUserById( String id) {
return feign1.getUserById(id);
}
定义缓存key:
当使用注解来定义请求缓存时,若要为请求命令指定具体的缓存key的生成规则,可以使用@CacheResult和@CacheRemove注解的cacheKeyMethod方法指定具体的生成函数,也可以通过@CacheKey注解在方法参数中指定用于组装缓存key的元素。
@Override
@CacheResult(cacheKeyMethod = "getUserByIdCacheKey")//用来标记请求命令返回的结果应该被缓存
@HystrixCommand
public User getUserById( String id) {
return feign1.getUserById(id);
}
public String getUserByIdCacheKey(String id){
return id;
}
/**
* 第二种使用@CacheKey的方式,@CacheKey用来在请求命令的参数上标记,使其作为缓存的key值,如果没有标记则会使用所有参数,如果
* 同事还使用了@CacheResult和@CacheRemove注解的cacheKeyMethod方法指定缓存Key的生成,那么该注解将不会起作用
* 有些教程中说使用这个可以指定参数,比如:@CacheKey("id"),在我这边会报错:
* java.beans.IntrospectionException: Method not found: isId
*/
@Override
@CacheResult
@HystrixCommand
public User getUserById(@CacheKey String id) {
return feign1.getUserById(id);
}
清理缓存:
@CacheRemove注解的commandKey属性是必须要指定的,它用来指明需要使用请求缓存的请求命令,因为只有通过该属性的配置,Hystrix才能找到正确的请求命令缓存位置。
@Override
@CacheRemove(commandKey = "getUserByIdCacheKey")//用来让请求命令的缓存失效,失效的缓存根据定义的key决定
@HystrixCommand
public User removeUserById( String id) {
return feign1.getUserById(id);
}
controller调用,注意定义是要在同一个请求中,如果是不同的请求,则没有效果。
@RequestMapping("/getUserById")
public List<User> getUserById(String id){
User user1 = serivce.getUserById(id);
User user2 = serivce.getUserById(id);
List<User> lsrUser = new ArrayList<>(2);
lsrUser.add(user1);
lsrUser.add(user2);
return lsrUser;
}
4.请求合并
请求合并是指在一段时间内将所有请求合并为一个请求,以减少通信的消耗和线程数的占用,从而大大降低服务端的负载。
请求合并的缺点:
在设置请求合并之后,本来一个请求可能5ms就搞定了,但是现在必须再等10ms等待其他的请求一起,这样一个请求的耗时就从5ms增加到了15ms了。不过如果我们要发起的命令本身就是一个高延迟的命令,那么这个时候就可以使用请求合并了,因为这个时候,等待的时间消耗就显得微不足道了。所以如果需要设置请求合并,千万不能将等待时间设置的过大。
服务提供者的控制类:
@RestController
public class UserBatchController {
/**
* 请求合并的方法
* @param ids
* @return
*/
@RequestMapping(value = "/getUserList", method = RequestMethod.GET)
public List<User> getUserList(String ids) {
System.out.println("ids===:" + ids);
String[] split = ids.split(",");
return Arrays.asList(split)
.stream()
.map(id -> new User(Integer.valueOf(id),"charon"+id,Integer.valueOf(id)*5))
.collect(Collectors.toList());
}
/**
* 请求单个user的方法
* @param id
* @return
*/
@RequestMapping(value = "/getUser/{id}", method = RequestMethod.GET)
public User getUser(@PathVariable("id") String id) {
User user = new User(1, "Charon",15);
return user;
}
}
消费者feign的调用接口:
@RequestMapping(value = "/getUser",method = RequestMethod.GET)
Future<User> getUser(@RequestParam("id")Integer id);
@RequestMapping(value = "/getUserList",method = RequestMethod.GET)
List<User> getUserList (@RequestParam("ids") String ids);
消费者的service及实现类:
Future<User> getUser(Integer i);
/**
* 表示在10s内的getUser请求将会合并到getUserList请求上,合并发出,最大的合并请求数为200
* @param userId 用户id
* @return
*/
@HystrixCollapser(batchMethod = "getUserList",scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
@HystrixProperty(name="timerDelayInMilliseconds",value="10"),
@HystrixProperty(name="maxRequestsInBatch",value="200")
}
)
@Override
public Future<User> getUser(Integer userId){
Future<User> user = feign1.getUser(userId);
return user;
}
@HystrixCommand
public List<User> getUserList(List<Integer> userIdList) {
List<User> lstUser = feign1.getUserList(StringUtils.join(userIdList,","));
return lstUser;
}
消费者控制类:
/**
* 获取单个用户
* @return User
*/
@RequestMapping("/getUser")
public User getUser() throws ExecutionException, InterruptedException {
Future<User> user = serivce.getUser(1);
System.out.println("返回的结果:"+user);
return user.get();
}
/**
* 获取用户list
* @return list
*/
@RequestMapping("/getUserList")
public List<User> getUserList() throws ExecutionException, InterruptedException {
Future<User> user1 = serivce.getUser(1);
Future<User> user2= serivce.getUser(2);
Future<User> user3= serivce.getUser(3);
List<User> users = new ArrayList<>();
users.add(user1.get());
users.add(user2.get());
users.add(user3.get());
System.out.println("返回的结果:" + users);
return users;
}
标注了HystrixCollapser这个注解的,这个方法永远不会执行,当有请求来的时候,直接请求batchMethod所指定的方法。batchMethod的方法在指定延迟时间内会将所有的请求合并一起执行
5.线程池隔离
Hystrix使用舱壁模式实现线程池的隔离,它会为每一个依赖服务创建一个独立的线程池,这样就算某个依赖服务出现延迟过高的情况,也只是对该依赖服务的调用产生影响,而不会拖慢其他的依赖服务。
使用线程池隔离的优点:
- 应用自身得到完全保护,不会受不可控的依赖服务影响,即便给依赖服务分配的线程池被填满,也不会影响到其他的服务
- 可以有效降低接入新服务的风险,如果新服务接入后运行不稳定或存在问题,完全不会影响原来的请求
- 每个服务都是独立的线程池,在一定程度上解决了高并发的问题
- 由于线程池有个数限制,所以也解决了限流的问题
使用线程池隔离的缺点:
- 增加了CPU的开销,因为不仅有tomcat的线程池,还需要有Hystrix的线程池
- 每个操作都是独立的线程,就有排队、调度和上下文切换等问题
不配置线程隔离:
@RequestMapping("/useThread")
public String useThread(){
return serivce.useThread1() + " " + serivce.useThread2();
}
@Override
public String useThread1() {
String threadName = Thread.currentThread().getName();
logger.error("使用的线程名称为:{}",threadName);
return "使用的线程名称为:" + threadName;
}
@Override
public String useThread2() {
String threadName = Thread.currentThread().getName();
logger.error("使用的线程名称为:{}",threadName);
return "使用的线程名称为:" + threadName;
}
如果不配置线程隔离,则使用的是同一个线程
如果我们给useThread1方法设置线程隔离:
@HystrixCommand(groupKey = "useThread1",//分组,设置服务名,一个group使用一个线程
commandKey = "useThread1",//命令名称,默认值为当前执行的方法名称
threadPoolKey = "useThread1",//是配置线程池名称,配置全局唯一标识接口线程池的名称,相同名称的线程池是同一个。默认值是分组名groupKey
threadPoolProperties = {
@HystrixProperty(name = "coreSize", value = "30"),//线程池大小
@HystrixProperty(name = "maxQueueSize", value = "100"),//最大队列长度
@HystrixProperty(name = "keepAliveTimeMinutes", value = "2"),//线程存活时间
@HystrixProperty(name = "queueSizeRejectionThreshold", value = "15")//拒绝请求
})
使用了线程池隔离之后,可以看到两个请求使用的不通的线程池。
6.信号量隔离
信号量隔离是指在规定时间内只允许指定数量的信号量进行服务访问,其他得不到信号量的线程进入fallback,访问结束后,归还信号量。说白了就是做了一个限流。
@RequestMapping("/semaphore")
public String semaphore(){
for (int i = 0; i < 15; i++) {
new Thread(new Runnable() {
@Override
public void run() {
serivce.semaphore();
}
}).start();
}
return "OK";
}
@HystrixCommand(fallbackMethod = "semaphoreFallback",commandProperties = {
@HystrixProperty(name="execution.isolation.strategy",value="SEMAPHORE"), //使用信号量隔离,默认为THREAD
@HystrixProperty(name="execution.isolation.semaphore.maxConcurrentRequests",value="10"), // 信号量最大并发度
})
@Override
public void semaphore() {
try {
Thread.sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.error("正常执行方法");
}
public void semaphoreFallback(){
logger.error("执行了fallback方法");
}
如下图所示,有10个线程拿到了信号量,执行了正常的方法,有5个线程没有拿到信号量,直接调用fallback方法。
原理分析
上一篇文章说过openFeign主要是通过jdk的动态代理构建对象,所以Hystrix集成到feign当中也是使用的jdk动态代理的invocationHandler上,那么来看看Hystrix实现的jdk的动态代理类--HystrixInvocationHandler吧!
invoke方法:
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
// 如果调用的方法来自 java.lang.Object 则提前退出代码与 ReflectiveFeign.FeignInvocationHandler 相同
// ...
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
// 获取并调用MethodHandler,MethodHandler封装了Http请求,ribbon也在这里被集成
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
// fallback的降级方法
@Override
protected Object getFallback() {
if (fallbackFactory == null) {
return super.getFallback();
}
try {
Object fallback = fallbackFactory.create(getExecutionException());
Object result = fallbackMethodMap.get(method).invoke(fallback, args);
if (isReturnsHystrixCommand(method)) {
return ((HystrixCommand) result).execute();
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return ((Observable) result).toBlocking().first();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else if (isReturnsCompletableFuture(method)) {
return ((Future) result).get();
} else {
return result;
}
} catch (IllegalAccessException e) {
// shouldn't happen as method is public due to being an interface
throw new AssertionError(e);
} catch (InvocationTargetException | ExecutionException e) {
// Exceptions on fallback are tossed by Hystrix
throw new AssertionError(e.getCause());
} catch (InterruptedException e) {
// Exceptions on fallback are tossed by Hystrix
Thread.currentThread().interrupt();
throw new AssertionError(e.getCause());
}
}
};
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
} else if (isReturnsCompletableFuture(method)) {
return new ObservableCompletableFuture<>(hystrixCommand);
}
return hystrixCommand.execute();
}
首先创建一个HystrixCommand,用来表示对依赖服务的操作请求,同时传递所有需要的参数,从命名中可以知道才用了“命令模式”来实现对服务调用操作的封装。
命令模式:是指将来自客户端的请求封装成一个对象,从而让调用者使用不同的请求对服务提供者进行参数化。
上面的两种命令模式一共有4种命令的执行方式,Hystrix在执行的时候会根据创建的Command对象以及具体的情况来选择一个执行。
- execute() 方法 :同步执行,从依赖的服务返回一个单一的结果对象,或是在发生错误时抛出异常
- queue() 方法 :异步执行,直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象
- observe()方法:返回Observable对象,它代表了操作的多个结果,是一个Hot observable
- toObservable()方法:同样返回一个Observable对象,也表示了操作的多个结果,但它返回的是一个Cold Observable
接下来首先来看看HystrixCommand#execute()方法:
public R execute() {
try {
// queue()返回一个Future,get()同步等待执行结束,然后获取异步的结果。
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
跟进queue()方法:
public Future<R> queue() {
// 通过toObservable()获得一个Cold Observable,
// 并且通过toBlocking()将该Observable转换成BlockingObservable,可以把数据以阻塞的方式发射出来
// toFuture()则是把BlockingObservable转换成一个Future
final Future<R> delegate = toObservable().toBlocking().toFuture();
final Future<R> f = new Future<R>() {
// future实现,调用delegate的对应实现
}
return f;
}
在queue()中调用了核心方法--toObservable()方法,
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
// ...
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
@Override
public R call(R r) {
R afterFirstApplication = r;
try {
afterFirstApplication = executionHook.onComplete(_cmd, r);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
}
try {
return executionHook.onEmit(_cmd, afterFirstApplication);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
return afterFirstApplication;
}
}
};
final Action0 fireOnCompletedHook = new Action0() {
@Override
public void call() {
try {
executionHook.onSuccess(_cmd);
} catch (Throwable hookEx) {
logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
}
}
};
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
/* this is a stateful object so can only be used once */
if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
//TODO make a new error type for this
throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
}
commandStartTimestamp = System.currentTimeMillis();
if (properties.requestLogEnabled().get()) {
// log this command execution regardless of what happened
if (currentRequestLog != null) {
currentRequestLog.addExecutedCommand(_cmd);
}
}
final boolean requestCacheEnabled = isRequestCachingEnabled();
final String cacheKey = getCacheKey();
// 先从缓存中获取如果有的话直接返回
if (requestCacheEnabled) {
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
if (fromCache != null) {
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
}
}
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache;
// put in cache
if (requestCacheEnabled && cacheKey != null) {
// 里面订阅了,所以开始执行hystrixObservable
HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache != null) {
// another thread beat us so we'll use the cached value instead
toCache.unsubscribe();
isResponseFromCache = true;
return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
} else {
// we just created an ObservableCommand so we cast and return it
afterCache = toCache.toObservable();
}
} else {
afterCache = hystrixObservable;
}
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
这个方法非常长,首先看看applyHystrixSemantics()方法:
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);
// 判断是否开启断路器
if (circuitBreaker.allowRequest()) {
// 断路器是关闭的,则检查识都有可用的资源来执行命令
// 获取信号量实例
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
// 释放信号量
executionSemaphore.release();
}
}
};
final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
// 尝试获取信号量
if (executionSemaphore.tryAcquire()) {
try {
// 执行业务
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 信号量获取失败,走fallback
return handleSemaphoreRejectionViaFallback();
}
} else {
// 断路器是打开的,快速熔断,走fallback
return handleShortCircuitViaFallback();
}
}
applyHystrixSemantics()通过熔断器的allowRequest()方法判断是否需要快速失败走fallback,如果允许执行那么又会经过一层信号量的控制,都通过才会走execute。
所以,核心逻辑就落到了HystrixCircuitBreaker#allowRequest()方法上:
public boolean allowRequest() {
// 强制开启熔断
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// 强制关闭熔断
if (properties.circuitBreakerForceClosed().get()) {
isOpen();
return true;
}
// 判断和计算当前断路器是否打开 或者 允许单个测试 ,通过这两个方法的配合,实现了断路器的打开和关闭状态的切换
return !isOpen() || allowSingleTest();
}
Hystrix允许强制开启或者关闭熔断,如果不想有请求执行就开启,如果觉得可以忽略所有错误就关闭。在没有强制开关的情况下,主要就是判断当前熔断是否开启。另外,在熔断器开启的情况下,会在一定时间后允许发出一个测试的请求,来判断是否开启熔断器。
首先来看看isOpen()方法:
public boolean isOpen() {
if (circuitOpen.get()) {
// 开关是开启的,直接返回
return true;
}
// 开关未开启,获取健康统计
HealthCounts health = metrics.getHealthCounts();
// 总请求数太小的情况,不开启熔断
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
// 总请求数够了,失败率比较小的情况,不开启熔断
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// 总请求数和失败率都比较大的时候,设置开关为开启,进行熔断
if (circuitOpen.compareAndSet(false, true)) {
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true;
}
}
}
总体逻辑就是判断一个失败次数是否达到开启熔断的条件,如果达到那么设置开启的开关。在熔断一直开启的情况下,偶尔会放过一个测试请求来判断是否关闭。
下面看看allowSingleTest()方法:
public boolean allowSingleTest() {
// 获取熔断开启时间,或者上一次的测试时间
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 如果熔断处于开启状态,且当前时间距离熔断开启时间或者上一次执行测试请求时间已经到了
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// 使用cas机制控制熔断的开启
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
}
回到applyHystrixSemantics()这个方法中,获取到信号量之后,执行业务的方法,在executeCommandAndObserve()中进行了一些超时及失败的逻辑处理之后,进入HystrixCommand#executeCommandWithSpecifiedIsolation()中:
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
// ...
Observable<R> execution;
// 判断是否开启超时设置
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd));
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback)
.doOnEach(setRequestContext);
}
在executeCommandWithSpecifiedIsolation(),先判断是否进行线程隔离,及一些状态变化之后,进入getUserExecutionObservable():
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
// 线程隔离
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
// 状态校验
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
// 同级标记命令
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// 该命令在包装线程中超时,立即返回
return Observable.error(new RuntimeException("timed out before executing run()"));
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
//if it was never started and received terminal, then no need to clean up (I don't think this is possible)
}
//if it was unsubscribed, then other cleanup handled it
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
handleThreadEnd(_cmd);
}
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
//if it was never started and was cancelled, then no need to clean up
}
//if it was terminal, then other cleanup handled it
}
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
}
}));
} else {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
}
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
// semaphore isolated
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
try {
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
//If the above hooks throw, then use that as the result of the run method
return Observable.error(ex);
}
}
});
}
}
在getUserExecutionObservable()和getExecutionObservable()中,主要是封装用户定义的run方法:
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
// 获取用户定义逻辑的Observable
userObservable = getExecutionObservable();
} catch (Throwable ex) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
HystrixCommand#getExecutionObservable():
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
// 包装定义的run方法
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
到这里,hystris分析就结束了。hystrix其实就是在feign的调用过程插了一脚,通过对请求的成功失败的统计数据来开关是否进行熔断。又在每个时间窗口内发送一个测试请求出去,来判断是否关闭熔断。总得来说还是很清晰实用的。
出处:https://www.cnblogs.com/pluto-charon/p/14942768.html