当前位置: 首页 > 知识库问答 >
问题:

在Kafka Streams应用程序中启动新线程(使用编程方式)是否可取?

越宣
2023-03-14

我们正在开发一个使用低级别处理器API的Kafka流应用程序。

根据Kafka的文档,所有线程和并行性都由流线程和流任务处理。使用主题上的分区,并行性也是可伸缩的。

public class Processor implements Processor<K, V> {

@Override
  public void process(String key, V value) {

      //Do processing on the stream thread itself
      ...

      // Write back to output topic
      context.forward(key, updatedValue)
    }); 
  }
}
public class Processor implements Processor<K, V> {

@Override
  public void process(String key, V value) {

  //Spawn new thread to do the processing
    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.submit(() -> {
      String threadName = Thread.currentThread().getName();
      System.out.println("Hello " + threadName);

      //Do more processing
      ...

      // Write back to output topic
      context.forward(key, updatedValue)
    }); 
  }
}

我尝试了最基本的代码,但不能确定它是否介入了Kafka提供的自动功能。例如自动提交偏移量、超时等。

还是坚持Kafka streams已经提供的默认行为并利用stream线程快速处理数据总是更好?

共有1个答案

华易安
2023-03-14

不建议启动您自己的线程,因为这破坏了Kafka Streams的容错保证。如果process()返回,则Kafka Streams假设消息已被完全处理,并且所有潜在的输出消息都通过forward()发送。在这种情况下,Kafka流可能提交输入记录偏移量。

但是,如果您在后台线程中处理消息,而线程处理失败,则Kafka流将不知道它的任何信息,因此,即使发生故障并且消息丢失,也可能提交偏移量。

此外,后台线程在返回process()后不允许调用forward()。如果forward()被称为process()的“外部”,则Kafka流将引发异常。

使用您自己的后台线程并保留至少一次的处理保证并不是不可能的,但是,这是相当复杂的,因此不推荐使用。

 类似资料:
  • 问题内容: 有没有一种方法可以在运行时从家庭启动器中删除活动?我的意思是从其属性或类似的东西删除。 问题答案: 您可以通过禁用组件,将其从启动器中删除。

  • 问题内容: 我目前正在开发一个Python应用程序,希望在该应用程序上查看实时统计信息。我想使用它以使其易于使用和理解。 问题是我的Flask服务器应该在我的Python应用程序的最开始处启动,而在最末尾停止。它看起来应该像这样: 因为我需要我的应用程序上下文(用于统计),所以不能使用multiprocessing.Process。然后,我尝试使用threading.Thread,但是Werkze

  • 这是我的第一个问题。我一路寻找,尝试了很多,但没有得到我想做的。问题是:我有一个应用程序使用SoundPool播放声音,仅此而已。但是要加载的东西很多(>50),一个一个加载需要时间。看一看; @Override公共视图onCreateView(LayoutInflater inflater,ViewGroup container,Bundle savedInstanceState){View V

  • 问题内容: 我需要以编程方式启动新的Java进程并动态设置JMX端口。所以不要这样做 我想做以下 但这不起作用。知道为什么吗? 问题答案: 在调用代码时,您已经错过了配置jmxremote连接器的机会。 您需要做的是创建您自己的rmi注册表和JMXConnectorServer来侦听rmi调用并将它们传递给MBeanServer。

  • 在我的应用程序内,我想检查是否有任何更新的版本,我的应用程序是在app Store。如果有,那么必须通过警告消息通知用户,如果他/她选择升级,我想更新新版本。我想通过我的应用程序完成所有这些。这可能吗?

  • 本文向大家介绍如何以编程方式“重启” iOS应用程序?,包括了如何以编程方式“重启” iOS应用程序?的使用技巧和注意事项,需要的朋友参考一下 无论如何,您都无法重启iOS应用程序,即使您能够使用某些私有api,您的应用程序也将被Apple拒绝,并且不会被视为App Store版本。