SpringCloudStream中的消息分区数详解
作者:DayDayUp丶
一、前言
本文仅针对 Kafka 来聊消息分区数相关的话题。
SpringCloudStream 中的消息分区数如何配置?
或者说消息分区数会受到哪些配置的影响。
- SpringCloudStream:Greenwich.SR2
- Kafka:kafka_2.12-2.3.0
二、影响因素
2.1 Kafka服务端
首先应该想到的,Kafka 配置文件 server.properties 中默认每一个 topic 的分区数 num.partitions=1
# The default number of log partitions per topic. More partitions allow greater num.partitions=1
2.2 生产者端
从SpringCloudStream的配置中可以看到,生产者可以指定分区数,默认1:
spring.cloud.stream.bindings.<channelName>.partitionCount.producer=n
【说明】:当分区功能开启时,使用该参数来配置消息数据的分区数。
如果消息生产者已经配置了分区键的生成策略,那么它的值必须大于1。
2.3 消费者端
SpringCloudStream 允许通过配置,使得消费者能够自动创建分区。
#输入通道消费者的并发数,默认1 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2
若想以上配置生效,还需添加如下通用配置:
#Kafka绑定器允许在需要的时候自动创建分区。默认false spring.cloud.stream.kafka.binder.autoAddPartitions=true
消费者端如此配置以后,将表现为一个消费者服务或进程中,会有2个线程各自消费1个分区,即2个消费者线程同时消费。
以下是该配置的效果验证步骤:
消费者代码:
1个 @StreamListener 消费自己的 topic 或自己的输出channel:
@EnableBinding(SpiderSink.class) @Slf4j public class SpiderSinkReceiver { @Autowired private SpiderMessageService spiderMessageService; @StreamListener(SpiderSink.INPUT) public void receive(Object payload) { log.info("SPIDER-SINK received: {}", payload); } }
方式一:通过日志验证:
通过在 log4j 日志中,打印线程名称的方式,验证 spring.cloud.stream.bindings.<channelName>.consumer.concurrency 的配置确确实实会新增1个消费者线程。
[INFO ] 2020-05-09 01:19:34,700 [thread: [Ljava.lang.String;@5b40de43.container-1-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50) [INFO ] 2020-05-09 01:19:35,888 [thread: [Ljava.lang.String;@5b40de43.container-0-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)
方式二:直接查看分区数来验证:
另外,也可在启动一个生产者服务时,等待自动创建一个新 topic 后(此时默认分区数为1),比如我们创建的 topic 为“topic-spider-dev”,此时通过kafka命令查看分区数,此时分区数为1:
[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev Topic:topic-spider-dev PartitionCount:1 ReplicationFactor:1 Configs: Topic: topic-spider-dev Partition: 0 Leader: 1 Replicas: 1 Isr: 1
然后,配置消费者服务的 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2,启动一个消费者服务,再次查看分区数,已经变为2了:
[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev Topic:topic-spider-dev PartitionCount:2 ReplicationFactor:1 Configs: Topic: topic-spider-dev Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: topic-spider-dev Partition: 1 Leader: 2 Replicas: 2 Isr: 2
同时查看消费者端的应用日志,看到2个消费者线程各自分配了一个分区:
[INFO ] 2020-05-12 17:22:43,940 [thread: [Ljava.lang.String;@299dd381.container-0-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363) partitions assigned: [topic-spider-dev-0] [INFO ] 2020-05-12 17:22:44,004 [thread: [Ljava.lang.String;@299dd381.container-1-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363) partitions assigned: [topic-spider-dev-1]
最终,确确实实地验证了 concurrency 配置对消费者线程数和分区数的影响。
2.4 其他因素
比如,SpringCloudStream 中 Kafka 绑定器的配置中,也有一个相关的影响因素:
#最小分区数,默认1 spring.cloud.stream.kafka.binder.minPartitionCount=n
【说明】:该参数仅在设置了 autoCreateTopics 和 autoAddPartitions 时生效,用来设置该绑定器所使用主题的全局分区最小数量。
如果当生产者的 partitionCount 参数或 instanceCount * concurrency 设置大于该参数配置时,该参数值将被覆盖。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。