SpringBoot实现Logback输出日志到Kafka方式
作者:不会画画的画师
本文介绍了如何在SpringBoot应用中通过自定义Appender实现Logback输出日志到Kafka,包括配置maven依赖、Kafka工具类和logback.xml配置
SpringBoot Logback输出日志到Kafka
本文通过在SpringBoot应用中创建一个自定义的Appender从而实现Logback输出日志到Kafka。
pom.xml
pom.xml中配置相关的maven依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.demo</groupId> <artifactId>log2kafka</artifactId> <version>0.0.1-SNAPSHOT</version> <name>log2kafka</name> <description>Demo project for send log to kafka</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.60</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!-- logback插件 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <!-- kafka插件 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
不建议修改pom文件中的kafka版本,容易造成各种各样的错误。
KafkaUtil.java
创建一个kafka工具类,用于配置生成Producer
package com.demo.log2kafka.util; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; public class KafkaUtil { public static Producer<String, String> createProducer( String bootstrapServers, String batchSize, String lingerMs, String compressionType, String retries, String maxRequestSize) { // 当配置项为IS_UNDEFINED时,使用默认值 if (bootstrapServers == null) { bootstrapServers = "localhost:9092"; } if (batchSize.contains("IS_UNDEFINED")) { batchSize = "50000"; } if (lingerMs.contains("IS_UNDEFINED")) { lingerMs = "60000"; } if (retries.contains("IS_UNDEFINED")) { retries = "3"; } if (maxRequestSize.contains("IS_UNDEFINED")) { maxRequestSize = "5242880"; } Properties properties = new Properties(); // kafka地址,集群用逗号分隔开 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // acks取值: // 0: kafka不返回确认信息,不保证record是否被收到,因为没有返回所以重试机制不会生效 // 1: partition leader确认record写入到日志中,但不保证信息是否被正确复制(建议设为该值) // all: leader会等待所有信息被同步后返回确认信息 properties.put(ProducerConfig.ACKS_CONFIG, "1"); properties.put(ProducerConfig.RETRIES_CONFIG, Integer.valueOf(retries)); // 批量发送,当达到batch size最大值触发发送机制(10.0后支持批量发送) properties.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.valueOf(batchSize)); // 该配置是指在batch.size数量未达到时,指定时间内也会推送数据 properties.put(ProducerConfig.LINGER_MS_CONFIG, Integer.valueOf(lingerMs)); // 配置缓存 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); if (!compressionType.contains("IS_UNDEFINED")) { // 指定压缩算法 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType); } // 每个请求的消息大小 properties.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Integer.valueOf(maxRequestSize)); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<String, String>(properties); } }
KafkaAppender.java
package com.demo.log2kafka.appender; import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.ConsoleAppender; import com.demo.log2kafka.util.KafkaUtil; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class KafkaAppender extends ConsoleAppender<ILoggingEvent> { public static final Logger LOGGER = LoggerFactory.getLogger(KafkaAppender.class); private String bootstrapServers; private String topic; private String batchSize; private String lingerMs; private String compressionType; private String retries; private String maxRequestSize; private String isSend; private Producer<String, String> producer; @Override public String toString() { return "KafkaAppender{" + "bootstrapServers='" + bootstrapServers + '\'' + ", topic='" + topic + '\'' + ", batchSize='" + batchSize + '\'' + ", lingerMs='" + lingerMs + '\'' + ", compressionType='" + compressionType + '\'' + ", retries='" + retries + '\'' + ", maxRequestSize='" + maxRequestSize + '\'' + ", isSend='" + isSend + '\'' + ", producer=" + producer + '}'; } @Override public void start() { super.start(); if ("true".equals(this.isSend)) { if (producer == null) { producer = KafkaUtil.createProducer(this.bootstrapServers, this.batchSize, this.lingerMs, this.compressionType, this.retries, this.maxRequestSize); } } } @Override public void stop() { super.stop(); if ("true".equals(this.isSend)) { this.producer.close(); } LOGGER.info(Markers.KAFKA, "Stopping kafkaAppender..."); } @Override protected void append(ILoggingEvent eventObject) { byte[] byteArray; String log; // 对日志格式进行解码 byteArray = this.encoder.encode(eventObject); log = new String(byteArray); ProducerRecord<String, String> record = new ProducerRecord<>(this.topic, log); if (eventObject.getMarker() == null && "true".equals(this.isSend)) { producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { LOGGER.error(Markers.KAFKA, "Send log to kafka failed: [{}]", log); } } }); } } public String getBootstrapServers() { return bootstrapServers; } public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public String getBatchSize() { return batchSize; } public void setBatchSize(String batchSize) { this.batchSize = batchSize; } public String getLingerMs() { return lingerMs; } public void setLingerMs(String lingerMs) { this.lingerMs = lingerMs; } public String getCompressionType() { return compressionType; } public void setCompressionType(String compressionType) { this.compressionType = compressionType; } public String getRetries() { return retries; } public void setRetries(String retries) { this.retries = retries; } public String getMaxRequestSize() { return maxRequestSize; } public void setMaxRequestSize(String maxRequestSize) { this.maxRequestSize = maxRequestSize; } public Producer<String, String> getProducer() { return producer; } public void setProducer(Producer<String, String> producer) { this.producer = producer; } public String getIsSend() { return isSend; } public void setIsSend(String isSend) { this.isSend = isSend; } }
为了实现根据指定格式发送Kafka日志,直接继承了ConsoleAppender
.
logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="./logs"/> <springProperty scope="context" name="springAppName" source="spring.application.name"/> <!-- 读取配置文件中kafka的信息 --> <springProperty scope="context" name="isSend" source="log.config.kafka.isSend" defalutValue="false"/> <springProperty scope="context" name="bootstrapServers" source="log.config.kafka.bootstrapServers" defalutValue="localhost:9002"/> <springProperty scope="context" name="topic" source="log.config.kafka.topic" defalutValue="test-topic"/> <springProperty scope="context" name="batchSize" source="log.config.kafka.batchSize" defalutValue="1"/> <springProperty scope="context" name="lingerMs" source="log.config.kafka.lingerMs" defalutValue="1000"/> <springProperty scope="context" name="compressionType" source="log.config.kafka.compressionType" defalutValue="gzip"/> <springProperty scope="context" name="retries" source="log.config.kafka.retries" defalutValue="3"/> <springProperty scope="context" name="maxRequestSize" source="log.config.kafka.maxRequestSize" defalutValue="5242880"/> <!-- 根据需要自行配置 --> <property name="APP_NAME" value="${springAppName}"/> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern> { "timestamp":"%date{yyyy-MM-dd HH:mm:ss.SSS}", "app": "${APP_NAME}", "logLevel": "%level", "message": "%message" }\n </pattern> </encoder> </appender> <appender name="KAFKA" class="com.demo.log2kafka.appender.KafkaAppender" > <!-- encoder必须配置, 日志格式 --> <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> <pattern> { "timestamp":"%date{yyyy-MM-dd HH:mm:ss.SSS}", "app": "${APP_NAME}", "logLevel": "%level", "message": "%message" }\n </pattern> </encoder> <bootstrapServers>${bootstrapServers}</bootstrapServers> <topic>${topic}</topic> <batchSize>${batchSize}</batchSize> <lingerMs>${lingerMs}</lingerMs> <compressionType>${compressionType}</compressionType> <retries>${retries}</retries> <maxRequestSize>${maxRequestSize}</maxRequestSize> <isSend>${isSend}</isSend> </appender> <!-- 使用logback-kafka-appender 当日志级别配为debug时,请使用该配置,不要使用root --> <logger name="com.demo.log2kafka" level="DEBUG"> <appender-ref ref="KAFKA"/> </logger> <!-- 日志输出级别 --> <root level="INFO"> <!-- 用于控制台输出 --> <appender-ref ref="STDOUT"/> </root> </configuration>
application.yml
spring: application: name: log2kafka # 不使用时可以不配置 log: config: kafka: # 是否将日志发送至kafka,true或false,使用时必须配置 isSend: true # kafka的地址,使用时必须配置 bootstrapServers: 192.168.254.152:9092,192.168.254.156:9092 # 日志发往的topic,使用时必须配置 topic: test-topic # # 批量上传数目,达到该数目后发送 batchSize: 5 # # 间隔时间后发送,即使未达到批量上传最大数,间隔时间到了也会发送,单位为毫秒 lingerMs: 1000 # # 数据压缩类型 # compressionType: gzip # # 重试次数 # retries: 3 # # 最大消息大小,此处设为5M # maxRequestSize: 5242880 server: port: 9090
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。