我知道这里之前有人问过这个问题:Kafka流并发?
但这对我来说很奇怪。根据文档(或者我可能遗漏了什么),每个分区都有一个任务,这意味着不同的处理器实例,每个任务由不同的线程执行。但是当我测试它的时候,我看到不同的线程可以得到不同的处理器实例。因此,如果你想在处理器中保持内存状态(老式的方式),你必须锁定?
public class SomeProcessor extends AbstractProcessor<String, JsonObject> {
private final String ID = UUID.randomUUID().toString();
@Override
public void process(String key, JsonObject value) {
System.out.println("Thread id: " + Thread.currentThread().getId() +" ID: " + ID);
线程ID:88 ID:c667e669-9023-494b-9345-236777e9dfda
线程ID:88 ID:c667e669-9023-494b-9345-236777e9dfda
线程ID:90 ID:0A43ECB0-26F2-440D-88E2-87E0C9CC4927
每个实例的线程数是一个配置参数(num.stream.threads
,默认值为1
)。因此,如果启动一个KafKastreams
实例,就会得到num.stream.threads
线程。
任务将工作分成并行单元(基于输入主题分区),并分配给线程。因此,如果您有多个任务和一个线程,所有任务都将分配给这个线程。如果您有两个线程(KafKastreams
实例的总和),每个线程执行大约50%的任务。
注意:因为Kafka Streams应用程序本质上是分布式的,所以如果运行一个Kafkastreams
实例和多个线程,或者运行多个Kafkastreams
实例和一个线程,则没有区别。任务将分布在应用程序的所有可用线程上。
如果您希望在任务之间共享任何数据结构,并且您有一个以上的线程,那么您有责任同步对该数据结构的访问。请注意,任务到线程的分配可以在运行时更改,因此,所有访问都必须同步。但是,不建议使用此模式,因为它限制了可伸缩性。你应该用没有共享数据结构来设计你的程序!主要原因是,您的程序通常分布在多台机器上,因此不同的KafKastreams
实例无论如何都无法访问共享数据结构。共享数据结构只能在单个JVM中工作,但是使用单个JVM可以防止应用程序的横向扩展。
我正在尝试使用Spring boot编写一个Kafka流处理器,但当消息产生到主题中时,它不会被调用。 主题消息有不同的类型,并且是Avro格式的。在模式注册表中使用Avro UNION注册模式。 这些是主题 application.yml我正在使用cp-all-in-one-community作为docker-file 但现在我得到以下错误:
我正在Kafka流中的处理器节点上工作。对于一个简单的代码,我编写如下代码只是为了过滤用户ID,这是在kafka流中处理处理器节点的正确方法吗? 但是,下面的代码没有编译,抛出了一个错误:
我发现JVM只有一个线程池用于并行处理流。我们在一个大的流上有一个I/O阻塞的函数,这导致了与不相关的并行流一起使用的不相关的或者快速的函数的活跃度问题。 stream上没有允许使用备用线程池的方法。 有没有一种简单的方法来避免这个问题,也许是以某种方式指定要使用哪个线程池?
我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出
我有以下Spring Cloud Stream Kafka Streams Binder 3. x应用程序: 当我通过这个应用程序运行X消息时,通过使用和从联调将它们发布到,点1和点2的消息计数是相等的,正如我所期望的那样。 当我使用连接到Kafka代理的实时应用程序做同样的事情时,点1和点2的计数仍然显着不同: 消费者在< code >主题2上有很大的滞后,并且该滞后保持不变(在我停止发布消息后
我想在我的Android应用程序中处理位图——位图可能很大,所以我使用多线程来执行更快的操作。以下是我的代码(Runnable child的一部分): 当我只使用池中的一个线程时,一切正常。不幸的是,当我使用的线程数等于处理器的内核数(在我的设备中为4)时,结果如下(对于灰度过滤器): 有时看起来像: bitmap.get像素(...)不起作用,因为输出中有黑线 bitmap.set像素(...)