val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()
val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
cluster.shutdown()
您可以做几件事:
您可以定义RecordTranslator
。该接口允许您根据从Kafka读取的ConsumerRecord
定义spout将如何构造元组。
默认实现如下所示:
public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
@Override
public List<Object> apply(ConsumerRecord<K, V> record) {
return new Values(record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
@Override
public Fields getFieldsFor(String stream) {
return FIELDS;
}
@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
我在Apache Storm拓扑上运行了一个性能测试,并注意到kinesis-spout中的“failed”计数非常高(几乎占了元组的1/3)。这个数值是多少?
Apache Kafka:分布式消息传递系统 Apache Storm:实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据? 在实时数据管道方面,我觉得两者做的工作是一样的。如何在数据管道上同时使用这两种技术?
我正在尝试用我的Storm设置实现最大的性能。我正在通过Kafka发送数以万计的消息,这些消息将被Storm拓扑接收。 当我在Storm UI中查看时,我注意到所有消息都流向一个执行器,而不是在所有执行器之间进行负载平衡。(见附件截图)。
我无法找到正确集成Kafka和Apache Storm Trident的好文档。我试图查看相关的问题之前张贴在这里,但没有充分的信息。 这样,我就可以为我的拓扑生成流,如下面的代码所示 虽然我提供了并行性和我的分区,但是只有一个Kafka Spout的执行器在运行,因此我无法很好地扩展它。 有谁能指导我更好地将Apache Storm Trident(2.0.0)与Apache Kafka(1.0
我正在本地开发一个Storm拓扑。我正在使用Storm 0.9.2孵化,并开发了一个简单的拓扑。当我使用LocalCluster()选项部署它时,它工作得很好,但它不会显示在我的Storm UI中,它只是执行而已。 当我定期部署它时,它会在我的Storm UI中显示拓扑结构,但当我单击它时,不会看到喷口或螺栓。 我还尝试了许多Storm启动项目中的示例WordCountTopology。同样的行为
我正在做一个学术项目,涉及传感器的流数据。我已经包围了苍鹭(Storm的接班人)和尼菲。两者都支持内置背压,这对我的项目至关重要。Apache Nifi和Heron之间的主要区别是什么? 哪款更适合物联网应用?