当前位置: 首页 > 知识库问答 >
问题:

在storm中使用trident的tick元组

韩季
2023-03-14

谢谢

共有1个答案

梁丘波
2023-03-14

实际上,微批量是三叉戟的一个内置功能。你不需要任何刻度元组。当您的代码中包含以下内容时:

topology
    .newStream("myStream", spout)
    .partitionPersist(
        ElasticSearchEventState.getFactoryFor(connectionProvider),
        new Fields("field1", "field2"),
        new ElasticSearchEventUpdater()
    )

(我在这里使用自定义的ElasticSearch状态/更新程序,您可以使用其他的)

因此,当您有这样的东西时,在hood Trident下,将您的流分组为批处理,并对这些批处理而不是单个元组执行partitionPersist操作。

public class TickSpout implements IBatchSpout {

    public static final String TIMESTAMP_FIELD = "timestamp";
    private final long delay;

    public TickSpout(long delay) {
        this.delay = delay;
    }

    @Override
    public void open(Map conf, TopologyContext context) {
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        Utils.sleep(delay);
        collector.emit(new Values(System.currentTimeMillis()));
    }

    @Override
    public void ack(long batchId) {
    }

    @Override
    public void close() {
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields(TIMESTAMP_FIELD);
    }
}
 类似资料:
  • Trident 是在 Storm 的基础上执行实时计算的高级抽象。这跟 hadoop 的 Pig 的概念类似.

  • 我有一个拓扑并在本地模式下运行它,比如-->-->。而且,我定义了一个对象,它是从B到C发出的值。我有两个问题: 我在中设置并在创建时设置新的。当为时,它将发送到。我认为Storm会克隆并在中生成一个新的,然而,当我试图更新的内容时,它仍然在中发生变化。为什么?这两个bolt可能位于不同的服务器中,并且不可能在多个bolt之间共享内存中的相同变量。

  • 我正在使用OpaqueTridentKafkaSpout来消费来自Kafka的消息。下面是代码。我忽略了配置,因为这会导致同一kafka消息在多个批处理中到达。 当Kafka喷口开始时,我得到以下错误一次,但之后运行平稳。

  • 我正在尝试开始使用Storm Trident,并使用IOpaquePartitionedTridentSpout和OpaqueMap进行拓扑设置和运行。 然而,我很难找到让spout/函数知道事务是否成功提交的方法。我没有看到任何ack或fail方法,如在常规Storm喷口/螺栓接口。 谢谢