我编码Kafka经纪人和消费者从应用程序捕捉消息。当试图从消费者获取消息时,会发生错误
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:303)
at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:280)
在应用程序端(Producer),还有一个连接错误
2020-03-25 12:29:33.689 WARN 25786 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-1, transactionalId=tx0] Connection to node -1 (<here broker hostname>:9092) could not be established. Broker may not be available.
我的项目有以下依赖关系:
compile "org.springframework.kafka:spring-kafka-test:2.4.4.RELEASE"
compile "org.springframework.kafka:spring-kafka:2.4.4.RELEASE"
代码我的Kafka经纪人
public class KafkaServer {
private static final String BROKERPORT = "9092";
private static final String BROKERHOST = "localhost";
public static final String TOPIC1 = "fss-fsstransdata";
public static final String TOPIC2 = "fss-fsstransscores";
public static final String TOPIC3 = "fss-fsstranstimings";
public static final String TOPIC4 = "fss-fssdevicedata";
@Getter
private Consumer<String, String> consumer;
private EmbeddedKafkaBroker embeddedKafkaBroker;
public void run() {
String[] topics = {TOPIC1, TOPIC2, TOPIC3, TOPIC4};
this.embeddedKafkaBroker = new EmbeddedKafkaBroker(
1,
false,
1,
topics
).kafkaPorts(BROKERPORT);
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", this.embeddedKafkaBroker));
this.consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();
this.consumer.subscribe(Arrays.asList(topics));
}
}
请帮助处理这种情况。我不擅长Kafka架构以及如何在Spring上实现它。
Embedded Kafkabroker
设计用于Spring应用程序上下文,或JUnit4@Rule
或@ClassRule
或JUnit5条件。
要在这些环境之外使用它,必须调用
afterPropertieSet()
对其进行初始化,并调用destroy()
将其关闭。
如果您使用Spring,那么您需要用@EmbeddedKafka注释您的bean,然后在EmbeddedKafkaBroker上使用@Autowire
嵌入Kafka注释配置示例:
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
brokerProperties = {// place your proerties here
})
我要做的是创建一个Springbean KafkaServerConfig,并将我所有的配置和bean构造逻辑放在里面。
PS:需要注意的是,嵌入式Kafkabroker用于单元测试。
如何确保我总是从Kafka主题的一开始就与Flink一起消费? Kafka0.9。x consumer是Flink 1.0.2的一部分,它似乎不再是Kafka,而是Flink来控制偏移量: Flink在内部快照偏移量,作为其分布式检查点的一部分。Kafka/动物园管理员promise的补偿只是为了让外界对进展的看法与Flink对进展的看法保持同步。通过这种方式,监控和其他工作可以了解Flink K
我的服务器机器上运行单节点kafka。我使用以下命令创建主题“bin/kafka-topics.sh--创建--zookeeper本地主机:2181--复制因子1--分区1--主题测试”。我有两个logstash实例正在运行。第一个从一些java应用程序日志文件中读取数据,并将其注入kafka。它工作得很好,我可以使用“bin/kafka-console-consumer.sh——zookeepe
我的spring boot项目有一个演示Kafka Streams API的应用程序。我可以使用以下命令使用主题中的所有消息 Kafka Streams API中使用KStream或ktable使用消息的类似命令是什么?我试过了 两者都不起作用。我确实创建了一个测试用例,用而不是流来使用,但它不起作用。代码上传到Github以供参考。任何帮助都会很好。
我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?
虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?
有没有其他方法可以做到这一点?