Kafka在客户端实现消息的发送与读取
作者:warybee
1.创建Maven工程
引入kafka相关依赖,POM文件如下:
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build>
2.生产者API(Producer API)
2.1 生成者处理流程
- Producer创建时,会创建一个Sender线程并设置为守护线程。
- 生产消息时,内部其实是异步流程;生产的消息先经过拦截器->序列化器->**分区器,**然后将消 息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
- 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算 哪个。
- 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失 败原因允许重试,那么客户端内部会对该消息进行重试。
- 落盘到broker成功,返回生产元数据给生产者。
- 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。
2.2 常用参数介绍
生产者主要的对象有: KafkaProducer , ProducerRecord 。
- KafkaProducer 是用于发送消息的类;
- ProducerRecord 类用于封装Kafka的消息。
KafkaProducer 的实例化需要指定的参数,Producer的参数定义在 org.apache.kafka.clients.producer.ProducerConfig类中。
常用参数说明如下:
- bootstrap.servers: 配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生 产者需要连接的是Kafka集群,则这里配置集群中几个broker的地址,而不 是全部,当生产者连接上此处指定的broker之后,在通过该连接发现集群 中的其他节点。
- key.serializer: 要发送信息的key数据的序列化类。kafka-clients提供了常用类型的序列化类,序列化类都实现了org.apache.kafka.common.serialization.Serializer 接口。
- **value.serializer:**要发送消息的alue数据的序列化类。kafka-clients提供了常用类型的序列化类,序列化类都实现了org.apache.kafka.common.serialization.Serializer 接口。
acks: 默认值:all。
acks=0: 生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息 已经发送完成。 该情形不能保证broker是否真的收到了消息,retries配置也不会生效。发 送的消息的返回的消息偏移量永远是-1
acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的 确认。 在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得 及同步该消息,则该消息丢失。
acks=all 首领分区会等待所有的ISR副本分区确认记录。 该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。 这是Kafka最强的可靠性保证,等效于 acks=-1
- retries: 设置该属性为一个大于1的值,将在消息发送失败的时候重新发送消 息。该重试与客户端收到异常重新发送并无二至。允许重试但是不设 置参数max.in.flight.requests.per.connection为1,存在消息乱序 的可能,因为如果两个批次发送到同一个分区,第一个失败了重试, 第二个成功了,则第一个消息批在第二个消息批后。int类型的值,默 认:0,可选值:[0,…,2147483647
- compression.type: 生产者生成数据的压缩格式。默认是none(没有压缩)。允许的 值:none,gzip,snappy和lz4。压缩是对整个消息批次来讲 的。消息批的效率也影响压缩的比例。消息批越大,压缩效率越好。默认是none。
2.3 生产者代码
package com.warybee; import org.apache.kafka.clients.producer.*; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; /** * @author joy */ public class KafkaProducerDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { Map<String, Object> configs = new HashMap<>(); // 设置连接Kafka的初始连接用到的服务器地址 configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); // 设置key的序列化类 configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer"); // 设置value的序列化类 configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configs.put(ProducerConfig.ACKS_CONFIG,"all"); KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs); //发送100条消息 for (int i = 0; i < 100; i++) { ProducerRecord<Integer,String> producerRecord=new ProducerRecord<> ( "test_topic_1", 0, i, "test topic msg "+i); //消息的异步确认 kafkaProducer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception exception) { if (exception==null){ System.out.println("消息的主题:"+recordMetadata.topic()); System.out.println("消息的分区:"+recordMetadata.partition()); System.out.println("消息的偏移量:"+recordMetadata.offset()); }else { System.out.println("发送消息异常"); } } }); } // 关闭生产者 kafkaProducer.close(); } }
3.消费者API(Consumer API)
3.1 常用参数介绍
KafkaConsumer 的实例化需要指定的参数,Consumer的参数定义在 org.apache.kafka.clients.consumer.ConsumerConfig类中。
常用参数说明如下:
- bootstrap.servers: 配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生 产者需要连接的是Kafka集群,则这里配置集群中几个broker的地址,而不 是全部,当生产者连接上此处指定的broker之后,在通过该连接发现集群 中的其他节点。
- key.deserializer: key数据的反序列化类。kafka-clients提供了常用类型的反序列化类,反序列化类都实现了org.apache.kafka.common.serialization.Deserializer 接口。
- value.deserializer: Value数据的反序列化类。kafka-clients提供了常用类型的反序列化类,反序列化类都实现了org.apache.kafka.common.serialization.Deserializer 接口。
- group.id: 消费组ID,用于指定当前消费者属于哪个消费组。
- auto.offset.reset: 当kafka中没有偏移量或者当前偏移量在服务器中不存在时,kafka该如何处理?参数值如下:
- earliest: automatically reset the offset to the earliest offset(自动重置偏移量到最早的偏移量)
- latest: automatically reset the offset to the latest offset(自动重置偏移量为最新的)
- none: throw exception to the consumer if no previous offset is found for the consumer’s group(如果消费组上一个偏移量不存在,向consumer 抛出异常)
- anything: throw exception to the consumer.(向consumer 抛出异常)
- client.id : 消费消息的时候向服务器发送的id字符串。在ip/port基础上 提供应用的逻辑名称,记录在服务端的请求日志中,用于追踪请求的源。
- enable.auto.commit : 如果设置为true,消费者会自动周期性地向服务器提交偏移量。
3.2 消费者与消费组概念介绍
每一个Consumer属于一个特定的Consumer Group,消费者可以通过指定group.id,来确定其所在消费组。
group_id一般设置为应用的逻辑名称。比如多个订单处理程序组成一个消费组,可以设置group_id 为"order_process"。
消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费
一个拥有四个分区的主题,包含一个消费者的消费组。此时,消费组中的消费者消费主题中的所有分区。
如果在消费组中添加一个消费者2,则每个消费者分别从两个分区接收消息。
如果消费组有四个消费者,则每个消费者可以分配到一个分区
如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任 何消息
向消费组添加消费者是横向扩展消费能力的主要方式。
3.3 消费者代码
package com.warybee; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.*; /** * @author joy */ public class KafkaConsumerDemo { public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); // 设置连接Kafka的初始连接用到的服务器地址 // 如果是集群,则可以通过此初始连接发现集群中的其他broker configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092"); //KEY反序列化类 configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); //value反序列化类 configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo"); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //创建消费者对象 KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs); List<String> topics = new ArrayList<>(); topics.add("test_topic_1"); //消费者订阅主题 consumer.subscribe(topics); while (true){ //批量拉取主题消息,每3秒拉取一次 ConsumerRecords<Integer, String> records = consumer.poll(3000); //变量消息 for (ConsumerRecord<Integer, String> record : records) { System.out.println("主题:"+record.topic() + "\t" +"分区:" + record.partition() + "\t" +"偏移量:" + + record.offset() + "\t" +"Key:"+ record.key() + "\t" +"Value:"+ record.value()); } } } }
依次运行生产者和消费者。控制台可以看到消费者接收到的消息
4. 客户端链接异常信息处理
如果运行代码过程中,java客户端连接出现Connection refused: no further information错误:
java.net.ConnectException: Connection refused: no further information ............................省略其他错误信息...................
修改 ${KAFKA_HOME}/config/server.properties
# The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 listeners = PLAINTEXT://localhost:9092
将localhost修改为kafka所在服务器的IP地址即可。如果java程序和kafka在同一个服务器上,则不需要修改。
到此这篇关于Kafka在客户端实现消息的发送与读取的文章就介绍到这了,更多相关Kafka消息发送与读取内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!