我有自己的Spring Cloud数据流处理器,里面有Python,我使用这个示例作为指导:https://dataflow.Spring.io/docs/recipes/polyglot/processor/。然后我想缩放并创建其中的三个处理器,因此使用spring.cloud.deployer.myapp.count=3
创建了3个Python内部的POD。我稍微修改了示例中的一段代码:当我创建一个Kafka消费者时,我也会传递一个组id,因此消息应该是负载平衡的。
consumer = KafkaConsumer(get_input_channel(), group_id=get_consumer_group(), bootstrap_servers=[get_kafka_binder_brokers()])
问题是,SCDF创建了一个只有1个分区的Kafka主题,因此消息只能到达一个pod。所以我在想:
请花点时间回顾一下Spring Cloud Data Flow的职责。在不清楚的情况下,SCDF既不与支持消息传递中间件(如Kafka)交互,也不在运行时使用它。换句话说,SCDF并不创建与之相关的主题或分区--它只是自动配置Spring Cloud Stream(SCSt)属性。
但是,如果您在自定义处理器中使用SCSt,框架会自动将所需通道绑定到中间件中的底层主题。该框架还具有更改分区行为的功能。您也可以部署带有过度分区主题的处理器。还有其他几个配置选项来构建所需的流数据处理行为。
您正在查看的Python示例并不具备SCSt提供的所有特性。该方法是一个示例演练,说明如何在Python中构建本机处理器风格的应用程序,其中生产者和消费者配置是在Python代码本身中手动创建的。SCDF和SCSt都不会影响此配方中的应用程序行为。
我应该如何配置SCDF来创建一个有3个分区的Kafka主题吗?
如前所述,SCDF不与Kafka交互。
或者我应该不依赖于SCDF,在Python中自己创建主题吗?我想这将是多余的,因为SCDF也创建了这个主题。
如果您的自定义处理器不是Spring Cloud Stream应用程序,是的,明确定义代码中的主题+分区是您的责任。
SCDF中的哪个组件实际负责Kafaka主题的创建?我如何影响它的分区数?
Spring-Cloud-Stream。请参阅上面的解释。
如果我停止这个流并用4个处理器步骤再次启动,是否应该用第4个分区扩展主题?因为当前没有创建新的分区。
您不一定需要重新启动流数据管道。如果您的主题预先被过度分区,是的,运行时的任何额外消费者都应该能够自动参与竞争的消费者关系。关注Spring-io/dataflow.spring.io#156-我们正在添加一个配方,演示使用SCSt+SCDF+Kafka进行手动缩放和自动缩放的可能性。
我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。
其中一个Kafka流应用程序在Kafka代理和消费者端产生了大量未知生产者ID错误。 流配置如下: 消费者方面的错误: 这背后的原因是什么?
创建数据流主要包括如下两个部分: 1. 获取相关信息 主要为获取FDS Bucket相关的信息 2. 创建/迁移Topic 数据最终需要收集到Talos 的Topic中,因此需要首先创建Topic,并迁移到生态云账号体系下;关于Talos相关,可以参见Talos-流式消息队列 3. 配置数据流 配置数据流需要的信息
“输入主题”已创建。我没有创建“输出主题”,似乎“Kstream”为我和其他内部主题创建了一个。此外,在“to”函数的javadoc中看到了这一点,指定的主题应该在使用之前手动创建(即,在Kafka Streams应用程序启动之前) 所以我的问题是,我们总是必须手动创建“输出主题”吗?
我正在编程一个客户端工作与Kafka0.9。我想知道如何制造话题。这个答案:如何通过Java在Kafka中创建一个主题,与我所问的类似。除此之外,该解决方案仅适用于Kafka 0.8.2,这与Kafka 0.9的API有很大不同。
我正在测试在一个正在运行的系统中添加Kafka分区,但我不清楚如果您将分区添加到一个现有的主题中,Kafka如何管理现有的数据。 例如: 我有一个主题为的Kafka实例,有一个分区和一个副本。 生产者组开始插入该主题,消费者组开始消费。 我更改主题以添加另一个分区。 在本例中,主题数据发生了什么?是在两个分区之间重新平衡,还是只有新生成的数据才会使用新分区?