-
Kafka 入门实战(3)--SpringBoot 整合 Kafka
spring-kafka 使得在 Spring 环境中使用 Kafka 变的很简单,只需少量的配置和少量的代码就可以发送和接受消息了。本文主要介绍在 SpringBoot 中用 spring-kafka 操作 Kafka,文中使用到的软件版本:Kafka 2.8.0、SpringBoot 2.4.6、Java 1.8.0_191。
1、参数说明
spring-kafka 中参数是以 spring.kafka 开头的,后面的参数名称和 Kafka 的原始参数很类似,只不过 spring-kafka 会把一些参数中的 "." 改为 "-",如 auto.offset.reset 改为 spring.kafka.consumer.auto-offset-reset。
前缀 | 描述 |
spring.kafka | Spring 中 Kafka 相关配置总的前缀 |
spring.kafka.consumer | 消费者相关参数 |
spring.kafka.producer | 生产者相关参数 |
spring.kafka.admin | Kafka 管理相关参数 |
kafka 的原始参数说明可参考:Kafka入门实战(1)-概念、安装及简单使用;或参考官方文档。
2、SpringBoot 整合 Kafka
2.1、引入依赖
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <!--流处理需要用到--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> </dependency>
2.2、增加 Kafka 配置
spring: kafka: bootstrap-servers: 10.40.100.69:9092 producer: acks: all transaction-id-prefix: tx. #开启事务,发送消息的方法需增加@Transactional注解 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: groupA auto-offset-reset: streams: application-id: streams-test properties: "[default.key.serde]": org.apache.kafka.common.serialization.Serdes$StringSerde "[default.value.serde]": org.apache.kafka.common.serialization.Serdes$StringSerde
2.3、发送消息
package com.abc.demo.kafka; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @Component public class Producer { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Transactional @Scheduled(cron = "0/10 * * * * ?") public void sendMessage() { for (int i = 0; i < 10; i++) { kafkaTemplate.send("test", "消息" + i); } } // @Scheduled(cron = "0/10 * * * * ?") // public void sendMessage2() { // kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){ // @Override // public Object doInOperations(KafkaOperations kafkaOperations) { // for (int i = 0; i < 10; i++) { // kafkaTemplate.send("test", "消息" + i); // } // return null; // } // }); // } }
2.4、接受消息
package com.abc.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class Consumer { private static Logger logger = LoggerFactory.getLogger(Consumer.class); @KafkaListener(topics = "test") public void recevieMessage(ConsumerRecord<String, String> record) { logger.info("offset={}, key={}, value={}", record.offset(), record.key(), record.value()); } }
2.5、流处理
package com.abc.demo.kafka; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Produced; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.support.serializer.JsonSerde; import java.util.HashMap; import java.util.Map; @Configuration(proxyBeanMethods = false) @EnableKafkaStreams public class StreamConfig { @Bean public KStream<String, String> kStream(StreamsBuilder streamsBuilder) { KStream<String, String> stream = streamsBuilder.stream("stream-in"); //从 stream-in 队列中读取数据,处理后发送给 stream-out 队列 //发送数据 key,value 分别使用的序列化类为Serdes.String(),JsonSerde stream.map(this::uppercaseValue).to("stream-out", Produced.with(Serdes.String(), new JsonSerde<>())); return stream; } /** * 消息转换,新的消息:key-原来的value值,value-一个map */ private KeyValue<String, Map> uppercaseValue(String key, String value) { Map<String, String> map = new HashMap<>(); map.put("message", value.toUpperCase()); map.put("timestamp", System.currentTimeMillis() + ""); return new KeyValue(value, map); } }
程序从 stream-in 中读取消息,对消息加工后再发送给 stream-out;打开两个终端,一个往 stream-in 发送消息,一个接受 stream-out 的消息。
./kafka-console-producer.sh --broker-list 10.40.100.69:9092 --topic stream-in #发送消息 ./kafka-console-consumer.sh --bootstrap-server 10.40.100.69:9092 --topic stream-out --property print.key=true #接受消息
stream-in 的输入:
stream-out 的输出:
出处:https://www.cnblogs.com/wuyongyin/p/14900587.html
最新更新
python爬虫及其可视化
使用python爬取豆瓣电影短评评论内容
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
uniapp/H5 获取手机桌面壁纸 (静态壁纸)
[前端] DNS解析与优化
为什么在js中需要添加addEventListener()?
JS模块化系统
js通过Object.defineProperty() 定义和控制对象
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比