当前位置: 首页 > 知识库问答 >
问题:

连接到Kafka在Docker中跑步

范凡
2023-03-14

我在我的本地机器上设置了一个单节点Kafka Docker容器,就像在融合留档中描述的那样(步骤2-3)。

此外,我还公开了Zookeeper的端口2181和Kafka的端口9092,以便能够从本地机器上运行的客户端连接到它们:

$ docker run -d \
    -p 2181:2181 \
    --net=confluent \
    --name=zookeeper \
    -e ZOOKEEPER_CLIENT_PORT=2181 \
    confluentinc/cp-zookeeper:4.1.0

$ docker run -d \
    --net=confluent \
    --name=kafka \
    -p 9092:9092 \
    -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 \
    -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
    confluentinc/cp-kafka:4.1.0

问题:当我试图从主机连接到Kafka时,连接失败,因为它无法解析地址:Kafka:9092。

这是我的Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "KafkaExampleProducer");
props.put("key.serializer", LongSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<Long, String> producer = new KafkaProducer<>(props);
ProducerRecord<Long, String> record = new ProducerRecord<>("foo", 1L, "Test 1");
producer.send(record).get();
producer.flush();

例外情况:

java.io.IOException: Can't resolve address: kafka:9092
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.common.network.Selector.connect(Selector.java:214) ~[kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:265) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:266) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) [kafka-clients-2.0.0.jar:na]
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:176) [kafka-clients-2.0.0.jar:na]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException: null
    at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_144]
    at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233) ~[kafka-clients-2.0.0.jar:na]
    ... 7 common frames omitted

问:如何连接到在Docker中运行的Kafka?我的代码是从主机上运行的,而不是Docker。

注意:我知道理论上我可以使用DNS设置和/etc/host,但这是一种解决方法-不应该是这样的。

这里也有类似的问题,但它是基于图像的。我使用了不同的基于融合的图像。

共有3个答案

池永长
2023-03-14

在动物园管理员之前

  1. docker容器运行--名称zookeeper-p 2181:2181 zookeeper

Kafka之后

在Kafka消费者和生产者配置中

@Bean
public ProducerFactory<String, String> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.8.128:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

我按照这些规定运行我的项目。祝你好运,伙计。

柳联
2023-03-14
匿名用户

当您第一次连接到kafka节点时,它会返回所有kafka节点和连接位置的url。然后您的应用程序将尝试直接连接到每个kafka。

问题总是Kafka会给你什么作为url?这就是为什么会有Kafka的听众,Kafka将用它告诉世界如何访问它。

现在,对于您的用例,需要考虑多个小问题:

假设您设置了<代码>plaintext://kafka:9092

  • 如果您的docker compose中有一个使用kafka的应用程序,这是可以的。该应用程序将从Kafka获取带有Kafka的URL,该URL可通过docker网络解析
  • 如果您试图从主系统或不在同一docker网络中的另一个容器连接,这将失败,因为无法解析Kafka名称

==

如果你设置plaintext://localhost:9092

  • 如果您有端口映射(启动kafka时为-p 9092:9092),则这在您的系统上是可以的
  • 如果在容器(与docker网络是否相同)上的应用程序中进行测试,则会失败(localhost是容器本身,而不是kafka容器)

==

最后一个选项:在名称中设置IP:<代码>plaintext://x.y.z.a:9092(Kafka公布的url不能如文件中所述为0.0.0.0https://kafka.apache.org/documentation/#brokerconfigs_advertised.listeners )

这对每个人都没问题...但是你怎么能得到x. y. z. a的名字呢?

唯一的方法是在启动容器时硬编码此ip:docker run......--net conflent--ip 10. x. y. z...。请注意,您需要将ip适配为conflent子网中的一个有效ip。

查淮晨
2023-03-14

tl; dr-从容器转发到主机的简单端口不起作用,不应该修改主机文件。您要连接到哪个确切的IP/主机名端口?确保该值在代理上设置为advertised.listeners。确保地址和作为bootstrap.servers一部分列出的服务器实际上是可解析的(ping一个IP/主机名,使用netcat检查端口...)

要验证端口是否在主机上正确映射,请确保docker ps显示kafka容器是从0.0.0.0映射的:

下面的答案使用docker图像来解决所提出的问题,而不是wurstmeister/kafka。更具体地说,尽管后者是Kafka最受欢迎的形象之一,但并没有得到很好的维护。

以下部分尝试聚合使用另一个映像所需的所有细节。对于其他常用的Kafka映像,它都是在容器中运行的相同的Apache Kafka。
你只是取决于它是如何配置的。哪些变量使它如此。

请参阅他们关于侦听器配置的自述部分,也可以阅读他们的连接wiki。

如果你想要一个小容器,试试这些。图像比融合图像小得多,并且比wurstmeister保持得更好。请参阅其自述文件以了解侦听器配置。

这里提到了它的文档。

注意:已弃用播发的主机和端口设置。广告中的听众涵盖了这两个方面。与融合容器类似,Debezium可以使用带有前缀的代理设置来更新其属性。

spotify/kafka已弃用且过时。
快速数据开发lensesio/box非常适合一体化解决方案,但如果您只想要Kafka
您自己的Dockerfile-为什么?这些其他的东西不完整吗?从拉取请求开始,而不是从头开始。

有关补充阅读、功能齐全的docker compose和网络图,请参阅@rmoff的本博客

融合快速启动(Docker)文档假设所有生产和消费请求都将在Docker网络内。

您可以通过在使用Docker网桥的自己的容器中运行kafka客户端代码来修复连接到kafka:9092的问题,但否则,您需要添加更多的环境变量,以便在外部公开容器,同时使其在Docker网络中工作。

首先添加PLAINTEXT_HOST的协议映射:PLAINTEXT,将侦听器协议映射到Kafka协议

键:KAFKA\u LISTENER\u SECURITY\u PROTOCOL\u映射值:明文:明文,明文主机:明文

然后在不同的端口上设置两个通告的侦听器。(这里的kafka指的是docker容器名称;它也可能被命名为代理,因此请仔细检查您的服务主机名)。注意协议与上面映射的右侧值匹配

键:KAFKA\u adverted\u侦听器值:PLAINTEXT://KAFKA:9092,PLAINTEXT\u HOST://localhost:29092

运行容器时,为主机端口映射添加-p 29092:29092。

(具有上述设置)

如果仍然不工作,KAFKA_LISTENERS可以设置为包含

广告localhost和相关端口将允许您在容器外部连接,正如您所期望的那样。

换句话说,在Docker网络之外运行任何Kafka客户端(包括您可能在本地安装的CLI工具)时,使用本地主机:29092作为引导服务器,使用本地主机:2181作为Zookeeper(需要Docker端口转发)

如果试图从外部服务器连接,则需要公布主机的外部主机名/ip以及/代替本地主机<因为Kafka协议仍将继续通告您配置的侦听器,所以仅通过端口转发通告localhost将不起作用。

如果不在同一个本地网络中,此设置需要Docker端口转发和路由器端口转发(以及防火墙/安全组更改),例如,您的容器正在云中运行,您希望从本地计算机与它进行交互。

这是最不容易出错的配置;您可以直接使用DNS服务名称。

在Docker网络中运行应用程序时,与任何其他Docker服务通信一样(不需要任何端口转发),对引导服务器使用kafka:9092(请参阅上文公布的明文侦听器配置),对zookeeper使用zookeeper:2181

如果使用单独的docker run命令或撰写文件,则需要手动定义共享网络

请参阅完整ConFluent堆栈或单个代理的多个最小堆栈的示例Complace文件。

从Docker(ksqlDB)连接到主机上的Kafka

对于任何对库伯内特斯部署感兴趣的人:https://operatorhub.io/?keyword=Kafka

 类似资料:
  • 问题内容: 我在本地计算机上设置了一个单节点Kafka Docker容器,如Confluent文档中所述(步骤2-3)。 另外,我还公开了Zookeeper的端口2181和Kafka的端口9092,以便能够从在本地计算机上运行的客户端连接到它们: 问题: 当我尝试从主机连接到Kafka时,连接失败,因为它。 这是我的Java代码: 例外: 问题: 如何连接到在Docker中运行的Kafka?我的代

  • 我在docker compose yml文件中为广告侦听器配置了以下配置 警告O.apache.kafka.clients.networkclient-获取相关id为1的元数据时出错:{foo=leader_not_available} 主机可以从我的机器ping,因为它都在同一个网络中,没有防火墙问题。在docker容器中使用命令时,可以看到相同的主机名。我是不是漏掉了什么?

  • 问题内容: 我在本地计算机上使用docker设置了Single Node Basic Kafka Deployment,如Confluent Kafka文档中所述(步骤2-3)。 另外,我还公开了zookeeper的端口2181和kafka的端口9092,以便能够从在本地计算机上运行的Java客户端连接到它们: 问题:当我尝试从主机连接到kafka时,连接失败,因为它无法解析地址:kafka:90

  • 我为我的基于spring boot的库配置了以下Kafka属性,该库绑定在部署到的的目录中。通过从类路径()加载porperty文件,我能够成功启动spring组件 我通过docker compose运行Kafka和zookeeper,容器分别映射到主机端口和。发布失败,错误为 这是在提供了属性之后。有趣的是 Wildfly服务器错误日志显示应用程序实际上是通过它的容器ID连接到docker容器的

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我

  • 我有 3 个码头工人撰写文件。一个启动Kafka,另外两个是消费者和生产者。在 kafka 中添加了其他 docker 撰写文件中的external_links,但仍然无法从容器内部访问 kafka。从容器外部,我可以通过 localhost:9092 访问,但是在 docker 容器内部呢?