安装部署使用
ack机制
ack机制原理
这里不讲什么是ack机制,可以参考官网的文档Ack 机制
我们只要知道它是使用异或xor的原理即可:
A xor A = 0
A xor B xor B xor A = 0
使用ack机制
要想使用ack机制,需要做以下工作:
Topology的处理
构建topology时设置acker不为0,方法如下:
config.setNumAckers(1);
该方法实际是设置以Config.TOPOLOGY_ACKER_EXECUTORS为key的value,说明如下:
/**
* How many executors to spawn for ackers.
*
*
* If this is set to 0, then Storm will immediately ack tuples as soon as they come off the spout, effectively disabling reliability.
*
*/
public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
Spout的处理
使用spout发送数据时,带上msgid,接口说明如下:
/**
* Emits a new tuple to the default output stream with the given message ID.
* When Storm detects that this tuple has been fully processed, or has
* failed to be fully processed, the spout will receive an ack or fail
* callback respectively with the messageId as long as the messageId was not
* null. If the messageId was null, Storm will not track the tuple and no
* callback will be received. The emitted values must be immutable.
*
* @return the list of task ids that this tuple was sent to
*/
public List emit(List tuple, Object messageId) {
return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
}
我们看下KafkaSpout是怎么做的:
@Override
public void nextTuple() {
List managers = _coordinator.getMyManagedPartitions();
for (int i = 0; i < managers.size(); i++) {
try {
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state = managers.get(_currPartitionIndex).next(_collector);
if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
}
if (state != EmitState.NO_EMITTED) {
break;
}
} catch (FailedFetchException e) {
LOG.warn("Fetch failed", e);
_coordinator.refresh();
}
}
long now = System.currentTimeMillis();
if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
}
注意上面的EmitState state = managers.get(_currPartitionIndex).next(_collector);我们进去看看 :
public EmitState next(SpoutOutputCollector collector) {
if (_waitingToEmit.isEmpty()) {
fill();
}
while (true) {
MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
if (toEmit == null) {
return EmitState.NO_EMITTED;
}
Iterable> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
if ((tups != null) && tups.iterator().hasNext()) {
for (List tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
}
break;
} else {
ack(toEmit.offset);
}
}
if (!_waitingToEmit.isEmpty()) {
return EmitState.EMITTED_MORE_LEFT;
} else {
return EmitState.EMITTED_END;
}
}
看到了吧,collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));emit的时候指定了messageId,而这个KafkaMessageId是一个静态内部类,包括分区和偏移量2个属性
static class KafkaMessageId {
public Partition partition;
public long offset;
public KafkaMessageId(Partition partition, long offset) {
this.partition = partition;
this.offset = offset;
}
}
Bolt的处理
一般我们写bolt的时候有两种方式,一种使用IRichBolt接口或者它的抽象实现类BaseRichBolt,一种使用IBasicBolt或者它的抽象实现类BaseBasicBolt,这2种是有区别的,主要在于影响ack机制
使用IRichBolt
使用IRichBolt意味着你要实现的接口如下:
void execute(Tuple input);
也意味着你要操作的类为OutputCollector
使用OutputCollector来emit tuple给下个bolt的时候必须要用anchored的方式,接口如下:
/**
* Emits a new tuple to the default stream anchored on a single tuple. The
* emitted values must be immutable.
*
* @param anchor the tuple to anchor to
* @param tuple the new output tuple from this bolt
* @return the list of task ids that this new tuple was sent to
*/
public List emit(Tuple anchor, List tuple) {
return emit(Utils.DEFAULT_STREAM_ID, anchor, tuple);
/**
* Emits a new tuple to the default stream anchored on a group of input
* tuples. The emitted values must be immutable.
*
* @param anchors the tuples to anchor to
* @param tuple the new output tuple from this bolt
* @return the list of task ids that this new tuple was sent to
*/
public List emit(Collection anchors, List tuple) {
return emit(Utils.DEFAULT_STREAM_ID, anchors, tuple);
}
所谓的anchor即为Bolt的execute方法里面的tuple,也即上游发给你的tuple
注意不能使用unanchored 的方式,说明如下:
/**
* Emits a new unanchored tuple to the default stream. Beacuse it's
* unanchored, if a failure happens downstream, this new tuple won't affect
* whether any spout tuples are considered failed or not. The emitted values
* must be immutable.
*
* @param tuple the new output tuple from this bolt
* @return the list of task ids that this new tuple was sent to
*/
public List emit(List tuple) {
return emit(Utils.DEFAULT_STREAM_ID, tuple);
}
同时在emit后要手动执行collector.ack(tuple);方法
使用IbasicBolt
使用IbasicBolt则编程会简单的多,因为它会帮我做很多事情,我们要做的仅仅是调用emit方法即可,先看要实现的接口:
/**
* Process the input tuple and optionally emit new tuples based on the input tuple.
*
* All acking is managed for you. Throw a FailedException if you want to fail the tuple.
*/
void execute(Tuple input, BasicOutputCollector collector);
这个execute方法和上述不一样了,他给我们注入了BasicOutputCollector类,我们操作它即可,其实这个类里面有一个上述OutputCollector out属性,并且,自动注入了inputTuple,使用它来emit tuple即可,暴露的emit的方法只有2个:
public List emit(String streamId, List tuple) {
return out.emit(streamId, inputTuple, tuple);
}
public List emit(List tuple) {
return emit(Utils.DEFAULT_STREAM_ID, tuple);
}
正如上述代码所示,它实际调用的是OutputCollector的emit方法,并且自动帮我们使用anchor的方式,这里用到了我们熟悉的设计模式中的代理的模式
大家可能有注意到了,这里并没有显示的调用collector.ack(tuple);方法,这里猜猜也会知道,应该是用到了模板模式,在调用该方法的调用者那里,调用了execute方法后,调用ack方法,查下代码,果然没错,在BasicBoltExecutor类里面,方法如下:
public void execute(Tuple input) {
_collector.setContext(input);
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch (FailedException e) {
if (e instanceof ReportedFailedException) {
_collector.reportError(e);
}
_collector.getOutputter().fail(input);
}
}
仔细看看,发现它还帮我们处理了异常,只要我们抛出FailedException,它就会自动执行fail方法
关闭ack
ack机制并不是必须的,并且会消耗一部分性能,如果可以容忍部分数据丢失,想要更高的性能则可以关闭ack机制
方法
spout 在发送数据的时候不带上msgid
设置acker数等于0
使用 unanchored的方式
以上方法任一种都可以,推荐使用第二种方式
性能和事务
事务
jstorm支持事务操作,这里所谓的事务即是,顺序处理tuple,如果这次的tuple没有被完整的处理完,就不会处理下一个tuple,可以看到这样大大降低了并发性,性能不会太好。所以可以采用批量的思想个时候,一个batch为一个transaction处理单元,当一个batch处理完毕,才能处理下一个batch。还可以采用分阶段处理的方式,在processing阶段并发,实际commit的时候按顺序
Trident
Trident是Storm之上的高级抽象,提供了joins,grouping,aggregations,fuctions和filters等接口。Trident将stream中的tuples分成batches进行处理,API封装了对这些batches的处理过程,保证tuple只被处理一次。处理batches中间结果存储在TridentState对象中。
性能
很明显的,按照性能来说, trident < transaction < 使用ack机制普通接口 < 关掉ack机制的普通接口
我们也可以通过增加ack的并发数来提高线程
ack和fail
ack方法和fail方法只有在Spout中才有
ack, 当spout收到一条ack消息时,触发的动作
fail, 当spout收到一条fail消息时,触发的动作
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
参数为msgId,即为前面说的Spout里面发送数据的msgId,失败了是否需要重发tuple完全取决于你的实现,比如KafkaSpout就有自己的实现,代码这里就不贴了
需要注意的是,一般我们会有多个Bolt,在Topology处理流程上的任意Bolt处理失败都会触发Spout执行fail方法,如果你的程序在fail方法里面会重发tuple的话,那么这个tuple仍将会被所有的Bolt执行一遍,举例如下:
假设topology的流程为:SpoutA->BoltB->BoltC->BoltD 如果BoltC处理失败,则SpoutA将重发tuple,并且将再次按照topology的流程走一遍。可以看到,BoltB处理了2遍Bolt,如果在BoltB里有插入数据库的操作则会出现问题。
好在一般情况下,我们也只是在最末尾的Bolt中执行入库的操作,前面执行的Bolt基本都是内存计算,不落地,所以执行多遍也就不会有问题了
多线程
在jstorm中, spout中nextTuple和ack/fail运行在不同的线程中, 从而鼓励用户在nextTuple里面执行block的操作, 原生的storm,nextTuple和ack/fail在同一个线程,不允许nextTuple/ack/fail执行任何block的操作,否则就会出现数据超时,但带来的问题是,当没有数据时, 整个spout就不停的在空跑,极大的浪费了cpu, 因此,jstorm更改了storm的spout设计,鼓励用户block操作(比如从队列中take消息),从而节省cpu。
进一步说明如下:
当topology.max.spout.pending 设置不为1时(包括topology.max.spout.pending设置为null),spout内部将额外启动一个线程单独执行ack或fail操作, 从而nextTuple在单独一个线程中执行,因此允许在nextTuple中执行block动作,而原生的storm,nextTuple/ack/fail 都在一个线程中执行,当数据量不大时,nextTuple立即返回,而ack、fail同样也容易没有数据,进而导致CPU 大量空转,白白浪费CPU, 而在JStorm中, nextTuple可以以block方式获取数据,比如从disruptor中或BlockingQueue中获取数据,当没有数据时,直接block住,节省了大量CPU。
但因此带来一个问题, 处理ack/fail 和nextTuple时,必须小心线程安全性。
当topology.max.spout.pending为1时, 恢复为spout一个线程,即nextTuple/ack/fail 运行在一个线程中。
其他
重启
建议不超过1个月,强制重启一下supervisor, 因为supervisor是一个daemon进程, 不停的创建子进程,当使用时间过长时, 文件打开的句柄会非常多,导致启动worker的时间会变慢,因此,建议每隔一周,强制重启一次supervisor
输出到kafka
写入数据到kafka可以使用KafkaBolt这个类,它已经帮我做好了,我们只需要提供一些参数即可
上面讲的KafkaSpout和KafkaBolt都在storm-kafka这个框架里面,maven配置如下:
org.apache.storm
storm-kafka
0.10.2
org.apache.zookeeper
zookeeper
注意版本不为最新,为0.10.2,1.0.0版本后的包结构变了,和jstorm不兼容,不能使用
该项目为storm的官方插件项目,项目地址为:Storm Kafka
--------------------------------------我是分割线,2017年5月10日16:29:33加--------------------------------------
KafkaSpout的nextTuple方法里,每次都是调用
List managers = _coordinator.getMyManagedPartitions();
来获得分区的信息,而这个方法如下:
@Override
public List getMyManagedPartitions() {
if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
refresh();
_lastRefreshTime = System.currentTimeMillis();
}
return _cachedList;
}
调用的时候判断是否超过了一定的时间,如果超过则重新获取分区的消息,这个时间默认为60s,为ZkHosts里的refreshFreqSecs属性
那么分区增加了或者减少了会不会有问题呢,答案是不会有问题,KafkaSpout已经帮我们做了很多了
如果分区数增加,在这60s内,我获取的是原来的分区进行消费,到60s后,刷新分区数,对新增加的分区进行消费,完全没有任何问题
如果分区数减少,比如之前有5个分区:0,1,2,3,4,现在减少为:0,1,2,当要消费分区3时会抛出异常并且在异常里面会重新刷新分区,这是分区数就会变为3,则直接跳出这个循环了,也不会有机会去消费分区4,所以也不会有任何问题
--------------------------------------我是分割线,2017年5月22日15:48:31加--------------------------------------
bolt中不要有静态变量和static{}方法 bolt中不要有静态变量和static{}方法 bolt中不要有静态变量和static{}方法 重要的事情说三遍