我是Apache Flink的新手,所以我目前正在尝试做一些实验。我正在读Kafka的一个主题,然后在控制台上打印出来。打印大约100kkafka消息后,它抛出异常。日志输出如下。
我正在使用一个自定义类来扩展AbstractDeserializationSchema,以反序列化kafka记录值。我甚至尝试过在其中添加一些异常处理,但没有触发。
我使用Kafka的代码非常简单:
public class Main {
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
private static final int FLINK_PARALLELISM = 1;
public static void main(String[] args) {
LOGGER.info("Starting Flink Kafka Consumer");
try {
Properties props = new Properties();
props.put("bootstrap.servers", Arrays.asList(
"localhost:9092"
));
props.put("group.id", "test_flink");
StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
flinkEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
flinkEnv.setParallelism(FLINK_PARALLELISM);
FlinkKafkaConsumer011<String> kafkaConsumer = new FlinkKafkaConsumer011<>("test", new SimpleStringSchema(), props);
DataStream<String> kafkaStream = flinkEnv.addSource(kafkaConsumer);
kafkaStream.print();
flinkEnv.execute("Flink Test");
} catch (Exception e) {
LOGGER.error("Exception thrown: {}", e.getMessage());
}
}
}
即使在异常之后,输出仍然会从主题附加到文件中。Kafka主题已经启动并运行,但无论我是否发布任何内容,我都会让微型集群停止。我还没能确定问题出在哪里。
有人能给我指一下正确的方向吗?谢谢
INFO [Source: Custom Source -> Sink: Print to Std.
Out (1/1)] o.a.f.s.c.k.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading the following 1 partitions from the committed group offsets in Kafka: [KafkaTopicPartition{topic='test', partition=0}]
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = test_flink
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka version : 0.11.0.2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.u.AppInfoParser - Kafka commitId : 73be1e1168f91ee2
INFO [Kafka 0.10 Fetcher for Source: Custom Source -> Sink: Print to Std. Out (1/1)] o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator localhost:9092 (id: 2147483647 rack: null) for group test_flink.
INFO [main] o.a.f.r.m.MiniCluster - Shutting down Flink Mini Cluster
INFO [main] o.a.f.r.d.DispatcherRestEndpoint - Shutting down rest endpoint.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopping TaskExecutor akka://flink/user/taskmanager_0.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.s.TaskExecutorLocalStateStoresManager - Shutting down TaskExecutorLocalStateStoresManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Source: Custom Source -> Sink: Print to Std. Out (1/1) (eb1f62611a047c5da09d8fa6f4e49084) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state RUNNING to FAILING.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.d.i.IOManager - I/O manager removed spill file directory /tmp/flink-io-763eca47-ab9c-4985-aa30-c7ac21442635
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.i.n.NetworkEnvironment - Shutting down the network environment and its components.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Try to restart or fail the job Flink Test (670e9073fbab507c41a26b5641a265eb) if no longer possible.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Job Flink Test (670e9073fbab507c41a26b5641a265eb) switched from state FAILING to FAILED.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.e.ExecutionGraph - Could not restart the job Flink Test (670e9073fbab507c41a26b5641a265eb) because the restart strategy prevented it.
org.apache.flink.util.FlinkException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.postStop(TaskExecutor.java:308) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) ~[flink-runtime_2.11-1.7.1.jar:1.7.1]
at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:374) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:224) ~[akka-actor_2.11-2.4.20.jar:na]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234) ~[akka-actor_2.11-2.4.20.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) ~[scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.11.12.jar:na]
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.11.12.jar:na]
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.CheckpointCoordinator - Stopping checkpoint coordinator for job 670e9073fbab507c41a26b5641a265eb.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.c.StandaloneCompletedCheckpointStore - Shutting down
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.JobLeaderService - Stop job leader service.
INFO [flink-akka.actor.default-dispatcher-4] o.a.f.r.t.TaskExecutor - Stopped TaskExecutor akka://flink/user/taskmanager_0.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Removing cache directory /tmp/flink-web-ui
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Closing the SlotManager.
INFO [flink-akka.actor.default-dispatcher-2] o.a.f.r.r.s.SlotManager - Suspending the SlotManager.
INFO [ForkJoinPool.commonPool-worker-9] o.a.f.r.d.DispatcherRestEndpoint - Shut down complete.
INFO [flink-akka.actor.default-dispatcher-3] o.a.f.r.j.JobMaster - Close ResourceManager connection 3a6541e469014e5685a7510403385dcb: ResourceManager leader changed to new address null.
INFO [PermanentBlobCache shutdown hook] o.a.f.r.b.PermanentBlobCache - Shutting down BLOB cache
INFO [TransientBlobCache shutdown hook] o.a.f.r.b.TransientBlobCache - Shutting down BLOB cache
INFO [BlobServer shutdown hook] o.a.f.r.b.BlobServer - Stopped BLOB server at 0.0.0.0:46065
将JDK版本更改为JDK-8u231-windows-x64
。我也遇到了同样的问题,最终通过更改JDK的版本解决了这个问题。
编辑问题,以包括预期行为、特定问题或错误以及重现问题所需的最短代码。这将有助于其他人回答问题。 哪里出错:< br >任务执行失败”:app:checkDevDebugAarMetadata。 无法解析配置': app: devDebugRuntime Classpath'的所有文件。无法解析net.minidev: json-Smart:[1.3.1,2.3]。需要:项目:app 我在构建项目时
我无法找到必要的信息,无论是在文档中还是在这里已经存在的问题中,这就是为什么我自己创建了一个(我还不能在类似的帖子下提问)。 我需要知道的是Spring任务执行器和调度器之间的关系。我当前的配置如下所示: 我不确定的是它是如何工作的。“谁”运行我的任务?是调度器,因为任务是和他一起安排的吗?或者调度器只是创建它们,放在队列中,由执行者运行它们? 如果没有,运行的是scheduler,我必须在特定类
我找不到关于我们被要求进行的调查的具体答案 我看到并行流在使用少量线程时性能可能不是那么好,而且当DB在处理当前请求的同时阻止下一个请求时,它的表现显然也不是那么好 然而,我发现实现任务执行器与并行流的开销是巨大的,我们实现了一个POC,它只需要这一行代码就能满足并发需求: 而在Task Executor中,我们需要重写Runnable接口并编写一些繁琐的代码,以使Runnable不是空的,并返回
每隔一段时间需要调度任务执行,也许你想注册一个任务在客户端完成连接5分钟后执行,一个常见的用例是发送一个消息“你还活着?”到远端通,如果远端没有反应,则可以关闭通道(连接)和释放资源。 本节介绍使用强大的 EventLoop 实现任务调度,还会简单介绍 Java API的任务调度,以方便和 Netty 比较加深理解。 使用普通的 Java API 调度任务 在 Java 中使用 JDK 提供的 S
原文链接:Serverless 应用开发指南:CRON 定时执行 Lambda 任务 在上一篇文章《Serverless 应用开发指南:基于 Serverless 的 GitHub Webhook》里,我们介绍了如何用 Webhook 来触发定时的 Lambda 函数。这种方式与我们平时的 CI(持续集成)服务器相似,而CI(持续集成)服务器除了会监听 PUSH 事件。还会执行一些定时的任务,比如