负载的消息数量很少,每天大约2000条,但是每个任务都需要相当长的时间。特别是一个拓扑处理每个任务所需的时间是可变的,通常在1到20分钟之间。如果按顺序处理,吞吐量不足以处理所有传入消息。所有的拓扑和Kafka系统都安装在一台单机中(16个核心,16 GB的RAM)。
由于消息是独立的,并且可以并行处理,我们正在尝试使用Storm并发能力来提高吞吐量。
为此,拓扑配置如下:
private static StormTopology buildTopologyOD() {
//This is the marker interface BrokerHosts.
BrokerHosts hosts = new ZkHosts(configuration.getProperty(ZKHOSTS));
TridentKafkaConfig tridentConfigCorrelation = new TridentKafkaConfig(hosts, configuration.getProperty(TOPIC_FROM_CORRELATOR_NAME));
tridentConfigCorrelation.scheme = new RawMultiScheme();
tridentConfigCorrelation.fetchSizeBytes = Integer.parseInt(configuration.getProperty(MAX_SIZE_BYTES_CORRELATED_STREAM));
OpaqueTridentKafkaSpout spoutCorrelator = new OpaqueTridentKafkaSpout(tridentConfigCorrelation);
TridentTopology topology = new TridentTopology();
Stream existingObject = topology.newStream("kafka_spout_od1", spoutCorrelator)
.shuffle()
.each(new Fields("bytes"), new ProcessTask(), new Fields(RESULT_FIELD, OBJECT_FIELD))
.parallelismHint(Integer.parseInt(configuration.getProperty(PARALLELISM_HINT)));
//Create a state Factory to produce outputs to kafka topics.
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(kafkaProperties)
.withKafkaTopicSelector(new ODTopicSelector())
.withTridentTupleToKafkaMapper(new ODTupleToKafkaMapper());
existingObject.partitionPersist(stateFactory, new Fields(RESULT_FIELD, OBJECT_FIELD), new TridentKafkaUpdater(), new Fields(OBJECT_FIELD));
return topology.build();
}
和配置创建为:
private static Config createConfig(boolean local) {
Config conf = new Config();
conf.setMaxSpoutPending(1); // Also tried 2..6
conf.setNumWorkers(4);
return conf;
}
我们能做些什么来提高性能,或者通过增加并行任务的数量,或者在完成一批处理时避免饥饿?
我在storm-users上找到了一篇Nathan Marz关于为Trident设置并行度的老帖子:
我建议使用“name”函数来命名流的各个部分,这样UI就可以显示什么螺栓对应什么部分。
“三叉戟”将操作打包成尽可能少的螺栓。此外,它从不重新分区您的流,除非您已经执行了明确涉及重新分区的操作(例如,shuffle、groupBy、partitionBy、全局聚合等)。Trident的这个属性确保了您可以控制事情如何处理的排序/半排序。因此在这种情况下,groupBy之前的所有内容都必须具有相同的并行性,否则Trident将不得不重新划分流。因为您没有说要重新分区流,所以它不能这样做。通过引入一个重新分区操作,您可以为spout获得不同于每个spout的并行性,如下所示:
一、整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持; Storm Kafka Integration (0.10.x+) : 包含 Kafka 新版本的 consumer API,主要对 Kafka 0.10.x + 提供整合支持。 这里我服务端安装的
我是Apache Storm的新手,正在尝试为我的用例设计一个简单的拓扑。Storm中对并行性的解释(理解一个Storm拓扑的并行性)给我留下了两个疑问: 请回答以上疑问,如有不正确之处,请更正我的理解。
我理解了Kafka分区和Spark RDD分区之间的自动映射,并最终理解了Spark任务。然而,为了正确地调整我的执行器(核心数量)的大小,并最终确定节点和集群的大小,我需要理解文档中似乎掩盖了的一些内容。 null 例如,关于如何使用 --master local启动spark-streaming的建议。每个人都会说,在spark streaming的情况下,应该把local[2]最小化,因为其
主要内容:Storm是什么?,与Storm整合,提交到拓扑在本章中,我们将学习如何将Kafka与Apache Storm集成。 Storm是什么? Storm最初是由Nathan Marz和BackType团队创建的。 在很短的时间内,Apache Storm成为分布式实时处理系统的标准,用于处理大数据。 Storm速度非常快,每个节点每秒处理超过一百万个元组的基准时钟。 Apache Storm持续运行,从配置的源(Spouts)中消耗数据并将数据传递
我有3个测试类,由我想要并行运行的多个测试方法组成。我使用ThreadLocal来隔离每个线程的webdriver实例。当我按顺序运行测试时,一切看起来都很好,但是当我并行运行它们时,问题就出现了。下面是我的套件文件 我在@Beforeclass中初始化Web驱动程序BrowserClient.java如下。 这里使用的类即BrowserFactory.java 驱动程序工厂.java 我的测试类
如何在storm集群中处理这种情况(最好不创建外部服务): 我只需要一个由所有拓扑实例使用的spout,例如,如果输入数据通过网络文件夹被推送到群集,该网络文件夹将扫描新文件。 类似的问题与混凝土类型的螺栓。例如,当数据由被锁定到具体物理机器的许可的第三方库处理时。