java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Kafka整合WebFlux

Kafka整合WebFlux实践

作者:西红柿系番茄

文章介绍了如何在Kafka中整合WebFlux,包括引入依赖和代码示例,并对相关知识进行了总结

Kafka整合WebFlux

1、引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
    <version>1.1.0.RELEASE</version>
</dependency>

2、代码示例

@Component
public class KafkaService {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    private KafkaSender<String, String> kafkaSender;
    private KafkaReceiver<String, String> kafkaReceiver;

    @PostConstruct
    public void init() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final SenderOptions<String, String> producerOptions = SenderOptions.create(producerProps);
        this.kafkaSender = KafkaSender.create(producerOptions);

        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ReceiverOptions<String, String> consumerOptions = ReceiverOptions.<String, String>create(consumerProps)
                .subscription(Collections.singleton("demo"))
                .addAssignListener(partitions -> System.out.println("onPartitionsAssigned " + partitions))
                .addRevokeListener(partitions -> System.out.println("onPartitionsRevoked " + partitions));
        KafkaReceiver<String, String> kafkaReceiver = KafkaReceiver.create(consumerOptions);
        kafkaReceiver.receive().doOnNext(r -> {
            System.out.println(r.value());
            r.receiverOffset().acknowledge();
        }).subscribe();
        this.kafkaReceiver = kafkaReceiver;
    }

    public Mono< ?> send() {
        SenderRecord<String, String, Object> senderRecord = SenderRecord.create(new ProducerRecord<>("demo", value()), 1);
        return kafkaSender.send(Mono.just(senderRecord)).next();
    }

    private String value() {
        Map<String, String> map = new HashMap<>();
        map.put("name", UUID.randomUUID().toString());
        try {
            return OBJECT_MAPPER.writeValueAsString(map);
        } catch (JsonProcessingException e) {
            return "{}";
        }
    }
}

3、其它

server:
  port: 8888

spring:
  jackson:
    serialization:
      FAIL_ON_EMPTY_BEANS: false

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文