当前位置: 首页 > 知识库问答 >
问题:

Kafka流并发行为

逄边浩
2023-03-14
KStream<byte[], byte[]> input = ...;
int counter = 0;

KStream<byte[], byte[]>[] processed = input.map(
    (k, v) -> {
      ....
      ....
      //update counter by multiple threads.
);

谢谢!

共有1个答案

卢志行
2023-03-14

这取决于您配置了多少线程来执行任务。如果您有一个线程执行所有任务,那么您不必使共享变量线程安全。但是如果有多个线程,则需要使其线程安全,因为应用程序实例中的任务将分布在多个线程中。您的Kafka Streams应用程序只是一个运行中的JVM,它以main()开始。Kafka Streams框架根据您指定的线程数编排处理。但它只是一个普通的Java运行时,并发访问仍然是并发访问。

更多关于线程和任务的信息:Kafka Streams线程号

更多关于线程、任务和共享状态:Kafka流处理器线程安全?

 类似资料:
  • 问题内容: 假设我有一堂课 我试图按班上所有领域分组。如何在JAVA 8中使用并行流来转换 映射的键是类中每个字段的值。JAVA 8以下示例将单个字段分组,如何将一个类的所有字段归为一个Map? 问题答案: 您可以使用的静态工厂方法来实现: 正如Holger在评论中所建议的那样,以下方法可能比上述方法更可取: 它使用的重载方法的行为与我上面建议的语句相同。

  • 我理解了Kafka分区和Spark RDD分区之间的自动映射,并最终理解了Spark任务。然而,为了正确地调整我的执行器(核心数量)的大小,并最终确定节点和集群的大小,我需要理解文档中似乎掩盖了的一些内容。 null 例如,关于如何使用 --master local启动spark-streaming的建议。每个人都会说,在spark streaming的情况下,应该把local[2]最小化,因为其

  • 我是kafka流的新手,我正在尝试使用groupBy函数将一些流数据聚合到KTable中。问题如下: 生成的消息是json msg,格式如下: 我想隔离json字段“after”,然后用“key”=“ID”创建一个KTable,并对整个json赋值“after”。 首先,我创建了一个KStream来隔离“after”JSON,它工作得很好。 KStream代码块:(不要注意if语句,因为“befo

  • 我正在用Spring-Kafka写我的第一个Kafka消费者。看了一下framework提供的不同选项,对相同的选项几乎没有疑问。能否有人请澄清以下,如果你已经在它工作。 问题1:根据Spring-Kafka文档,有两种实现Kafka-Consumer的方法;“您可以通过配置MessageListenerContainer并提供消息侦听器或使用@Kafkalistener注释来接收消息”。有人能告

  • 我知道在你的流中的任何时间点都可能发生再平衡。当它发生时,由于没有提交给定偏移量的最新偏移量,可能会发生事件的重新处理。 Kafka流是否允许在重新平衡发生之前完成任何飞行中处理?我的意思是,你的应用程序正在消耗一个记录(在你的过程方法内部),发生一个再平衡事件。该处理是否立即中止或允许处理方法完成? 一个具体的例子是 最后一次计算是否会在状态存储中结束并转发到接收器主题?因此,这意味着当重新平衡

  • 使用Spring Cloud Stream版本Chelsea. SR2,RabbitMQ作为消息代理。要拥有多个消费者,我们使用属性并发(入站消费者的并发)。 如果我们将并发设置为50。它从1开始,慢慢地增加消费者计数。有没有任何可能的解决方案可以使用更高的数字而不是一个来启动初始消费者计数,以提高消费者性能。