# 项目说明
本项目以 slf4j
作为日志门面,对其进行提供日志实现,以 SPI 机制为基础,提供各种日志服务。本项目主要作为学习使用,了解现代主流的日志框架提供的功能以及如何开发一个简化的日志框架。因为鄙人对并发很感兴趣,所以 cyan-log
项目也主要是在并发方面花功夫,考虑将性能优化到极致,汇聚一点 登峰造极。
# 开源仓库
暂时还没 push
# cyan-log
# log4j2
版本:2.19.0
# Logger
# AsyncLogger
参考链接:https://bryantchang.github.io/2019/01/15/log4j2-asyncLogger/
AsyncLogger 将日志事件传给了 disruptor 队列后,事件中封装了对应的 logger 和 config,消费者就会根据事件中的 config 来打印 logger,消费者的逻辑就和同步打印一样的。
为什么有了
AsyncAppender
,还要AsyncLogger
,这两个结合会如何,性能如何?请看该篇文章:https://stackoverflow.com/questions/24177601/difference-between-asynclogger-and-asyncappender-in-log4j2
# Appender
Appender
是一个路由 LogEvent 的管道,他决定了日志要去哪以及怎么去。主要分为两种 Appender
:同步 Appender
和异步 Appender
。
# AsyncAppender
建议看一下源码,该类本身代码量不多,给一个参考链接:https://segmentfault.com/a/1190000042853430
日志记录需要进行磁盘 IO 操作,非常耗时,同时为了保证日志记录的并发安全,在写入日志时还经常要加锁,这些导致性能严重下降。
先给出一个配置 demo 来演示异步 Appender
:
<?xml version="1.0" encoding="UTF-8"?> | |
<Configuration status="INFO"> | |
<Appenders> | |
<!-- 配置两个正常的 Appender --> | |
<Console name="MyConsole" target="SYSTEM_OUT"> | |
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY"/> | |
<PatternLayout pattern="%msg%n"/> | |
</Console> | |
<RollingFile name="MyFile" fileName="mylog.log" | |
filePattern="mylog.log.%i"> | |
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY" /> | |
<PatternLayout pattern="%msg%n"/> | |
<SizeBasedTriggeringPolicy size="20M"/> | |
</RollingFile> | |
<!-- 让异步 Appender 引用正常 Appender --> | |
<Async name="MyAsync"> | |
<AppenderRef ref="MyConsole"/> | |
<AppenderRef ref="MyFile"/> | |
</Async> | |
</Appenders> | |
<Loggers> | |
<!-- 让根日志打印器引用异步 Appender --> | |
<Root level="INFO"> | |
<Appender-ref ref="MyAsync"/> | |
</Root> | |
</Loggers> | |
</Configuration> |
AsyncAppender
类作为生产者,构建 LogEvent
放到阻塞队列中,同时内部有个属性为 AsyncAppenderEventDispatcher dispatcher
,该类继承了 Log4jThread
,其实就是消费者,该消费者最重要就是 dispatch()
方法,该方法就是将日志事件通过循环交给各个 AppenderRef
。另外两个方法就是:
dispatchAll()
:循环消费阻塞队列中的日志并打印dispatchRemaining()
:线程如果被停止,则调用 dispatchRemaining () 保证阻塞队列中的日志全部被打印
AsyncAppender
的 start()
方法初始化 AsyncAppenderEventDispatcher
,并在最后就会调用 dispatcher.start()
。
dispatchAll()
方法会把 logEvent
交给 AsyncAppender
关联的 Appender
。上述例子中也就是将 LogEvent
发送到 MyConsole
和 MyFile
,然后进行日志打印(都在同一个线程,而不会分别让 MyConsole
和 MyFIle
各自一个线程)。
下面就是整个异步打印的流程(图片来自上文引言中的链接):
现在流程中的参与成员都知道了,并且我们也知道生产者何时将 logEvent
加入到队列中(当 Logger 产生日志时)消费者如何进行消费,下面列出几个重要的点:
Appender
使用WriterManager
进行日志写入,后者内部使用的是Writer
,为了保证并发安全,WriterManager
在写入方法上使用了synchronized
关键字。(并发问题:多个 Logger 都使用了同一个 Appender,为了防止写入混乱,这是必要的上锁)。- 消费线程
AsyncAppenderEventDispatcher
何时启动又如何工作的?以 xml 配置为例,Log4j2
框架在首次获取Logger
时,会初始化LoggerContext
,而初始化LoggerContext
时有一个步骤就是将Log4j2
配置对象XmlConfiguration
设置给LoggerContext
并启动XmlConfiguration
,这里看一下XmlConfiguration
的start()
方法
public void start() { | |
// ... | |
// 遍历配置文件中配置的 Appender 并启动 | |
for (final Appender appender : appenders.values()) { | |
appender.start(); | |
} | |
if (!alreadyStarted.contains(root)) { | |
root.start(); | |
} | |
super.start(); | |
LOGGER.debug("Started configuration {} OK.", this); | |
} |
start()
方法的最上层定义是 LifeCycle
接口,如此定义是因为不止 Appender
存在异步组件, Logger
、 Filter
等组件也存在异步,所以大家的异步组件本质上都实现了该接口。然后在 config
中启动。
此时并发性能已经比较优秀了,但是我们可以看到有两个并发性能的瓶颈:
logEvent
的传递还是会在阻塞队列处停住,起码我们明显可以看出,一个并发安全的队列的并发性能是限制会限制日志写入- 一个队列分配一个线程来消费,本质上没有问题,但是后台线程在消费日志事件时还是会遇到锁竞争,因为后台线程
dispatch
会挨个将事件传给Appender
,再抢锁(synchronized)进行日志打印。造成这种原因的其实是一个队列中的事件种类不一样,所以也就需要分发器(dispatch
),而这个分发器又不是那么优秀(因为它不止需要分发,还需要打印)。
dispatch 承担的功能:取出、分发、打印,取出和打印都会阻塞(取出是否阻塞要看选用哪种队列),如果打印阻塞时间太长,反而会导致队列中事件堆积。
问题一:log4j2 的异步日志是通过队列来处理的,关于队列,Log4j2 支持生成以下四种队列:
ArrayBlockingQueue
:默认的队列,通过 java 原生的ArrayBlockingQueue
实现。DisruptorBlockingQueue
:disruptor
包实现的高性能队列,开源地址。JCToolsBlockingQueue
:JCTools
实现的无锁队列。LinkedTransferQueue
:通过 java7 以上原生支持的LinkedTransferQueue
实现。
DisruptorBlockingQueue
队列,它的特点为:无锁、环形空间。无锁就是 CAS
、环形队列使得空间循环利用,但是可能会覆盖,所以此时需要执行对应策略。
关于该队列的细节介绍,之后会在数据结构与算法的 tag
中开一篇,现在我们只需要知道,这个队列确实能够高效的、并发安全的传输 LogEvent,也不会导致 dispatch
取出日志变得很慢。
问题二:这正是 cyan-log 需要解决的问题:
如何减少写入同一文件时锁的竞争:那就是只让一个线程写入文件,从开始到结束,都是它来写入
如何只让一个线程来写入文件:多个 Logger 对于同一个文件的
LogEvent
,都放入通过一个队列中,然后写入线程只需要从队列中获取即可。如果某个文件的日志已经很久都没产生了,那么其写入线程就会一直阻塞,有点浪费,如何优化:不如使用线程池,阻塞队列提供了超时获取的方法,如果超时了就不再获取,那么线程可以复用,或者连续超时了很多次,这取决于你希望的策略。
我们应该如何初始化这样一个线程:这里有个子问题,也就是我如何判断线程已经初始化了?
正常思维是为每个文件(也就是日志去向)关联一个变量 A,当线程没有初始化(或者已经销毁复用)时,A 为 0,反之 A 为 1。A 涉及到原子修改,需要加锁或者 CAS,但是这并不会限制系统并发,因为这只是初始化阶段,初始化完毕就不会再执行(当然,复用还是会涉及到 A 的原子修改,还是那句话,这取决于你的策略,也就是是否要线程复用)。很遗憾,还是有点漏洞:
可以看出这种使用一个变量 A 来保证并发安全,是有问题的,它会使得:队列中还有消息没有消费,但是线程池中已经没有线程来消费了。当然,如果后续又来了一个日志进入队列中,那么它又会初始化一个线程,队列中的日志自然可以消费。
但是这是可以允许的吗?肯定不行,写一个程序,尤其是写一个健壮的程序,这才是关键。
对于这种情况,我们可以后台单独开一个线程来检测是否有队列处于这样的情况,但是这无疑使得问题变得复杂,我们甚至无法知道线程该何时运行,运行多久。
想一想,现在的情况是多个生产者,一个消费者,当消费者走到步骤 1 时,可能会有很多个生产者走到步骤 3,这是允许的,但不允许的是所有生产者都走到了步骤 3,不知道你们是否想到了解决办法,我的思路是使用读写锁来避免这种情况的出现。
很遗憾,我们又无可避免的使用了锁,但是要保证并发安全,我们又无法不这么做,为了提升并发,我们采用的是读写锁,思考一下,这很合适,某种诡异的契合,不是吗?
现‘在引入了读写锁,我们重新来看一下:
解释几点:
- 生产者使用
tryAcquire
是因为如果没有获取到锁,就说明已经有线程在初始化线程池的线程了 - 消费者使用
tryAcquire
是因为没拿到锁,说明存在读锁,也就说明有日志加入到队列中了,那么自己也就不需要销毁,重新消费即可。 - 将日志加入队列的时机,是初始化前还是初始化后,其实没有太多影响,起码对于这个问题而言。注意,如果涉及到队列的抛弃策略,也就是队列可能会满,那么加入队列就应该在初始化前(因为没必要抢锁)。
- 这里要考虑的问题就是线程初始化和线程复用这段时间是否会冲突,假设线程获取事件触发了超时(或许很多次),然后准备复用了,此时另外的线程恰好有对应的日志打过来
# 拒绝策略
如果队列满了,会先判断 logger 是否在递归调用中,如果在,就在当前线程直接调用 appender 打印日志。不再,就会根据 AsyncQueueFullPolicy
来决定:
DefaultAsyncQueueFullPolicy
:如果该线程是后台线程,就在当前线程打印;反之就调用阻塞队列的 put 来阻塞当前线程加入日志。DiscardingAsyncQueueFullPolicy
:继承了DefaultAsyncQueueFullPolicy
,会先比较一下日志等级,如果在指定等级下的日志就会丢弃,高于指定等级的日志比较重要,就会调用DefaultAsyncQueueFullPolicy
来处理日志。
# disruptor 队列
高性能体现:CAS、解决伪共享。
生产者和消费者共享一个环形内存,也就是 RingBuffer
:
Disruptor
没有对生产者和消费者做定义,只是暴露了提交和取出的接口:
RingBuffer<LogEvent> ringBuffer; | |
// 生产者 api | |
long sequence = ringBuffer.next(); // 无锁获取 | |
LogEvent logEvent = ringBuffer.get(sequence); | |
// 然后根据业务对 log 进行操作 | |
ringBuffer.publish(sequence); | |
// 消费者 api,需要实现 EventHandler<T > 接口 | |
@Override | |
public void onEvent(LogEvent logEvent, long sequence, boolean endOfBatch) { | |
System.out.println("获取到日志并消费"); | |
} |
# 核心组件
Disruptor
对象:持有一个RingBuffer
对象,一个线程池Executor
对象以及一个ConsumerRepository
对象,生产者生产的数据会存放在RingBuffer
中的元素中,同时当向Disruptor
注册事件处理器时Disruptor
会基于注册的事件处理器创建消费者并添加到ConsumerRepository
中。RingBuffer
对象:Disruptor
框架中的核心对象,其持有一个Object
数组用于存放元素以及一个Sequencer
对象实现对生产者的同步控制。Sequencer
对象:接口,有两个实现类分别为SingleProducerSequencer
和MultiProducerSequencer
(默认),代表对单生产者和多生产者的同步控制(可以这么理解,定义的生产者TestEventProducer
如果向RingBuffer
生产元素,那么就会和其他生产者以及消费者产生并发冲突,Sequencer
就是用于控制并解决这个并发冲突的)。SequenceBarrier
对象:其由Sequencer
创建,并且会由消费者持有,主要用于消费者获取当前可以消费的元素的序号。EventProcessor
对象:接口,表示消费者,当向Disruptor
对象注册EventHandler
对象时,Disruptor
会基于EventHandler
创建一个BatchEventProcessor
对象作为消费者,当向Disruptor
注册WorkHandler
对象时,Disruptor
会基于WorkHandler
创建一个WorkProcessor
对象作为消费者,本篇文章提及的消费者全部指BatchEventProcessor
。Sequence
对象:每个EventProcessor
消费者会持有一个Sequence
,同时SingleProducerSequencer
持有一个Sequence
,MultiProducerSequencer
持有两个Sequence
。Sequence
的使用者都是使用Sequence
来维护自己的读 **/** 写序号。
# 消费者
在消费时,会一直维护一个 nextSequence
,然后向 sequenceBarrier
获取当前实际可以消费到的元素的最大序号( waitFor
方法)。
这种机制可能导致消费者拿到的序号可能是别人已经消费了的事件,那么自己就会拿到一个还未填充数据的事件或者一个被生产者填充数据的事件。
现在分析一下 waitFor(nextSequence)
方法:
public long waitFor(final long sequence) | |
throws AlertException, InterruptedException, TimeoutException | |
{ | |
checkAlert(); | |
// 调用等待策略的 waitFor () 方法得到一个可用序号 | |
long availableSequence = waitStrategy.waitFor(sequence, | |
cursorSequence, | |
dependentSequence, | |
this); | |
// 如果可用序号小于目标序号,则直接返回可用序号 | |
// 说明当前最多只能消费到可用序号的元素 | |
if (availableSequence < sequence) { | |
return availableSequence; | |
} | |
// 如果可用序号大于等于目标序号,则调用 Sequencer 来得到当前最大的已发布的序号 | |
// 本示例中这里的 Sequencer 实际为 SingleProducerSequencer | |
// SingleProducerSequencer 的策略就是直接返回可用序号 | |
return sequencer.getHighestPublishedSequence(sequence, availableSequence); | |
} | |
// SingleProducerSequencer#getHighestPublishedSequence | |
public long getHighestPublishedSequence(long lowerBound, long availableSequence) { | |
return availableSequence; | |
} |
等待策略中的 waitFor
方法, WaitStrategy
方法来决定它如何等待这个序号,以 YieldingWaitStrategy
为例,会循环 100 次获取可用序号,超过后每次循环未拿到就会调用 Thread.yield
。
# 生产者
Disruptor
的生产者生产元素的步骤可以概括如下。
- 从
RingBuffer
中申请空间,即获取到一个可以设置数据的元素的序号; - 根据步骤 1 中获取到的索引将可以设置数据的元素从数组中获取出来;
- 为步骤 2 中获取出来的元素设置数据;
- 发布步骤 3 中的元素,即这个元素允许被消费者消费了。(两阶段提交)
因为是环形结构,所以生产者也要防止追了(最慢)消费者的尾,如果检测到会发生追尾,那么就要自旋等待。
生产者自然也可以批处理,因为它知道最慢的消费者走到哪了,也知道 RingBuffer 的大小。
事实上,只有当 RingBuffer 的游标到达要提交的节点的前一个节点才会完成提交,假设 A 在 14 号位置已经进入提交函数,但是 B 在 13 号只是申请了序号,写入了数据,还没有提交,A 就需要自旋等待 B 提交了,游标走到 13 号,才会真正提交。
# 提高并发效率
直接使用 CAS,是完全无锁的。我们自然知道,效率上:单线程操作 > 单线程 CAS > 单线程加锁。
所以 disruptor 封装时还加上了策略配置,如果是单生产者(多)单消费者策略,生产者自然不需要 CAS,直接获得序号更快。根据生产者和消费者的数量,即有四种策略。默认是多生产者多消费者策略。
更为重要的是,disruptor 是颁布序号给生产者和消费者,然后写入,消费元素时就能保证绝对线程安全,因为没有对应序号的线程不可能在这个位置竞争。那么就会有 RingBuffer 一次性有很多个线程在同时生产、消费,自然是很快的。
而作者也提到,为什么队列无法胜任这个工作:因为队列只有两个指针。
文档作者也提到,一个队列常常是单生产者和单消费者(至少我们应该尽力去做到),但是生产者和消费者的步调很难一致,所以缓冲常常就是满的队列(生产者比消费者快),或者是空的队列(消费者比生产者快)。
# 解决伪共享
现代 64 位 CPU 的缓存行都是 64 字节,以 ArrayBlockingQueue 为例,他有头尾两个指针,int 情况下也就 8 字节,所以大概率是头尾指针在同一个缓存行,所以哪怕只有消费者消费了一个元素,导致尾指针更新,也会导致头指针失效,不得不从内存中重新获取。
这就是伪共享 --false sharing。
所以为了避免伪共享,就是用了字段填充:
public long p1, p2, p3, p4, p5, p6, p7; // cache line padding | |
private volatile long cursor = INITIAL_CURSOR_VALUE; | |
public long p8, p9, p10, p11, p12, p13, p14; // cache line padding |
# 为什么使用环形缓冲
RingBuffer 只维护了一个指向下一个可用位置的序号,这是为了提供可靠的消息传递。RingBuffer 需要将发送过的消息保存,如果外部服务发送 NAK 表示没有成功可以收到,就可以重新发送那一点到当前序号之间的所有消息。