java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring boot  kafka Stream app 开发

Spring boot 项目中如何进行kafka Stream app 开发

作者:小落的编程笔记

文章介绍了Kafka Streams的配置要点与核心方法,强调应用ID唯一性、正确设置Bootstrap服务器,区分KStream与Java Stream的不可变性和多消费特性,本文给大家介绍Springboot项目中如何进行kafka Stream app开发,感兴趣的朋友一起看看吧

Kafka Stream

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点

下面介绍Spring boot 项目中进行kafka Stream app 开发的详细过程。

1. 导入依赖

      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.6.2</version>
      </dependency>

2. 示例代码(伪代码)

这段伪代码只是为了举例设置的场景,业务场景并不一定合适

@Slf4j
@Component
public class MyKafkaStreamProcessor {
	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;
	@PostConstruct
	private void init () {
		String appId = "my-kafka-streams-app";
		myKafkaStreams(appId);
		log.info("✅ Kafka Streams:{} 初始化完成,开始监听 topic: {}", appId, "source-topic");
	}
	public void myKafkaStreams(String appId) {
		/*=======配置=======*/
		Properties config  = new Properties();
		config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
		config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		StreamsBuilder builder = new StreamsBuilder();
		/*=======构建拓扑结构=======*/
		// 数据清洗
		KStream<String, Order> stream = builder
			.stream("source-topic", Consumed.with(Serdes.String(), new JsonSerde<>(Order.class)))
			.mapValues(value -> {
				// do something
				return value;
			});
		// 过滤出从app创建的订单 并进行处理
		stream.filter((k, v) -> Order.getSource.equals("app"))
			.foreach((k, v) -> {
				// do something
			});
		// 发送到第一个topic
		stream.mapValues(value -> JSON.toJSONString(value), Named.as("to-the-first-target-topic-processor"))
			.to("the-first-target-topic", Produced.with(Serdes.String(), Serdes.String()));
		// 发送到第二个topic
		stream.filter((k, v) -> {
				// filter something
			})
			.mapValues(value -> {
				// map to another object
			}, Named.as("to-the-second-target-topic-processor"))
			.to("the-second-target-topic", Produced.with(Serdes.String(), Serdes.String()));
		/*=======创建KafkaStreams=======*/
		KafkaStreams streams = new KafkaStreams(builder.build(), config);
		/*=======设置异常处理器=======*/
		streams.setUncaughtExceptionHandler(new CustomStreamsUncaughtExceptionHandler());
		/*=======启动streams=======*/
		streams.start();
		/*=======添加jvm hook 确保streams安全退出=======*/
		Runtime.getRuntime().addShutdownHook(new Thread(() -> {
			log.info("关闭 Kafka Streams:{}...", appId);
			streams.close();
			log.info("Kafka Streams:{}已经关闭!", appId);
		}));
	}
}
@Slf4j
public class CustomStreamsUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
	/**
	 * Inspect the exception received in a stream thread and respond with an action.
	 *
	 */
	@Override
	public StreamThreadExceptionResponse handle(Throwable throwable) {
		log.error("Kafka Streams 线程发生未捕获异常: {}", ExceptionUtil.stacktraceToString(throwable));
		// 选择处理策略(以下三选一):
		// 1. 替换线程(继续运行)
		return StreamThreadExceptionResponse.REPLACE_THREAD;
		// 2. 关闭整个 Streams 应用
		// return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
		// 3. 关闭整个 JVM
		// return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
	}
}

3. 一些注意事项和说明

4. kafka Stream 的一些方法说明

到此这篇关于Spring boot 项目中如何进行kafka Stream app 开发的文章就介绍到这了,更多相关Spring boot kafka Stream app 开发内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文