java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Kafka消息发送与读取

Kafka在客户端实现消息的发送与读取

作者:warybee

这篇文章主要介绍了Kafka在客户端实现消息的发送与读取,KafkaProducer是用于发送消息的类,ProducerRecord类用于封装Kafka的消息,KafkaProducer的实例化需要指定的参数,Producer的参数定义在 org.apache.kafka.clients.producer.ProducerConfig类中,需要的朋友可以参考下

1.创建Maven工程

引入kafka相关依赖,POM文件如下:

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.7</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.生产者API(Producer API)

2.1 生成者处理流程

在这里插入图片描述

  1. Producer创建时,会创建一个Sender线程并设置为守护线程。
  2. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->**分区器,**然后将消 息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
  3. 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算 哪个。
  4. 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失 败原因允许重试,那么客户端内部会对该消息进行重试。
  5. 落盘到broker成功,返回生产元数据给生产者。
  6. 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。

2.2 常用参数介绍

生产者主要的对象有: KafkaProducer , ProducerRecord 。

KafkaProducer 的实例化需要指定的参数,Producer的参数定义在 org.apache.kafka.clients.producer.ProducerConfig类中。

常用参数说明如下:

在这里插入图片描述

acks: 默认值:all。

acks=0: 生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息 已经发送完成。 该情形不能保证broker是否真的收到了消息,retries配置也不会生效。发 送的消息的返回的消息偏移量永远是-1

acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的 确认。 在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得 及同步该消息,则该消息丢失。

acks=all 首领分区会等待所有的ISR副本分区确认记录。 该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。 这是Kafka最强的可靠性保证,等效于 acks=-1

2.3 生产者代码

package com.warybee;
import org.apache.kafka.clients.producer.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
 * @author joy
 */
public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 设置key的序列化类
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 设置value的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs);
        //发送100条消息
        for (int i = 0; i < 100; i++) {
            ProducerRecord<Integer,String> producerRecord=new ProducerRecord<>
                    (       "test_topic_1",
                            0,
                            i,
                            "test topic msg "+i);
            //消息的异步确认
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                    if (exception==null){
                        System.out.println("消息的主题:"+recordMetadata.topic());
                        System.out.println("消息的分区:"+recordMetadata.partition());
                        System.out.println("消息的偏移量:"+recordMetadata.offset());
                    }else {
                        System.out.println("发送消息异常");
                    }
                }
            });
        }
        // 关闭生产者
        kafkaProducer.close();
    }
}

3.消费者API(Consumer API)

3.1 常用参数介绍

KafkaConsumer 的实例化需要指定的参数,Consumer的参数定义在 org.apache.kafka.clients.consumer.ConsumerConfig类中。

常用参数说明如下:

3.2 消费者与消费组概念介绍

每一个Consumer属于一个特定的Consumer Group,消费者可以通过指定group.id,来确定其所在消费组。

group_id一般设置为应用的逻辑名称。比如多个订单处理程序组成一个消费组,可以设置group_id 为"order_process"。

消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费

一个拥有四个分区的主题,包含一个消费者的消费组。此时,消费组中的消费者消费主题中的所有分区。

在这里插入图片描述

如果在消费组中添加一个消费者2,则每个消费者分别从两个分区接收消息。

在这里插入图片描述

如果消费组有四个消费者,则每个消费者可以分配到一个分区

在这里插入图片描述

如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任 何消息

在这里插入图片描述

向消费组添加消费者是横向扩展消费能力的主要方式。

3.3 消费者代码

package com.warybee;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
/**
 * @author joy
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化类
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //value反序列化类
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //创建消费者对象
        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
        List<String> topics = new ArrayList<>();
        topics.add("test_topic_1");
        //消费者订阅主题
        consumer.subscribe(topics);
        while (true){
            //批量拉取主题消息,每3秒拉取一次
            ConsumerRecords<Integer, String> records = consumer.poll(3000);
            //变量消息
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("主题:"+record.topic() + "\t"
                       +"分区:" + record.partition() + "\t"
                        +"偏移量:" +  + record.offset() + "\t"
                        +"Key:"+ record.key() + "\t"
                        +"Value:"+ record.value());
            }
        }
    }
}

依次运行生产者和消费者。控制台可以看到消费者接收到的消息

4. 客户端链接异常信息处理

如果运行代码过程中,java客户端连接出现Connection refused: no further information错误:

java.net.ConnectException: Connection refused: no further information
   ............................省略其他错误信息...................

修改 ${KAFKA_HOME}/config/server.properties

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners = PLAINTEXT://localhost:9092

将localhost修改为kafka所在服务器的IP地址即可。如果java程序和kafka在同一个服务器上,则不需要修改。

到此这篇关于Kafka在客户端实现消息的发送与读取的文章就介绍到这了,更多相关Kafka消息发送与读取内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文