Kafka中的producer拦截器与consumer拦截器详解
作者:warybee
1. producer 拦截器(interceptor)
1.1 介绍
Producer 的Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做 一些定制化需求,比如修改消息等。Producer允许指定多个Interceptor按照指定顺序作用于一条消 息从而形成一个拦截链(interceptor chain)。
自定义的拦截器(interceptor)需要实现org.apache.kafka.clients.producer.ProducerInterceptor接口,接口定义如下:
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable {
/**
该方法封装进KafkaProducer.send()方法中,方法会在消息发送之前被调用,用户可以在该方法中对消息做任
何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算
*/
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
/**
该方法会在消息成功提交或发送失败之后被调用,
KafkaProducer.send()异步发送有回调通知 callback, onAcknowledgement 的调用要早于 callback 的调用。
*/
void onAcknowledgement(RecordMetadata metadata, Exception exception);
/**
关闭Interceptor,主要用于执行一些资源清理工作。
*/
void close();
}
- onSend() : 方法封装进KafkaProducer.send()方法中,方法会在消息发送之前被调用,用户可以在该方法中对消息做任
- 何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算;
- onAcknowledgement(): 当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时调用此方法。KafkaProducer.send()异步发送有回调通知 callback, onAcknowledgement 的调用要早于 callback 的调用。
- close(): 关闭Interceptor,主要用于执行一些资源清理工作。
如果指定了多个Interceptor,则Producer将按照指定顺序调用它们,如果interceptor出现异常Producer仅仅是捕获每个 Interceptor抛出的异常记录到错误日志中而非在向上传递。
1.2 案例
实现两个拦截器(interceptor),组成拦截链。第一个拦截器在消息发送前,给消息添加header。第二个拦截器统计消息的发送成功数和失败数。
PS:其实这两个功能可以放在一个interceptor中,这里仅仅是为了演示多个interceptor。
定义拦截器
拦截器1:
public class MyInterceptor implements ProducerInterceptor<Integer,String> {
/**
* 消息发送之前调用
* @param record
* @return
*/
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
//消息的主题
String topic = record.topic();
Integer partition = record.partition();
Integer key = record.key();
String value = record.value();
Headers headers = record.headers();
//给header添加时间戳
String stamp = System.currentTimeMillis()+"";
headers.add("timestamp",stamp.getBytes(StandardCharsets.UTF_8));
ProducerRecord<Integer,String> resultRecord=
new ProducerRecord<>(topic,partition,key,value,headers);
return resultRecord;
}
/**
* 当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时调用此方法
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}拦截器2:
public class MyInterceptor02 implements ProducerInterceptor<Integer,String> {
private int errorNum=0;
private int successNum=0;
/**
* 消息发送之前调用
* @param record
* @return
*/
@Override
public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
return record;
}
/**
* 当发送到服务器的记录已被确认时,或者当发送记录在发送到服务器之前失败时调用此方法
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception==null){
successNum++;
}else {
errorNum++;
}
}
@Override
public void close() {
System.out.println("消息发送成功数: " + successNum);
System.out.println("消息发送失败数: " + errorNum);
}
@Override
public void configure(Map<String, ?> configs) {
}
}生产者
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"拦截器类全路径");,多个拦截器使用,分割
public class KafkaProducerDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Map<String, Object> configs = new HashMap<>();
// 设置连接Kafka的初始连接用到的服务器地址
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
// 设置key的序列化类
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
// 设置value的序列化类
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configs.put(ProducerConfig.ACKS_CONFIG,"all");
//添加拦截器
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyInterceptor,com.warybee.interceptor.MyInterceptor02");
KafkaProducer<Integer,String> kafkaProducer=new KafkaProducer<Integer, String>(configs);
//发送100条消息
for (int i = 0; i < 100; i++) {
ProducerRecord<Integer,String> producerRecord=new ProducerRecord<>
( "test_topic_1",
0,
i,
"test topic msg "+i);
//消息的异步确认
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (exception==null){
System.out.println("消息的主题:"+recordMetadata.topic());
System.out.println("消息的分区:"+recordMetadata.partition());
System.out.println("消息的偏移量:"+recordMetadata.offset());
}else {
System.out.println("发送消息异常");
}
}
});
}
// 关闭生产者
kafkaProducer.close();
}
}2 Consumer拦截器(interceptor)
2.1.介绍
消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。
- ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。
- 常用在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
- ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
- 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
- 从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。
自定义的拦截器(interceptor)需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口,接口定义如下:
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
/**
该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。
该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。
*/
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
/**
当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
调用者将忽略此方法抛出的任何异常。
*/
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
/**
* 关闭Interceptor之前调用
*/
void close();
}方法说明:
- onConsume 该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。
- onCommit 当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。
2.2 案例
定义拦截器
public class MyConsumerInterceptor implements ConsumerInterceptor<Integer,String> {
@Override
public ConsumerRecords<Integer, String> onConsume(ConsumerRecords<Integer, String> records) {
//在这里可以对接收到的消息进行修改
//如不做处理,直接返回即可
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp,offsetAndMetadata) -> {
System.out.println(tp+" : "+offsetAndMetadata.offset());
});
}
@Override
public void close() {
}
/**
* 用于获取消费者的设置参数
* @param configs
*/
@Override
public void configure(Map<String, ?> configs) {
configs.forEach((k, v) -> {
System.out.println(k + "\t" + v);
});
}
}消费者
在消费者客户端配置中增加如下配置
如果有多个拦截器,用,分割即可
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
public class KafkaConsumerDemo {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
// 设置连接Kafka的初始连接用到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
//KEY反序列化类
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
//value反序列化类
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
//创建消费者对象
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);
List<String> topics = new ArrayList<>();
topics.add("test_topic_1");
//消费者订阅主题
consumer.subscribe(topics);
while (true){
//批量拉取主题消息,每3秒拉取一次
ConsumerRecords<Integer, String> records = consumer.poll(3000);
//变量消息
for (ConsumerRecord<Integer, String> record : records) {
System.out.println(record.topic() + "\t"
+ record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value());
}
}
}
}到此这篇关于Kafka中的producer拦截器与consumer拦截器详解的文章就介绍到这了,更多相关producer拦截器与consumer拦截器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
