当前位置: 首页 > 知识库问答 >
问题:

Kafka·Flink:Kafka制作人错误取决于Flink作业

龙默
2023-03-14

当我与Kafka·Flink一起进行概念验证时,我发现了以下几点:似乎Kafka制作人的错误可能会由于Flink方面的工作量而发生?!

以下是更多细节:

我有样本文件,如样本??。EDR由约700000行组成,具有“实体”、“值”、“时间戳”等值

我使用以下命令创建kafka主题:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic gprs

我使用以下命令加载主题的示例文件:

[13:00] kafka@ubu19: ~/fms
% /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic gprs < ~/sample/sample01.EDR

我有一些在flink方面的工作,这些工作通过6小时和72小时的滑动窗口为每个实体聚合价值(aggregationeachsix,aggregationeachsentytwo)。

我做了三个场景:

  1. 在不运行任何作业的情况下加载主题中的文件
  2. 在aggregationeachsix作业运行时加载主题中的文件
  3. 在运行aggregationeachsix和aggregationeachsentytwo作业的情况下加载主题中的文件

结果是前两个场景正在工作,但对于第三个场景,我在加载文件时在kafka生产者端出现以下错误(不总是在同一个文件中,它可以是第一个、第二个、第三个甚至更高的文件):

    [plenty of lines before this part]
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1627 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1626 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1625 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1624 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender)
    [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time
    [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time
    [plenty of lines after this part]

我的问题是为什么Flink会对Kafka制作人产生影响,然后,我需要做什么改变来避免这个错误?

共有1个答案

江阳夏
2023-03-14

当flink和kafka制作人都在使用它时,你的网络似乎已经饱和了,因此你得到了超时异常。

 类似资料:
  • 我有两个简单的Flink流式作业,从Kafka读取,做一些转换,并将结果放入Cassandra sink。他们从不同的Kafka主题阅读,并存入不同的卡桑德拉表。 当我单独运行这两个工作中的任何一个时,一切都很好。检查点被触发并完成,数据被保存到Cassandra。 我找不到关于这个错误的很多信息,它可能是由下列任何一个引起的: Flink(V1.10.0-Scala2.12), Flink Ca

  • 我正在考虑创建一个独立的Kafka生产者,它作为守护进程运行,通过套接字接收消息,并将其可靠地发送给Kafka。 但是,我决不能是第一个想到这个想法的人。这样做的目的是避免使用PHP或Node编写Kafka生成器,而只是通过套接字将消息从这些语言传递到独立的守护进程,这些语言负责传递,而主应用程序则一直在做自己的事情。 此守护进程应负责在发生中断时进行重试传递,并充当服务器上运行的所有程序的传递点

  • 我想让我的Kafka制作人变得富有交易性。我正在发送10条消息。如果发生任何错误,则不应向Kafka发送任何消息,即无或全部。 我使用的是Spring Boot KafkaTemplate。 我正在发送文件中提到的10条信息,如下所示。应发送9条消息,且I消息大小超过1MB,由于 https://docs.spring.io/spring-kafka/reference/html/#using-K

  • 我试图实现这里提到的生产者(https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-java/blob/master/src/main/demo/com/amazonaws/kinesisvideo/demoapp/putmediademo.java)。 我有一个mkv文件,我想上传在循环中作为制片人在Kinesis

  • 下面给出的kafka producer程序不是通过Eclipse在Windows中运行的,而是在Unix平台上运行的(即,当我在承载kafka代理的Unix中运行它时,它工作正常)。windows不支持Kafka制作人吗?但是,我可以从windows计算机ping ip地址。请帮忙。 这是我得到的异常错误。 log4j:WARN找不到记录器的附加程序(kafka.utils.VerifiableP

  • 我正在探索反应性Kafka,只是想确认反应性Kafka是否等同于同步制作人。与同步生产者,我们得到消息传递保证与确认字符和生产者序列保持。但是,ASYNC不能保证交付和测序。反应式生产者等同于SYNC还是ASYNC?