当前位置: 首页 > 知识库问答 >
问题:

Kafka使用者无法使用Spring Boot 2.2.0.m4连接到除LocalHost:9092以外的代理

丁振海
2023-03-14

我使用Spring Boot 2.2.0.m4和Kafka 2.2.0试图基于https://www.baeldung.com/Spring-Kafka的示例构建一个应用程序。当我为我的主题启用侦听器时,我在使用者上得到以下错误。

[AdminClient Clientid=AdminClient-2]无法建立到节点-1(本地主机/127.0.0.1:9092)的连接。代理可能不可用。

@KafkaListener(topics = "add_app", groupId = "foo")
public void listen(String message) {
    System.out.println("Received Message in group foo: " + message);
}

下面是引用Kafka.BootStrapAddress值的使用者配置类。它被正确地记录。

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    public ConsumerFactory<String, String> consumerFactory(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        log.info("Created {} using address {}.", this.getClass(), bootstrapAddress);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> fooKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory("foo"));
        return factory;
    }

共有1个答案

爱茂勋
2023-03-14

解决这个问题的方法相当简单。我只需要将以下内容添加到application.properties文件中。

bootstrap-servers=174.22.22.55:9092

在查看了kafkaproperties.java之后,我发现了这一行:

private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private Map<String, Object> buildCommonProperties() {
        Map<String, Object> properties = new HashMap();
        if (this.bootstrapServers != null) {
            properties.put("bootstrap.servers", this.bootstrapServers);
        }

        if (this.clientId != null) {
            properties.put("client.id", this.clientId);
        }

        properties.putAll(this.ssl.buildProperties());
        if (!CollectionUtils.isEmpty(this.properties)) {
            properties.putAll(this.properties);
        }

        return properties;
    }

将containerFactory属性添加到侦听器注释中也可以修复它,并消除对Application.Properties的更改。

@KafkaListener(topics = "add_app", groupId = "foo", containerFactory = "fooKafkaListenerContainerFactory")
public void listen(String message) {
    System.out.println("Received Message in group foo: " + message);
}
 类似资料:
  • 我有一个Zookeeper在端口2181(默认)上运行,一个Kafka服务器在我的本地机器上监听端口9090。 当我在本地运行kafka CLI或在本地运行消费者/生产者应用程序时,我在连接上没有问题。 然后尝试将Kafka使用者绑定到Docker容器中,并在本地运行该Docker容器,例如: 这将给出错误: 这是有意义的,因为Kafka服务器绑定到9092,如:所示 通过将Docker容器映射到

  • 问题内容: 有某些功能,例如不带https的JavaScript服务工作者,仅可在localhost上运行,但是当我使用在docker- machine上运行的docker-compose在docker容器内运行我的应用程序时,我需要使用我从那里得到的地址 有没有办法映射到该IP? 问题答案: 您可以向前添加VirtualBox端口,以将Docker主机上的端口映射到本地计算机。 假设您的dock

  • 下面是Qpid客户端的配置,使用Camel上下文连接Artemis Broker。 上下文文件正像预期的那样开始,但是在本例中,qpid客户机无法连接到Apache Artemis 2.14.0代理。 我所看到的只是消费者端的一些线程块。 common-context.xml 注意:在使用CAMEL2.20.0版本时,即使在所有上下文都得到的情况下,这个单个xml文件中也有两个camel上下文

  • 嗨,我最近下载了wamp服务器来帮我做网站。我的问题是,当我点击phpmyadmin时,我收到一条错误消息“HTTP错误404.0-未找到您要查找的资源已被删除、名称已更改或暂时不可用。” 最后,要看到我的网页,我必须写http://localhost:8888/。如果我将8888放在phpmyadmin的前面,那么我可以看到phpmyadmin,但它不会与我创建的mysql数据库和表同步,它只是

  • 我有这个代码可以通过IMAP连接到Gmail 我总是会遇到这个例外,指向商店。connect() 我已经检查了谷歌的所有信息,如果我是对的,IMAP的端口是993。用户名和密码绝对正确。我想我错过了一些东西,但我无法弄清楚。 任何帮助都将不胜感激! 编辑 添加邮件后。debug,我得到了这些日志,但我仍然停留在商店里。connect() 它已登录,但仍卡在connect上。。

  • 我使用的是Oracle 11g R2数据库。我使用Oracle SQL Developer。如果我创建了一个新连接并检查了基本连接类型并填写了我可以连接的字段。如果我选择TNS并在下拉菜单中选择我想要的,它会显示失败E/S异常:网络适配器无法建立连接。 这是tnsnames.ora。我用它连接到2个数据库,dblilly和astrea。我可以正确连接到astrea。侦听器已打开,实例已准备就绪。你