-
Pulsar整合SpringCloud 让Pulsar的配置可以热更新的方法
完整代码git地址 https://gitee.com/zhaoyuxuan66/pulsar-springcloud_boot-demo/tree/master/
代码,包括Pulsar的参数类, Pulsar Client, Producer和Consumer
================Pulsar参数类===================== @Data @RefreshScope @Component @ConfigurationProperties(prefix = "tdmq.pulsar") public class PulsarProperties { /** * 接入地址 */ private String serviceurl; /** * 命名空间tdc */ private String tdcNamespace; /** * 角色tdc的token */ private String tdcToken; /** * 集群name */ private String cluster;
/** * topicMap */ private Map<String, String> topicMap; /** * 订阅 */ private Map<String, String> subMap; /** * 开关 on:Consumer可用 ||||| off:Consumer断路 */ private String onOff;
} ==================PulsarClient=======================
@Slf4j @Configuration @EnableConfigurationProperties(PulsarProperties.class) public class PulsarConfig { @Autowired PulsarProperties pulsarProperties; @RefreshScope @Bean public PulsarClient getPulsarClient() { try { return PulsarClient.builder() .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken())) .serviceUrl(pulsarProperties.getServiceurl()) .build(); } catch (PulsarClientException e) { log.error("初始化Pulsar Client失败", e); } throw new RuntimeException("初始化Pulsar Client失败"); } } ===========Producer&Consumer&发送消息的工具类=================
@Slf4j @Component public class PulsarUtils { @Autowired PulsarProperties pulsarProperties; @Autowired PulsarClient client; @Autowired AuditCommentResultListener<String> auditCommentResultListener; @Autowired AuditReplyResultListener<String> auditReplyResultListener; @Autowired AuditResourceResultListener<String> auditResourceResultListener; /** * 创建一个生产者 * * @param topic topic name * @return Producer生产者 */ public Producer<byte[]> createProducer(String topic) { try { return client.newProducer() .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic) .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .sendTimeout(10, TimeUnit.SECONDS) .blockIfQueueFull(true) .create(); } catch (PulsarClientException e) { log.error("初始化Pulsar Producer失败", e); } throw new RuntimeException("初始化Pulsar Producer失败"); } /** * 创建一个消费者 * * @param topic topic name * @param subscription sub name * @param messageListener MessageListener的自定义实现类 * @return Consumer消费者 */ public Consumer createConsumer(String topic, String subscription, MessageListener messageListener) { try { return client.newConsumer() .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic) .subscriptionName(subscription) .ackTimeout(10, TimeUnit.SECONDS) .subscriptionType(SubscriptionType.Shared) .messageListener(messageListener) .subscribe() ; } catch (PulsarClientException e) { log.error("初始化Pulsar Consumer失败", e); } throw new RuntimeException("初始化Pulsar Consumer失败"); } /** * 异步send一条msg * * @param message 消息体 */ public void sendMessage(String message, Producer<byte[]> producer) { producer.sendAsync(message.getBytes()).thenAccept(msgId -> { log.info("消息发送成功, MessageID为{}", msgId); }); } /** * 同步发送一条msg * * @param message 消息体 * @param producer 生产者实例 */ public void sendOnce(String message, Producer<byte[]> producer) throws PulsarClientException { MessageId send = producer.send(message.getBytes()); log.info("消息成功发送, MessageId {},message {}", send, message); } //-----------consumer----------- @RefreshScope @Bean(name = "audit-resource-result-topic") public Consumer getAuditResourceResultTopicConsumer() { return this.createConsumer(pulsarProperties.getTopicMap().get("audit-resource-result-topic"), pulsarProperties.getSubMap().get("resource-sub-audit-resource-result"), auditResourceResultListener); } //-----------producer----------- @RefreshScope @Bean(name = "resource-publish-topic") public Producer<byte[]> getResourcePublishTopicProducer() { return this.createProducer(pulsarProperties.getTopicMap().get("resource-publish-topic")); } } =====================AbstractListener===============================
@Slf4j @Component public abstract class AbstractListener<String> implements MessageListener<String> { @Autowired PulsarProperties pulsarProperties; @Override public void received(Consumer<String> consumer, Message<String> message) { } /** * 判断开关 * * @return is equals off */ public boolean judgeIsOff() { return pulsarProperties.getOnOff().equals("off"); } }
=================Listener自定义实现类====================
@Slf4j @Component public class AuditCommentResultListener<String> extends AbstractListener<String> { @Autowired CommentService commentService; @Override public void received(Consumer consumer, Message msg) { try { java.lang.String data = new java.lang.String(msg.getData()); log.info("接受到消息, MessageId {} data {}", msg.getMessageId(), data); // 添加开关 if (super.judgeIsOff()) { consumer.negativeAcknowledge(msg); log.error("当前开关为off 拒绝消费消息, MessageId {} data {}", msg.getMessageId(), data); } // 处理业务逻辑 consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); log.error("拒绝消费消息, MessageId {} data {}", msg.getMessageId(), new java.lang.String(msg.getData()), e); } } }
========================================================================= 后来发现 如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉 经排查发现结果 是由于@RefreshScope注解导致, 此注解将摧毁Bean, PulsarConsumer和Producer都将被摧毁,只是说Producer将在下一次调用中完成重启,Consumer则不能重启,因为没有调用. 那么怎么解决呢? 我通过日志打印的信息
发现这行日志 打印在Nacos 更新完配置之后 跟进这个类
==============NacosContextRefresher 81行====================
private void registerNacosListener(final String groupKey, final String dataKey) { String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey); Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> { return new AbstractSharedListener() { public void innerReceive(String dataId, String group, String configInfo) { NacosContextRefresher.refreshCountIncrement(); NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo); NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config")); if (NacosContextRefresher.log.isDebugEnabled()) { NacosContextRefresher.log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo)); } } }; });
============================
关键就在这里 我发现Nacos更新在更新了历史记录表之后 走了这个方法 publishEvent(),我曾经尝试去监听RefreshEvent 但是这个事件 仍然执行在@RefreshScope注解刷新容器事件之后, 我需要以一个延时任务的形式, 在监听到RefreshEvent之后, 延时两秒执行唤醒Consumer的操作 这样的做法不太优雅, 那么继续寻找解决方案 跟入这个publishEvent方法 ============================
===========不用多说 相信看过源码的朋友都知道 该跟进哪一个================
来到了如图的这个地方
这就是Spring发布事件的方法, 打断点 找更新Nacos配置后,将发布什么事件
根据日志的信息
找到这个类
跟进refresh方法
发现 这就是SpringCloud的@RefreshScope刷新容器的方法!!!
于是打了个断点 以寻找RefereshScope发布了事件做了什么
就是发布一系列事件 以刷新容器
=============圆满解决===================
@Slf4j @Component public class RefreshPulsarListener implements ApplicationListener { @Autowired ApplicationContext applicationContext; @Override public void onApplicationEvent(ApplicationEvent event) { if (event.getSource().equals("__refreshAll__")) { log.info("Nacos配置中心配置修改 重启Pulsar===================================="); log.info("重启PulsarClient,{}", applicationContext.getBean("getPulsarClient")); log.info("重启PulsarConsumer,{}", applicationContext.getBean("audit-resource-result-topic")); log.info("重启PulsarConsumer,{}", applicationContext.getBean("audit-comment-result-topic")); log.info("重启PulsarConsumer,{}", applicationContext.getBean("audit-reply-result-topic")); } } }
来源:https://www.cnblogs.com/zhaoyuxuan66/p/15578718.html
最新更新
python爬虫及其可视化
使用python爬取豆瓣电影短评评论内容
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
uniapp/H5 获取手机桌面壁纸 (静态壁纸)
[前端] DNS解析与优化
为什么在js中需要添加addEventListener()?
JS模块化系统
js通过Object.defineProperty() 定义和控制对象
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比