我需要在多个线程中使用来自Kafka分区的记录,每个线程上有唯一的记录要处理。我有以下代码,我不知道是什么错误
public class ConsumerThread implements Runnable {
public String name;
public ConsumerThread(String name){
this.name = name;
}
public Properties getDefaultProperty(){
Properties prop = new Properties();
prop.setProperty("group.id", "4");
prop.put("enable.auto.commit", "false");
prop.put("auto.offset.reset", "earliest");
prop.setProperty("bootstrap.servers", "localhost:9092");
prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.setProperty("max.poll.records","150");
return prop;
}
public void run() {
TopicPartition tp = new TopicPartition("my.topic", 0);
KafkaConsumer consumer = new KafkaConsumer(getDefaultProperty());
ArrayList tpList = new ArrayList<TopicPartition>();
tpList.add(tp);
consumer.assign(tpList);
ConsumerRecords poll = consumer.poll(1000);
Iterator it = poll.iterator();
consumer.commitAsync();
while(it.hasNext()){
ConsumerRecord cr = (ConsumerRecord) it.next();
System.out.println("From "+this.name+" : "+cr.value());
}
consumer.close();
System.out.println("Thread Exiting "+this.name);
}
}
结果
From Thread1 : produced_0
From Thread1 : produced_1
From Thread1 : produced_2
From Thread1 : produced_3
.
.
.
From Thread1 : produced_136
From Thread2 : produced_0
From Thread2 : produced_1
From Thread2 : produced_2
From Thread2 : produced_3
.
.
.
应为:
From Thread1 : produced_0
From Thread1 : produced_1
From Thread1 : produced_2
From Thread1 : produced_3
.
.
.
From Thread1 : produced_136
From Thread2 : produced_4
From Thread2 : produced_5
From Thread2 : produced_6
From Thread2 : produced_137
只有使用kafka使用者的subscribe方法才能自动将分区分配给使用者组。但是,您使用assign
和特定的主题分区,因此您负责将特定的分区分配给不同的使用者(但是您总是使用同一个分区0
,因此所有使用者都在使用同一个主题分区)。
我对Kafka是陌生的。我用spring boot创建了一个kafka消费者(spring-kafka dependency)。在我的应用程序中,我使用了consumerFactory和producerfactory beans进行配置。所以在我的应用程序中,我创建了如下的kafka消费者。 我的配置如下 所以我想并行消费,因为我可能会收到更多的消息。关于使用并行主题,我发现我需要为一个主题创建多
问题内容: 很抱歉,我只学习了PHP和MySQL,已经搜索了一个多星期,但是没有找到任何答案。 我创建了一个简单的财务脚本,该表如下所示: 我想要这样的结果 有人可以帮助我解决我的问题吗? 谢谢 问题答案: 正如@Quassnoi指出的那样,这对于MySQL来说不是很有效。我尝试使用怪胎连接而不是子查询,因为内部查询本身就可以使用。 编辑 对此引起了一些兴趣,发现连接版本的速度是@Quassnoi
目前,我有一个类分数,它允许我用三种不同的方式创建分数 对于一个整数,在这种情况下,给定的整数将是分子,分母将设置为1 有2个整数,分子和分母 最后一种方法是解析一个字符串,该字符串必须与REGEX-?\d/[1-9]\d* gcd将尽可能减少生成的分数。 我现在想实现的是,具有相同分子和分母的分数实例具有相同的引用例如。 应该返回true。 我研究了一些关于泛型和边界的章节,但我不确定这是否是我
我是Kafka的初学者。我知道具有相同组id的多个消费者不能在一个主题中使用来自同一个分区的消息。我想知道如果来自一个消费组的多个Kafka消费者从一个分区读取相同的消息会发生什么,为什么这是一件坏事。 。
我在List上使用并行流。我想它会使用WorkJoinPool生成多个工作线程,根据日志,不知何故一直只有一个线程: 以下是我的代码的简短版本: [编辑]我在本地机器上尝试了一些非常简单的代码,它确实产生了多个线程。不幸的是,我无法在本地或其他环境中运行生产代码。。 为什么我的并行流不使用WorkJoinPool生成多个线程? 非常感谢。
问题内容: 在以下代码中,我不明白为什么当它属于两个不同的对象时具有相同的ID? 问题答案: 我认为这是正在发生的事情: 取消引用时,将在内存中创建其副本。该存储位置由以下位置返回 由于没有引用到刚刚创建的方法的副本,因此GC将其回收,并且该内存地址再次可用 取消引用时,将在相同的内存地址(可用)中创建它的副本,您可以再次使用该地址。 第二个副本是GCd 如果您要运行一堆其他代码并再次检查实例方法