当前位置: 首页 > 知识库问答 >
问题:

Kafka生产者网络异常和超时异常

郑宇
2023-03-14

我们的生产环境中出现了随机的NetworkExceptionsTimeoutExceptions

Brokers: 3
Zookeepers: 3
Servers: 3
Kafka: 0.10.0.1
Zookeeeper: 3.4.3

我们偶尔会在我的生产者日志中得到这个异常:

主题:XXXXXX:5608 ms的过期记录自批量创建加上逗留时间以来已经过去。

此类错误消息中的毫秒数不断变化。有时是5秒,有时是13秒!

我们很少能得到:

NetworkException: Server disconnected before response received. 

集群由3个经纪人和3个动物园管理员组成。生产者服务器和Kafka集群在同一个网络中。

我在打同步电话。有一个web服务,多个用户请求调用它来发送数据。Kafka web服务有一个Producer对象,负责所有发送操作。制作人的请求超时最初为1000毫秒,后来改为15000毫秒(15秒)。即使在增加超时时间后,TimeoutExceptions仍会显示在错误日志中。

原因是什么?

共有3个答案

司徒俊良
2023-03-14

解决方案1

修改

listeners=PLAINTEXT://hostname:9092

属性。属性文件到

listeners=PLAINTEXT://0.0.0.0:9092

解决方案2

换经纪人。将id设置为1001这样的值,通过设置环境变量KAFKA_BROKER_id来更改brocker id。

您必须将环境变量KAFKA_RESERVED_BROKER_MAX_ID设置为像1001这样的值,才能将BROKER ID设置为1001。

我希望能有所帮助

唐元青
2023-03-14

我们也面临过类似的问题。日志中的许多NetworkExceptions不时地TimeoutExc0019

原因

一旦我们从生产中收集了TCP日志,结果发现一些到Kafka代理(我们有3个代理节点)的TCP连接在空闲5分钟后没有通知客户端就被删除了(TCP层上没有FIN标志)。当客户端在此之后尝试重新使用此连接时,将返回RST标志。我们可以很容易地将TCP日志中的这些连接重置与应用程序日志中的NetworkExceptions相匹配。

至于TimeoutException,我们无法进行与找到原因时相同的匹配,这种类型的错误不再发生。然而,我们在另一项测试中确认,断开TCP连接也可能导致TimeoutException。我猜这是因为Java Kafka客户端在幕后使用Java NIO套接字通道。所有消息都将被缓冲,然后在连接就绪后被发送。如果连接在超时(30秒)内未就绪,则消息将过期,导致TimeoutException

解决方案

对我们来说,解决办法是减少连接。麦克斯,空闲。ms在我们的客户身上停留了4分钟。一旦我们应用了它,NetworkExceptions就从我们的日志中消失了。

我们仍在调查是什么导致连接中断。

编辑

问题的原因是AWS NAT网关在350秒后中断了传出连接。

https://docs.aws.amazon.com/vpc/latest/userguide/nat-gateway-troubleshooting.html#nat-gateway-troubleshooting-timeout

微生学
2023-03-14

要找到根本原因有点棘手,我将放弃我的经验,希望有人会发现它有用。一般来说,这可能是网络问题,或者与ack=ALL相结合的网络洪水过多。下面是一个图表,解释了在撰写本文时KafkaKIP-91中的TimeoutException(在1.1.0之前仍然适用):

除了网络配置问题或错误之外,您可以根据场景调整以下属性,以缓解或解决问题:

>

  • buffer.memory控制生产者用于缓冲的总可用存储器。如果记录被发送的速度快于它们可以被传输到Kafka然后,这个缓冲区将得到超过然后额外的发送调用阻止max.block.ms之后,生产者抛出一个TimeoutExc0019

    麦克斯街区。ms的值已经很高,我不建议进一步增加它。缓冲器内存的默认值为32MB,根据您的邮件大小,您可能希望增加它;如有必要,增加jvm堆空间。

    重试定义在放弃之前,在出现错误的情况下重新发送记录的尝试次数。如果您使用零重试,您可以尝试通过增加此值来缓解问题,除非您将max.in.flight.requests.per.connection设置为1,否则请注意记录顺序不再是保证。

    一旦达到批量大小或经过延迟时间(以先到者为准),就会发送记录。如果批量生产。大小(默认为16kb)小于最大请求大小,可能您应该使用更高的值。此外,变化依然存在。ms设置为更高的值,例如10、50或100,以优化批次和压缩的使用。这将减少网络中的洪水,并在使用时优化压缩。

    对于这类问题没有一个确切的答案,因为它们也取决于实现,在我的例子中,对上述价值观进行了实验。

  •  类似资料:
    • 我正在尝试提出一种配置,该配置将根据生产者的平均字节率强制实施生产者配额设置。我用一个3节点集群做了一个测试。但是,该主题是使用1个分区和1个复制因子创建的,因此只能为1个代理(leader代理)测量生产者字节率。 我在客户端IDtest_producer_quota将producer_byte_rate设置为20480。 我使用Kafka-生产者-性能-测试来测试吞吐量和油门。 我希望produ

    • 我尝试使用 kafka 实现一个简单的生产者消费者示例,并使用以下属性实现了: 然而,当我在另一个项目(数据可视化软件的插件)中尝试完全相同的配置时,我得到了以下错误: 在我说它工作的第一个版本中,我使用了“mvn clean compile assembly:single”,但是在第二个版本中,我为整个项目创建了一个jar文件。因为可视化软件需要一个jar文件来安装插件。因为每件事都是一样的(至

    • 我正在使用Spring Kafka 2.3.9编写一个Kafka制作人,该制作人假设向一个主题发布大约200000条消息。例如,我有一个从数据库中提取的200000个对象的列表,我想将这些对象的json消息发布到一个主题。 我写的制作人在发布1000条消息方面做得很好。然后它创建了一些空指针错误(我已经包括了下面的屏幕截图)。 在调试过程中,我发现Kafka Producer网络线程的数量非常高。

    • 我有一个循环缓冲区(数组/先进先出),一个消费者和一个生产者。生产者将随机数放入数组中,消费者获取第一个数字并检查它是否是相对素数。 我的代码工作正常,我认为它工作正常,但我想改进它。我不太确定我的“空运行”方法。我应该在其他地方做异常处理吗?改变“无限循环”?不应更改方法签名(它们是预定义的)。 我会很高兴每一个改进代码的建议。(不在乎知名度(公开,...),还有静态的东西,我刚刚把它们放在一个

    • 我们使用sping-cloud-stream-binder-kafka(3.0.3.RELEASE)向我们的Kafka集群(2.4.1)发送消息。时不时地,其中一个生产者线程会收到NOT_LEADER_FOR_PARTITION异常,甚至超过重试(当前设置为12,由依赖sping-retry激活)。我们限制了重试,因为我们发送了大约1kmsg/s(每个生产者实例),并且担心缓冲区的大小。这样我们会

    • 我使用带有幂等生产者配置的spring kafka: 这是我的配置道具: 我的Kafka制作人抛出OutOfOrderSequence异常: 2019-03-06 21:25:47发送者[ERROR][生产者clientId=生产者-1]代理返回org.apache.kafka.common.errors.OutOfOrderSequence异常:代理在偏移-1处收到主题分区主题-1的乱序序列号。