当前位置: 首页 > 知识库问答 >
问题:

Storm+Kafka并行化不及预期

司空玮
2023-03-14

负载的消息数量很少,每天大约2000条,但是每个任务都需要相当长的时间。特别是一个拓扑处理每个任务所需的时间是可变的,通常在1到20分钟之间。如果按顺序处理,吞吐量不足以处理所有传入消息。所有的拓扑和Kafka系统都安装在一台单机中(16个核心,16 GB的RAM)。

由于消息是独立的,并且可以并行处理,我们正在尝试使用Storm并发能力来提高吞吐量。

为此,拓扑配置如下:

    null
    null
private static StormTopology buildTopologyOD() {
    //This is the marker interface BrokerHosts.
    BrokerHosts hosts = new ZkHosts(configuration.getProperty(ZKHOSTS));
    TridentKafkaConfig tridentConfigCorrelation = new TridentKafkaConfig(hosts, configuration.getProperty(TOPIC_FROM_CORRELATOR_NAME));

    tridentConfigCorrelation.scheme = new RawMultiScheme();
    tridentConfigCorrelation.fetchSizeBytes = Integer.parseInt(configuration.getProperty(MAX_SIZE_BYTES_CORRELATED_STREAM));

    OpaqueTridentKafkaSpout spoutCorrelator = new OpaqueTridentKafkaSpout(tridentConfigCorrelation);

    TridentTopology topology = new TridentTopology();

    Stream existingObject = topology.newStream("kafka_spout_od1", spoutCorrelator)
            .shuffle()
            .each(new Fields("bytes"), new ProcessTask(), new Fields(RESULT_FIELD, OBJECT_FIELD))
            .parallelismHint(Integer.parseInt(configuration.getProperty(PARALLELISM_HINT)));

    //Create a state Factory to produce outputs to kafka topics.
    TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
            .withProducerProperties(kafkaProperties)
            .withKafkaTopicSelector(new ODTopicSelector())
            .withTridentTupleToKafkaMapper(new ODTupleToKafkaMapper());

    existingObject.partitionPersist(stateFactory, new Fields(RESULT_FIELD, OBJECT_FIELD), new TridentKafkaUpdater(), new Fields(OBJECT_FIELD));

    return topology.build();
}

和配置创建为:

private static Config createConfig(boolean local) {
    Config conf = new Config();
    conf.setMaxSpoutPending(1); // Also tried 2..6
    conf.setNumWorkers(4);

    return conf;
}

我们能做些什么来提高性能,或者通过增加并行任务的数量,或者在完成一批处理时避免饥饿?

共有1个答案

鲁俊友
2023-03-14

我在storm-users上找到了一篇Nathan Marz关于为Trident设置并行度的老帖子:

我建议使用“name”函数来命名流的各个部分,这样UI就可以显示什么螺栓对应什么部分。

“三叉戟”将操作打包成尽可能少的螺栓。此外,它从不重新分区您的流,除非您已经执行了明确涉及重新分区的操作(例如,shuffle、groupBy、partitionBy、全局聚合等)。Trident的这个属性确保了您可以控制事情如何处理的排序/半排序。因此在这种情况下,groupBy之前的所有内容都必须具有相同的并行性,否则Trident将不得不重新划分流。因为您没有说要重新分区流,所以它不能这样做。通过引入一个重新分区操作,您可以为spout获得不同于每个spout的并行性,如下所示:

 类似资料:
  • 一、整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持; Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。 这里我服务端安装的

  • 我是Apache Storm的新手,正在尝试为我的用例设计一个简单的拓扑。Storm中对并行性的解释(理解一个Storm拓扑的并行性)给我留下了两个疑问: 请回答以上疑问,如有不正确之处,请更正我的理解。

  • 我理解了Kafka分区和Spark RDD分区之间的自动映射,并最终理解了Spark任务。然而,为了正确地调整我的执行器(核心数量)的大小,并最终确定节点和集群的大小,我需要理解文档中似乎掩盖了的一些内容。 null 例如,关于如何使用 --master local启动spark-streaming的建议。每个人都会说,在spark streaming的情况下,应该把local[2]最小化,因为其

  • 主要内容:Storm是什么?,与Storm整合,提交到拓扑在本章中,我们将学习如何将Kafka与Apache Storm集成。 Storm是什么? Storm最初是由Nathan Marz和BackType团队创建的。 在很短的时间内,Apache Storm成为分布式实时处理系统的标准,用于处理大数据。 Storm速度非常快,每个节点每秒处理超过一百万个元组的基准时钟。 Apache Storm持续运行,从配置的源(Spouts)中消耗数据并将数据传递

  • 我有3个测试类,由我想要并行运行的多个测试方法组成。我使用ThreadLocal来隔离每个线程的webdriver实例。当我按顺序运行测试时,一切看起来都很好,但是当我并行运行它们时,问题就出现了。下面是我的套件文件 我在@Beforeclass中初始化Web驱动程序BrowserClient.java如下。 这里使用的类即BrowserFactory.java 驱动程序工厂.java 我的测试类

  • 如何在storm集群中处理这种情况(最好不创建外部服务): 我只需要一个由所有拓扑实例使用的spout,例如,如果输入数据通过网络文件夹被推送到群集,该网络文件夹将扫描新文件。 类似的问题与混凝土类型的螺栓。例如,当数据由被锁定到具体物理机器的许可的第三方库处理时。