Springboot系列之kafka操作使用详解
作者:wotrd
kafka简介
ApacheKafka®是一个分布式流媒体平台。有三个关键功能:
- 发布和订阅记录流,类似于消息队列或企业消息传递系统。
- 以容错的持久方式存储记录流。
- 记录发生时处理流。
Kafka通常用于两大类应用:
- 构建可在系统或应用程序之间可靠获取数据的实时流数据管道
- 构建转换或响应数据流的实时流应用程序
kafka概念
(1)什么是流处理?
所谓流处理,我的理解是流水线处理。例如,电子厂每个人负责一个功能,来了就处
理,不来就等着。
(2)partition和replication和broker有关吗?
partition和replication是分区和备份的概念。即使是单机一个broker也一样支持。
(3)consumer如何设置和存储partition的offset偏移量,有哪几种消费模式,怎么确定消息是否被消费,将偏移量移到前面会立即消费到最后吗?
使用KafkaConsumer设置partition和offset。有自动提交和手动ack模式提交偏移量两种消费方式。将偏移量移到前面需要设置成为消费状态会立即被消费(设置新消费组)。
(4)AckMode模式有哪几种?
RECORD:处理记录后,侦听器返回时提交偏移量
BATCH:在处理poll()返回的所有记录时提交偏移量
TIME:只要已超过自上次提交以来的ackTime,就会在处理poll()返回的所有记录时提交偏移量
COUNT:只要自上次提交以来已收到ackCount记录,就会在处理poll()返回的所有记录时提交偏移量
COUNT_TIME:与TIME和COUNT类似,但如果任一条件为真,则执行提交
MANUAL:消息监听器负责确认()确认。 之后,应用与BATCH相同的语义
MANUAL_IMMEDIATE:当侦听器调用Acknowledgment.acknowledge()方法时,立即提交偏移量
Springboot使用kafka
(1)注入NewTopic自动在broker中添加topic
@Bean public NewTopic topic() { return new NewTopic("topic1", 2, (short) 1); }
(2)使用KafkaTemplate发送消息时,topic自动创建,自动创建的partition是0,长度为1
(3)使用KafkaTemplate发送消息
@RequestMapping("sendMsgWithTopic") public String sendMsgWithTopic(@RequestParam String topic, @RequestParam int partition, @RequestParam String key, @RequestParam String value) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, partition, key, value); return "success"; }
(4)异步发送消息
public void sendToKafka(final MyOutputData data) { final ProducerRecord<String, String> record = createRecord(data); ListenableFuture<SendResult<Integer, String>> future = template.send(record); future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() { @Override public void onSuccess(SendResult<Integer, String> result) { handleSuccess(data); } @Override public void onFailure(Throwable ex) { handleFailure(data, record, ex); } }); }
(5)同步发送消息
public void sendToKafka(final MyOutputData data) { final ProducerRecord<String, String> record = createRecord(data); try { template.send(record).get(10, TimeUnit.SECONDS); handleSuccess(data); }catch (ExecutionException e) { handleFailure(data, record, e.getCause()); }catch (TimeoutException | InterruptedException e) { handleFailure(data, record, e); } }
(6)事务
(1)Spring事务支持一起使用(@Transactional,TransactionTemplate等) (2)使用template执行事务 boolean result = template.executeInTransaction(t -> { t.sendDefault("thing1", "thing2"); t.sendDefault("cat", "hat"); return true; });
(7)消费者
(1)简单使用 @KafkaListener(id = "myListener", topics = "myTopic", autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}") public void listen(String data) { ... } (2)配置多个topic和partition,TopicPartition中partitions和PartitionOffset不能同时使用 @KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... } (3)使用ack手动确认模式 @KafkaListener(id = "cat", topics = "myTopic", containerFactory = "kafkaManualAckListenerContainerFactory") public void listen(String data, Acknowledgment ack) { ... ack.acknowledge(); } (4)获取消息的header信息 @KafkaListener(id = "qux", topicPattern = "myTopic1") public void listen(@Payload String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts ) { ... } (5)批处理 @KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory") public void listen(List<String> list, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions, @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics, @Header(KafkaHeaders.OFFSET) List<Long> offsets) { ... } (6)使用@Valid校验数据 @KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler", containerFactory = "kafkaJsonListenerContainerFactory") public void validatedListener(@Payload @Valid ValidatedClass val) { ... } @Bean public KafkaListenerErrorHandler validationErrorHandler() { return (m, e) -> { ... }; } (7)topic根据参数类型映射不同方法 @KafkaListener(id = "multi", topics = "myTopic") static class MultiListenerBean { @KafkaHandler public void listen(String cat) { ... } @KafkaHandler public void listen(Integer hat) { ... } @KafkaHandler public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) { ... } }
Springboot使用kafka踩坑
(1)需要修改server.properties的listener主机地址不然Java获取不到消息。
(2)不同服务配置相同groupId只有一个监听者可以收到消息
kafka图形化工具 kafka tool
下载地址 http://www.kafkatool.com/down...
以上就是Springboot系列之kafka操作使用详解的详细内容,更多关于Springboot kafka操作的资料请关注脚本之家其它相关文章!