当前位置: 首页 > 知识库问答 >
问题:

如何解决Kafka暴风雨,只有消费者从Kafka的一半数据?

赫连黎昕
2023-03-14
<dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.0.2</version>
        <scope>provided</scope>
</dependency>
<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.0.0</version>
        <scope>compile</scope>
</dependency>

我在Apache/Storm/external/storm-kafka-client中使用Storm Kafka Spout的storm-kafka-client和新的Kafka Consumer API。我的拓扑如下所示:

public class AnalyseTopo {
private static final Logger LOG = LoggerFactory.getLogger(AnalyseTopo.class);


private static final String[] STREAMS = new String[]{"test_stream"};
private static final String[] TOPICS = new String[]{"online"};

public static void main(String[] args) throws Exception {
    new AnalyseTopo().runMain(args);
}

protected void runMain(String[] args) throws Exception {
    if (args.length == 0) {
        submitTopologyLocalCluster(getTopologyKafkaSpout(), getConfig());
    } else {
        submitTopologyRemoteCluster(args[0], getTopologyKafkaSpout(), getConfig());
    }
}

protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptException {
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("KafkaTest", config, topology);
    stopWaitingForInput();
}

protected void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception {
    StormSubmitter.submitTopology(arg, config, topology);
}

protected void stopWaitingForInput() {
    try {
        System.out.println("PRESS ENTER TO STOP Now");
        new BufferedReader(new InputStreamReader(System.in)).readLine();
        System.exit(0);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

protected StormTopology getTopologyKafkaSpout() {
    final TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);

    // 1. 先用fastjson解析每条日志记录
    builder.setBolt("json_parse", new JsonParseBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);

    // 2. 每隔60s去计算一下应用频道的统计量,固定窗口为时间增加60s
    Duration oneMinite = new Duration(60, TimeUnit.SECONDS);// 60 -> 2
    IWindowedBolt appChannelBolt = new AppChannelStatBolt()
            .withTimestampField("timestamp")
            .withLag(oneMinite)
            .withWatermarkInterval(oneMinite)
            .withTumblingWindow(oneMinite);
    builder.setBolt("app_channel", appChannelBolt, 3)
            .fieldsGrouping("json_parse", new Fields("timestamp"));   //from app_channel change to timestamp

    // 3. 将这些统计给到app整体统计,channel整体统计
    IWindowedBolt appStatBolt = new AppStatBolt()
            .withTimestampField("timestamp")
            .withLag(oneMinite)
            .withWatermarkInterval(oneMinite)
            .withTumblingWindow(oneMinite);
    builder.setBolt("app_stat", appStatBolt, 1)
            .fieldsGrouping("app_channel", "stat", new Fields("appid"));

    IWindowedBolt channelStatBolt = new ChannelStatBolt()
            .withTimestampField("timestamp")
            .withLag(oneMinite)
            .withWatermarkInterval(oneMinite)
            .withTumblingWindow(oneMinite);
    builder.setBolt("channel_stat", channelStatBolt, 1)
            .fieldsGrouping("app_channel", "stat", new Fields("channel"));

    // 4. 写道mysql持久化保存
    IWindowedBolt batchWriteBolt = new BatchWriteBolt()
            .withTumblingWindow(new BaseWindowedBolt.Count(10));
    builder.setBolt("batch_write", batchWriteBolt, 1)
            .shuffleGrouping("app_channel", "sql")
            .shuffleGrouping("app_stat", "sql")
            .shuffleGrouping("channel_stat", "sql");

    return builder.createTopology();
}


protected Config getConfig() {
    Config config = new Config();
    config.setDebug(true);
    config.put("topology.message.timeout.secs", 1000);
    return config;
}

protected KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
    return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
            .setOffsetCommitPeriodMs(2000)
            .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
            .setMaxUncommittedOffsets(50000)
            .setPollTimeoutMs(2000)
            .build();
}

protected KafkaSpoutRetryService getRetryService() {
    return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500),
            TimeInterval.microSeconds(2), 35, TimeInterval.seconds(10));//change Integer.MAXVALUE to 3->50
}

protected Map<String, Object> getKafkaConsumerProps() {
    Map<String, Object> props = new HashMap<>();
    props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "cstr-01:9092,cstr-02:9092,cstr-03:9092");
    props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "storm2");
    props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
    props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
    //props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
    //props.put(KafkaSpoutConfig.Consumer.AUTO_COMMIT_INTERVAL_MS, "5000");

    // add resolve commit failure param
    //props.put("session.timeout.ms", "50000");   //increase
    //props.put("max.poll.records", "50000");     //reduce
    return props;
}

protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
    return new KafkaSpoutTuplesBuilderNamedTopics.Builder<String, String>(
            new OnlineTupleBuilder<>(TOPICS[0]))
            .build();
}

protected KafkaSpoutStreams getKafkaSpoutStreams() {
    final Fields outputFields = new Fields("topic", "partition", "offset", "value");
    return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0]})
            .build();
}

当我将kafkaspout.java更改为打印consumerRecords的偏移量时,我发现跳过了一些偏移量。跳过http://7xtjbx.com1.z0.glb.clouddn.com/stack.png中的偏移量图像

对于这个问题我该怎么做?是storm-kafka-client使用新消费者有问题吗?谢谢!

共有1个答案

凤高翰
2023-03-14

我在使用自动提交时解决了这个问题。

props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
props.put(KafkaSpoutConfig.Consumer.AUTO_COMMIT_INTERVAL_MS, "5000");

您可能会发现一个NullPointException并使用try catch is ok来解决它,并且需要删除kafkaspout.java 297中的numuncommittedoffsets++

 类似资料:
  • 我使用以下代码来读取主题的数据,即“sha-test2”,但它正在读取完全替代的代码行,即 10 行中的 20 行。但是当我运行控制台时,它显示所有 20 行。即.bin/kafka-console-consumer.sh --zookeeper 本地主机:2181 --主题 sha-test2 --从头 我做错了什么?非常感谢您的帮助。

  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

  • 我试图使Kafka消费者同步消费Kafka的消息。 我遇到的实际问题是消息队列存储在Storm Spout中。 我想做的是让暴风雪等待Kafka的回复,然后让暴风雪消耗下一条信息。 我正在使用Storm KafkaSpout: 我已经更新到Storm 2.0.0,我使用Storm kafka客户端。但是如果我将Storm队列配置为50:

  • 一、消费者和消费者群组 在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响。Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度。此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 当一个组中只有一个消费者,并且认为消费者无法在session.time.out内进行轮询时,将触发重新平衡,但是在这种情况下,组中只有一个消费者,现在假设session.time.out是30秒和消费者民意调查后50秒组协调员将识别消费者后50秒,并允许它提交偏移或协调员将断开消费者和没有偏移得到提交,并将重新平衡消费者与新的消费者标识?如果上次提交的偏移量是345678,在下一次轮询中,它处理了