Kafka的安装及接入SpringBoot的详细过程
作者:LB_bei
环境:windows、jdk1.8、springboot2
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/
1.概述
Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。
详细概述见Kafka概述:
1.1 Kafka的作用
- 发布和订阅记录流
- 持久存储记录流,Kafka中的数据即使消费后也不会消失
- 在系统或应用之间构建可靠获取数据的实时流数据管道
- 构建转换或响应数据流的实时流应用程序
- Kafka可以处理源源不断产生的数据
1.2 Kafka的一些概念
- topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
2.Kafka下载安装
Apache Kafka
Apache Kafka: A Distributed Streaming Platform.
https://kafka.apache.org/downloads
选择最新版就可以
2.1 配置kafka
解压下载的文件,修改 config 文件夹下的 zookeeper.properties
修改 config 文件夹下的server.properties
当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka)
advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092
2.2 启动 zookeeper
Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。
kafka_2.13-3.7.0\bin\windows文件夹中输入命令:
zookeeper-server-start.bat ../../config/zookeeper.properties
可以本地访问看一下:http://localhost:2181/
2.3 启动Kafka
kafka_2.13-3.7.0\bin\windows文件夹中输入命令:
kafka-server-start.sh ../../config/server.properties
访问路径:http://localhost:9092/
2.4 便捷启动脚本
两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中
cd bin\windows
zookeeper-server-start.bat ../../config/zookeeper.properties
cd bin\windows
kafka-server-start.bat ../../config/server.properties
3.springboot集成Kafka
3.1 环境搭建
(1)添加pom依赖
<!-- 继承Spring boot工程 --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.8.RELEASE</version> </parent> <properties> <fastjson.version>1.2.58</fastjson.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- kafkfa --> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> </dependency> </dependencies>
(2)配置类application.yml
生产者:
spring: kafka: bootstrap-servers: xxx.xxx.xxx.xxx:9092 producer: retries: 0 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
消费者:
spring: kafka: bootstrap-servers: xxx.xxx.xxx.xxx:9092 consumer: group-id: kafka-demo-kafka-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
(3)启动类
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApp { public static void main(String[] args) { SpringApplication.run(KafkaApp.class, args); } }
3.2 消息生产者
junit测试,新建消息发送方
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class KafkaSendTest { @Autowired private KafkaTemplate<String,String> kafkaTemplate; //如果这里有红色波浪线,那是假错误 @Test public void sendMsg(){ String topic = "spring_test"; kafkaTemplate.send(topic,"hello spring boot kafka!"); System.out.println("发送成功."); while (true){ //保存加载ioc容器 } } }
3.3 消息消费者
新建监听类:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class MyKafkaListener { // 以下两种方法都行 // 指定监听的主题 // @KafkaListener(topics = "spring_test") // public void receiveMsg(String message){ // System.out.println("接收到的消息:"+message); // } @KafkaListener(topics = "spring_test") public void handleMessage(ConsumerRecord<String, String> record) { System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value()); } }
到此这篇关于Kafka的安装及接入SpringBoot的文章就介绍到这了,更多相关Kafka的安装及接入SpringBoot内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!