java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > SpringCloudStream中的消息分区数

SpringCloudStream中的消息分区数详解

作者:DayDayUp丶

这篇文章主要介绍了SpringCloudStream中的消息分区数,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教

一、前言

本文仅针对 Kafka 来聊消息分区数相关的话题。

SpringCloudStream 中的消息分区数如何配置?

或者说消息分区数会受到哪些配置的影响。

二、影响因素

2.1 Kafka服务端

首先应该想到的,Kafka 配置文件 server.properties 中默认每一个 topic 的分区数 num.partitions=1

# The default number of log partitions per topic. More partitions allow greater
num.partitions=1

2.2 生产者端

SpringCloudStream的配置中可以看到,生产者可以指定分区数,默认1:

spring.cloud.stream.bindings.<channelName>.partitionCount.producer=n

【说明】:当分区功能开启时,使用该参数来配置消息数据的分区数。

如果消息生产者已经配置了分区键的生成策略,那么它的值必须大于1。

2.3 消费者端

SpringCloudStream 允许通过配置,使得消费者能够自动创建分区。

#输入通道消费者的并发数,默认1
spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2

若想以上配置生效,还需添加如下通用配置:

#Kafka绑定器允许在需要的时候自动创建分区。默认false
spring.cloud.stream.kafka.binder.autoAddPartitions=true

消费者端如此配置以后,将表现为一个消费者服务或进程中,会有2个线程各自消费1个分区,即2个消费者线程同时消费。

以下是该配置的效果验证步骤:

消费者代码:

1个 @StreamListener 消费自己的 topic 或自己的输出channel:

@EnableBinding(SpiderSink.class)
@Slf4j
public class SpiderSinkReceiver {
 
    @Autowired
    private SpiderMessageService spiderMessageService;
 
    @StreamListener(SpiderSink.INPUT)
    public void receive(Object payload) {
        log.info("SPIDER-SINK received: {}", payload);
    }
}

方式一:通过日志验证:

通过在 log4j 日志中,打印线程名称的方式,验证 spring.cloud.stream.bindings.<channelName>.consumer.concurrency 的配置确确实实会新增1个消费者线程。

[INFO ] 2020-05-09 01:19:34,700 [thread: [Ljava.lang.String;@5b40de43.container-1-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)
[INFO ] 2020-05-09 01:19:35,888 [thread: [Ljava.lang.String;@5b40de43.container-0-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)

方式二:直接查看分区数来验证:

另外,也可在启动一个生产者服务时,等待自动创建一个新 topic 后(此时默认分区数为1),比如我们创建的 topic 为“topic-spider-dev”,此时通过kafka命令查看分区数,此时分区数为1:

[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev
Topic:topic-spider-dev  PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: topic-spider-dev Partition: 0    Leader: 1       Replicas: 1     Isr: 1

然后,配置消费者服务的 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2,启动一个消费者服务,再次查看分区数,已经变为2了:

[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev
Topic:topic-spider-dev  PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: topic-spider-dev Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: topic-spider-dev Partition: 1    Leader: 2       Replicas: 2     Isr: 2

同时查看消费者端的应用日志,看到2个消费者线程各自分配了一个分区:

[INFO ] 2020-05-12 17:22:43,940 [thread: [Ljava.lang.String;@299dd381.container-0-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363)
partitions assigned: [topic-spider-dev-0]
[INFO ] 2020-05-12 17:22:44,004 [thread: [Ljava.lang.String;@299dd381.container-1-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363)
partitions assigned: [topic-spider-dev-1]

最终,确确实实地验证了 concurrency 配置对消费者线程数和分区数的影响。

2.4 其他因素

比如,SpringCloudStream 中 Kafka 绑定器的配置中,也有一个相关的影响因素:

#最小分区数,默认1
spring.cloud.stream.kafka.binder.minPartitionCount=n

【说明】:该参数仅在设置了 autoCreateTopics 和 autoAddPartitions 时生效,用来设置该绑定器所使用主题的全局分区最小数量。

如果当生产者的 partitionCount 参数或 instanceCount * concurrency 设置大于该参数配置时,该参数值将被覆盖。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文