当前位置: 首页 > 知识库问答 >
问题:

Kafka:本地制作人无法发送或接收消息

邢烨烨
2023-03-14

我试图使用谷歌云计算引擎VM实例作为Kafka消费者。我发现虚拟机阻止了来自任何外部计算机的通信,我成功地设置了防火墙规则,从本地计算机访问虚拟机。

我能够在云虚拟机实例上创建和列出主题。但我无法收发Kafka主题的信息。它抛出超时异常。

我使用telnet检查端口是否打开,并获得了端口的转义序列(9092)。

当我尝试使用另一个云虚拟机实例实现相同的事情时,我能够执行所有kafka操作。(发送/接收消息,创建/列出主题)

到目前为止,我只尝试使用Kafka控制台生产者和控制台消费者。

上周我一直在努力解决这个问题。如果有人能帮我,那将是一个很大的帮助。

我在一个云虚拟机上运行kafka服务器和consumer。我想用覆盆子皮做制片人。

提前感谢。

使现代化

我的配置/服务器。属性文件如下所示

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://localhost:9092
# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().

advertised.listeners = PLAINTEXT://35.196.XXX.XXX:9092 #Google VM External IP

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

当我在本地机器上运行以下命令时

bin/kafka-console-consumer.sh --bootstrap-server 35.196.227.191:9092 --topic test --from-beginning

我的Google VM实例出现以下错误:

[2018-02-04 12:42:19,839] ERROR [KafkaApi-0] Number of alive brokers '0' does not meet the required replication factor '1' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

但是当我看到服务器启动时的日志时,

[2018-02-04 12:33:39,995] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-02-04 12:33:40,012] INFO [ProducerId Manager 0]: Acquired new producerId block (brokerId:0,blockStartProducerId:1000,blockEndProducerId:1999) by writing to Zk with path version 2 (kafka.coordinator.transaction.ProducerIdManager)
[2018-02-04 12:33:40,081] INFO [TransactionCoordinator id=0] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-02-04 12:33:40,095] INFO [TransactionCoordinator id=0] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2018-02-04 12:33:40,099] INFO [Transaction Marker Channel Manager 0]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2018-02-04 12:33:40,249] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2018-02-04 12:33:40,268] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
---> [2018-02-04 12:33:40,270] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(35.196.XXX.XXX,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2018-02-04 12:33:40,282] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
[2018-02-04 12:33:40,282] INFO Kafka commitId : aaa7af6d4a11b29d (org.apache.kafka.common.utils.AppInfoParser)
[2018-02-04 12:33:40,286] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

当我看到那条线---

共有1个答案

东方俊材
2023-03-14

我认为有几个潜在的问题。

  1. 您提供的引导服务器ip用于初始连接,但之后会被公布。主办name属性用于连接到代理。如果你的广告。主办您的消费者无法解析名称,那么即使您提供了完全可访问的引导服务ip和端口,他也无法连接到代理。所以更新广告。主办将配置命名为您的代理的ip
 类似资料:
  • 我是斯卡拉和Kafka的新手,遇到了一些麻烦。 我正在尝试将scala kafka producer连接到安装在cloudera express服务器上的kafka服务器。我已经用这些指令在VMs中这样做过一次了,没有任何问题。 当我运行producer时,所需的主题被创建,但没有任何消息被发送,或者我是这样认为的。 Kafka制作人 当我执行run方法时,我看到“producer-send:#”

  • 我正在尝试将Spring Cloud合同合并到现有项目中。我在REST方面取得了一些成功,但我正在努力设置消息端。 到目前为止,我已经在producer上建立了一个契约,它确实在target/generated test sources/contracts中生成了一个测试。我还为测试设置了一个基类。 我无法克服这个错误: 2017-09-08 17:10:51.759错误 - --[]- [ 主]

  • 我在向我的Kafka主题发送序列化XML时遇到问题。每当我运行我的代码时,我都不会收到任何异常或错误消息,但我仍然无法在Kafka主题中看到我的任何消息。 我的Kafka制作人设置如下: 当我运行代码时,我得到: 知道怎么做吗?提前谢谢!

  • 假设kafka消息生产者向一个主题发送一条事件消息。然后一个消费者处理这个事件消息。但是,这个消费者进程因为业务错误而抛出异常,所以他想让消息生产者知道它并再次怨恨。 有什么解决办法吗?

  • 我在用Kafka。 我有10k个jsons列表, 我该怎么做呢? 谢谢

  • 问题内容: 使用以下代码,我发送Elasticsearch文档以进行索引。我尝试将基本对象转换为JSON并通过生产者发送。但是,每条消息(从控制台检查)都附加了乱码,例如 - t。{“ productId”:2455 出站配置 有什么线索吗? 使用的插件:Spring Extension Kafka 问题答案: 我今天遇到了这个问题,可以通过在生产者配置中设置正确的value-serializer