详解Spring Boot对 Apache Pulsar的支持
作者:听海边涛声
https://docs.spring.io/spring-boot/docs/3.2.0/reference/htmlsingle/#messaging.pulsar
Apache Pulsar 通过提供 Spring for Apache Pulsar 项目的自动配置而受到支持。
当类路径中存在 org.springframework.pulsar:spring-pulsar
时,Spring Boot 将自动配置并注册经典的(命令式)Spring for Apache Pulsar 组件。当类路径中存在 org.springframework.pulsar:spring-pulsar-reactive
时,Spring Boot 也会对反应式组件执行相同的操作。
分别有适用于命令式和反应式使用的 spring-boot-starter-pulsar
和 spring-boot-starter-pulsar-reactive
“Starters”,可方便地收集依赖项。
连接到Pulsar
当使用 Pulsar 启动器时,Spring Boot 将自动配置并注册一个 PulsarClient
bean。
默认情况下,应用程序尝试连接到位于 pulsar://localhost:6650
的本地 Pulsar 实例。这可以通过将 spring.pulsar.client.service-url
属性设置为不同的值来进行调整。
注意:该值必须是有效的 Pulsar 协议 URL。
可以通过指定任何以 spring.pulsar.client.*
开头的应用程序属性来配置客户端。
如果需要更多控制权来配置 PulsarClient,请考虑注册一个或多个 PulsarClientBuilderCustomizer
bean。
认证(Authentication)
要连接到需要认证的 Pulsar 集群,需要指定要使用哪个认证插件,通过设置 pluginClassName
和插件所需的任何参数。可以将参数设置为参数名称到参数值的映射。以下示例显示了如何配置 AuthenticationOAuth2
插件。
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
注意:
需要确保在 spring.pulsar.client.authentication.param.*
下定义的名称与认证插件所期望的名称完全匹配(通常是驼峰命名法)。Spring Boot 不会尝试对这些条目进行任何形式的宽松绑定。
例如,如果想为 AuthenticationOAuth2
认证插件配置issuer URL,则必须使用 spring.pulsar.client.authentication.param.issuerUrl
。如果使用其他形式,如 issuerurl
或 issuer-url
,则设置将不会应用于插件。
SSL
默认情况下,Pulsar客户端以明文形式与Pulsar服务进行通信。以下部分描述了如何配置Pulsar客户端以使用TLS加密(SSL)。一个先决条件是Broker也已经配置为使用TLS加密。
Spring Boot自动配置目前不支持任何TLS/SSL配置属性。相反,你可以提供一个PulsarClientBuilderCustomizer
,该定制器会在Pulsar客户端构建器上设置必要的属性。Pulsar支持Privacy Enhanced Mail(PEM)和Java KeyStore(JKS)两种证书格式。
按照以下步骤配置TLS:
- 调整Pulsar客户端服务URL以使用
pulsar+ssl://
scheme 和TLS端口(通常为6651
)。 - 调整管理客户端服务URL以使用
https://
scheme 和TLS Web端口(通常为8443
)。 - 提供客户端构建器定制器,该定制器会在构建器上设置相关属性。
以响应式方式连接到Pulsar
当Reactive自动配置被激活时,Spring Boot将自动配置并注册一个ReactivePulsarClient
bean。
连接到Pulsar管理界面
Spring for Apache Pulsar的PulsarAdministration
客户端也实现了自动配置。
默认情况下,应用程序尝试连接到位于http://localhost:8080
的本地Pulsar实例。可以通过将spring.pulsar.admin.service-url
属性设置为(http|https)://<host>:<port>
的不同值来调整此设置。
如果需要更多控制权来配置PulsarAdmin
,请考虑注册一个或多个PulsarAdminBuilderCustomizer
bean。
认证
当访问需要身份验证的Pulsar集群时,管理客户端需要与普通Pulsar客户端相同的安全配置。可以通过将spring.pulsar.client.authentication
替换为spring.pulsar.admin.authentication
来使用上述身份验证配置。
提示:在启动时创建主题,请添加一个类型为PulsarTopic
的bean。如果主题已经存在,则该bean将被忽略。
发送消息
Spring的PulsarTemplate
实现了自动配置,可以使用它来发送消息,如下所示:
import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final PulsarTemplate<String> pulsarTemplate; public MyBean(PulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void someMethod() throws PulsarClientException { this.pulsarTemplate.send("someTopic", "Hello"); } }
PulsarTemplate
依赖于PulsarProducerFactory
来创建底层的Pulsar生产者。Spring Boot的自动配置也提供了这个生产者工厂,默认情况下,它会缓存所创建的生产者。你可以通过指定任何以spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
为前缀的应用属性来配置生产者工厂和缓存设置。
如果你需要对生产者工厂的配置进行更多的控制,考虑注册一个或多个ProducerBuilderCustomizer
bean。这些定制器会应用于所有创建的生产者。你也可以在发送消息时传入一个ProducerBuilderCustomizer
,只影响当前的生产者。
如果你需要对正在发送的消息进行更多的控制,你可以在发送消息时传入一个TypedMessageBuilderCustomizer
。
以响应式方式发送消息
当Reactive自动配置被激活时,Spring的ReactivePulsarTemplate
也会实现自动配置,可以使用它来发送消息,如下所示:
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final ReactivePulsarTemplate<String> pulsarTemplate; public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void someMethod() { this.pulsarTemplate.send("someTopic", "Hello").subscribe(); } }
ReactivePulsarTemplate
依赖于ReactivePulsarSenderFactory
来实际创建底层的发送器。Spring Boot的自动配置也提供了这个发送器工厂,默认情况下,它会缓存所创建的发送器。你可以通过指定任何以spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
为前缀的应用属性来配置发送器工厂和缓存设置。
如果你需要对发送器工厂的配置进行更多的控制,考虑注册一个或多个ReactiveMessageSenderBuilderCustomizer
bean。这些定制器会应用于所有创建的发送器。你也可以在发送消息时传入一个ReactiveMessageSenderBuilderCustomizer
,只影响当前的发送器。
如果你需要对正在发送的消息进行更多的控制,你可以在发送消息时传入一个MessageSpecBuilderCustomizer
。
接收消息
当存在Apache Pulsar基础设施时,任何bean都可以通过添加@PulsarListener
注解来创建监听器端点。以下组件在someTopic
主题上创建了一个监听器端点:
import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.stereotype.Component; @Component public class MyBean { @PulsarListener(topics = "someTopic") public void processMessage(String content) { // ... } }
Spring Boot的自动配置为PulsarListener
提供了所有必要的组件,如PulsarListenerContainerFactory
和用于构建底层Pulsar消费者的消费者工厂。你可以通过指定任何以spring.pulsar.listener.*
和spring.pulsar.consumer.*
为前缀的应用属性来配置这些组件。
如果你需要对消费者工厂的配置进行更多的控制,考虑注册一个或多个ConsumerBuilderCustomizer
bean。这些定制器会应用于工厂创建的所有消费者,因此适用于所有@PulsarListener
实例。你还可以通过设置@PulsarListener
注解的consumerCustomizer
属性来定制单个监听器。
以响应式方式接收消息
当存在Apache Pulsar基础设施且Reactive自动配置被激活时,任何bean都可以通过添加@ReactivePulsarListener
注解来创建响应式监听器端点。以下组件在someTopic
主题上创建了一个响应式监听器端点:
import reactor.core.publisher.Mono; import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; import org.springframework.stereotype.Component; @Component public class MyBean { @ReactivePulsarListener(topics = "someTopic") public Mono<Void> processMessage(String content) { // ... return Mono.empty(); } }
Spring Boot的自动配置为ReactivePulsarListener
提供了所有必要的组件,如ReactivePulsarListenerContainerFactory
和用于构建底层响应式Pulsar消费者的消费者工厂。你可以通过指定任何以spring.pulsar.listener.
和spring.pulsar.consumer.
为前缀的应用属性来配置这些组件。
如果你需要对消费者工厂的配置进行更多的控制,考虑注册一个或多个ReactiveMessageConsumerBuilderCustomizer
bean。这些定制器会应用于工厂创建的所有消费者,因此适用于所有@ReactivePulsarListener
实例。你还可以通过设置@ReactivePulsarListener
注解的consumerCustomizer
属性来定制单个监听器。
读取消息
Pulsar的读取器接口使应用程序能够手动管理游标。当你使用读取器连接到主题时,你需要指定当读取器连接到主题时从哪个消息开始读取。
当存在Apache Pulsar基础设施时,任何bean都可以通过添加@PulsarReader
注解来使用读取器消费消息。以下组件创建了一个读取器端点,该端点从someTopic
主题的开头开始读取消息:
import org.springframework.pulsar.annotation.PulsarReader; import org.springframework.stereotype.Component; @Component public class MyBean { @PulsarReader(topics = "someTopic", startMessageId = "earliest") public void processMessage(String content) { // ... } }
@PulsarReader
依赖于PulsarReaderFactory
来创建底层的Pulsar读取器。Spring Boot的自动配置提供了这个读取器工厂,可以通过设置任何以spring.pulsar.reader.*
为前缀的应用属性来定制它。
如果你需要对读取器工厂的配置进行更多的控制,考虑注册一个或多个ReaderBuilderCustomizer
bean。这些定制器会应用于工厂创建的所有读取器,因此适用于所有@PulsarReader
实例。你还可以通过设置@PulsarReader
注解的readerCustomizer
属性来定制单个监听器。
以响应式方式读取消息
当存在Apache Pulsar基础设施且Reactive自动配置被激活时,Spring会提供ReactivePulsarReaderFactory
,你可以使用它来创建读取器,以响应式的方式读取消息。以下组件使用提供的工厂创建一个读取器,并从someTopic
主题中读取5秒钟前的一条消息:
import java.time.Instant; import java.util.List; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.StartAtSpec; import reactor.core.publisher.Mono; import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory; import org.springframework.stereotype.Component; @Component public class MyBean { private final ReactivePulsarReaderFactory<String> pulsarReaderFactory; public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) { this.pulsarReaderFactory = pulsarReaderFactory; } public void someMethod() { ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder .topic("someTopic") .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5))); Mono<Message<String>> message = this.pulsarReaderFactory .createReader(Schema.STRING, List.of(readerBuilderCustomizer)) .readOne(); // ... } }
Spring Boot的自动配置提供了这个读取器工厂,可以通过设置任何以spring.pulsar.reader.*
为前缀的应用属性来定制它。
如果你需要对读取器工厂的配置进行更多的控制,当使用工厂创建读取器时,考虑传入一个或多个ReactiveMessageReaderBuilderCustomizer
实例。
如果你需要对读取器工厂的配置进行更多的控制,考虑注册一个或多个ReactiveMessageReaderBuilderCustomizer
bean。这些定制器会应用于所有创建的读取器。你也可以在创建读取器时传入一个或多个ReactiveMessageReaderBuilderCustomizer
,只将定制应用于创建的读取器。
额外的Pulsar属性
只有Pulsar支持的属性子集才能直接通过PulsarProperties
类使用。如果你希望使用额外的属性来调整自动配置的组件,而这些属性不被直接支持,你可以使用前面提到的每个组件支持的定制器。
到此这篇关于详解Spring Boot对 Apache Pulsar的支持的文章就介绍到这了,更多相关Spring Boot Apache Pulsar内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!