-
线程池怎么做到主动回调
某些业务场景需要在一个异步操作完成后做一个回调,如果使用Future有个问题,需要主动调用get然后再执行逻辑。但是异步逻辑不知道什么时候结束,所以推荐回调的方式处理比较优雅,本文总结了三种方式可以实现此类需求。
1、CompletableFuture
Future
可以明确地完成(设定其值和状态),并且可以被用作CompletionStage
,支持有关的功能和它的完成时触发动作。
当两个或多个线程试图complete
, completeExceptionally
,或cancel
一个CompletableFuture,只有一个成功。
除了直接操作状态和结果的这些和相关方法外,CompletableFuture还实现了接口CompletionStage
,具有以下策略:
- 为异步方法的依赖完成提供的操作可以由完成当前CompletableFuture的线程或完成方法的任何其他调用者执行。
-
所有不使用显式Executor参数的异步方法都使用
ForkJoinPool.commonPool()
执行 (除非它不支持至少两个并行级别,在这种情况下,使用新的线程)。 为了简化监视,调试和跟踪,所有生成的异步任务都是标记接口CompletableFuture.AsynchronousCompletionTask
的实例 。 - 所有CompletionStage方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法的覆盖的影响。
CompletableFuture还实施Future
,具有以下政策:
-
由于(不同于
FutureTask
),这个类不能直接控制导致其完成的计算,所以取消被视为另一种形式的异常完成。 方法cancel
具有相同的效果completeExceptionally(new CancellationException())
。 方法isCompletedExceptionally()
可用于确定CompletableFuture是否以任何特殊方式完成。 -
如果使用CompletionException异常完成,方法
get()
和get(long, TimeUnit)
将抛出与对应的CompletionException中保持的相同原因的ExecutionException
。 为了简化大多数情况下的使用,此类还定义了方法join()
和getNow(T)
,而是在这些情况下直接抛出CompletionException。
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
2、Guava 的 ListenableFuture
ListenableFuture 是对原有 Future 的增强,可以用于监听 Future 任务的执行状况,是执行成功还是执行失败,并提供响应的接口用于对不同结果的处理。简单来说就是他提供了一个回调接口,在任务结束时可以主动去做通知调用,有点主动推的感觉。
public class ListenFutureTest {
public static void main(String[] args) {
testListenFuture();
}
public static void testListenFuture() {
System.out.println("主任务执行完,开始异步执行副任务1.....");
ListeningExecutorService pool = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5));
ListenableFuture<String> future = pool.submit(new Task());
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
System.out.println("成功,结果是:" + result);
}
@Override
public void onFailure(Throwable t) {
System.out.println("出错,业务回滚或补偿");
}
});
System.out.println("副本任务启动,回归主任务线,主业务正常返回2.....");
}
}
class Task implements Callable<String> {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(1);
// int a =1/0;
return "task done";
}
}
3、RxJava2
推荐使用RxJava2,具有背压和默认线程调度功能。
ExecutorService executor = Executors.newFixedThreadPool(2);
System.out.println("MAIN: " + Thread.currentThread().getId());
Callable<String> callable = () -> {
System.out.println("callable [" + Thread.currentThread().getId() + "]: ");
Path filePath = Paths.get("build.gradle");
return Files.readAllLines(filePath).stream().flatMap(s -> Arrays.stream(s.split
(""))).count() + "";
};
Future<String> future = executor.submit(callable);
Consumer<String> onNext = v -> System.out
.println("consumer[" + Thread.currentThread().getId() + "]:" + v);
Flowable.fromCallable(callable).subscribe(onNext);
Flowable.fromFuture(future).subscribe(onNext);
System.out.println("END");
参考:
[RxJava2 响应式编程介绍] https://zouzhberk.github.io/rxjava-study/
出处:https://www.cnblogs.com/javago/p/14466698.html
栏目列表
最新更新
MyBatis从入门到入土——缓存的使用
线程池怎么做到主动回调
【福尔摩斯探案集】数据库和应用程序时
设计模式之简单工厂模式(Simple Factory Pa
springmvc核心流程及配置
如何发起并运营一个开源项目
Spring 中的重试机制,简单、实用!
java 数组详解
什么是SpringCloud?个人理解
Java异常处理(捕获异常)
MongoDB常用命令(2)
MongoDB基本介绍与安装(1)
SQLServer触发器调用JavaWeb接口
SQL Server索引的原理深入解析
SqlServer2016模糊匹配的三种方式及效率问题
SQL中Truncate的用法
sqlserver 多表关联时在where语句中慎用tri
链接服务器读取Mysql---出现消息 7347,级别
SQL Server解惑——为什么你拼接的SQL语句换
MySQL视图了解一下
748. 最短补全词 循环遍历每一个单词,比
1078. Bigram 分词
567. 字符串的排列
794. 有效的井字游戏 找出判断条件
1446. 连续字符 滑动窗口
1518. 换酒问题 不断更新当前瓶子的个数
118. 杨辉三角 循环遍历,一层一层计算
136. 只出现一次的数字 循环利用indexOf求解
1122. 数组的相对排序 利用Map集合和数组排
1005. K 次取反后最大化的数组和 找到负数