我有一个流应用程序,具有以下驱动程序代码,用于实时消息转换。
String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);
source.transform(() -> new MyTransformer()).to(...);
KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
logger.error("UncaughtExceptionHandler " + e.getMessage());
System.exit(0);
}
});
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
执行几分钟后,应用会引发以下异常,然后不会在流中前进。
[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
... 13 more
我试图清除/tmp/kafka streams/TRANSFORMATION-APP
目录并重新启动应用程序,但再次抛出相同的异常。我注意到的一件事是,该应用程序在转换所有积压消息之前工作正常,但在处理一些新消息后引发异常!
有时它也会抛出下面未被捕获的异常。
[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created
[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed
to rebalance
引发这些异常(其中一个)后,应用仍在运行,但未在流中前进。
处理这些错误的正确方法是什么?。是否可以以编程方式重新启动流,而不杀死应用程序?此应用程序在monit下。在最坏的情况下,我宁愿正确终止应用程序(没有任何消息丢失),以便monit可以重新启动它。
输入主题有100个分区,我设置了num.stream。threads
设置为100。该应用程序位于Kafka 0.10.1.1-cp1上。
我知道这个问题很久以前就被问过了,但会发布有关新的 Kafka-Streams 功能的更新。从 Kafka-Streams
2.8.0
开始,您可以使用 KafkaStreams
方法 void 自动替换失败的流线程(由未捕获的异常引起)
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
在
Kafka Streams
2.8.0
之前,您可以自行实现重新启动失败的KafkaStreams的逻辑。想法如下:
KafkaStreams kafkaStreams = createYourKafkaStreams();
kafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, kafkaStreams));
private KafkaStreams.StateListener createErrorStateListener(String sourceTopicName, KafkaStreams kafkaStreams) {
return (newState, oldState) -> {
if (newState == KafkaStreams.State.ERROR) {
log.error("Kafka Stream is in ERROR state for source topic [{}]", sourceTopicName);
replaceFailedKafkaStream(kafkaStreams, sourceTopicName);
}
};
}
// invoke this method either right after stream died, or by scheduling
private void replaceFailedKafkaStream(KafkaStreams kafkaStreams, String sourceTopicName) {
kafkaStreams.close();
KafkaStreams newKafkaStreams = createYourKafkaStreams();
newKafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, newKafkaStreams));
newKafkaStreams.start();
}
Kakfa0.10.1. x
在多线程方面存在一些错误。您可以升级到0.10.2
(AK今天发布,CP 3.2应该很快就会发布),或者应用以下解决方法:
在重新启动之前,您可能还需要删除本地状态目录(仅一次),以进入整体一致的应用程序状态。
在任何情况下,都不会丢失数据。Kafka Streams即使在失败的情况下也保证至少一次处理语义。这也适用于您的本地存储——在您删除本地状态dir之后,在启动时,这些状态将从底层Kafka changelog主题中重新创建(尽管这是一个昂贵的操作)。
UncaughtExceptionHandler
只为你提供了一种确定线程死亡的方法。它(直接)不会(直接)帮助重新启动应用程序。要恢复已死亡的线程,您需要完全关闭 KafkaStreams
实例并创建/启动一个新实例。我们希望将来能为此添加更好的支持。
我在应用程序中使用Activiti框架。Activiti通过Activiti API进行管理。 真是个问题: 我需要重新启动服务器wtih应用程序。这意味着两件事: 我需要正确暂停/停止所有活动 目前,我的应用程序已通过系统停止。退出(0) 问题: 我怎么会那样做呢?(意指以上两项) System.exit(0)会遇到什么问题? 编辑: 是的,谢谢。但我使用嵌入式Activiti。这些适用于嵌入式
我正在使用CloseableHttpResponse(来自apache-httpclient-4.5.3),但我不确定我是否正确使用了它,我看到了一个答案,没有投票支持使用: 是抽象的,没有可调用的close方法,尽管在这个回答中使用了: 目前,我正在使用try with resources,用于send方法内部的和。 我是否缺少任何打开的资源或以错误的方式使用它?
本文向大家介绍nginx关闭/重启/启动的操作方法,包括了nginx关闭/重启/启动的操作方法的使用技巧和注意事项,需要的朋友参考一下 关闭 service nginx stop systemctl stop nginx 启动 service nginx start systemctl start nginx 重启 service nginx reload systemctl restart ng
问题内容: 我正在尝试在我的c程序中检测从Linux关闭或重新启动。我发现程序可以使用signal(SIGTERM,handler)(SIGKILL,handler)。但是,如果用户也使用命令杀死该进程,这两个触发器也会触发。 他们说,在某些解决方案中,可以使用运行级别,但无法运行。在系统初始化运行级别之前,不知道该进程是否被杀死。我什至尝试将脚本放在rcx.d中,但仍然无法正常工作。 有人建议吗
为了效率和成本,我不想将这些标志存储在memcache或Datastore中。 我正在寻找一种向所有实例发送消息的方法(请参阅我之前的文章GAE向所有活动实例发送请求): 1)向我的应用程序或服务的所有实例发送关闭消息/命令 2)向我的应用程序或服务的所有实例发送重新启动消息/命令 我只使用自动缩放,所以我不能发送针对特定实例的请求(我可以使用GAE管理API获得活动实例的列表)。 有没有办法在P
我用一个JDesktopPane和几个JButton做了一个简单的用户界面。该程序的工作原理是,当单击按钮时,JInternalFrame将加载并禁用该按钮,以防止创建内部框架的副本。JInternalFrame上的取消按钮关闭帧,JButton再次启用。我的代码如下: adminAddUser类上Cancel按钮的操作侦听器。dispose()方法用于关闭JInternalFrame。 最后,我