谢谢
实际上,微批量是三叉戟的一个内置功能。你不需要任何刻度元组。当您的代码中包含以下内容时:
topology
.newStream("myStream", spout)
.partitionPersist(
ElasticSearchEventState.getFactoryFor(connectionProvider),
new Fields("field1", "field2"),
new ElasticSearchEventUpdater()
)
(我在这里使用自定义的ElasticSearch状态/更新程序,您可以使用其他的)
因此,当您有这样的东西时,在hood Trident下,将您的流分组为批处理,并对这些批处理而不是单个元组执行partitionPersist操作。
public class TickSpout implements IBatchSpout {
public static final String TIMESTAMP_FIELD = "timestamp";
private final long delay;
public TickSpout(long delay) {
this.delay = delay;
}
@Override
public void open(Map conf, TopologyContext context) {
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
Utils.sleep(delay);
collector.emit(new Values(System.currentTimeMillis()));
}
@Override
public void ack(long batchId) {
}
@Override
public void close() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields(TIMESTAMP_FIELD);
}
}
Trident 是在 Storm 的基础上执行实时计算的高级抽象。这跟 hadoop 的 Pig 的概念类似.
我有一个拓扑并在本地模式下运行它,比如-->-->。而且,我定义了一个对象,它是从B到C发出的值。我有两个问题: 我在中设置并在创建时设置新的。当为时,它将发送到。我认为Storm会克隆并在中生成一个新的,然而,当我试图更新的内容时,它仍然在中发生变化。为什么?这两个bolt可能位于不同的服务器中,并且不可能在多个bolt之间共享内存中的相同变量。
我正在使用OpaqueTridentKafkaSpout来消费来自Kafka的消息。下面是代码。我忽略了配置,因为这会导致同一kafka消息在多个批处理中到达。 当Kafka喷口开始时,我得到以下错误一次,但之后运行平稳。
我正在尝试开始使用Storm Trident,并使用IOpaquePartitionedTridentSpout和OpaqueMap进行拓扑设置和运行。 然而,我很难找到让spout/函数知道事务是否成功提交的方法。我没有看到任何ack或fail方法,如在常规Storm喷口/螺栓接口。 谢谢