当前位置: 首页 > 知识库问答 >
问题:

Spring的Kafka。不是从Kafka布洛克开始的

尉迟龙光
2023-03-14

我编码Kafka经纪人和消费者从应用程序捕捉消息。当试图从消费者获取消息时,会发生错误

java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:216)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:531)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:540)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:230)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:303)
    at org.springframework.kafka.test.utils.KafkaTestUtils.getRecords(KafkaTestUtils.java:280)

在应用程序端(Producer),还有一个连接错误

2020-03-25 12:29:33.689  WARN 25786 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1, transactionalId=tx0] Connection to node -1 (<here broker hostname>:9092) could not be established. Broker may not be available.

我的项目有以下依赖关系:

compile "org.springframework.kafka:spring-kafka-test:2.4.4.RELEASE"
compile "org.springframework.kafka:spring-kafka:2.4.4.RELEASE"

代码我的Kafka经纪人

public class KafkaServer {

    private static final String BROKERPORT = "9092";
    private static final String BROKERHOST = "localhost";
    public static final String TOPIC1 = "fss-fsstransdata";
    public static final String TOPIC2 = "fss-fsstransscores";
    public static final String TOPIC3 = "fss-fsstranstimings";
    public static final String TOPIC4 = "fss-fssdevicedata";
    @Getter
    private Consumer<String, String> consumer;

    private EmbeddedKafkaBroker embeddedKafkaBroker;

    public void run() {

        String[] topics = {TOPIC1, TOPIC2, TOPIC3, TOPIC4};

        this.embeddedKafkaBroker = new EmbeddedKafkaBroker(
                1,
                false,
                1,
                topics
        ).kafkaPorts(BROKERPORT);

        Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", this.embeddedKafkaBroker));
        this.consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer()).createConsumer();

        this.consumer.subscribe(Arrays.asList(topics));
    } 
}

请帮助处理这种情况。我不擅长Kafka架构以及如何在Spring上实现它。

共有2个答案

庄阿苏
2023-03-14

Embedded Kafkabroker设计用于Spring应用程序上下文,或JUnit4@Rule@ClassRule或JUnit5条件。

要在这些环境之外使用它,必须调用afterPropertieSet()对其进行初始化,并调用destroy()将其关闭。

焦同
2023-03-14

如果您使用Spring,那么您需要用@EmbeddedKafka注释您的bean,然后在EmbeddedKafkaBroker上使用@Autowire

嵌入Kafka注释配置示例:

@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {// place your proerties here
})

我要做的是创建一个Springbean KafkaServerConfig,并将我所有的配置和bean构造逻辑放在里面。

PS:需要注意的是,嵌入式Kafkabroker用于单元测试。

 类似资料:
  • 如何确保我总是从Kafka主题的一开始就与Flink一起消费? Kafka0.9。x consumer是Flink 1.0.2的一部分,它似乎不再是Kafka,而是Flink来控制偏移量: Flink在内部快照偏移量,作为其分布式检查点的一部分。Kafka/动物园管理员promise的补偿只是为了让外界对进展的看法与Flink对进展的看法保持同步。通过这种方式,监控和其他工作可以了解Flink K

  • 我的服务器机器上运行单节点kafka。我使用以下命令创建主题“bin/kafka-topics.sh--创建--zookeeper本地主机:2181--复制因子1--分区1--主题测试”。我有两个logstash实例正在运行。第一个从一些java应用程序日志文件中读取数据,并将其注入kafka。它工作得很好,我可以使用“bin/kafka-console-consumer.sh——zookeepe

  • 我的spring boot项目有一个演示Kafka Streams API的应用程序。我可以使用以下命令使用主题中的所有消息 Kafka Streams API中使用KStream或ktable使用消息的类似命令是什么?我试过了 两者都不起作用。我确实创建了一个测试用例,用而不是流来使用,但它不起作用。代码上传到Github以供参考。任何帮助都会很好。

  • 我在本地机器上安装了Kafka,并启动了zookeeper和一个代理服务器。 现在我有一个单独的主题,描述如下: 我有一个生产者在消费者启动之前产生了一些消息,如下所示: 当我使用--从头开始选项启动消费者时,它不会显示生产者生成的所有消息: 但是,它显示的是新添加的消息。 我在这里怎么了?有什么帮助吗?

  • 虽然auto.offset.reset的值是最新的,但使用者从属于2天前的消息开始,然后就会赶上最新的消息。 我错过了什么?

  • 有没有其他方法可以做到这一点?