当前位置: 首页 > 知识库问答 >
问题:

闪烁:无法取消正在运行的作业(流)

莫兴言
2023-03-14

我想运行流作业。
当我尝试使用start-clusted.sh和Flink Web界面在本地运行该作业时,没有问题。

但是,我当前正在尝试使用Flink on YARN(部署在Google Dataproc上)运行我的作业,并且当我尝试取消它时,取消状态将永远持续,并且TaskManager中仍有一个插槽被占用。

这是我得到的日志:

2016-10-18 16:56:04,053 INFO org.apache.flink.runtime.taskmanager.Task - 
Attempting to cancel task Source: pubSubMessageAcknowledgingSource -> 
TrackingDisplayPushDeduplicater -> TrackingDisplayPushDeserializer -> 
(Sink: TrackingDisplayPushErrorFlumeSink, Map -> Sink: 
TrackingDisplayPushValidFlumeSink) (1/1)
2016-10-18 16:56:04,053 INFO org.apache.flink.runtime.taskmanager.Task - 
Source: pubSubMessageAcknowledgingSource -> 
TrackingDisplayPushDeduplicater -> TrackingDisplayPushDeserializer -> 
(Sink: TrackingDisplayPushErrorFlumeSink, Map -> Sink: 
TrackingDisplayPushValidFlumeSink) (1/1) switched to CANCELING
2016-10-18 16:56:04,053 INFO org.apache.flink.runtime.taskmanager.Task - 
Triggering cancellation of task code Source: 
pubSubMessageAcknowledgingSource -> TrackingDisplayPushDeduplicater -> 
TrackingDisplayPushDeserializer -> (Sink: 
TrackingDisplayPushErrorFlumeSink, Map -> Sink: 
TrackingDisplayPushValidFlumeSink) (1/1) (38bf32d9199a0c9383a8b1e8d73a1f65).
2016-10-18 16:56:34,055 WARN org.apache.flink.runtime.taskmanager.Task - 
Task 'Source: pubSubMessageAcknowledgingSource -> 
TrackingDisplayPushDeduplicater -> TrackingDisplayPushDeserializer -> 
(Sink: TrackingDisplayPushErrorFlumeSink, Map -> Sink: 
TrackingDisplayPushValidFlumeSink) (1/1)' did not react to cancelling 
signal, but is stuck in method:
java.net.PlainSocketImpl.socketConnect(Native Method)
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
java.net.Socket.connect(Socket.java:589)
java.net.Socket.connect(Socket.java:538)
sun.net.NetworkClient.doConnect(NetworkClient.java:180)
sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
sun.net.www.http.HttpClient.New(HttpClient.java:308)
sun.net.www.http.HttpClient.New(HttpClient.java:326)
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169)
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105)
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999)
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933)
sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1283)
sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1258)
com.accengage.bigdata.flink.streaming.sinks.FlumeSink.flush(FlumeSink.java:107)
com.accengage.bigdata.flink.streaming.sinks.FlumeSink.invoke(FlumeSink.java:80)
com.accengage.bigdata.flink.streaming.sinks.FlumeSink.invoke(FlumeSink.java:25)l
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:126)
org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:35)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:160)
com.accengage.bigdata.flink.streaming.sources.PubSubAcknowledgingSource.run(PubSubAcknowledgingSource.java:148)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80)
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53)
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
java.lang.Thread.run(Thread.java:745)

共有1个答案

何星鹏
2023-03-14

我假设您使用的是自定义接收器(com.accengage.bigdata.flink.streaming.sinks.flumesink),它使用某种HTTP库与Flume进行通信。

最有可能的情况是,当中断被发送到线程时,HTTP库在循环中受到攻击(例如,当中断的异常被忽略时,就会发生这种情况)

要解决此问题,您可以使用正确处理中断的HTTP库,也可以从其他线程调用该库,因为其他线程不会在主线程上接收中断。

 类似资料:
  • 我有一个使用Apache Flink(Flink版本:1.8.1)使用Scala进行流式处理的工作。flow作业要求如下:Kafka->写给Hbase->用不同的主题再次发送给Kafka 在向Hbase写入过程中,需要从另一个表中检索数据。为确保数据不为空(NULL),作业必须(在一定时间内)重复检查数据是否为空。 编辑:我的意思是,有了我在内容中描述的问题,我想过必须在作业流中创建某种类型的作业

  • 一个好的基于GUI的应用程序向用户提供有关交互的反馈。 例如,桌面应用程序使用对话框或消息框,JavaScript使用警报用于类似目的。 在Flask Web应用程序中生成这样的信息性消息很容易。 Flask框架的闪烁系统可以在一个视图中创建消息,并在名为next的视图函数中呈现它。 Flask模块包含flash()方法。 它将消息传递给下一个请求,该请求通常是一个模板。 flash(messag

  • 我有一个关于在Kinesis流中分片数据的问题。我想在向我的kinesis流发送用户数据时使用一个随机分区键,这样碎片中的数据是均匀分布的。为了使这个问题更简单,我想通过在Flink应用程序中键入用户ID来聚合用户数据。

  • 问题内容: 我为创建的每个对象都有一个名称。每个对象在表格视图中填充一行。现在,我希望在表行持续闪烁的橙色是。 只要属性为true,如何使表格行一直 闪烁? 问题答案: 要使内容闪烁,请使用: 在这种情况下,更改颜色的最好方法是使用CSS: 然后在外部CSS文件中,您可以配置Flash高亮显示的样式: 要将其绑定到布尔属性,只需使用该属性创建一个侦听器: 要将其应用于表格行,您必须编写一个。您只需

  • 我试图在Flink中编写一个需要两个阶段的计算。 在第一阶段,我创建一个Graph并获取它的顶点id: 在第二阶段,我想使用这些ID为每个顶点运行SingleSourceShortestPath。 它在本地工作(在IntelliJ IDE和命令行中使用),但当我使用其WebUI在Flink上提交作业时,程序只是执行直到方法并且不运行程序的剩余部分(用于语句和)。 问题是什么? 这是我的代码:

  • 我正在kubernetes上试用最新版本的Flink1.5的flink工作。 我的问题是如何在上面的flink集群上运行一个示例应用程序。flink示例项目提供了如何使用flink应用程序构建docker映像并将该应用程序提交给flink的信息。我遵循了这个例子,只是把flink的版本改成了最新版本。我发现应用程序(example-app)提交成功,并且在kubernetes的pod中显示,但是f