当前位置: 首页 > 知识库问答 >
问题:

Kafka 0.9.0新的Java使用者API获取重复记录

赵超
2023-03-14

我创建了3个具有相同组id的使用者线程,订阅了相同的主题。已启用自动提交。由于所有3个使用者线程都订阅了相同的主题,我假设每个使用者都将获得一个要使用的分区,并将提交每个分区的偏移量日志。

但我在这里面临着一个奇怪的问题。我所有的留言都是重复的。我从我的每个线程在消费者端获得x倍多的记录。由于我的每个使用者线程都进行无限循环来轮询主题,所以我必须终止该进程。

我甚至尝试了单线程,但我仍然得到重复记录x次,仍然继续。

我张贴我的消费者代码供您参考。

public class ConsumerDemo {

public static void main(String[] args) {

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build();
    ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory);

    executor.submit(new ConsumerThread("topic1", "myThread-1"));
    executor.submit(new ConsumerThread("topic1", "myThread-2"));
    executor.submit(new ConsumerThread("topic1", "myThread-3"));

    //executor shutdown logic is skipped
}
}

使用者线程:

public class ConsumerThread  implements Runnable {

private static final String KAFKA_BROKER = "<<IP:port>>";

private final KafkaConsumer<String, String> consumer;

    public ConsumerThread(String topic, String name) {
        Properties props = new Properties();
        props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER);
        props.put("group.id", "DemoConsumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "6000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer(props);
        this.consumer.subscribe(Collections.singletonList(topic));
    }


    public void run() {
        try {
            boolean isRunning = true;
            while (isRunning) {
                ConsumerRecords<String,String> records= consumer.poll(10L);
                System.out.println("Partition Assignment to this Consumer: "+consumer.assignment());
                Iterator it = records.iterator();
                while(it.hasNext()) {
                    ConsumerRecord record = (ConsumerRecord)it.next();
                    System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset());
                }
            }
            consumer.close();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

同样非常重要的是,我的目标是精确地一次性语义。我知道在1000英里之外。任何帮助都是非常感谢的。

分配给这个使用者的分区:[topic1-1,topic1-0,topic1-2]

Kafka专家,除了上面的问题,我还在寻找另外两个输入。

  1. 请帮助我理解上面代码中的错误。
  2. 一般来说,如何准确地实现一次示意图。如果可能的话,请举例说明。
  3. 异常场景,如消费者关闭。如何在不丢失html" target="_blank">消息的情况下处理。

提前谢了。

共有1个答案

钱凌
2023-03-14

好吧,我发现了我的代码/底板有什么问题。

在我开始做原型之前,我应该完全阅读Kafka文档。

这是我发现的。

因此,我手动为每个使用者分配了一个分区,以确保我的使用者拥有该分区并控制偏移量,如下所示

consumer = new KafkaConsumer(props)    
TopicPartition partition = new TopicPartition(topic, partitionNum);
consumer.assign(Collections.singletonList(partition));

精确一次场景:为了确保我们准确地消耗一次消息,我们需要控制偏移量。虽然到目前为止我还没有尝试过,但根据我从大量谷歌搜索中学到的,这是一种更好的方法,可以将偏移与数据一起保存。最好是相同的交易。数据和偏移量肯定会保存或回滚以便重试。

任何其他解决方案都是赞赏的。

 类似资料:
  • 问题内容: 我正在尝试使用Java API从Elasticsearch获取所有记录。但我收到以下错误 n [[Wild Thing] [localhost:9300] [indices:data / read / search [phase / dfs]]]; 嵌套:QueryPhaseExecutionException [结果窗口太大,从+大小必须小于或等于:[10000],但为[10101]

  • 我正在尝试使用Java API从Elasticsearch中获取所有记录。但我收到以下错误 N[[Wild Thing][localhost:9300][索引:数据/读取/搜索[Phase/DFS]]];嵌套:QueryPhaseExecutionException[结果窗口太大,from+size必须小于或等于:[10000]但为[10101]。 我的代码如下所示 当前存在的记录总数为13188

  • 问题内容: 这个问题已经在这里有了答案 : 从时间戳列中选择当前月份记录mysql (9个答案) 如何在MySQL的当月第一天和当日之间进行选择? (15个答案) 6年前关闭。 如何从MySql数据库表中选择“当前月”记录? 像现在这样,当前月份是一月。我想获取一月月份的记录,其中我的表列的数据类型是。我想知道sql查询。 谢谢 问题答案: 该查询将为您工作:

  • 问题内容: 所以我有两个表,一个是RAWtable,另一个是MAINtable,如果存在多个记录(比较相同的名称,代码),我必须获取最新的groupID。例如,我在RAWtable上有这个: 这两个记录应视为一个,并且应仅返回此值: 该行是应插入主表中的唯一行。提供返回的最新GroupID(groupid是日期和时间的组合) 我已经尝试过了,但是没有用: 我怎样才能做到这一点?多谢。 问题答案:

  • 问题内容: 这个问题已经在这里有了答案 : SQL只选择列上具有最大值的行[重复] (27个答案) 去年关闭。 假设我有一个带有列的表: 我只想从每个用户那里获取最新消息,就像您在深入实际线程之前在FaceBook收件箱中看到的那样。 此查询似乎使我接近所需的结果: 但是查询给我的是来自每个用户的最旧消息,而不是最新消息。 我不知道这一点。 问题答案: 您应该找出每个组(子查询)中的最后一个值,然

  • 获取使用者列表 请求方式: GET 请求地址: https://open.qingtui.cn/v1/app/followers?access_token=ACCESS_TOKEN&page_size=PAGE_SIZE&request_page=REQUEST_PAGE 参数说明: 参数 必须 说明 access_token 是 接口调用凭证 page_size 是 请求页面数量 request