logback的AsyncAppender高效日志处理方式源码解析
作者:codecraft
这篇文章主要为大家介绍了logback的AsyncAppender高效日志处理方式源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
序
本文主要研究一下logback的AsyncAppender
AsyncAppender
ch/qos/logback/classic/AsyncAppender.java
public class AsyncAppender extends AsyncAppenderBase<ILoggingEvent> { boolean includeCallerData = false; /** * Events of level TRACE, DEBUG and INFO are deemed to be discardable. * @param event * @return true if the event is of level TRACE, DEBUG or INFO false otherwise. */ protected boolean isDiscardable(ILoggingEvent event) { Level level = event.getLevel(); return level.toInt() <= Level.INFO_INT; } protected void preprocess(ILoggingEvent eventObject) { eventObject.prepareForDeferredProcessing(); if (includeCallerData) eventObject.getCallerData(); } public boolean isIncludeCallerData() { return includeCallerData; } public void setIncludeCallerData(boolean includeCallerData) { this.includeCallerData = includeCallerData; } }
AsyncAppender继承了AsyncAppenderBase,它新增了includeCallerData配置,另外覆盖了isDiscardable、preprocess方法,isDiscardable针对TRACE、DEBUG的级别返回true,INFO返回false;preprocess则判断是否includeCallerData,是的话则执行eventObject.getCallerData()
AsyncAppenderBase
ch/qos/logback/core/AsyncAppenderBase.java
public class AsyncAppenderBase<E> extends UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> { AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>(); BlockingQueue<E> blockingQueue; /** * The default buffer size. */ public static final int DEFAULT_QUEUE_SIZE = 256; int queueSize = DEFAULT_QUEUE_SIZE; int appenderCount = 0; static final int UNDEFINED = -1; int discardingThreshold = UNDEFINED; boolean neverBlock = false; Worker worker = new Worker(); /** * The default maximum queue flush time allowed during appender stop. If the * worker takes longer than this time it will exit, discarding any remaining * items in the queue */ public static final int DEFAULT_MAX_FLUSH_TIME = 1000; int maxFlushTime = DEFAULT_MAX_FLUSH_TIME; /** * Is the eventObject passed as parameter discardable? The base class's implementation of this method always returns * 'false' but sub-classes may (and do) override this method. * <p/> * <p>Note that only if the buffer is nearly full are events discarded. Otherwise, when the buffer is "not full" * all events are logged. * * @param eventObject * @return - true if the event can be discarded, false otherwise */ protected boolean isDiscardable(E eventObject) { return false; } /** * Pre-process the event prior to queueing. The base class does no pre-processing but sub-classes can * override this behavior. * * @param eventObject */ protected void preprocess(E eventObject) { } @Override public void start() { if (isStarted()) return; if (appenderCount == 0) { addError("No attached appenders found."); return; } if (queueSize < 1) { addError("Invalid queue size [" + queueSize + "]"); return; } blockingQueue = new ArrayBlockingQueue<E>(queueSize); if (discardingThreshold == UNDEFINED) discardingThreshold = queueSize / 5; addInfo("Setting discardingThreshold to " + discardingThreshold); worker.setDaemon(true); worker.setName("AsyncAppender-Worker-" + getName()); // make sure this instance is marked as "started" before staring the worker Thread super.start(); worker.start(); } @Override public void stop() { if (!isStarted()) return; // mark this appender as stopped so that Worker can also processPriorToRemoval if it is invoking // aii.appendLoopOnAppenders // and sub-appenders consume the interruption super.stop(); // interrupt the worker thread so that it can terminate. Note that the interruption can be consumed // by sub-appenders worker.interrupt(); InterruptUtil interruptUtil = new InterruptUtil(context); try { interruptUtil.maskInterruptFlag(); worker.join(maxFlushTime); // check to see if the thread ended and if not add a warning message if (worker.isAlive()) { addWarn("Max queue flush timeout (" + maxFlushTime + " ms) exceeded. Approximately " + blockingQueue.size() + " queued events were possibly discarded."); } else { addInfo("Queue flush finished successfully within timeout."); } } catch (InterruptedException e) { int remaining = blockingQueue.size(); addError("Failed to join worker thread. " + remaining + " queued events may be discarded.", e); } finally { interruptUtil.unmaskInterruptFlag(); } } @Override protected void append(E eventObject) { if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) { return; } preprocess(eventObject); put(eventObject); } protected boolean isDiscardable(E eventObject) { return false; } protected void preprocess(E eventObject) { } private boolean isQueueBelowDiscardingThreshold() { return (blockingQueue.remainingCapacity() < discardingThreshold); } private void put(E eventObject) { if (neverBlock) { blockingQueue.offer(eventObject); } else { putUninterruptibly(eventObject); } } private void putUninterruptibly(E eventObject) { boolean interrupted = false; try { while (true) { try { blockingQueue.put(eventObject); break; } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } //...... }
AsyncAppenderBase继承了UnsynchronizedAppenderBase,实现了AppenderAttachable接口,它定义了queueSize、discardingThreshold、neverBlock等属性,其start方法会根据queueSize创建ArrayBlockingQueue,discardingThreshold默认为queueSize / 5,之后启动Wroker;stop方法则执行worker.interrupt(),然后等待maxFlushTime让log进行flush;其append方法会先判断isQueueBelowDiscardingThreshold及isDiscardable,都为true则直接返回,否则执行preprocess、put方法
Worker
ch/qos/logback/core/AsyncAppenderBase.java
class Worker extends Thread { public void run() { AsyncAppenderBase<E> parent = AsyncAppenderBase.this; AppenderAttachableImpl<E> aai = parent.aai; // loop while the parent is started while (parent.isStarted()) { try { E e = parent.blockingQueue.take(); aai.appendLoopOnAppenders(e); } catch (InterruptedException ie) { break; } } addInfo("Worker thread will flush remaining events before exiting. "); for (E e : parent.blockingQueue) { aai.appendLoopOnAppenders(e); parent.blockingQueue.remove(e); } aai.detachAndStopAllAppenders(); } }
Worker的run方法会不断循环从blockingQueue阻塞取出原生,然后添加到AppenderAttachableImpl;在started为false的时候跳槽循环,然后遍历blockingQueue,添加到AppenderAttachableImpl,然后将其从blockingQueue;最后执行detachAndStopAllAppenders
AppenderAttachableImpl
ch/qos/logback/core/spi/AppenderAttachableImpl.java
public int appendLoopOnAppenders(E e) { int size = 0; final Appender<E>[] appenderArray = appenderList.asTypedArray(); final int len = appenderArray.length; for (int i = 0; i < len; i++) { appenderArray[i].doAppend(e); size++; } return size; } /** * Remove and processPriorToRemoval all previously attached appenders. */ public void detachAndStopAllAppenders() { for (Appender<E> a : appenderList) { a.stop(); } appenderList.clear(); }
AppenderAttachableImpl的appendLoopOnAppenders方法会遍历所有的appenderList执行doAppend方法;其detachAndStopAllAppenders则遍历appenderList,挨个执行stop,最后clear掉整个appenderList
小结
logback的AsyncAppender使用ArrayBlockingQueue(默认size为256)来进行缓冲,每次append的时候会先判断isQueueBelowDiscardingThreshold及isDiscardable,为true则直接返回/丢弃,之后执行preprocess,最后执行put,put的时候有个参数neverBlock,为true则使用的是offer方法,队列满的时候会被丢弃,为false则是阻塞的方法,等到put成功才返回;另外它有个worker线程,不断从blockingQueue阻塞take元素出来然后写入到appenderList,在关闭时还会遍历队列写入到appenderList然后从队列移除,最后清空队列。
以上就是logback的AsyncAppender的详细内容,更多关于logback的AsyncAppender的资料请关注脚本之家其它相关文章!