在数据量大的时候,flume日志中出这么个错误:
Caused by: org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:130)
at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at org.apache.flume.channel.ChannelProcessor.executeChannelTransaction(ChannelProcessor.java:245)
A。简单了解一点点flume的原理:
flume中三大件,source、channel、sink,串起来
0,维持一个队列,队列的两端分别是source和sink。
1,source使用doPut方法往putList插入Event
2,sink使用doTake方法从queue中获取event放入takeList,并且提供rollback方法,用于回滚。
3,commit方法作用是把putList中的event一次性写到queue;
下面表示了Event在一个使用了MemoryChannel的agent中数据流向:
source ---> putList ---> queue ---> takeList ---> sink
中间的三个阶段,是在channel里面的。
报这个错的原因:就是source在拿到数据,往channel里面写的时候,没地方了,中间是个queue,比作池子,流入太快,流出慢,然后,慢慢的,池子就满了,再流就溢出了。
现在要解决问题:
1,是流入和流出的速度差;
2,增大池子以延迟池子流满的时间!
先检查一下flume的配置,是不是source的batchSize比sink的batchSize大,改改让流入慢点,给流出争取时间,一般sink都是要把数据写出去,比如写es,写文件,写kafka,写Hadoop等,这个操作比较耗时。
B。MemoryChannel的几个参数
//定义队列中一次允许的事件总数
private static final Integer defaultCapacity = 100;
//定义一个事务中允许的事件总数
private static final Integer defaultTransCapacity = 100;
//将物理内存转换成槽(slot)数,默认是100
private static final double byteCapacitySlotSize = 100;
//定义队列中事件所使用空间的最大字节数(默认是JVM最大可用内存的0.8)
private static final Long defaultByteCapacity = (long)(Runtime.getRuntime().maxMemory() * .80);
//定义byteCapacity和预估Event大小之间的缓冲区百分比:
private static final Integer defaultByteCapacityBufferPercentage = 20;
//添加或者删除一个event的超时时间,单位秒:
private static final Integer defaultKeepAlive = 3;
// maximum items in a transaction queue
private volatile Integer transCapacity;
private volatile int keepAlive;
private volatile int byteCapacity;
private volatile int lastByteCapacity;
private volatile int byteCapacityBufferPercentage;
private ChannelCounter channelCounter;
capacity | 在 channel 中 最多能保存多少个 event。默认是100 |
transCapacity | 在每次从source中获取数据或者将数据sink出去的一次事务操作中,最多处理的 event 数。默认是100 |
byteCapacity | 在 channel 中最多能容纳 所有event body的总字节数。默认是 JVM最大可用内存(-Xmx )的80% 。(大师兄不建议改这个参数) |
byteCapacityBufferPercentage | 计算event header跟最大可用内存的字节占比。默认是20(大师兄不建议改这个参数) |
keep-alive | 尝试添加或者删除一个event的超时时间,单位为秒。默认是3 |
能改的参数就capacity和transCapacity了,增加capacity的大小,就是调整channel缓存数据的能力,在增大这个配置的值的时候,flume的jvm参数,必须得跟着一起加大,不然,稍微积压一下,数据都积压到channel里面,channel full 之前,已经把heap占满了,然后就又会出OOM的异常了。还有就是增加channel sink的对儿,这个也需要同时扩大flume的jvm的内存大小,不然也会有OOM的风险。
上面的招数,不管是增大capacity的大小,耗时增加channel sink对儿,都是增加了缓存池子的大小,池子越大,缓存的数据就多,jvm占的内存就大,也只是暂缓channel full exception的到来,治标不治本。
关键还是得解决sink端,数据写出去的能力,增加数据的写出速度。比如,加ES的data节点,换固态硬盘。