普通java项目集成kafka方式
作者:西柚感觉日了狗
文章介绍了如何在非Spring Cloud或Spring Boot项目中配置和使用Kafka,提供了一个简单的Kafka配置读取类,可以灵活地从不同配置中读取属性,并提供默认值
现在假设一种需求,我方业务系统要与某服务平台通过kafka交互,异步获取服务,而系统架构可能老旧,不是spring cloud桶,不是spring boot,只是java普通项目或者 java web项目
依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>2.4.1</version> </dependency>
Kafka配置读取类
本文后边没用到,直接填配置了,简单点
但如果生产需要,还是有这个类比较好,可以从不同配置中读取,同时给个默认值
import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.util.Properties; /** * kafka配置读取类 * * @author zzy */ public class KafkaProperties { private static final Logger LOG = LoggerFactory.getLogger(KafkaProperties.class); private static Properties serverProps = new Properties(); private static Properties clientProps = new Properties(); private static Properties producerProps = new Properties(); private static Properties consumerProps = new Properties(); private static KafkaProperties instance = null; private KafkaProperties() { String filePath = System.getProperty("user.dir") + File.separator + "kafkaConf" + File.separator; File file; FileInputStream fis = null; try { file = new File(filePath + "producer.properties"); if (file.exists()) { fis = new FileInputStream(filePath + "producer.properties"); producerProps.load(fis); } file = new File(filePath + "consumer.properties"); if (file.exists()) { fis = new FileInputStream(filePath + "consumer.properties"); consumerProps.load(fis); } file = new File(filePath + "server.properties"); if (file.exists()) { fis = new FileInputStream(filePath + "server.properties"); serverProps.load(fis); } file = new File(filePath + "client.properties"); if (file.exists()) { fis = new FileInputStream(filePath + "client.properties"); clientProps.load(fis); } } catch (Exception e) { LOG.error("init kafka props error." + e.getMessage()); } finally { if (fis != null) { try { fis.close(); } catch (IOException e) { LOG.error("close kafka properties fis error." + e); } } } } /** * 获取懒汉式单例 */ public static synchronized KafkaProperties getInstance() { if (instance == null) { instance = new KafkaProperties(); } return instance; } /** * 获取配置,获取不到时使用参数的默认配置 */ public String getValue(String key, String defaultValue) { String value; if (StringUtils.isEmpty(key)) { LOG.error("key is null or empty"); } value = getPropsValue(key); if (value == null) { LOG.warn("kafka property getValue return null, the key is " + key); value = defaultValue; } LOG.info("kafka property getValue, key:" + key + ", value:" + value); return value; } private String getPropsValue(String key) { String value = serverProps.getProperty(key); if (value == null) { value = producerProps.getProperty(key); } if (value == null) { value = consumerProps.getProperty(key); } if (value == null) { value = clientProps.getProperty(key); } return value; } }
producer
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.ExecutionException; /** * kafka producer * @author zzy */ public class KafkaProFormal { public static final Logger LOG = LoggerFactory.getLogger(KafkaProFormal.class); private Properties properties = new Properties(); private final String bootstrapServers = "bootstrap.servers"; private final String clientId = "client.id"; private final String keySerializer = "key.serializer"; private final String valueSerializer = "value.serializer"; //private final String securityProtocol = "security.protocol"; //private final String saslKerberosServiceName = "sasl.kerberos.service.name"; //private final String kerberosDomainName = "kerberos.domain.name"; private final String maxRequestSize = "max.request.size"; private KafkaProducer<String, String> producer; private volatile static KafkaProFormal kafkaProFormal; private KafkaProFormal(String servers) { properties.put(bootstrapServers, servers); properties.put(keySerializer, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(valueSerializer, "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(properties); } public static KafkaProFormal getInstance(String servers) { if(kafkaProFormal == null) { synchronized(KafkaProFormal.class) { if(kafkaProFormal == null) { kafkaProFormal = new KafkaProFormal(servers); } } } return kafkaProFormal; } public void sendStringWithCallBack(String topic, String message, boolean asyncFlag) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message); long startTime = System.currentTimeMillis(); if(asyncFlag) { //异步发送 producer.send(record, new KafkaCallBack(startTime, message)); } else { //同步发送 try { producer.send(record, new KafkaCallBack(startTime, message)).get(); } catch (InterruptedException e) { LOG.error("InterruptedException occured : {0}", e); } catch (ExecutionException e) { LOG.error("ExecutionException occured : {0}", e); } } } } class KafkaCallBack implements Callback { private static Logger LOG = LoggerFactory.getLogger(KafkaCallBack.class); private String key; private long startTime; private String message; KafkaCallBack(long startTime, String message) { this.startTime = startTime; this.message = message; } @Override public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if(metadata != null) { LOG.info("Record(" + key + "," + message + ") sent to partition(" + metadata.partition() + "), offset(" + metadata.offset() + ") in " + elapsedTime + " ms."); } else { LOG.error("metadata is null." + "Record(" + key + "," + message + ")", exception); } } }
consumer
import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.Properties; import java.util.Set; /** * kafka consumer * @author zzy */ public abstract class KafkaConFormal extends ShutdownableThread { private static final Logger LOG = LoggerFactory.getLogger(KafkaConFormal.class); private Set<String> topics; private final String bootstrapServers = "bootstrap.servers"; private final String groupId = "group.id"; private final String keyDeserializer = "key.deserializer"; private final String valueDeserializer = "value.deserializer"; private final String enableAutoCommit = "enable.auto.commit"; private final String autoCommitIntervalMs = "auto.commit.interval.ms"; private final String sessionTimeoutMs = "session.timeout.ms"; private KafkaConsumer<String, String> consumer; public KafkaConFormal(String topic) { super("KafkaConsumerExample", false); topics.add(topic); Properties props = new Properties(); props.put(bootstrapServers, "your servers"); props.put(groupId, "TestGroup"); props.put(enableAutoCommit, "true"); props.put(autoCommitIntervalMs, "1000"); props.put(sessionTimeoutMs, "30000"); props.put(keyDeserializer, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(valueDeserializer, "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<>(props); } /** * subscribe and handle the msg */ @Override public void doWork() { consumer.subscribe(topics); ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); dealRecords(records); } /** * 实例化consumer时,进行对消费信息的处理 * @param records records */ public abstract void dealRecords(ConsumerRecords<String, String> records); public void setTopics(Set<String> topics) { this.topics = topics; } }
使用
KafkaProFormal producer = KafkaProFormal.getInstance("kafka server1.1.1.1:9092,2.2.2.2:9092"); KafkaConFormal consumer = new KafkaConFormal("consume_topic") { @Override public void dealRecords(ConsumerRecords<String, String> records) { for (ConsumerRecord<String, String> record: records) { producer.sendStringWithCallBack("target_topic", record.value(), true); } } }; consumer.start();
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。