VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > 编程开发 > Java教程 >
  • Spring Boot 3 开发上线一个前后端分离的生产级系统(九) - Spring AMQP 集成与配置

Spring AMQP 介绍

AMQP(高级消息队列协议)是一个异步消息传递所使用的应用层协议规范,为面向消息的中间件设计,不受产品和开发语言的限制. Spring AMQP 将核心 Spring 概念应用于基于 AMQP 消息传递解决方案的开发。

RabbitMQ 是基于 AMQP 协议的轻量级、可靠、可扩展、可移植的消息中间件,Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括 spring-boot-starter-amqp “Starter”。

Spring AMQP 集成与配置

  1. 可通过如下 Docker 命令 安装 RabbiMQ:

 
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
  1. 登录 RabbiMQ 的 web 管理界面,创建虚拟主机novel:

RabbitMQ

  1. 项目中加入如下的 maven 依赖:

 
<dependency>
 
<groupId>org.springframework.boot</groupId>
 
<artifactId>spring-boot-starter-amqp</artifactId>
 
</dependency>
  1. 在 application.yml 配置文件中加入 RabbitMQ 的连接配置:

 
spring:
 
rabbitmq:
 
addresses: "amqp://guest:guest@47.106.243.172"
 
virtual-host: novel
 
template:
 
retry:
 
# 开启重试
 
enabled: true
 
# 最大重试次数
 
max-attempts: 3
 
# 第一次和第二次重试之间的持续时间
 
initial-interval: "3s"
  1. 此时已经可以在 Spring Beans 中注入 AmqpTemplate 发送消息了。

Spring AMQP 使用示例

  1. io.github.xxyopen.novel.core.constant包下创建 AMQP 相关常量类:

 
/**
 
* AMQP 相关常量
 
*
 
* @author xiongxiaoyang
 
* @date 2022/5/25
 
*/
 
public class AmqpConsts {
 
 
 
/**
 
* 小说信息改变 MQ
 
* */
 
public static class BookChangeMq{
 
 
 
/**
 
* 小说信息改变交换机
 
* */
 
public static final String EXCHANGE_NAME = "EXCHANGE-BOOK-CHANGE";
 
 
 
/**
 
* Elasticsearch book 索引更新的队列
 
* */
 
public static final String QUEUE_ES_UPDATE = "QUEUE-ES-BOOK-UPDATE";
 
 
 
/**
 
* Redis book 缓存更新的队列
 
* */
 
public static final String QUEUE_REDIS_UPDATE = "QUEUE-REDIS-BOOK-UPDATE";
 
 
 
// ... 其它的更新队列
 
 
 
}
 
 
 
}
  1. io.github.xxyopen.novel.core.config包下创建 AMQP 配置类,配置各个交换机、队列以及绑定关系:

 
/**
 
* AMQP 配置类
 
*
 
* @author xiongxiaoyang
 
* @date 2022/5/25
 
*/
 
@Configuration
 
public class AmqpConfig {
 
 
 
/**
 
* 小说信息改变交换机
 
*/
 
@Bean
 
public FanoutExchange bookChangeExchange() {
 
return new FanoutExchange(AmqpConsts.BookChangeMq.EXCHANGE_NAME);
 
}
 
 
 
/**
 
* Elasticsearch book 索引更新队列
 
*/
 
@Bean
 
public Queue esBookUpdateQueue() {
 
return new Queue(AmqpConsts.BookChangeMq.QUEUE_ES_UPDATE);
 
}
 
 
 
/**
 
* Elasticsearch book 索引更新队列绑定到小说信息改变交换机
 
*/
 
@Bean
 
public Binding esBookUpdateQueueBinding() {
 
return BindingBuilder.bind(esBookUpdateQueue()).to(bookChangeExchange());
 
}
 
 
 
// ... 其它的更新队列以及绑定关系
 
 
 
}
  1. io.github.xxyopen.novel.manager.mq包下创建 AMQP 消息管理类,用来发送各种 AMQP 消息:

 
/**
 
* AMQP 消息管理类
 
*
 
* @author xiongxiaoyang
 
* @date 2022/5/25
 
*/
 
@Component
 
@RequiredArgsConstructor
 
public class AmqpMsgManager {
 
 
 
private final AmqpTemplate amqpTemplate;
 
 
 
@Value("${spring.amqp.enable}")
 
private String enableAmqp;
 
 
 
/**
 
* 发送小说信息改变消息
 
*/
 
public void sendBookChangeMsg(Long bookId) {
 
if (Objects.equals(enableAmqp, CommonConsts.TRUE)) {
 
sendAmqpMessage(amqpTemplate, AmqpConsts.BookChangeMq.EXCHANGE_NAME, null, bookId);
 
}
 
}
 
 
 
private void sendAmqpMessage(AmqpTemplate amqpTemplate, String exchange, String routingKey, Object message) {
 
// 如果在事务中则在事务执行完成后再发送,否则可以直接发送
 
if (TransactionSynchronizationManager.isActualTransactionActive()) {
 
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
 
@Override
 
public void afterCommit() {
 
amqpTemplate.convertAndSend(exchange, routingKey, message);
 
}
 
});
 
return;
 
}
 
amqpTemplate.convertAndSend(exchange, routingKey, message);
 
}
 
 
 
}
  1. 在小说信息更新后,发送 AMQP 消息:

 
@Transactional(rollbackFor = Exception.class)
 
@Override
 
public RestResp<Void> saveBookChapter(ChapterAddReqDto dto) {
 
// 1) 保存章节相关信息到小说章节表
 
// a) 查询最新章节号
 
 
 
// b) 设置章节相关信息并保存
 
 
 
// 2) 保存章节内容到小说内容表
 
 
 
// 3) 更新小说表最新章节信息和小说总字数信息
 
// a) 更新小说表关于最新章节的信息
 
 
 
// b) 发送小说信息更新的 MQ 消息
 
amqpMsgManager.sendBookChangeMsg(dto.getBookId());
 
return RestResp.ok();
 
}
  1. io.github.xxyopen.novel.core.listener包下创建 Rabbit 队列监听器,监听各个 RabbitMQ 队列的消息并处理:

 
/**
 
* Rabbit 队列监听器
 
*
 
* @author xiongxiaoyang
 
* @date 2022/5/25
 
*/
 
@Component
 
@RequiredArgsConstructor
 
@Slf4j
 
public class RabbitQueueListener {
 
 
 
private final BookInfoMapper bookInfoMapper;
 
 
 
private final ElasticsearchClient esClient;
 
 
 
/**
 
* 监听小说信息改变的 ES 更新队列,更新最新小说信息到 ES
 
* */
 
@RabbitListener(queues = AmqpConsts.BookChangeMq.QUEUE_ES_UPDATE)
 
@SneakyThrows
 
public void updateEsBook(Long bookId) {
 
BookInfo bookInfo = bookInfoMapper.selectById(bookId);
 
IndexResponse response = esClient.index(i -> i
 
.index(EsConsts.BookIndex.INDEX_NAME)
 
.id(bookInfo.getId().toString())
 
.document(EsBookDto.build(bookInfo))
 
);
 
log.info("Indexed with version " + response.version());
 
}
 
 
 
// ... 监听其它队列,刷新其它副本数据
 
 
 
}

此时,如果需要更新其它小说副本数据,只需要配置更新队列和增加监听器,不需要在小说信息变更的地方增加任何业务代码,而且任意小说副本的数据刷新之间互不影响,真正实现了模块间的解耦。

原文链接:https://www.cnblogs.com/xxyopen/p/16350743.html


相关教程