当前位置: 首页 > 知识库问答 >
问题:

我怎么能发出Stormbotl的结果Kafka经纪人使用kafkbolt?

池永长
2023-03-14

我正在尝试更改storm中的wordcount示例:我不想显示结果,而是想将其发送到kafka集群。以下是构建拓扑的代码:

 Config config = new Config();
    config.setDebug(true);
    config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//set producer properties.
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("acks", "1");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
config.put("kafka.broker.config", props);
KafkaBolt bolt = new KafkaBolt()
            .withTopicSelector(new DefaultTopicSelector("tt"))
            .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper())
                        .withProducerProperties(props);

    BrokerHosts hosts = new ZkHosts("localhost:2181");
    SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "", "id1");
    spoutConfig.scheme = new SchemeAsMultiScheme(new KafkaBoltKeyValueScheme());
    KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);     
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("line-reader-spout", kafkaSpout);
    builder.setBolt("word-spitter", new WordSpitterBolt()).shuffleGrouping("line-reader-spout");
    builder.setBolt("word-counter", new WordCounterBolt()).shuffleGrouping("word-spitter");
    builder.setBolt("forwardToKafka", bolt,1).shuffleGrouping("word-counter");
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("HelloStorm", config, builder.createTopology());

正如你所看到的,我想把wordCountbolt的结果发送到Kafka主题。然而,我得到了这个例外:

java.lang.IllegalArgumentException: message does not exist
at org.apache.storm.tuple.Fields.fieldIndex(Fields.java:95) ~[storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:100) ~[storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:149) ~[storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper.getMessageFromTuple(FieldNameBasedTupleToKafkaMapper.java:46) ~[storm-kafka-1.0.5.jar:1.0.5]
at org.apache.storm.kafka.bolt.KafkaBolt.process(KafkaBolt.java:120) [storm-kafka-1.0.5.jar:1.0.5]
at org.apache.storm.topology.base.BaseTickTupleAwareRichBolt.execute(BaseTickTupleAwareRichBolt.java:38) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.daemon.executor$fn__4963$tuple_action_fn__4965.invoke(executor.clj:731) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__4884.invoke(executor.clj:461) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.disruptor$clojure_handler$reify__4398.onEvent(disruptor.clj:40) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:453) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:432) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.daemon.executor$fn__4963$fn__4976$fn__5029.invoke(executor.clj:850) [storm-core-1.0.5.jar:1.0.5]
at org.apache.storm.util$async_loop$fn__557.invoke(util.clj:484) [storm-core-1.0.5.jar:1.0.5]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]

7793[Thread-18-forwardToKafka-executor[2 2]]信息o.a.s.d.executor-BOLT失败任务:2次:-1元组:源:字计数器:4,流:默认值,id:{},[“发布”}]

在字数螺栓,我已经这样做了:

String str = input.getString(0);
collector.emit(new Values(str));
    collector.ack(input);

共有1个答案

钦永贞
2023-03-14

尝试添加庄家。声明(新字段(“消息”))在您的bolt的public void declareOutputFields(outputfields声明器declarer)函数中

public class MessageBolt extends BaseBasicBolt {

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String word = (String) input.getValue(0);
        String out = word ;
        collector.emit(new Values(out));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }
}

 类似资料:
  • 我在Windows子系统Linux上安装了kafka,并开始使用命令服务启动,所有服务都已启动。现在,当我尝试从Windows运行我的kafka-spring应用程序时,它显示以下错误:- 无法建立与节点-1(localhost/127.0.0.1:9092)的连接。经纪人可能不可用。 我的服务器属性是:- 我哪里出错了???

  • 我们正在使用带有 5 个代理的 Apache Kafka 2.2 版本。我们每天收到 50 数百万个事件,但我们达到了高 kafka CPU 使用率。我们使用默认的生产者/消费者/代理设置。 我对表演有一些疑问; 我们有不同的kafka流应用程序,它们进行聚合或连接操作以携带丰富的消息。我们所有的kafka-流应用程序都包含以下设置: < li >恰好一次:true < li >最小同步副本:3

  • 我正在尝试仅为代理间kerberos配置Kafka代理。然而,由于它似乎也想通过Kerberos连接到Zookeeper,所以我似乎总是遇到错误。我目前还没有设置任何Zookeeper键。 我的Kafka代理 JAAS 配置如下: 服务器属性 我用上述配置得到的错误如下: 换句话说,我只想要经纪人到经纪人的 kerberos 和经纪人 - 动物园管理员的普通SASL_SSL。这可能吗?

  • 在Flink中,我执行以下代码: 我推出3次同样的工作。 如果我用一个代理执行这段代码,它工作得很好,但是用3个broke(在3个不同的机器上)只读取一个分区。 null

  • 我们有一个带有三个代理(节点ID 0、1、2)的kafka集群和一个带有三个节点的zookeeper设置。

  • 我知道生产者/消费者需要与经纪人交谈以了解分区的领导者。经纪人与zk交谈以告诉他们加入了集群。 是真的吗 经纪人从zk知道谁是给定分区的负责人 zk发现经纪人离开/死亡。然后重新选举领导人,并向所有经纪人发送新的领导人信息 问题: 为什么我们需要经纪人相互沟通?这只是为了让tehy可以移动分区,或者他们也可以互相查询元数据。如果是这样,元数据交换的例子是什么