当前位置: 首页 > 知识库问答 >
问题:

Apache Flink任务执行器关闭

谭志用
2023-03-14

我是Apache Flink的新手,所以我目前正在尝试做一些实验。我正在读Kafka的一个主题,然后在控制台上打印出来。打印大约100kkafka消息后,它抛出异常。日志输出如下。

我正在使用一个自定义类来扩展AbstractDeserializationSchema,以反序列化kafka记录值。我甚至尝试过在其中添加一些异常处理,但没有触发。

我使用Kafka的代码非常简单:

public class Main {

  private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
  private static final int FLINK_PARALLELISM = 1;

  public static void main(String[] args) {
    LOGGER.info("Starting Flink Kafka Consumer");

    try {

        Properties props = new Properties();
        props.put("bootstrap.servers", Arrays.asList(
                "localhost:9092"
        ));
        props.put("group.id", "test_flink");


        StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        flinkEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        flinkEnv.setParallelism(FLINK_PARALLELISM);

        FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(), props);
        DataStream<String> kafkaStream = flinkEnv.addSource(kafkaConsumer);

        kafkaStream.print();


        flinkEnv.execute("Flink Test");
    } catch (Exception e) {
        LOGGER.error("Exception thrown: {}", e.getMessage());
    }

  }
}

即使在异常之后,输出仍然会从主题附加到文件中。Kafka主题已经启动并运行,但无论我是否发布任何内容,我都会让微型集群停止。我还没能确定问题出在哪里。

有人能给我指一下正确的方向吗?谢谢

INFO [Source: Custom Source -> Sink: Print to Std. 

Out (1/1)] o.a.f.s.c.k.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test', partition=0}]
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.ConsumerConfig - ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = test_flink
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka commitId : 73be1e1168f91ee2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group test_flink.
INFO [main] o.a.f.r.m.MiniCluster - Shutting down Flink Mini Cluster
INFO [main] o.a.f.r.d.DispatcherRestEndpoint - Shutting down rest endpoint.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopping TaskExecutor akka://flink/user/taskmanager_0.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (eb1f62611a047c5da09d8fa6f4e49084) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.d.i.IOManager - I/O manager removed spill file directory /tmp/flink-io-763eca47-ab9c-4985-aa30-c7ac21442635
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.n.NetworkEnvironment - Shutting down the network environment and its components.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Try to restart or fail the job Flink Test (670e9073fbab507c41a26b5641a265eb) if no longer possible.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state FAILING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Could not restart the job Flink Test (670e9073fbab507c41a26b5641a265eb) because the restart strategy prevented it.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
    at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
    at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 670e9073fbab507c41a26b5641a265eb.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.StandaloneCompletedCheckpointStore - Shutting down
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.JobLeaderService - Stop job leader service.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_0.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ui
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Closing the SlotManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Suspending the SlotManager.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Shut down complete.
INFO [flink-akka.actor.default-dispatcher-3] o.a.f.r.j.JobMaster - Close ResourceManager connection 3a6541e469014e5685a7510403385dcb: ResourceManager leader changed to new address null.
INFO [PermanentBlobCache shutdown hook] o.a.f.r.b.PermanentBlobCache - Shutting down BLOB cache
INFO [TransientBlobCache shutdown hook] o.a.f.r.b.TransientBlobCache - Shutting down BLOB cache
INFO [BlobServer shutdown hook] o.a.f.r.b.BlobServer - Stopped BLOB server at 0.0.0.0:46065

共有1个答案

仲智
2023-03-14

将JDK版本更改为JDK-8u231-windows-x64。我也遇到了同样的问题,最终通过更改JDK的版本解决了这个问题。

 类似资料:
  • 编辑问题,以包括预期行为、特定问题或错误以及重现问题所需的最短代码。这将有助于其他人回答问题。 哪里出错:< br >任务执行失败”:app:checkDevDebugAarMetadata。 无法解析配置': app: devDebugRuntime Classpath'的所有文件。无法解析net.minidev: json-Smart:[1.3.1,2.3]。需要:项目:app 我在构建项目时

  • 我无法找到必要的信息,无论是在文档中还是在这里已经存在的问题中,这就是为什么我自己创建了一个(我还不能在类似的帖子下提问)。 我需要知道的是Spring任务执行器和调度器之间的关系。我当前的配置如下所示: 我不确定的是它是如何工作的。“谁”运行我的任务?是调度器,因为任务是和他一起安排的吗?或者调度器只是创建它们,放在队列中,由执行者运行它们? 如果没有,运行的是scheduler,我必须在特定类

  • 我找不到关于我们被要求进行的调查的具体答案 我看到并行流在使用少量线程时性能可能不是那么好,而且当DB在处理当前请求的同时阻止下一个请求时,它的表现显然也不是那么好 然而,我发现实现任务执行器与并行流的开销是巨大的,我们实现了一个POC,它只需要这一行代码就能满足并发需求: 而在Task Executor中,我们需要重写Runnable接口并编写一些繁琐的代码,以使Runnable不是空的,并返回

  • 每隔一段时间需要调度任务执行,也许你想注册一个任务在客户端完成连接5分钟后执行,一个常见的用例是发送一个消息“你还活着?”到远端通,如果远端没有反应,则可以关闭通道(连接)和释放资源。 本节介绍使用强大的 EventLoop 实现任务调度,还会简单介绍 Java API的任务调度,以方便和 Netty 比较加深理解。 使用普通的 Java API 调度任务 在 Java 中使用 JDK 提供的 S

  • 原文链接:Serverless 应用开发指南:CRON 定时执行 Lambda 任务 在上一篇文章《Serverless 应用开发指南:基于 Serverless 的 GitHub Webhook》里,我们介绍了如何用 Webhook 来触发定时的 Lambda 函数。这种方式与我们平时的 CI(持续集成)服务器相似,而CI(持续集成)服务器除了会监听 PUSH 事件。还会执行一些定时的任务,比如