我有一个主题列表(目前为10个),其规模将来可能会增加。我知道我们可以在每个主题中产生多个线程(每个主题)使用,但是就我而言,如果主题数量增加,那么从主题中使用的线程数量就会增加,这是我不希望的,因为主题不是太频繁地获取数据,因此线程将处于理想状态。
有没有办法让一个消费者从所有主题中消费?如果是,那我们如何实现呢?另外,Kafka将如何维护偏移量?请提出答案。
我们可以使用以下API订阅多个主题:Consumer.subscribe(Arrays.asList(topic1,topic2),ConsumerRebalanceListener
obj)
消费者拥有主题信息,我们可以通过如下创建OffsetAndMetadata对象来使用Consumer.commitAsync或Consumer.commitSync()。
ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
问题内容: 我对Kafka比较陌生。我已经做了一些实验,但是对于消费者补偿我有些不清楚。根据到目前为止的了解,使用方启动时,将从其读取的偏移量由配置设置确定(如果我输入错了,请更正我)。 现在说,例如,该主题中有10条消息(偏移量0到9),一个消费者在崩溃之前(或我杀死该消费者之前)碰巧消耗了其中的5条消息。然后说我重新启动该使用者进程。我的问题是: 如果将设置为,它是否总是从偏移量0开始消耗?
设置 多个独立的源系统将AVRO事件推送到Kafka主题中。KafkaS3接收器连接器从本主题读取AVRO事件,并写入S3拼花格式。 问题 我们的架构注册表中的 AVRO 架构不符合标准。例如,源系统中的十进制字段在架构注册表中具有基类型字符串和逻辑类型十进制。AVRO 中不允许这些类型的组合(十进制逻辑类型必须始终具有基本类型修复/字节。 这些不正确的AVRO模式导致不正确的拼花文件模式。E、
我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。
但是,consumer只从主题中第一个未提交的消息开始轮询。我希望总是从偏移量0开始,不管提交的消息是什么。使用Alpakka消费者,如何手动指定偏移量?
我有一个主题列表(目前是10个),其大小可以在未来增加。我知道我们可以产生多个线程(每个主题)来消耗每个主题,但在我的例子中,如果主题的数量增加,那么消耗主题的线程数量也会增加,这是我不希望的,因为主题不会太频繁地获取数据,所以线程将是理想的。 有没有办法让单个消费者从所有话题中消费?如果是的话,我们怎样才能做到呢?另外,Kafka将如何维护抵消?请建议答案。
问题内容: 我有一个JMS客户端,它正在生成消息并通过JMS队列发送到其唯一的使用者。 我想要的是不止一个消费者收到这些消息。我想到的第一件事是将队列转换为主题,以便现有用户和新用户都可以订阅并将相同的消息传递给他们。 显然,这将涉及在生产者和消费者方面修改当前的客户代码。 我还要查看其他选项,例如创建第二个队列,这样就不必修改现有的使用者。我相信这种方法有很多优点,例如(如果我错了,请纠正我)在