当前位置: 首页 > 知识库问答 >
问题:

使用UncaughtExceptionHandler重新启动或关闭流的正确方法

澹台锐
2023-03-14

我有一个流应用程序,具有以下驱动程序代码,用于实时消息转换。

String topicName = ...
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(topicName);

source.transform(() -> new MyTransformer()).to(...);

KafkaStreams streams = new KafkaStreams(builder, appConfig);
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
    public void uncaughtException(Thread t, Throwable e) {
        logger.error("UncaughtExceptionHandler " + e.getMessage());
        System.exit(0);
    }
});


streams.cleanUp();
streams.start();

Runtime.getRuntime().addShutdownHook(new  Thread(streams::close));

执行几分钟后,应用会引发以下异常,然后不会在流中前进。

[2017-02-22 14:24:35,139] ERROR [StreamThread-14] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group TRANSFORMATION-APP failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: task [0_11] Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:72)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:89)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
Caused by: java.io.IOException: task [0_11] Failed to lock the state directory: /tmp/kafka-streams/TRANSFORMATION-APP/0_11
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:101)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 13 more

我试图清除/tmp/kafka streams/TRANSFORMATION-APP目录并重新启动应用程序,但再次抛出相同的异常。我注意到的一件事是,该应用程序在转换所有积压消息之前工作正常,但在处理一些新消息后引发异常!

有时它也会抛出下面未被捕获的异常。

[ERROR] 2017-02-22 12:40:54.804 [StreamThread-29] MyTransformer - UncaughtExceptionHandler task directory [/tmp/kafka-streams/TRANSFORMATION-APP/0_24] doesn't exist and couldn't be created

[ERROR] 2017-02-22 12:42:30.148 [StreamThread-179] MyTransformer - UncaughtExceptionHandler stream-thread [StreamThread-179] Failed 
to rebalance

引发这些异常(其中一个)后,应用仍在运行,但未在流中前进。

处理这些错误的正确方法是什么?。是否可以以编程方式重新启动流,而不杀死应用程序?此应用程序在monit下。在最坏的情况下,我宁愿正确终止应用程序(没有任何消息丢失),以便monit可以重新启动它。

输入主题有100个分区,我设置了num.stream。threads设置为100。该应用程序位于Kafka 0.10.1.1-cp1上。

共有2个答案

白文彬
2023-03-14

我知道这个问题很久以前就被问过了,但会发布有关新的 Kafka-Streams 功能的更新。从 Kafka-Streams 2.8.0 开始,您可以使用 KafkaStreams 方法 void 自动替换失败的流线程(由未捕获的异常引起)

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

Kafka Streams2.8.0之前,您可以自行实现重新启动失败的KafkaStreams的逻辑。想法如下:

KafkaStreams kafkaStreams = createYourKafkaStreams();
kafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, kafkaStreams));

private KafkaStreams.StateListener createErrorStateListener(String sourceTopicName, KafkaStreams kafkaStreams) {
    return (newState, oldState) -> {
        if (newState == KafkaStreams.State.ERROR) {
            log.error("Kafka Stream is in ERROR state for source topic [{}]", sourceTopicName);
            replaceFailedKafkaStream(kafkaStreams, sourceTopicName);
        }
    };
}

// invoke this method either right after stream died, or by scheduling
private void replaceFailedKafkaStream(KafkaStreams kafkaStreams, String sourceTopicName) {
    kafkaStreams.close();
    KafkaStreams newKafkaStreams = createYourKafkaStreams();
    newKafkaStreams.setStateListener(createErrorStateListener(sourceTopicName, newKafkaStreams));
    newKafkaStreams.start();
}

包翔
2023-03-14

Kakfa0.10.1. x在多线程方面存在一些错误。您可以升级到0.10.2(AK今天发布,CP 3.2应该很快就会发布),或者应用以下解决方法:

    < li >仅使用单线程执行 < li >如果需要更多线程,请启动更多实例 < li >对于每个实例,配置不同的状态目录

在重新启动之前,您可能还需要删除本地状态目录(仅一次),以进入整体一致的应用程序状态。

在任何情况下,都不会丢失数据。Kafka Streams即使在失败的情况下也保证至少一次处理语义。这也适用于您的本地存储——在您删除本地状态dir之后,在启动时,这些状态将从底层Kafka changelog主题中重新创建(尽管这是一个昂贵的操作)。

UncaughtExceptionHandler 只为你提供了一种确定线程死亡的方法。它(直接)不会(直接)帮助重新启动应用程序。要恢复已死亡的线程,您需要完全关闭 KafkaStreams 实例并创建/启动一个新实例。我们希望将来能为此添加更好的支持。

 类似资料:
  • 我在应用程序中使用Activiti框架。Activiti通过Activiti API进行管理。 真是个问题: 我需要重新启动服务器wtih应用程序。这意味着两件事: 我需要正确暂停/停止所有活动 目前,我的应用程序已通过系统停止。退出(0) 问题: 我怎么会那样做呢?(意指以上两项) System.exit(0)会遇到什么问题? 编辑: 是的,谢谢。但我使用嵌入式Activiti。这些适用于嵌入式

  • 我正在使用CloseableHttpResponse(来自apache-httpclient-4.5.3),但我不确定我是否正确使用了它,我看到了一个答案,没有投票支持使用: 是抽象的,没有可调用的close方法,尽管在这个回答中使用了: 目前,我正在使用try with resources,用于send方法内部的和。 我是否缺少任何打开的资源或以错误的方式使用它?

  • 本文向大家介绍nginx关闭/重启/启动的操作方法,包括了nginx关闭/重启/启动的操作方法的使用技巧和注意事项,需要的朋友参考一下 关闭 service nginx stop systemctl stop nginx 启动 service nginx start systemctl start nginx 重启 service nginx reload systemctl restart ng

  • 问题内容: 我正在尝试在我的c程序中检测从Linux关闭或重新启动。我发现程序可以使用signal(SIGTERM,handler)(SIGKILL,handler)。但是,如果用户也使用命令杀死该进程,这两个触发器也会触发。 他们说,在某些解决方案中,可以使用运行级别,但无法运行。在系统初始化运行级别之前,不知道该进程是否被杀死。我什至尝试将脚本放在rcx.d中,但仍然无法正常工作。 有人建议吗

  • 为了效率和成本,我不想将这些标志存储在memcache或Datastore中。 我正在寻找一种向所有实例发送消息的方法(请参阅我之前的文章GAE向所有活动实例发送请求): 1)向我的应用程序或服务的所有实例发送关闭消息/命令 2)向我的应用程序或服务的所有实例发送重新启动消息/命令 我只使用自动缩放,所以我不能发送针对特定实例的请求(我可以使用GAE管理API获得活动实例的列表)。 有没有办法在P

  • 我用一个JDesktopPane和几个JButton做了一个简单的用户界面。该程序的工作原理是,当单击按钮时,JInternalFrame将加载并禁用该按钮,以防止创建内部框架的副本。JInternalFrame上的取消按钮关闭帧,JButton再次启用。我的代码如下: adminAddUser类上Cancel按钮的操作侦听器。dispose()方法用于关闭JInternalFrame。 最后,我