我创建了3个具有相同组id的使用者线程,订阅了相同的主题。已启用自动提交。由于所有3个使用者线程都订阅了相同的主题,我假设每个使用者都将获得一个要使用的分区,并将提交每个分区的偏移量日志。
但我在这里面临着一个奇怪的问题。我所有的留言都是重复的。我从我的每个线程在消费者端获得x倍多的记录。由于我的每个使用者线程都进行无限循环来轮询主题,所以我必须终止该进程。
我甚至尝试了单线程,但我仍然得到重复记录x次,仍然继续。
我张贴我的消费者代码供您参考。
public class ConsumerDemo {
public static void main(String[] args) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build();
ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory);
executor.submit(new ConsumerThread("topic1", "myThread-1"));
executor.submit(new ConsumerThread("topic1", "myThread-2"));
executor.submit(new ConsumerThread("topic1", "myThread-3"));
//executor shutdown logic is skipped
}
}
使用者线程:
public class ConsumerThread implements Runnable {
private static final String KAFKA_BROKER = "<<IP:port>>";
private final KafkaConsumer<String, String> consumer;
public ConsumerThread(String topic, String name) {
Properties props = new Properties();
props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER);
props.put("group.id", "DemoConsumer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "6000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer(props);
this.consumer.subscribe(Collections.singletonList(topic));
}
public void run() {
try {
boolean isRunning = true;
while (isRunning) {
ConsumerRecords<String,String> records= consumer.poll(10L);
System.out.println("Partition Assignment to this Consumer: "+consumer.assignment());
Iterator it = records.iterator();
while(it.hasNext()) {
ConsumerRecord record = (ConsumerRecord)it.next();
System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset());
}
}
consumer.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
同样非常重要的是,我的目标是精确地一次性语义。我知道在1000英里之外。任何帮助都是非常感谢的。
分配给这个使用者的分区:[topic1-1,topic1-0,topic1-2]
Kafka专家,除了上面的问题,我还在寻找另外两个输入。
提前谢了。
好吧,我发现了我的代码/底板有什么问题。
在我开始做原型之前,我应该完全阅读Kafka文档。
这是我发现的。
因此,我手动为每个使用者分配了一个分区,以确保我的使用者拥有该分区并控制偏移量,如下所示
consumer = new KafkaConsumer(props)
TopicPartition partition = new TopicPartition(topic, partitionNum);
consumer.assign(Collections.singletonList(partition));
精确一次场景:为了确保我们准确地消耗一次消息,我们需要控制偏移量。虽然到目前为止我还没有尝试过,但根据我从大量谷歌搜索中学到的,这是一种更好的方法,可以将偏移与数据一起保存。最好是相同的交易。数据和偏移量肯定会保存或回滚以便重试。
任何其他解决方案都是赞赏的。
问题内容: 我正在尝试使用Java API从Elasticsearch获取所有记录。但我收到以下错误 n [[Wild Thing] [localhost:9300] [indices:data / read / search [phase / dfs]]]; 嵌套:QueryPhaseExecutionException [结果窗口太大,从+大小必须小于或等于:[10000],但为[10101]
我正在尝试使用Java API从Elasticsearch中获取所有记录。但我收到以下错误 N[[Wild Thing][localhost:9300][索引:数据/读取/搜索[Phase/DFS]]];嵌套:QueryPhaseExecutionException[结果窗口太大,from+size必须小于或等于:[10000]但为[10101]。 我的代码如下所示 当前存在的记录总数为13188
问题内容: 这个问题已经在这里有了答案 : 从时间戳列中选择当前月份记录mysql (9个答案) 如何在MySQL的当月第一天和当日之间进行选择? (15个答案) 6年前关闭。 如何从MySql数据库表中选择“当前月”记录? 像现在这样,当前月份是一月。我想获取一月月份的记录,其中我的表列的数据类型是。我想知道sql查询。 谢谢 问题答案: 该查询将为您工作:
问题内容: 所以我有两个表,一个是RAWtable,另一个是MAINtable,如果存在多个记录(比较相同的名称,代码),我必须获取最新的groupID。例如,我在RAWtable上有这个: 这两个记录应视为一个,并且应仅返回此值: 该行是应插入主表中的唯一行。提供返回的最新GroupID(groupid是日期和时间的组合) 我已经尝试过了,但是没有用: 我怎样才能做到这一点?多谢。 问题答案:
问题内容: 这个问题已经在这里有了答案 : SQL只选择列上具有最大值的行[重复] (27个答案) 去年关闭。 假设我有一个带有列的表: 我只想从每个用户那里获取最新消息,就像您在深入实际线程之前在FaceBook收件箱中看到的那样。 此查询似乎使我接近所需的结果: 但是查询给我的是来自每个用户的最旧消息,而不是最新消息。 我不知道这一点。 问题答案: 您应该找出每个组(子查询)中的最后一个值,然
获取使用者列表 请求方式: GET 请求地址: https://open.qingtui.cn/v1/app/followers?access_token=ACCESS_TOKEN&page_size=PAGE_SIZE&request_page=REQUEST_PAGE 参数说明: 参数 必须 说明 access_token 是 接口调用凭证 page_size 是 请求页面数量 request