java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Kafka java使用

Kafka 在 java 中的基本使用步骤

作者:张小虎在学习

本文给大家介绍Kafka在java中的基本使用,本文分步骤结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧

1. 使用 kafka 原生客户端

现在基本都直接使用 springboot 版本,但了解原生客户端,能更好的理解 springboot 版的 kafka 客户端原理。

步骤1:pom 引入核心依赖:

引入依赖时,尽量选择和 kafka 版本对应的依赖版本。

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.13</artifactId>
    <version>4.0.0</version>
</dependency>

步骤2:提供者客户端代码:

提供者客户端要做三件事:

  1. 设置提供者客户端属性(可选属性都被定义在 ProducerConfig 类中)
  2. 设置要发送的消息
  3. 发送(有三种发送方式,下面代码中都有)
public class MyProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 第一步:设置提供者属性
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        try (Producer<String, String> producer = new KafkaProducer<>(props)) {
            // 第二步:设置要发送的消息
            ProducerRecord<String, String> record = new ProducerRecord<>("testTopic", "testKey", "testValue");
            // 第三部:发送消息
            // send(producer, record);
            // sendSync(producer, record);
            sendASync(producer, record);
        }
    }
    /**
     * 发送方式1:单向推送,不关心服务器的应答
     */
    private static void send(Producer<String, String> producer, ProducerRecord<String, String> record) {
        producer.send(record);
    }
    /**
     * 发送方式2:同步推送,得到服务器的应答前会阻塞当前线程
     */
    private static void sendSync(Producer<String, String> producer, ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
        RecordMetadata metadata = producer.send(record).get();
        System.out.println(metadata.topic());
        System.out.println(metadata.partition());
        System.out.println(metadata.offset());
    }
    /**
     * 发送方式3:异步推送,不需等待服务器应答,当服务器有应答后会触发函数回调
     */
    private static void sendASync(Producer<String, String> producer, ProducerRecord<String, String> record) {
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                throw new RuntimeException("向 kafka 推送失败", exception);
            }
            System.out.println(metadata.topic());
            System.out.println(metadata.partition());
            System.out.println(metadata.offset());
        });
    }
}

步骤3:消费者客户端代码:

消费者客户端要做三件事:

  1. 设置消费者客户端属性(可选属性都被定义在 ConsumerConfig 类中)
  2. 设置消费者订阅的主题
  3. 拉取消息
  4. 提交 offset(有两种提交方式,下面代码中都有)
public class MyConsumer {
    public static void main(String[] args) {
        // 第一步:设置消费者属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.28:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // 第二步:设置要订阅的主题
            consumer.subscribe(Collections.singletonList("testTopic"));
            while (true) {
                // 第三步:拉取消息,100 代表最大等待时间,如果时间到了还没有拉取到消息就不阻塞了继续往后执行
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value());
                }
                // 第四步:提交 offset
                // consumer.commitSync(); // 同步提交,表示必须等到 offset 提交完毕,再去消费下⼀批数据
                consumer.commitSync(); // 异步提交,表示发送完提交 offset 请求后,就开始消费下⼀批数据了。不⽤等到Broker的确认。
            }
        }
    }
}

2. Kafka 集成 springboot

springboot 版本是最常用的,比原生客户端使用方便。但是道理是一样的,底层也是原生客户端。

pom 引入核心依赖:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.0</version>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

yaml 配置文件:

这一步无非就是把原生客户端中的属性配置,写在 yaml 中

spring:
  kafka:
    bootstrap-servers: 192.168.2.28:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: testGroup
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

提供者客户端代码:

只需要两步:

  1. 注入 KafkaTemplate
  2. 发送
@RestController
public class ProducerController {
    /**
     * kafka
     */
    private KafkaTemplate<String, Object> kafkaTemplate;
    @Autowired
    public void setKafkaTemplate(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }
    @GetMapping("/test")
    public void send() {
        // 发送 kafka 消息
        kafkaTemplate.send("testTopic", "testKey", "testValue");
    }
}

消费者客户端代码:

只需要监听主题就可以

@RestController
public class ConsumerController {
    // 监听 kafka 消息
    @KafkaListener(topics = {"testTopic"})
    public void test(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}

到此这篇关于Kafka 在 java 中的基本使用的文章就介绍到这了,更多相关Kafka java使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文