我使用200个分区的主题。flink使用这个主题。
最近,我进行了手动分区重新分配。
当我重新分配分区时,Flink连续失败并出现此错误。
错误1。
[2021-07-28 18:21:15,926] WARN Attempting to send response via channel for which there is no open connection, connection id ..(kafka.network.Processor)
错误2.
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for -126: 30042 ms has passed since batch creation plus linger time
错误3。
java.lang.Exception: Error while triggering checkpoint 656 for Source: Custom Source -> Sink: ... (32/200)
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1174)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not perform checkpoint 656 for operator Source: Custom Source -> Sink: ... (32/200).
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:570)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:116)
at org.apache.flink.runtime.taskmanager.Task$1.run(Task.java:1163)
... 5 more
Caused by: java.lang.Exception: Could not complete snapshot 656 for operator Source: Custom Source -> Sink: ... (32/200).
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:564)
... 7 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for ...-86: 30049 ms has passed since batch creation plus linger time
当我重新启动失败的作业时,这个错误会不断发生。
ClassLoader info: URL ClassLoader:
file: '/blobStore-29c572a3-4ed4-48a6-b604-d93b7e4a9a10/job_8bd41a7e0690e75bd61d148d89dca963/blob_p-5c10d03a5cbb09c9a9459f1bc2a70804d0b08290-26b5562cbe83b0403b06717637e7ab47' (invalid JAR: /blobStore-29c572a3-4ed4-48a6-b604-d93b7e4a9a10/job_8bd41a7e0690e75bd61d148d89dca963/blob_p-5c10d03a5cbb09c9a9459f1bc2a70804d0b08290-26b5562cbe83b0403b06717637e7ab47 (Too many open files))
Class not resolvable through given classloader.
所以我重新启动了所有mesos和flink集群,并获得了动物园管理员的许可<有什么原因需要寻找吗?
群集中的某些代理存在网络问题。如果由于网络问题,对特定分区的请求处理缓慢,则预计将显示该消息。
随后,分区对应的作业无法正常工作,似乎出现了flink的检查点问题。
这个问题通过更换经纪人的设备得到了解决。
Kafka主题分区偏移位置始终从0或随机值开始,如何确保使用者记录是分区中的第一条记录?有没有办法找出答案?如果有的话,请让我知道。谢谢。
简单问题: 假设我有一个具有3个分区的主题:Topic:StateEvents P1、P2和P3。 让我们假设生产者生成20条消息: 1, 2, 3, ..........20 我的问题是: 当制作人生成这些消息时: 1)每个消息将只在且仅在1个分区?也就是说,1在P1,2在P2,3在P3,然后4在P1,5在P2,6在P3,以此类推? 2)如果#1为真,当消费者订阅时,它将订阅所有分区,以便获得所
我有以下配置: 一个具有 2 个分区的 kafka 主题 一个动物园管理员实例 一个 kafka 实例 具有相同组 ID 的两个使用者 Flink 作业片段: 方案 1: 我在eclipse上写了一个flink job (Producer ),它从一个文件夹中读取一个文件,并在kafka主题上放置msgs。 所以当我使用eclipse运行这段代码时,它工作得很好。 例如:如果我放置一个有100条记
通过Kafka文档和各种其他资源,我了解到Kafka中的消息被组织成主题。此外,主题可以分解为多个分区,每个分区可以托管在不同的服务器上。这提供了冗余和可伸缩性。 我不确定这里的“破碎”这个词是什么意思。这是否意味着,如果添加到主题的消息是,例如“1 2 3 4 5 6 7”,那么在将其分解为分区后,我们将有一个分区仅包含整个主题的子部分。就像一个分区有“1 2 3”,而另一个分区有“4 5 6”
我需要实现下面的数据流。我有一个kafka主题,它有9个分区。我可以用9个并行级别阅读这个主题。我还有3个节点Flink集群。这个集群的每个节点都有24个任务槽。 首先,我想传播我的kafka,每个服务器有3个分区,如下所示。顺序没关系,我只转换kafka消息并发送DB。 第二件事是,我想在保存NoSQL DB的同时提高并行度。如果我增加并行度48,因为发送DB是IO操作,它不会消耗CPU,我想确
我对Kafka是新的,所以道歉,如果我听起来很愚蠢,但我目前所理解的是…消息流可以定义为主题,就像类别一样。并且每个主题被分成一个或多个分区(每个分区可以有多个副本)。所以它们是平行的 他们说Kafka的主要网站 生成器能够选择将哪个消息分配给主题中的哪个分区。这可以通过循环的方式简单地平衡负载,也可以根据某个语义分区函数(例如基于消息中的某个键)来完成。 在0.8 beta版中创建produce