Kafka整合WebFlux实践
作者:西红柿系番茄
文章介绍了如何在Kafka中整合WebFlux,包括引入依赖和代码示例,并对相关知识进行了总结
Kafka整合WebFlux
1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.1.0.RELEASE</version>
</dependency>2、代码示例
@Component
public class KafkaService {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private KafkaSender<String, String> kafkaSender;
private KafkaReceiver<String, String> kafkaReceiver;
@PostConstruct
public void init() {
final Map<String, Object> producerProps = new HashMap<>();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
final SenderOptions<String, String> producerOptions = SenderOptions.create(producerProps);
this.kafkaSender = KafkaSender.create(producerOptions);
final Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
ReceiverOptions<String, String> consumerOptions = ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton("demo"))
.addAssignListener(partitions -> System.out.println("onPartitionsAssigned " + partitions))
.addRevokeListener(partitions -> System.out.println("onPartitionsRevoked " + partitions));
KafkaReceiver<String, String> kafkaReceiver = KafkaReceiver.create(consumerOptions);
kafkaReceiver.receive().doOnNext(r -> {
System.out.println(r.value());
r.receiverOffset().acknowledge();
}).subscribe();
this.kafkaReceiver = kafkaReceiver;
}
public Mono< ?> send() {
SenderRecord<String, String, Object> senderRecord = SenderRecord.create(new ProducerRecord<>("demo", value()), 1);
return kafkaSender.send(Mono.just(senderRecord)).next();
}
private String value() {
Map<String, String> map = new HashMap<>();
map.put("name", UUID.randomUUID().toString());
try {
return OBJECT_MAPPER.writeValueAsString(map);
} catch (JsonProcessingException e) {
return "{}";
}
}
}3、其它
server:
port: 8888
spring:
jackson:
serialization:
FAIL_ON_EMPTY_BEANS: false总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
