我已经开始使用storm,所以我使用本教程创建简单的拓扑
public StormTopology build() {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(HelloWorldSpout.class.getSimpleName(),
helloWorldSpout, spoutParallelism);
HelloWorldBolt bolt = new HelloWorldBolt();
builder.setBolt(HelloWorldBolt.class.getSimpleName(),
bolt, boltParallelism)
.shuffleGrouping(HelloWorldSpout.class.getSimpleName());
}
我的嘴是这样的
public class HelloWorldSpout extends BaseRichSpout implements ISpout {
private SpoutOutputCollector collector;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("int"));
}
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}
private static Boolean flag = false;
public void nextTuple() {
Utils.sleep(5000);
//emit only 1 tuple - for testing
if (!flag){
this.collector.emit(new Values(6));
flag = true;
}
}
@Override
public void ack(Object msgId) {
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
public void fail(Object msgId){
System.out.println("[HelloWorldSpout] fail on msgId" + msgId);
}
}
我的螺栓是这样的
@SuppressWarnings("serial")
public class HelloWorldBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map conf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
logger.info("preparing HelloWorldBolt");
}
public void execute(Tuple tuple) {
System.out.println("[HelloWorldBolt] got" + tuple.getInteger(0));
this.collector.ack(tuple);
}
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
}
spout中的emit()方法只有一个参数,因此不会锚定元组。这就是为什么即使在bolt中处理元组,也不会得到对spout中的ack()方法的回调。
要使其正常工作,您需要修改spout以发出第二个参数,即消息ID。正是这个id被传递回spout中的ack()方法:
public void nextTuple() {
Utils.sleep(5000);
//emit only 1 tuple - for testing
if (!flag){
Object msgId = "ID 6"; // this can be any object
this.collector.emit(new Values(6), msgId);
flag = true;
}
}
@Override
public void ack(Object msgId) {
// msgId should be "ID 6"
System.out.println("[HelloWorldSpout] ack on msgId" + msgId);
}
谢谢你抽出时间。 通过在每个分区上使用数据进行泛洪测试,完成读取需要。 再次使用parallelism_hint=1的代码 即 其中, parallelism_hint-是应该分配给执行此spout的任务数。每个任务将在集群周围某个进程的线程上运行。
利用 cocos2d 制作的一款休闲游戏。游戏中有九种不同的卡通水果,您可以拖动整行或整列上的水果,或交换屏幕中两个相邻水果的位置,一条直线上的三个或三个以上的相同水果将会消失,而您将得到分数。当您的分数逐步提高后,将会进入更高难度的关卡,接受更高难度的挑战。 游戏中允许使用一些道具。 [Code4App.com]
当运行两个线程时,对于spout,文件的每一行都读取两次。 我是新手,我想知道处理这件事的最好方法?我可以将线程的数量减少到1个,或者修改spout,使每个线程读取不同的行--或者(如何)我需要使用TopologyContext参数?我不确定我是否错过了一个“Storm”的方式来解决这个问题?
看下这个段伪代码: local value = get_from_cache(key) if not value then value = query_db(sql) set_to_cache(value, timeout = 100) end return value 看上去没有问题,在单元测试情况下,也不会有异常。 但是,进行压力测试的时候,你会发现,每隔 100 秒,数据库的
我试图把Kafka数据通过Storm在hdfs和Hive。我在和HortonWorks合作。因此,我有以下结构,正如在许多教程(http://henning.kropponline.de/2015/01/24/hive-streaming-with-storm/)中看到的那样(稍加修改): 我将kafka-spout数据直接发送到hdfs-bolt,当我只使用hdfs-bolt时,它是工作的。当我
在我的storm拓扑(有2个喷口和1个bolt)中,其中一个kafka喷口使用者的偏移量正在前进,但MSG没有通过kafka喷口发送到bolt。我可以在storm ui中看到,对于那个特定的喷口,发出和传送的消息是0。所以,我的问题是为什么消费者在前进,我可以看到消费者从zookeeper客户端的抵消逐渐增加。