java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Avro序列化和反序列化

Kafka中使用Avro序列化和反序列化详解

作者:warybee

这篇文章主要介绍了Kafka中使用Avro序列化和反序列化详解,由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组, 序列化器的作用就是用于序列化要发送的消息的,需要的朋友可以参考下

1.自定义序列化器与反序列化器

1.1 定义Order实体类

public class Order implements Serializable {
      private Integer OrderId;
      private String title;
    public Order() {
    }
    public Order(Integer orderId, String title) {
        OrderId = orderId;
        this.title = title;
    }
   // 省略必要的get与set方法
}

1.2 定义Order序列化类

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。 序列化器的作用就是用于序列化要发送的消息的。

Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将 泛型指定类型的数据转换为字节数组。

Kafka提供了如下常用类型的序列化类:

在这里插入图片描述

自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,并实现其中 的 serialize 方法。

public class OrderSerializer implements Serializer<Order> {
    @Override
    public byte[] serialize(String topic, Order data) {
        try {
            if (data == null) 
                return null;
            Integer orderId = data.getOrderId();
            String title = data.getTitle();
            int length = 0;
            byte[] bytes = null;
            if (null != title) {
                bytes = title.getBytes("utf-8");
                length = bytes.length;
            }
            //前4个字节保存orderId,
            //第二个4个字节保存title字段的长度
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
            buffer.putInt(orderId);
            buffer.putInt(length);
            buffer.put(bytes);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("序列化数据异常");
        }
    }
}

1.3 生产者代码

package com.warybee.c1;
import com.warybee.model.Order;
import com.warybee.serializer.OrderSerializer;
import org.apache.kafka.clients.producer.*;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 设置key的序列化类
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 设置自定义的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, OrderSerializer.class);
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        KafkaProducer<Integer, Order> kafkaProducer=new KafkaProducer<Integer, Order>(configs);
        //定义order
        Order order=new Order();
        order.setOrderId(1);
        order.setTitle("iphone13 pro 256G");
        ProducerRecord<Integer,Order> producerRecord=new ProducerRecord<Integer,Order>
        (       "test_order_topic",
                0,
                order.getOrderId(),
                order);
        //消息的异步确认
        kafkaProducer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
                if (exception==null){
                    System.out.println("消息的主题:"+recordMetadata.topic());
                    System.out.println("消息的分区:"+recordMetadata.partition());
                    System.out.println("消息的偏移量:"+recordMetadata.offset());
                }else {
                    System.out.println("发送消息异常");
                }
            }
        });
        // 关闭生产者
        kafkaProducer.close();
    }
}

1.4. 定义Order反序列化器

自定义反序列化类,需要实现 org.apache.kafka.common.serialization.Deserializer 接 口,并且实现其中的deserialize方法。

package com.warybee.deserializer;
import com.warybee.model.Order;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
/**
 * @author joy
 * @description Order反序列化类
 */
public class OrderDeserializer implements Deserializer<Order> {
    @Override
    public Order deserialize(String topic, byte[] data) {
        ByteBuffer allocate = ByteBuffer.allocate(data.length);
        allocate.put(data);
        allocate.flip();
        Integer orderId = allocate.getInt();
        int length = allocate.getInt();
        String title = new String(data, 8, length);
        return new Order(orderId, title);
    }
}

1.5 消费者代码

package com.warybee.c1;
import com.warybee.deserializer.OrderDeserializer;
import com.warybee.model.Order;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @author joy
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化类
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //自定义value反序列化类
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, OrderDeserializer.class);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo-3");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //创建消费者对象
        KafkaConsumer<Integer, Order> consumer = new KafkaConsumer<Integer, Order>(configs);
        List<String> topics = new ArrayList<>();
        topics.add("test_order_topic");
        //消费者订阅主题
        consumer.subscribe(topics);
        while (true){
            //批量拉取主题消息,每3秒拉取一次
            ConsumerRecords<Integer, Order> records = consumer.poll(3000);
            //变量消息
            for (ConsumerRecord<Integer, Order> record : records) {
                System.out.println(record.topic() + "\t"
                        + record.partition() + "\t"
                        + record.offset() + "\t"
                        + record.key() + "\t"
                        + record.value().getTitle());
            }
        }
    }
}

上面实现的序列化与反序列化类,定义繁琐,不具有通用性,一不小心就会BUG满天飞,不适合在实际项目中使用,只做了解原理即可。接下来使用Apache Avro来实现序列化和反序列化

2. 使用Avro序列化和反序列化

2.1 Apache Avro介绍

Apache Avro是一种与编程语言无关的序列化格式。提供了一种共享数据文件的方式。Avro 数据通过与语言无关的 schema 来定义。schema 通过 JSON 来描述,数据被序列化成二进制文件或 JSON 文件,不过一般会使用二进制文件。Avro 在读写文件时需要用到 schema,schema 一般会被内嵌在数据文件里。

Apache Avro官网文档

2.2 创建Maven项目

引入依赖

    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.7</version>
        </dependency>
       <!--Apache Avro 依赖-->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.0</version>
        </dependency>

Avro 插件

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.11.0</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/java/com/warybee/avro/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

Avro插件参数说明:

2.3 创建schema 文件

Apache Avro schema 是使用 JSON 定义的。详细的介绍,可以参考官网文档

定义一个order.avsc文件(文件所在目录要与上一步配置的sourceDirectory保持一致),内容如下:

{"namespace": "com.warybee.avro",
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "orderId", "type": "int"},
    {"name": "title",  "type": "string"},
    {"name": "num", "type": "int"}
  ]
}

IDEA话,可以安装一个Apache Avro IDL Schema Support插件,安装插件后编写schema 有智能提示。

2.4.Avro生成entity

上面配置了Avro 插件,通过maven命令,生成即可。

mvn install

或者IDEA右键->RUN Maven->install

在这里插入图片描述

2.5 生产者代码

package com.warybee.avro;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.*;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
/**
 * @author joy
 */
public class KafkaProducerDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        // 设置key的序列化类
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        // 设置value的序列化类
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        configs.put(ProducerConfig.ACKS_CONFIG,"all");
        KafkaProducer<Integer,byte[]> producer=new KafkaProducer<Integer, byte[]>(configs);
        //发送100条消息
        for (int i = 0; i < 100; i++) {
             Order order=Order.newBuilder()
                     .setOrderId(i+1)
                     .setTitle("订单: "+(i+1)+" iphone 13 pro 256G")
                     .setNum(1)
                     .build();
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, (BinaryEncoder)null);
            SpecificDatumWriter writer = new SpecificDatumWriter(order.getSchema());
            try {
                writer.write(order, encoder);
                encoder.flush();
                out.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
           ProducerRecord<Integer,byte[]> record=new ProducerRecord<>(
                   "test_avro_topic",
                   0,
                   order.getOrderId(),
                   out.toByteArray());
            //发送消息
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                      if (exception==null){
                          System.out.println("消息的主题:"+metadata.topic());
                          System.out.println("消息的分区:"+metadata.partition());
                          System.out.println("消息的偏移量:"+metadata.offset());
                      }else {
                          System.out.println("发送消息异常");
                      }
                }
            });
        }
        // 关闭生产者
        producer.close();
    }
}

2.6 消费者代码

package com.warybee.avro;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
 * @author joy
 */
public class KafkaConsumerDemo {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://192.168.235.132:9092");
        //KEY反序列化类
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
        //value反序列化类
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        //消费者所在的组ID
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer.demo.avro");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //创建消费者对象
        KafkaConsumer<Integer,  byte[]> consumer = new KafkaConsumer<Integer,  byte[]>(configs);
        List<String> topics = new ArrayList<>();
        topics.add("test_avro_topic");
        //消费者订阅主题
        consumer.subscribe(topics);
        SpecificDatumReader<Order> reader = new SpecificDatumReader<>(Order.getClassSchema());
        try {
            while (true){
                //批量拉取主题消息,每3秒拉取一次
                ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
                for (ConsumerRecord<Integer, byte[]> record : records) {
                    Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
                    Order order=null;
                    try {
                        order=reader.read(null,decoder);
                        System.out.println("订单ID:"+order.getOrderId()+"\t"
                                +"订单标题:"+order.getTitle()+"\t"
                                +"数量:"+order.getNum());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }finally {
            consumer.close();
        }
    }
}

到此这篇关于Kafka中使用Avro序列化和反序列化详解的文章就介绍到这了,更多相关Avro序列化和反序列化内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文