我在一台Windows主机上安装了两个Kafka 2.1.0代理。默认复制因子设置为2。所有其他设置均为默认设置。
networkClient:[Consumer ClientID=Consumer-1,GroupID=SOUT]无法建立到节点-2(/192.168.0.1:19092)的连接。代理可能不可用。
消费者:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Properties;
public final class PumpToConsoleSimple {
private static final Duration pollTimeout = Duration.ofSeconds(10);
public static void main(String[] args) {
final Properties consumerProperties = new Properties();
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "sout");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.1:19092");
consumerProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
try (final KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(consumerProperties)) {
kafkaConsumer.subscribe(Collections.singleton("test"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//do nothing
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions were assigned");
kafkaConsumer.seekToBeginning(partitions);
}
});
while (true) {
final ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(pollTimeout);
consumerRecords.forEach(r -> System.out.println(r.value()));
kafkaConsumer.commitSync();
}
}
}
}
一个制作人:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.locks.LockSupport;
public final class OnceInASecondProducerSimple {
public static void main(String[] args) {
final Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.1:9092,192.168.0.1:19092");
long counter = 0;
while (true) {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(producerProperties)) {
producer.send(new ProducerRecord<>("test", "msg" + counter++));
}
LockSupport.parkNanos(Duration.ofSeconds(1).getNano());
}
}
}
通过kafka-topics
脚本检查偏移量主题__consumer_offsets
状态。失败的代理必须是组的协调器,并且__consumer_offsets的复制因子可能为1,因此使用者找不到协调器。即使您重新启动消费者,它仍然无法查找协调器。
在您的情况下,可以增加__consumer_offsets的复制因子,然后重试看看它是否按预期工作。
我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认
知道为什么当server0关闭时,即使主题test1的复制因子设置为2,使用者仍然停止获取消息吗? 已经有一个类似的问题,但它没有完全回答Kafka0.10快速入门:当“主要”经纪人被扳倒时,消费者失败
有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。
我有两台机器localhost和192.168.1.110来运行两台独立的单机kafka。 kafka2.11-0.10.0.0 bin/kafka-console-producer.sh--broker-list 192.168.1.110:9092--topic test这是一条消息[2016-08-24 18:15:27,441]错误将消息发送到topic test时出错,关键字:null,
我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用
我正在使用带有KafkaListener注释的spring kafka v2.5.2。 在运行时,我希望能够向消费者发送停止消费的信号。 我看到了autoStartup参数,但它似乎只对初始化有效,之后无法更改。 我看到了KafkaListenerEndpointRegistry的methode close()。。。 你有什么建议吗? 提前谢谢。