当前位置: 首页 > 知识库问答 >
问题:

阿帕奇Storm中的JSON Kafka喷口

越嘉石
2023-03-14
val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

cluster.shutdown()

共有1个答案

逑景铄
2023-03-14

您可以做几件事:

您可以定义RecordTranslator。该接口允许您根据从Kafka读取的ConsumerRecord定义spout将如何构造元组。

默认实现如下所示:

public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");

    @Override
    public List<Object> apply(ConsumerRecord<K, V> record) {
        return new Values(record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
    }

    @Override
    public Fields getFieldsFor(String stream) {
        return FIELDS;
    }
    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }
 类似资料:
  • 我在Apache Storm拓扑上运行了一个性能测试,并注意到kinesis-spout中的“failed”计数非常高(几乎占了元组的1/3)。这个数值是多少?

  • Apache Kafka:分布式消息传递系统 Apache Storm:实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据? 在实时数据管道方面,我觉得两者做的工作是一样的。如何在数据管道上同时使用这两种技术?

  • 我正在尝试用我的Storm设置实现最大的性能。我正在通过Kafka发送数以万计的消息,这些消息将被Storm拓扑接收。 当我在Storm UI中查看时,我注意到所有消息都流向一个执行器,而不是在所有执行器之间进行负载平衡。(见附件截图)。

  • 我无法找到正确集成Kafka和Apache Storm Trident的好文档。我试图查看相关的问题之前张贴在这里,但没有充分的信息。 这样,我就可以为我的拓扑生成流,如下面的代码所示 虽然我提供了并行性和我的分区,但是只有一个Kafka Spout的执行器在运行,因此我无法很好地扩展它。 有谁能指导我更好地将Apache Storm Trident(2.0.0)与Apache Kafka(1.0

  • 我正在本地开发一个Storm拓扑。我正在使用Storm 0.9.2孵化,并开发了一个简单的拓扑。当我使用LocalCluster()选项部署它时,它工作得很好,但它不会显示在我的Storm UI中,它只是执行而已。 当我定期部署它时,它会在我的Storm UI中显示拓扑结构,但当我单击它时,不会看到喷口或螺栓。 我还尝试了许多Storm启动项目中的示例WordCountTopology。同样的行为

  • 我正在做一个学术项目,涉及传感器的流数据。我已经包围了苍鹭(Storm的接班人)和尼菲。两者都支持内置背压,这对我的项目至关重要。Apache Nifi和Heron之间的主要区别是什么? 哪款更适合物联网应用?