当前位置: 首页 > 知识库问答 >
问题:

具有相同组id的Kafka用户线程使用相同记录

龚勇锐
2023-03-14

我需要在多个线程中使用来自Kafka分区的记录,每个线程上有唯一的记录要处理。我有以下代码,我不知道是什么错误

public class ConsumerThread implements Runnable {
    public String name;
    public ConsumerThread(String name){
        this.name = name;
    }
    public Properties getDefaultProperty(){
        Properties prop = new Properties();
        prop.setProperty("group.id", "4");
        prop.put("enable.auto.commit", "false");
        prop.put("auto.offset.reset", "earliest");
        prop.setProperty("bootstrap.servers", "localhost:9092");
        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("max.poll.records","150");
        return prop;
    }
    public void run() {
        TopicPartition tp = new TopicPartition("my.topic", 0);
        KafkaConsumer consumer = new KafkaConsumer(getDefaultProperty());
        ArrayList tpList = new ArrayList<TopicPartition>();
        tpList.add(tp);
        consumer.assign(tpList);
        ConsumerRecords poll = consumer.poll(1000);
        Iterator it = poll.iterator();
        consumer.commitAsync();
        while(it.hasNext()){
            ConsumerRecord cr = (ConsumerRecord) it.next();
            System.out.println("From "+this.name+" : "+cr.value());
        }
        consumer.close();
        System.out.println("Thread Exiting "+this.name);
    }
}

结果

From Thread1 : produced_0
From Thread1 : produced_1
From Thread1 : produced_2
From Thread1 : produced_3
.
.
.
From Thread1 : produced_136
From Thread2 : produced_0
From Thread2 : produced_1
From Thread2 : produced_2
From Thread2 : produced_3
.
.
.


应为:

From Thread1 : produced_0
From Thread1 : produced_1
From Thread1 : produced_2
From Thread1 : produced_3
.
.
.
From Thread1 : produced_136
From Thread2 : produced_4
From Thread2 : produced_5
From Thread2 : produced_6
From Thread2 : produced_137

共有1个答案

顾穆冉
2023-03-14

只有使用kafka使用者的subscribe方法才能自动将分区分配给使用者组。但是,您使用assign和特定的主题分区,因此您负责将特定的分区分配给不同的使用者(但是您总是使用同一个分区0,因此所有使用者都在使用同一个主题分区)。

 类似资料:
  • 我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用程序中,我创建了如下的kafka消费者。 我的配置如下 所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多

  • 问题内容: 很抱歉,我只学习了PHP和MySQL,已经搜索了一个多星期,但是没有找到任何答案。 我创建了一个简单的财务脚本,该表如下所示: 我想要这样的结果 有人可以帮助我解决我的问题吗? 谢谢 问题答案: 正如@Quassnoi指出的那样,这对于MySQL来说不是很有效。我尝试使用怪胎连接而不是子查询,因为内部查询本身就可以使用。 编辑 对此引起了一些兴趣,发现连接版本的速度是@Quassnoi

  • 目前,我有一个类分数,它允许我用三种不同的方式创建分数 对于一个整数,在这种情况下,给定的整数将是分子,分母将设置为1 有2个整数,分子和分母 最后一种方法是解析一个字符串,该字符串必须与REGEX-?\d/[1-9]\d* gcd将尽可能减少生成的分数。 我现在想实现的是,具有相同分子和分母的分数实例具有相同的引用例如。 应该返回true。 我研究了一些关于泛型和边界的章节,但我不确定这是否是我

  • 我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。

  • 我在List上使用并行流。我想它会使用WorkJoinPool生成多个工作线程,根据日志,不知何故一直只有一个线程: 以下是我的代码的简短版本: [编辑]我在本地机器上尝试了一些非常简单的代码,它确实产生了多个线程。不幸的是,我无法在本地或其他环境中运行生产代码。。 为什么我的并行流不使用WorkJoinPool生成多个线程? 非常感谢。

  • 问题内容: 在以下代码中,我不明白为什么当它属于两个不同的对象时具有相同的ID? 问题答案: 我认为这是正在发生的事情: 取消引用时,将在内存中创建其副本。该存储位置由以下位置返回 由于没有引用到刚刚创建的方法的副本,因此GC将其回收,并且该内存地址再次可用 取消引用时,将在相同的内存地址(可用)中创建它的副本,您可以再次使用该地址。 第二个副本是GCd 如果您要运行一堆其他代码并再次检查实例方法