当前位置: 首页 > 知识库问答 >
问题:

如何在暴风雨中制作同步Kafka

班安平
2023-03-14

我试图使Kafka消费者同步消费Kafka的消息。

我遇到的实际问题是消息队列存储在Storm Spout中。

我想做的是让暴风雪等待Kafka的回复,然后让暴风雪消耗下一条信息。

我正在使用Storm KafkaSpout:

/**
     * Creates a configured kafka spout.
     * @param topicName Topic where the kafka spout subscribes
     * @return An instance of configured KafkaSpout
     */
    public KafkaSpout getkafkaSpout(String topicName){
        return new KafkaSpout(this.getSpoutConfig(topicName));
    }

    /**
     * Create the necessary configuration to create a new kafka spout.
     * @param topicName Topic where the kafka spout subscribes
     * @return Spout configuration
     */
    public SpoutConfig getSpoutConfig(String topicName) {
        SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(),topicName, "", String.join("-",topicName,RandomStringUtils.randomAlphanumeric(20)));
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
        return spoutConfig;
    }



builder.setSpout("kafkaPackedData", stormConfig.getkafkaSpout("topic_kafka"), 2);

我已经更新到Storm 2.0.0,我使用Storm kafka客户端。但是如果我将Storm队列配置为50:setMaxSpoutPending(50) 当我向Kafka发送许多数据时,它会停止使用这些数据。

我已经配置了Kafka消费者与下一个配置:

KafkaSpoutConfig spoutConf =  KafkaSpoutConfig.builder("stream1:9092", "kafkaToStormAlarms")
                    .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000") //Set session timeout
                    .setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000") //Set request timeout
                    .setOffsetCommitPeriodMs(10000)    //Set automatic confirmation time (in ms)
                    .setFirstPollOffsetStrategy(LATEST)    //Set to pull the latest messages
                    .setRetry(kafkaSpoutRetryService)
                    .build();

当Storm使用与MaxSpoutPending配置相同的50条消息时,它将停止使用更多消息。也许下一个螺栓没有正确发送ACK?我使用Kafkaconsumerspoot之后的下一个螺栓:

public class testBolt extends BaseBasicBolt {

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("MQTTmessage"));
    }

    @Override
    public void execute(Tuple tuple, BasicOutputCollector boc) {
        System.out.println("\n\n\n\nLLEGA BIENN AL SPLIT TEXT BOLT\n\n");
        System.out.println("TUPLE "+tuple);
        String text = tuple.getString(4);
        List<String> lines = Arrays.asList(text.split("\\r?\\n"));

        lines.forEach(line -> {
            boc.emit(new Values(line));
        });
    }
}

共有1个答案

鲜于喜
2023-03-14

关于节流喷嘴:是的,可以通过设置拓扑来实现。马克斯,喷口。将拓扑配置中的挂起的选项设置为1。如果您想获得良好的吞吐量,我不推荐使用它,但我假设您已经仔细考虑了为什么需要拓扑以这种方式运行。

关于新的喷口:stream1:9092是Kafka正在运行的服务器吗,以及kafkaToStormAlarms是您要发送到的主题吗?如果不是,那可能是你的问题。否则,请在storm/logs/workers artifacts中检查工作日志,它可能会告诉您喷口没有排放任何东西的原因。

最后是的,您绝对应该使用Storm-kafka-Client而不是Storm-kafka,否则您将无法升级到Storm 2.0.0或最新的Kafka版本。

 类似资料:
  • 我有一个EvaluationBolt(用于内存监视),我希望确保每个工作进程上运行一个执行器(在我的例子中,每个物理节点运行一个执行器,即supervisor.slots.ports只配置为端口6700)。在题目上我发现了这个问题: 干杯,孙铁麟

  • 默认情况下,当Storm喷口或螺栓遇到异常时,它会重新启动喷口或螺栓,然后再试一次。是否有任何配置选项使它停止拓扑,也许在N次重复尝试之后?(例如,Hadoop尝试了4次才放弃。) 我有一个Storm拓扑运行了77天,一个螺栓在每个元组上引发一个异常。在这种情况下,我宁愿它失败,这样我就会注意到有问题。

  • 我在Apache/Storm/external/storm-kafka-client中使用Storm Kafka Spout的storm-kafka-client和新的Kafka Consumer API。我的拓扑如下所示: 当我将kafkaspout.java更改为打印consumerRecords的偏移量时,我发现跳过了一些偏移量。跳过http://7xtjbx.com1.z0.glb.clo

  • 问题内容: 我对Java同步有疑问。我想知道我的类中是否有三个同步方法,并且一个线程在一个同步方法中获得了锁,另外两个将被锁定吗?我问这个问题是因为我对以下陈述感到困惑。 当线程处于对象的同步方法内部时,希望执行该同步方法或对象的任何其他同步方法的所有其他线程将必须等待。此限制不适用于已经具有锁并正在执行对象的同步方法的线程。这样的方法可以调用对象的其他同步方法而不会被阻塞。当然,任何线程都可以随

  • 我对Java同步有一个疑问。我想知道如果我的类中有三个同步方法,一个线程在一个同步方法中获取锁,其他两个会被锁定吗?我问这个问题是因为我与以下语句混淆了。 当一个线程在一个对象的同步方法内部时,所有希望执行这个同步方法或该对象的任何其他同步方法的其他线程都必须等待。这个限制不适用于已经有锁并正在执行该对象的同步方法的线程。这样的方法可以调用该对象的其他同步方法而不会被阻塞。该对象的非同步方法当然可

  • 利用 cocos2d 制作的一款休闲游戏。游戏中有九种不同的卡通水果,您可以拖动整行或整列上的水果,或交换屏幕中两个相邻水果的位置,一条直线上的三个或三个以上的相同水果将会消失,而您将得到分数。当您的分数逐步提高后,将会进入更高难度的关卡,接受更高难度的挑战。 游戏中允许使用一些道具。 [Code4App.com]