Golang

关注公众号 jb51net

关闭
首页 > 脚本专栏 > Golang > docker和k3s实现go语言发送和接收kafka消息

Go库实现Kafka消息的发送与接收(docker和k3s安装kafka)

作者:福大大架构师每日一题

文章介绍使用docker在宿主机映射容器9092端口部署k3s,并使用Go库实现Kafka消息的发送与接收,涉及segmentio/kafka-gogo、saramago和sarama等客户端库的应用场景

kafka是什么

Kafka传统定义 :Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。 发布/订阅 :消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息 分为不同的类别,订阅者只接收感兴趣的消息。

Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

消息队列的应用场景无外乎是:削峰填谷、应用解耦、异步处理等等,具体使用案例我们在之前讲rabbitmq基础篇已经详述过,这里不在做讲述,这里说一下消息队列的两种模型:

kafka基础架构和核心概念

在 Kafka 中,发布订阅的对象是 主题(Topic ),你可以为每个业务、每个应用甚至是每类数据都创建专属的主题。

生产者(Producer) :消息生产者,就是向 kafka broker 发消息的客户端,生产者程序通常持续不断地向一个或多个主题发送消息。

消费者(Consumer) :消息消费者,向 kafka broker 取消息的客户端,消费者就是订阅这些主题消息的客户端应用程序。

和生产者类似,消费者也能够同时订阅多个主题的消息。我们把生产者和消费者统称为客户端(Clients)。你可以同时运行多个生产者和消费者实例,这些实例会不断地向 Kafka 集群中的多个主题生产和消费消息。

消费者组Consumer Group** (CG)**:由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。

主题(topic) :可以理解为一个队列,生产者和消费者面向的都是一个 topic;

分区(Partition) :为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;

副本(Replica) :副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本, 一个 leader 和若干个 follower 。

leader :每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对 象都是 leader。

follower :每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据 的同步。leader 发生故障时,某个 follower 会成为新的 follower

副本的工作机制也很简单:生产者总是向领导者副本写消息;而消费者总是从领导者副本读消息。至于追随者副本,它只做一件事:向领导者副本发送请求,请求领导者把最新生产的消息发给它,这样它能保持与领导者的同步。

Kafka 使用消息日志(Log)来保存数据,一个日志就是磁盘上一个只能追加写(Append-only)消息的物理文件。因为只能追加写入,故 避免了缓慢的随机 I/O 操作,改为性能较好的顺序 I/O 写操作,这也是实现 Kafka 高吞吐量特性的一个重要手段 。不过如果你不停地向一个日志写入消息,最终也会耗尽所有的磁盘空间,因此 Kafka 必然要定期地删除消息以回收磁盘。怎么删除呢?简单来说就是通过日志段(Log Segment)机制。在 Kafka 底层,一个日志又近一步细分成多个日志段,消息被追加写到当前最新的日志段中,当写满了一个日志段后,Kafka 会自动切分出一个新的日志段,并将老的日志段封存起来。Kafka 在后台还有定时任务会定期地检查老的日志段是否能够被删除,从而实现回收磁盘空间的目的

Kafka优缺点

优点

高吞吐量:Kafka的顺序日志机制和高可用性设计使其在高并发场景下表现出色。

扩展性强:通过分区和复制机制,Kafka能够轻松扩展到多个节点。 ** easy to use**:Kafka提供了丰富的 API 和工具支持,简化了集成和管理。

缺点

学习曲线:Kafka的发布-订阅模型和分布式架构对初次接触者来说可能较为复杂。

配置敏感:Kafka的性能和稳定性高度依赖于正确的配置和维护。

合规性与安全性在金融、医疗等高敏感领域,Kafka需要满足严格的合规要求。可以通过配置安全机制(如认证、授权)来确保数据的完整性和安全性。

Kafka注意事项

高并发与分区的管理在高并发场景下,合理的分区划分和负载均衡是关键。如果分区数量过多或负载不平衡,可能导致节点资源浪费或消息延迟。

配置参数的优化Kafka的性能参数(如生产速率、消费速率、分区数等)需要根据实际应用场景进行调整。过高的生产速率可能导致消息堆积,而过低的消费速率则会增加客户端的负载。

网络稳定性Kafka对网络性能有较高的要求。在实际部署中,需要确保集群内各节点之间的网络带宽足够高,避免因网络延迟或分区不一致导致的消息丢失或延迟处理。

集群的高可用性Kafka的高可用性依赖于集群的配置和管理。在部署时,需要确保节点的硬件配置一致,定期监控集群状态,并及时处理节点故障。

监控与运维Kafka的监控是保障系统稳定运行的关键。可以通过工具(如Prometheus、Grafana)实时监控集群的性能、消息队列的健康状况以及消费者组的负载情况。

docker安装命令,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:

docker run -d \
  --name kafka \
  -p 14818:9092 \
  -p 9093:9093 \
  -v /tmp/kraft-combined-logs:/tmp/kraft-combined-logs \
  -e TZ=Asia/Shanghai \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.11.111:14818 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
  -e KAFKA_NUM_PARTITIONS=3 \
  -e KAFKA_LOG_DIRS=/tmp/kraft-combined-logs \
  -e CLUSTER_ID=5L6g3nShT-eMCtK--X86sw \
  apache/kafka-native:4.1.0

k3s的yaml,其中172.16.11.111是宿主机ip,14818是宿主机端口,对应容器端口9092:

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka
  name: kafka
  namespace: moonfdd
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      initContainers:
        - name: kafka-fix-data-volume-permissions
          image: alpine
          imagePullPolicy: IfNotPresent
          command:
          - sh
          - -c
          - "chown -R 1000:1000 /tmp/kraft-combined-logs"
          volumeMounts:
            - mountPath: /tmp/kraft-combined-logs
              name: volv
      containers:
        - env:
            - name: TZ
              value: Asia/Shanghai
            - name: KAFKA_NODE_ID
              value: "1"
            - name: KAFKA_PROCESS_ROLES
              value: broker,controller
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://:9092,CONTROLLER://:9093
            - name: KAFKA_ADVERTISED_LISTENERS
              value: PLAINTEXT://172.16.11.111:14818
            - name: KAFKA_CONTROLLER_LISTENER_NAMES
              value: CONTROLLER
            - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
              value: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
            - name: KAFKA_CONTROLLER_QUORUM_VOTERS
              value: 1@localhost:9093
            - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
              value: "1"
            - name: KAFKA_TRANSACTION_STATE_LOG_MIN_ISR
              value: "1"
            - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
              value: "0"
            - name: KAFKA_NUM_PARTITIONS
              value: "3"
            - name: KAFKA_LOG_DIRS
              value: /tmp/kraft-combined-logs
            - name: CLUSTER_ID
              value: "5L6g3nShT-eMCtK--X86sw"  # 固定集群ID,仅首次启动格式化使用
          image: 'apache/kafka-native:4.1.0'
          imagePullPolicy: IfNotPresent
          name: kafka
          volumeMounts:
            - mountPath: /tmp/kraft-combined-logs
              name: volv
      volumes:
        - hostPath:
            path: /root/k8s/moonfdd/kafka/tmp/kraft-combined-logs
            type: DirectoryOrCreate
          name: volv
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka
  name: kafka
  namespace: moonfdd
spec:
  ports:
    - port: 9092
      protocol: TCP
      targetPort: 9092
      name: 9092-9092
    - port: 9093
      protocol: TCP
      targetPort: 9093
      name: 9093-9093
  selector:
    app: kafka
  type: NodePort

go发送kafka消息:github.com/segmentio/kafka-go

package main

import (
	"context"
	"log"

	"github.com/segmentio/kafka-go"
)

func main() {
	// 创建一个Kafka writer(Producer)
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers:  []string{"172.16.11.111:14818"}, // Kafka broker 地址
		Topic:    "test-topic",                    // 发送的 topic
		Balancer: &kafka.LeastBytes{},             // 负载均衡策略
	})

	// 写入消息
	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("Hello Kafka from Go!"),
		},
	)

	if err != nil {
		log.Fatalf("could not write message: %v", err)
	}

	log.Println("Message sent successfully!")

	// 关闭 writer
	w.Close()
}

go接收kafka消息:github.com/segmentio/kafka-go

package main

import (
	"context"
	"log"

	"github.com/segmentio/kafka-go"
)

func main() {
	// 创建 Kafka reader(Consumer)
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{"172.16.11.111:14818"}, // Kafka broker 地址
		Topic:    "test-topic",                    // 订阅的 topic
		GroupID:  "my-consumer-group",             // 消费者组,确保相同组会读取上一 offset
		MinBytes: 10e3,                            // 最小fetch字节数
		MaxBytes: 10e6,                            // 最大fetch字节数
	})

	for {
		// 读取消息(会自动从上次的 offset 开始)
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			log.Fatalf("could not read message: %v", err)
		}
		log.Printf("offset:%d | key:%s | value:%s\n", m.Offset, string(m.Key), string(m.Value))
	}

	// r.Close() // 如果你打算退出循环时关闭
}

go发送kafka消息:github.com/IBM/sarama

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/IBM/sarama"
)

func main() {
	// 配置生产者
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true          // 确保消息发送成功
	config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本确认
	config.Producer.Retry.Max = 3                    // 重试次数

	// 重要:配置客户端使用正确的主机
	config.Net.SASL.Enable = false
	config.Net.TLS.Enable = false
	config.Version = sarama.MaxVersion

	// 创建同步生产者
	producer, err := sarama.NewSyncProducer([]string{"172.16.11.111:14818"}, config)
	if err != nil {
		log.Fatalf("创建生产者失败: %v", err)
	}
	defer producer.Close()

	// 构造消息
	message := &sarama.ProducerMessage{
		Topic: "test-topic",
		Key:   sarama.StringEncoder("message-key"),
		Value: sarama.StringEncoder(fmt.Sprintf("Hello Kafka! %v", time.Now())),
	}

	// 发送消息
	partition, offset, err := producer.SendMessage(message)
	if err != nil {
		log.Fatalf("发送消息失败: %v", err)
	}

	fmt.Printf("消息发送成功! 分区: %d, 偏移量: %d\n", partition, offset)
}

go接收kafka消息:github.com/IBM/sarama

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"

	"github.com/IBM/sarama"
)

type Consumer struct{}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	// 会话初始化,可以在这里做一些准备工作
	return nil
}

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	// 会话结束时的清理操作
	return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	// claim.Messages() 会不断返回新消息
	for msg := range claim.Messages() {
		fmt.Printf("Topic:%s Partition:%d Offset:%d Value:%s\n",
			msg.Topic, msg.Partition, msg.Offset, string(msg.Value))

		// 标记该消息已被处理,Kafka会自动保存offset
		session.MarkMessage(msg, "")
	}
	return nil
}

func main() {
	// Kafka集群地址
	brokers := []string{"172.16.11.111:14818"}
	groupID := "my-group" // 消费者组ID,保持不变才能从上次offset消费
	topics := []string{"test-topic"}

	// 配置
	config := sarama.NewConfig()
	config.Version = sarama.MaxVersion // Kafka版本
	config.Consumer.Return.Errors = true

	// 非首次启动时自动从上次位置开始
	config.Consumer.Offsets.Initial = sarama.OffsetNewest
	// OffsetNewest: 如果没有历史offset,从最新开始;
	// OffsetOldest: 如果没有历史offset,从最旧开始。

	// 创建消费者组
	consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
	if err != nil {
		log.Fatalf("Error creating consumer group: %v", err)
	}
	defer consumerGroup.Close()

	consumer := &Consumer{}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		for err := range consumerGroup.Errors() {
			log.Printf("Error: %v", err)
		}
	}()

	log.Println("Kafka consumer started...")
	// 优雅退出
	go func() {
		sigchan := make(chan os.Signal, 1)
		signal.Notify(sigchan, os.Interrupt)
		<-sigchan
		cancel()
	}()

	// 循环消费
	for {
		if err := consumerGroup.Consume(ctx, topics, consumer); err != nil {
			log.Printf("Error from consumer: %v", err)
		}

		// 检查退出
		if ctx.Err() != nil {
			return
		}
	}
}

到此这篇关于Go库实现Kafka消息的发送与接收(docker和k3s安装kafka)的文章就介绍到这了,更多相关docker和k3s实现go语言发送和接收kafka消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文