VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • Pulsar整合SpringCloud 让Pulsar的配置可以热更新的方法

完整代码git地址 https://gitee.com/zhaoyuxuan66/pulsar-springcloud_boot-demo/tree/master/

代码,包括Pulsar的参数类, Pulsar Client, Producer和Consumer

================Pulsar参数类=====================
@Data
@RefreshScope
@Component
@ConfigurationProperties(prefix = "tdmq.pulsar")
public class PulsarProperties {

    /**
     * 接入地址
     */
    private String serviceurl;

    /**
     * 命名空间tdc
     */
    private String tdcNamespace;

    /**
     * 角色tdc的token
     */
    private String tdcToken;

    /**
     * 集群name
     */
    private String cluster;

/**
 * topicMap
 */
private Map<String, String> topicMap;

/**
 * 订阅
 */
private Map<String, String> subMap;

/**
 * 开关 on:Consumer可用 ||||| off:Consumer断路
 */
private String onOff;
}
==================PulsarClient=======================
@Slf4j
@Configuration
@EnableConfigurationProperties(PulsarProperties.class)
public class PulsarConfig {

    @Autowired
    PulsarProperties pulsarProperties;

    @RefreshScope
    @Bean
    public PulsarClient getPulsarClient() {

        try {
            return PulsarClient.builder()
                    .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))
                    .serviceUrl(pulsarProperties.getServiceurl())
                    .build();
        } catch (PulsarClientException e) {
            log.error("初始化Pulsar Client失败", e);
        }

        throw new RuntimeException("初始化Pulsar Client失败");
    }

}
===========Producer&Consumer&发送消息的工具类=================
@Slf4j
@Component
public class PulsarUtils {

    @Autowired
    PulsarProperties pulsarProperties;

    @Autowired
    PulsarClient client;

    @Autowired
    AuditCommentResultListener<String> auditCommentResultListener;

    @Autowired
    AuditReplyResultListener<String> auditReplyResultListener;

    @Autowired
    AuditResourceResultListener<String> auditResourceResultListener;

    /**
     * 创建一个生产者
     *
     * @param topic topic name
     * @return Producer生产者
     */
    public Producer<byte[]> createProducer(String topic) {

        try {
            return client.newProducer()
                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
                    .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                    .sendTimeout(10, TimeUnit.SECONDS)
                    .blockIfQueueFull(true)
                    .create();
        } catch (PulsarClientException e) {
            log.error("初始化Pulsar Producer失败", e);
        }

        throw new RuntimeException("初始化Pulsar Producer失败");
    }

    /**
     * 创建一个消费者
     *
     * @param topic           topic name
     * @param subscription    sub name
     * @param messageListener MessageListener的自定义实现类
     * @return Consumer消费者
     */
    public Consumer createConsumer(String topic, String subscription,
                                   MessageListener messageListener) {
        try {
            return client.newConsumer()
                    .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
                    .subscriptionName(subscription)
                    .ackTimeout(10, TimeUnit.SECONDS)
                    .subscriptionType(SubscriptionType.Shared)
                    .messageListener(messageListener)
                    .subscribe()
                    ;
        } catch (PulsarClientException e) {
            log.error("初始化Pulsar Consumer失败", e);
        }

        throw new RuntimeException("初始化Pulsar Consumer失败");
    }

    /**
     * 异步send一条msg
     *
     * @param message 消息体
     */
    public void sendMessage(String message, Producer<byte[]> producer) {
        producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
            log.info("消息发送成功, MessageID为{}", msgId);
        });
    }

    /**
     * 同步发送一条msg
     *
     * @param message  消息体
     * @param producer 生产者实例
     */
    public void sendOnce(String message, Producer<byte[]> producer) throws PulsarClientException {
        MessageId send = producer.send(message.getBytes());
        log.info("消息成功发送, MessageId {},message {}", send, message);
    }

    //-----------consumer-----------
    @RefreshScope
    @Bean(name = "audit-resource-result-topic")
    public Consumer getAuditResourceResultTopicConsumer() {
        return this.createConsumer(pulsarProperties.getTopicMap().get("audit-resource-result-topic"),
                pulsarProperties.getSubMap().get("resource-sub-audit-resource-result"),
                auditResourceResultListener);
    }

    //-----------producer-----------
    @RefreshScope
    @Bean(name = "resource-publish-topic")
    public Producer<byte[]> getResourcePublishTopicProducer() {
        return this.createProducer(pulsarProperties.getTopicMap().get("resource-publish-topic"));
    }
}
=====================AbstractListener===============================
@Slf4j
@Component
public abstract class AbstractListener<String> implements MessageListener<String> {

    @Autowired
    PulsarProperties pulsarProperties;

    @Override
    public void received(Consumer<String> consumer, Message<String> message) {

    }

    /**
     * 判断开关
     *
     * @return is equals off
     */
    public boolean judgeIsOff() {
        return pulsarProperties.getOnOff().equals("off");
    }
}
=================Listener自定义实现类====================
@Slf4j
@Component
public class AuditCommentResultListener<String> extends AbstractListener<String> {

    @Autowired
    CommentService commentService;

    @Override
    public void received(Consumer consumer, Message msg) {
        try {
            java.lang.String data = new java.lang.String(msg.getData());
            log.info("接受到消息, MessageId {} data {}", msg.getMessageId(), data);
            // 添加开关
            if (super.judgeIsOff()) {
                consumer.negativeAcknowledge(msg);
                log.error("当前开关为off 拒绝消费消息, MessageId {} data {}", msg.getMessageId(), data);
            }
            // 处理业务逻辑

            consumer.acknowledge(msg);
        } catch (Exception e) {
            consumer.negativeAcknowledge(msg);
            log.error("拒绝消费消息, MessageId {} data {}", msg.getMessageId(), new java.lang.String(msg.getData()), e);
        }
    }
}
=========================================================================
后来发现 如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉
经排查发现结果 是由于@RefreshScope注解导致, 此注解将摧毁Bean, PulsarConsumer和Producer都将被摧毁,只是说Producer将在下一次调用中完成重启,Consumer则不能重启,因为没有调用. 那么怎么解决呢?
我通过日志打印的信息

 

 

 发现这行日志 打印在Nacos 更新完配置之后 跟进这个类

==============NacosContextRefresher 81行====================

private void registerNacosListener(final String groupKey, final String dataKey) {
    String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
    Listener listener = (Listener)this.listenerMap.computeIfAbsent(key, (lst) -> {
        return new AbstractSharedListener() {
            public void innerReceive(String dataId, String group, String configInfo) {
                NacosContextRefresher.refreshCountIncrement();
                NacosContextRefresher.this.nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
                NacosContextRefresher.this.applicationContext.publishEvent(new RefreshEvent(this, (Object)null, "Refresh Nacos config"));
                if (NacosContextRefresher.log.isDebugEnabled()) {
                    NacosContextRefresher.log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s", group, dataId, configInfo));
                }

            }
        };
    });

 

 

 ============================

关键就在这里 我发现Nacos更新在更新了历史记录表之后 走了这个方法 publishEvent(),我曾经尝试去监听RefreshEvent 但是这个事件 仍然执行在@RefreshScope注解刷新容器事件之后, 我需要以一个延时任务的形式, 在监听到RefreshEvent之后, 延时两秒执行唤醒Consumer的操作
这样的做法不太优雅, 那么继续寻找解决方案 跟入这个publishEvent方法
============================

 

 ===========不用多说 相信看过源码的朋友都知道 该跟进哪一个================

 

 来到了如图的这个地方 

这就是Spring发布事件的方法, 打断点 找更新Nacos配置后,将发布什么事件

根据日志的信息

找到这个类

 

跟进refresh方法

 

 发现 这就是SpringCloud的@RefreshScope刷新容器的方法!!!

 

 

 

 

 

 

于是打了个断点 以寻找RefereshScope发布了事件做了什么

 

 就是发布一系列事件 以刷新容器

=============圆满解决===================

@Slf4j
@Component
public class RefreshPulsarListener implements ApplicationListener {

    @Autowired
    ApplicationContext applicationContext;

    @Override
    public void onApplicationEvent(ApplicationEvent event) {

        if (event.getSource().equals("__refreshAll__")) {
            log.info("Nacos配置中心配置修改 重启Pulsar====================================");
            log.info("重启PulsarClient,{}", applicationContext.getBean("getPulsarClient"));
            log.info("重启PulsarConsumer,{}", applicationContext.getBean("audit-resource-result-topic"));
            log.info("重启PulsarConsumer,{}", applicationContext.getBean("audit-comment-result-topic"));
            log.info("重启PulsarConsumer,{}", applicationContext.getBean("audit-reply-result-topic"));
        }
    }

}

来源:https://www.cnblogs.com/zhaoyuxuan66/p/15578718.html


      



  

相关教程