我有一个简单的Java生产者,如下所示
public class Producer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";
public static void main( String[] args ) throws Exception {
Producer<String, byte[]> producer = createProducer();
for(int i=0;i<3000;i++) {
String msg = "Test Message-" + i;
final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
producer.send(record).get();
System.out.println("Sent message " + msg);
}
producer.close();
}
private static Producer<String, byte[]> createProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("client.id", "AppFromJava");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.codec", "snappy");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<String, byte[]>(props);
}
}
我正在尝试读取以下数据
public class Consumer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";
public static void main( String[] args ) throws Exception {
Consumer<String, byte[]> consumer = createConsumer();
start(consumer);
}
static void start(Consumer<String, byte[]> consumer) throws InterruptedException {
final int giveUp = 10;
int noRecordsCount = 0;
int stopCount = 1000;
while (true) {
final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
// Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic());
});
consumer.commitSync();
break;
}
consumer.close();
System.out.println("DONE");
}
private static Consumer<String, byte[]> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
props.put("enable.auto.commit", "false");
// Create the consumer using props.
final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
}
但是消费者没有阅读来自kafka的任何消息。如果我在下面添加以下内容start()
consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());
然后,消费者开始阅读该主题。但是,每当使用者重新启动时,它都会从我不希望的主题开头读取消息。如果我在启动Consumer时添加以下配置
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
然后它从主题读取消息,但是如果使用者在处理所有消息之前重新启动,则它不会读取未处理的消息。
有人可以让我知道出了什么问题吗,我该如何解决?
Kafka代理和Zookeeper正在使用默认配置运行。
您对commitSync()的调用将确认最后一个poll()中批处理中的所有消息,而不是在处理消息时确认每个消息,这是我认为您要尝试的。
从文档中
“以上示例使用commitSync将所有收到的记录标记为已提交。在某些情况下,您可能希望通过显式指定偏移量来更好地控制已提交的记录。在下面的示例中,我们在处理完每个分区中的记录后提交偏移量。
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
注意:提交的偏移量应始终是应用程序将读取的下一条消息的偏移量。因此,在调用commitSync(offsets)时,应在最后处理的消息的偏移量上添加一个。”
我遵循这篇文档来实现上述场景。 那么,有没有人可以建议我如何一次使用多个订阅者从主题中读取消息。
我们正在使用Kafka流将数据写入接收器主题。我正在运行一个avro消费者命令行来检查接收器主题中是否有数据: bin/kafka-avro控制台-消费者-主题sink.output.topic-从开始-新消费者-引导-服务器 当我在kafka streams应用程序运行时同时运行消费者时,我会看到数据,但如果我停止消费者并在几分钟后再次运行,我不会看到任何数据。几乎没有可能: 1) 这是因为Ka
我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决
我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。
因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:
我的目标是使用Flink KafkaSource阅读来自Kafka主题的所有消息。我尝试用批处理和流模式执行。问题如下:当我将env.setParallelism设置为高于2时,我必须使用包含bug的接收器。于是,我设置了例如:< code > streamexecutionenvironment . setparallelism(1); 我想使用的Kafka主题包含3个分区。这是我的代码片段: