-
spring cloud stream 3.1.2 源码搭配rocketmq学习 (三)
前言#
这个 functionInitializer 其实是 channel 和 function bean的绑定
响应式的doOn#
同步钩子方法,在subscriber触发一系列事件的时候触发
先来熟悉一下doOn系列的方法. 这个方法在subscriber的时候如果没触发对应的钩子, 是不会执行的.
doOn资料传送门
热身#
public Function<Flux<Message<String>>, Mono<Void>> demo() { return flux -> flux.map(message -> { System.out.println("接收到了: " + message); return message; }).then(); } static class DemoRunner implements CommandLineRunner { Wrapper wrapper; public void run(String... args) throws Exception { InputChannel inputChannel = new InputChannel(); Flux<Message<String>> input = Flux.defer(() -> { Sinks.Many<Message<String>> sink = Sinks.many().unicast().onBackpressureError(); System.out.println("初始化了inputChannel"); MessageHandler messageHandler = message -> { System.out.println("处理信息"); sink.tryEmitNext((Message<String>) message); }; inputChannel.subscribe(messageHandler); return sink.asFlux().doOnCancel(() -> { // ... }); }); Mono<Void> result = wrapper.apply(input); // 上面这一段操作等同于 操作 flux 合并成了一个大的响应式 // Mono<Void> result = Flux.defer(() -> { // Sinks.Many<Message<String>> sink = Sinks.many().unicast().onBackpressureError(); // System.out.println("初始化了inputChannel"); // MessageHandler messageHandler = message -> { // System.out.println("处理信息"); // sink.tryEmitNext((Message<String>) message); // }; // inputChannel.subscribe(messageHandler); // return sink.asFlux().doOnCancel(() -> { // // ... // }); // }).map(message -> { // System.out.println("接收到了: " + message); // return message; // }).then() // .doOnSubscribe(message -> { // System.out.println("在Wrapper.apply我加入了"); // }); result.subscribe(); inputChannel.handle(MessageBuilder.withPayload("aaaa").build()); } } static class InputChannel { final List<MessageHandler> messageHandlers = new ArrayList<>(); public void subscribe(MessageHandler messageHandler) { messageHandlers.add(messageHandler); } public void handle(Message<String> message) { messageHandlers.get(0).handleMessage(message); } } static class Wrapper { Function<Flux<Message<String>>, Mono<Void>> demo; public Mono<Void> apply(Flux<Message<String>> input) { System.out.println("---------"); return demo.apply(input).doOnSubscribe(message -> { System.out.println("在Wrapper.apply我加入了"); }); } }
这一段简单的响应式, 是functionInitializer核心的部分.
先组装flux然后调用我们注册的Bean把初始化的东西传入并生成一个总的响应式, 类似于合体一样. 上面注释部分的result就是最终生成的响应式.
functionInitializer就是把注册的Function Bean的调用某些注册方法加入到channel中和增加一些响应式的钩子达到统一处理某些信息的注册.
下面我们一起来看看源码
functionInitializer#
初始化了一个这样的Bean--new FunctionConfiguration.FunctionToDestinationBinder
public void afterPropertiesSet() throws Exception { Map<String, BindableProxyFactory> beansOfType = this.applicationContext.getBeansOfType(BindableProxyFactory.class); }
首先把BindableProxyFactory.class的Bean都取出来了.
看到BindableProxyFactory是不是很熟悉, 点进去发现, 他是BindableFunctionProxyFactory的父类.
BindableFunctionProxyFactory是不是(二)中用definition注册的Bean.
接着我们看到下面的这个bindFunctionToDestinations函数
只有这个函数不是提供者的时候才能绑定函数到目的地
if (function != null && !function.isSupplier()) { this.bindFunctionToDestinations(bindableProxyFactory, functionDefinition); }
从下述代码发现inputs/outputs 就是(二)中注册的Input/Output
Set<String> inputBindingNames = bindableProxyFactory.getInputs(); Set<String> outputBindingNames = bindableProxyFactory.getOutputs(); public Set<String> getInputs() { return this.inputHolders.keySet(); }
我们看到其中有一段关键的代码
SubscribableChannel, 是不是在(二)中注册的DirectWithAttributesChannel的Bean. 把对应inputBindingName的取了出来并做了对应的封装. 组合成一个Publisher
SubscribableChannel inputChannel = (SubscribableChannel)this.applicationContext.getBean(inputBindingName, SubscribableChannel.class); IntegrationReactiveUtils.messageChannelToFlux(inputChannel);
进入messageChannelToFlux方法我们发现会调用adaptSubscribableChannelToPublisher
private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) { return Flux.defer(() -> { Many<Message<T>> sink = Sinks.many().unicast().onBackpressureError(); MessageHandler messageHandler = (message) -> { while(true) { switch(sink.tryEmitNext(message)) { case FAIL_NON_SERIALIZED: case FAIL_OVERFLOW: LockSupport.parkNanos(1000L); break; case FAIL_ZERO_SUBSCRIBER: throw new IllegalStateException("The [" + sink + "] doesn't have subscribers to accept messages"); case FAIL_TERMINATED: case FAIL_CANCELLED: throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink for message channel: " + inputChannel); default: return; } } }; inputChannel.subscribe(messageHandler); return sink.asFlux().doOnCancel(() -> { inputChannel.unsubscribe(messageHandler); }); }); }
会发现有一行
inputChannel.subscribe(messageHandler);
把处理message的处理器注册进了inputChannel中
因为这个inputChannel就是DirectWithAttributesChannel, 所以我们直接关注到DirectWithAttributesChannel的subscibe方法.
MessageDispatcher dispatcher = this.getRequiredDispatcher(); boolean added = dispatcher.addHandler(handler);
把这个handler加进了dispatcher中, 那这个dispatcher是一个什么呢?
我们查阅继承关系发现DirectChannel这个类初始化的时候初始化了一个dispathcher
public DirectChannel( LoadBalancingStrategy loadBalancingStrategy){ this.dispatcher = new UnicastingDispatcher(); ... }
这样 messageHandler 就注册进了DirectWithAttributesChannel的dispatcher中.
我们回到bindFunctionToDestinations中, 然后我们关注到这一行代码
Object resultPublishers = ((Function)functionToInvoke).apply(inputPublishers.length == 1 ? inputPublishers[0] : Tuples.fromArray(inputPublishers));
functionToInvoke 就是FunctionWrapper, 所以我们看看FunctionInvocationWrapper的apply方法 点进去看看
public Object apply(Object input) { // ... Object result = this.doApply(input); // ... return result; }
看到doApply中, 因为我们注册的Bean是Function类型的, 所以我们直接看到 invokeFunction 发现有关键的一行 invokeFunctionAndEnrichResultIfNecessary
result = this.invokeFunctionAndEnrichResultIfNecessary(convertedInput); private Object invokeFunctionAndEnrichResultIfNecessary(Object value) { //... // target就是注册的Function Bean的函数. // 在此处我们对他进行调用并把输入传入. // intputValue是对inputChannel内的信息进行了处理并封装成了Message // 想知道怎么处理的朋友可以看看源码, 就在这个函数里 Object result = ((Function)this.target).apply(inputValue); //... }
那这个target是什么呢, 这个是时候我们可以打个断点看看, 发现他就是我们注册的Function.
然后他调用了apply, 证明调用了这个方法, 并且传入了inputValue
然后我们发现functionToInvoke.apply这个函数将上述封装的inputChannel响应式进行传入, 并调用对应的function Bean, 得到完整的响应式函数. 合并了两段响应式函数.
这里的resultPublishers实际上就是我们配置的Function调用后的返回的值.
接着对resultPublishers进行判断, 是否有输出需要处理, 有的话做个doOnNext的钩子, 并封装对应的发送和错误处理逻辑. 没有则进行subscribe, 让之前的inputChannel的调用进行消费注册.
((Iterable)resultPublishers).forEach((publisher) -> { Flux flux = Flux.from((Publisher)publisher); if (!CollectionUtils.isEmpty(outputBindingNames)) { // ...发送逻辑 } // 如果不是消费者 则消费. // 这会subscribe上面配置的Flux, 进行对应的初始化. // 但是doOn的方法是钩子, 这边只是简单的subscribe所以不会被触发 if (!function.isConsumer()) { flux.subscribe(); } });
至此, 我们才完整的注册了一个Function Bean.
总结#
- 找到(二)中注册的Bean
- 找到(二)中注册的对应的Input/Output的Bean
- 将channel和这个Function bean绑定到一起, 并加入统一的处理方法