当前位置: 首页 > 知识库问答 >
问题:

Kafka --from-begin CLI vs Kafka Java API

司宏伯
2023-03-14

最近在使用 Kafka 时,我的应用程序需要从头开始访问主题中的所有消息。因此,在编写Kafka Consumer(使用Java API)时,我可以从头开始读取消息,但它只返回主题中的前500条消息。试图增加

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,整数.MAX_VALUE);props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,Long.MAX_VALUE);

但它仍然不会返回所有消息

kafka-console-consumer --bootstrap-server localhost:9092 --topic --from from

它会返回我所有的5000条记录。

伙计们缺少任何配置吗?任何帮助都将是可观的。

消费者代码。

public ConsumerRecords<byte[], byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
    consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
    consumer.poll(0);
    // Reading topic offset from beginning
    consumer.seekToBeginning(consumer.assignment());
    // poll and time-out if no replies
    ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
    consumer.close();
    return records;
}

然而,我改变了消费者:

public Map<String, byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
    Map<String, byte[]> entityMap = new HashMap<String, byte[]>();
    boolean stop = false;
    consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
    consumer.poll(0);
    // Reading topic offset from beginning
    consumer.seekToBeginning(consumer.assignment());
    while (!stop) {
        // Request unread messages from the topic.
        ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(1000);
        Iterator<ConsumerRecord<byte[], byte[]>> iterator = consumerRecords.iterator();
        if (iterator.hasNext()) {
            while (iterator.hasNext()) {
                ConsumerRecord<byte[], byte[]> record = iterator.next();
                // Iterate through returned records, extract the value
                // of each message, and print the value to standard output.
                entityMap.put(new String(record.key()), record.value());
            }
        } else {
            stop = true;
        }
    }
    return entityMap;
}

虽然现在它正在获取所有记录,但我想知道是否有更好的方法。

共有1个答案

幸阳波
2023-03-14

使用< code>seekToBeginning()来消费所有消息没有任何问题。

然而,有一种稍微灵活一点的方法可以达到同样的效果。你可以通过配置做到这一点,这允许你从开始到结束都使用相同的代码。这也是< code > Kafka-console-consumer . sh 工具使用的方法:

> < li>

将< code>auto.offset.reset设置为< code >最早

group.id 设置为新/随机值。如果您对跟踪此消费者位置不感兴趣,但总是想从头开始,您还可以将 enable.auto.commit 设置为 false 以避免污染偏移量主题。

从您的逻辑中删除search kTo的开始()

关于你的逻辑,有几件事你应该考虑:

> < li>

有时< code>poll()会返回一个空集合,即使它尚未到达末尾。也题是一个流(无界),端可动。无论哪种方式,您都可以使用< code>endOffsets()来查找当前的结束偏移量,并将其与返回的消息的偏移量进行比较

你可能不想投票,直到你到达终点。一个主题的大小可以是几GB,包含数百万条记录。将所有内容存储在地图中很容易导致内存不足问题。

 类似资料:
  • 嗨,我是新来的斯托姆和Kafka。我使用的是storm 1.0.1和kafka 0.10.0,我们有一个kafkaspout可以接收来自kafka主题的java bean。我花了几个小时来寻找正确的方法。发现很少文章是有用的,但没有一个方法为我工作到目前为止。 KafKaProducer: } Kyro串行器:

  • .from( target:Object, duration:Number, vars:Object, position:* ) : * 添加一个TweenLite.from()动画到时间轴,相当于add(TweenLite.from(...)),以下两行产生相同的结果: myTimeline.add( TweenLite.from(element, 1, {left:100, opacity:

  • from 将其他类型或者数据结构转换为 Observable 当你在使用 Observable 时,如果能够直接将其他类型转换为 Observable,这将是非常省事的。from 操作符就提供了这种功能。 演示 将一个数组转换为 Observable: let numbers = Observable.from([0, 1, 2]) 它相当于: let numbers = Observable<

  • From请求头中包含的 Internet 电子邮件地址谁控制了请求的用户代理的人类用户。 如果您正在运行机器人用户代理(例如搜寻器),From则应发送标题,以便在服务器出现问题(例如机器人发送过多,不需要或无效的请求)时联系您。 您不应该使用From标题进行访问控制或身份验证。 Header type Request header Forbidden header name no 语法 From:

  • from 函数签名: from(ish: ObservableInput, mapFn: function, thisArg: any, scheduler: Scheduler): Observable 将数组、promise 或迭代器转换成 observable 。 对于数组和迭代器,所有包含的值都会被作为序列发出! 此操作符也可以用来将字符串作为字符的序列发出! 示例 示例 1: 数组转换而

  • 描述 (Description) 可以使用链接上的特殊类和数据属性打开和关闭所需的弹出框。 以下陈述简要描述了如何打开/关闭弹出窗口 - 要打开popover,请将open-popover类添加到任何HTML元素,并且可以使用add close-popover类来关闭popover。 当你在app中有很多popover时,你需要将data-popover=".mypopover"属性指定给适当的d