最近在使用 Kafka 时,我的应用程序需要从头开始访问主题中的所有消息。因此,在编写Kafka Consumer(使用Java API)时,我可以从头开始读取消息,但它只返回主题中的前500条消息。试图增加
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,整数.MAX_VALUE);props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,Long.MAX_VALUE);
但它仍然不会返回所有消息,
kafka-console-consumer --bootstrap-server localhost:9092 --topic --from from
它会返回我所有的5000条记录。
伙计们缺少任何配置吗?任何帮助都将是可观的。
消费者代码。
public ConsumerRecords<byte[], byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
consumer.poll(0);
// Reading topic offset from beginning
consumer.seekToBeginning(consumer.assignment());
// poll and time-out if no replies
ConsumerRecords<byte[], byte[]> records = consumer.poll(1000);
consumer.close();
return records;
}
然而,我改变了消费者:
public Map<String, byte[]> pullFromKafka(String topicname, Map<String, Object> props) {
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
Map<String, byte[]> entityMap = new HashMap<String, byte[]>();
boolean stop = false;
consumer.subscribe(new ArrayList<String>(Collections.singletonList(topicname)));
consumer.poll(0);
// Reading topic offset from beginning
consumer.seekToBeginning(consumer.assignment());
while (!stop) {
// Request unread messages from the topic.
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(1000);
Iterator<ConsumerRecord<byte[], byte[]>> iterator = consumerRecords.iterator();
if (iterator.hasNext()) {
while (iterator.hasNext()) {
ConsumerRecord<byte[], byte[]> record = iterator.next();
// Iterate through returned records, extract the value
// of each message, and print the value to standard output.
entityMap.put(new String(record.key()), record.value());
}
} else {
stop = true;
}
}
return entityMap;
}
虽然现在它正在获取所有记录,但我想知道是否有更好的方法。
使用< code>seekToBeginning()来消费所有消息没有任何问题。
然而,有一种稍微灵活一点的方法可以达到同样的效果。你可以通过配置做到这一点,这允许你从开始到结束都使用相同的代码。这也是< code > Kafka-console-consumer . sh 工具使用的方法:
> < li>
将< code>auto.offset.reset设置为< code >最早
将 group.id
设置为新/随机值。如果您对跟踪此消费者位置不感兴趣,但总是想从头开始,您还可以将 enable.auto.commit
设置为 false 以避免污染偏移量主题。
从您的逻辑中删除search kTo的开始()
关于你的逻辑,有几件事你应该考虑:
> < li>
有时< code>poll()会返回一个空集合,即使它尚未到达末尾。也题是一个流(无界),端可动。无论哪种方式,您都可以使用< code>endOffsets()来查找当前的结束偏移量,并将其与返回的消息的偏移量进行比较
你可能不想投票,直到你到达终点。一个主题的大小可以是几GB,包含数百万条记录。将所有内容存储在地图中很容易导致内存不足问题。
嗨,我是新来的斯托姆和Kafka。我使用的是storm 1.0.1和kafka 0.10.0,我们有一个kafkaspout可以接收来自kafka主题的java bean。我花了几个小时来寻找正确的方法。发现很少文章是有用的,但没有一个方法为我工作到目前为止。 KafKaProducer: } Kyro串行器:
.from( target:Object, duration:Number, vars:Object, position:* ) : * 添加一个TweenLite.from()动画到时间轴,相当于add(TweenLite.from(...)),以下两行产生相同的结果: myTimeline.add( TweenLite.from(element, 1, {left:100, opacity:
from 将其他类型或者数据结构转换为 Observable 当你在使用 Observable 时,如果能够直接将其他类型转换为 Observable,这将是非常省事的。from 操作符就提供了这种功能。 演示 将一个数组转换为 Observable: let numbers = Observable.from([0, 1, 2]) 它相当于: let numbers = Observable<
From请求头中包含的 Internet 电子邮件地址谁控制了请求的用户代理的人类用户。 如果您正在运行机器人用户代理(例如搜寻器),From则应发送标题,以便在服务器出现问题(例如机器人发送过多,不需要或无效的请求)时联系您。 您不应该使用From标题进行访问控制或身份验证。 Header type Request header Forbidden header name no 语法 From:
from 函数签名: from(ish: ObservableInput, mapFn: function, thisArg: any, scheduler: Scheduler): Observable 将数组、promise 或迭代器转换成 observable 。 对于数组和迭代器,所有包含的值都会被作为序列发出! 此操作符也可以用来将字符串作为字符的序列发出! 示例 示例 1: 数组转换而
描述 (Description) 可以使用链接上的特殊类和数据属性打开和关闭所需的弹出框。 以下陈述简要描述了如何打开/关闭弹出窗口 - 要打开popover,请将open-popover类添加到任何HTML元素,并且可以使用add close-popover类来关闭popover。 当你在app中有很多popover时,你需要将data-popover=".mypopover"属性指定给适当的d