我们正在开发一个使用低级别处理器API的Kafka流应用程序。
根据Kafka的文档,所有线程和并行性都由流线程和流任务处理。使用主题上的分区,并行性也是可伸缩的。
public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Do processing on the stream thread itself
...
// Write back to output topic
context.forward(key, updatedValue)
});
}
}
public class Processor implements Processor<K, V> {
@Override
public void process(String key, V value) {
//Spawn new thread to do the processing
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("Hello " + threadName);
//Do more processing
...
// Write back to output topic
context.forward(key, updatedValue)
});
}
}
我尝试了最基本的代码,但不能确定它是否介入了Kafka提供的自动功能。例如自动提交偏移量、超时等。
还是坚持Kafka streams已经提供的默认行为并利用stream线程快速处理数据总是更好?
不建议启动您自己的线程,因为这破坏了Kafka Streams的容错保证。如果process()
返回,则Kafka Streams假设消息已被完全处理,并且所有潜在的输出消息都通过forward()
发送。在这种情况下,Kafka流可能提交输入记录偏移量。
但是,如果您在后台线程中处理消息,而线程处理失败,则Kafka流将不知道它的任何信息,因此,即使发生故障并且消息丢失,也可能提交偏移量。
此外,后台线程在返回process()
后不允许调用forward()
。如果forward()
被称为process()
的“外部”,则Kafka流将引发异常。
使用您自己的后台线程并保留至少一次的处理保证并不是不可能的,但是,这是相当复杂的,因此不推荐使用。
问题内容: 有没有一种方法可以在运行时从家庭启动器中删除活动?我的意思是从其属性或类似的东西删除。 问题答案: 您可以通过禁用组件,将其从启动器中删除。
问题内容: 我目前正在开发一个Python应用程序,希望在该应用程序上查看实时统计信息。我想使用它以使其易于使用和理解。 问题是我的Flask服务器应该在我的Python应用程序的最开始处启动,而在最末尾停止。它看起来应该像这样: 因为我需要我的应用程序上下文(用于统计),所以不能使用multiprocessing.Process。然后,我尝试使用threading.Thread,但是Werkze
这是我的第一个问题。我一路寻找,尝试了很多,但没有得到我想做的。问题是:我有一个应用程序使用SoundPool播放声音,仅此而已。但是要加载的东西很多(>50),一个一个加载需要时间。看一看; @Override公共视图onCreateView(LayoutInflater inflater,ViewGroup container,Bundle savedInstanceState){View V
问题内容: 我需要以编程方式启动新的Java进程并动态设置JMX端口。所以不要这样做 我想做以下 但这不起作用。知道为什么吗? 问题答案: 在调用代码时,您已经错过了配置jmxremote连接器的机会。 您需要做的是创建您自己的rmi注册表和JMXConnectorServer来侦听rmi调用并将它们传递给MBeanServer。
在我的应用程序内,我想检查是否有任何更新的版本,我的应用程序是在app Store。如果有,那么必须通过警告消息通知用户,如果他/她选择升级,我想更新新版本。我想通过我的应用程序完成所有这些。这可能吗?
本文向大家介绍如何以编程方式“重启” iOS应用程序?,包括了如何以编程方式“重启” iOS应用程序?的使用技巧和注意事项,需要的朋友参考一下 无论如何,您都无法重启iOS应用程序,即使您能够使用某些私有api,您的应用程序也将被Apple拒绝,并且不会被视为App Store版本。