有人能指导我在这种情况下如何使用三叉戟吗?或者使用storm功能的任何其他适用方式?
好问题!但可悲的是,这种微型批处理不支持三叉戟箱外。
但是您可以尝试实现自己的频率驱动微批处理。类似于下面这个框架示例:
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MicroBatchingBolt extends BaseRichBolt {
private static final long serialVersionUID = 8500984730263268589L;
private static final Logger LOG = LoggerFactory.getLogger(MicroBatchingBolt.class);
protected LinkedBlockingQueue<Tuple> queue = new LinkedBlockingQueue<Tuple>();
/** The threshold after which the batch should be flushed out. */
int batchSize = 100;
/**
* The batch interval in sec. Minimum time between flushes if the batch sizes
* are not met. This should typically be equal to
* topology.tick.tuple.freq.secs and half of topology.message.timeout.secs
*/
int batchIntervalInSec = 45;
/** The last batch process time seconds. Used for tracking purpose */
long lastBatchProcessTimeSeconds = 0;
private OutputCollector collector;
@Override
@SuppressWarnings("rawtypes")
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
// Check if the tuple is of type Tick Tuple
if (isTickTuple(tuple)) {
// If so, it is indication for batch flush. But don't flush if previous
// flush was done very recently (either due to batch size threshold was
// crossed or because of another tick tuple
if ((System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds) >= batchIntervalInSec) {
LOG.debug("Current queue size is " + this.queue.size()
+ ". But received tick tuple so executing the batch");
finishBatch();
} else {
LOG.debug("Current queue size is " + this.queue.size()
+ ". Received tick tuple but last batch was executed "
+ (System.currentTimeMillis() / 1000 - lastBatchProcessTimeSeconds)
+ " seconds back that is less than " + batchIntervalInSec
+ " so ignoring the tick tuple");
}
} else {
// Add the tuple to queue. But don't ack it yet.
this.queue.add(tuple);
int queueSize = this.queue.size();
LOG.debug("current queue size is " + queueSize);
if (queueSize >= batchSize) {
LOG.debug("Current queue size is >= " + batchSize
+ " executing the batch");
finishBatch();
}
}
}
private boolean isTickTuple(Tuple tuple) {
// Check if it is tick tuple here
return false;
}
/**
* Finish batch.
*/
public void finishBatch() {
LOG.debug("Finishing batch of size " + queue.size());
lastBatchProcessTimeSeconds = System.currentTimeMillis() / 1000;
List<Tuple> tuples = new ArrayList<Tuple>();
queue.drainTo(tuples);
for (Tuple tuple : tuples) {
// Prepare your batch here (may it be JDBC, HBase, ElasticSearch, Solr or
// anything else.
// List<Response> responses = externalApi.get("...");
}
try {
// Execute your batch here and ack or fail the tuples
LOG.debug("Executed the batch. Processing responses.");
// for (int counter = 0; counter < responses.length; counter++) {
// if (response.isFailed()) {
// LOG.error("Failed to process tuple # " + counter);
// this.collector.fail(tuples.get(counter));
// } else {
// LOG.debug("Successfully processed tuple # " + counter);
// this.collector.ack(tuples.get(counter));
// }
// }
} catch (Exception e) {
LOG.error("Unable to process " + tuples.size() + " tuples", e);
// Fail entire batch
for (Tuple tuple : tuples) {
this.collector.fail(tuple);
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// ...
}
}
来源:http://hortonworks.com/blog/apache-storm-design-patter-micro-batching/和在storm中使用trident的tick元组
null 既然我问了这个问题,我的公司决定先买三叉戟。我们只会在性能出现问题时使用纯Storm。可悲的是,这不是一个积极的决定,它只是成为默认的行为(我当时不在)。 他们的假设是,在大多数用例中,我们需要状态处理或只需一次处理,或者我们将在不久的将来需要它。我理解他们的推理,因为从Storm到Trident或返回并不是一个容易的转换,但在我个人看来,没有状态的流处理的概念并不被所有人理解,这是使用
我无法找到正确集成Kafka和Apache Storm Trident的好文档。我试图查看相关的问题之前张贴在这里,但没有充分的信息。 这样,我就可以为我的拓扑生成流,如下面的代码所示 虽然我提供了并行性和我的分区,但是只有一个Kafka Spout的执行器在运行,因此我无法很好地扩展它。 有谁能指导我更好地将Apache Storm Trident(2.0.0)与Apache Kafka(1.0
当我写一个storm拓扑的时候,我发现元组的顺序并不总是和spout发出的顺序一样(spout的作用是在行中读取一个文件,并将行发送到一个没有的bolt,所以这个过程会非常快)。 谁能告诉我,我如何确保元组的顺序,是由喷口或螺栓发出的?多谢!
首先诚挚的道歉,如果我的问题是重复的,我尝试搜索,但没有找到我的问题的相关答案 首先真诚的道歉,如果我问一些很基本的东西,因为我是Storm的初学者。如果我的问题是重复的,因为我试着搜索但是找不到相关的答案 请就我下面的用例提出建议。 > 因此以25秒为频率的所有元组将汇集在一起,并由Bolt on每25秒发射一次(如果在25秒内收到重复的元组,则只考虑最新的一个元组)。 类似地,所有以10分钟为