假设我有有状态的 Kafka Streams 应用程序,该应用程序使用具有 3 个分区的主题中的数据。目前,我正在运行上述应用程序的2个实例。让我们这样说: 分配了 和 , 分配了 。 因此,现在我想添加新实例,以完全利用并行化。 在我的理解中,一旦我启动了一个新实例,就会发生重新平衡: 或 中的一个以及相应的本地状态存储将从现有实例迁移到新添加的实例。在此示例中,让我们假设 在 上迁移。 同时,
我正在编写一个Kafka Streams应用程序。它执行以下步骤“1)消耗输入数据2)在1小时窗口内根据新密钥消重记录3)重新选择密钥4)在1小时窗口内计数密钥5)发送到下游。 我是Kafka流的新手。我的理解是,为了将窗口保持为1小时,我将 也设置为1小时。这是正确的做法吗? 一旦我部署了我的应用程序与真实的流量,它似乎应用程序不断发送消息,而我认为它每小时只会发送一堆消息? 感谢任何帮助!!
我有两个字符串列表,希望比较列表中的值并构造另一个对象的新列表。我可以用嵌套循环来实现这一点,但我正在寻找一种更高效、更整洁的解决方案。 有一个车辆物体。 现在,我需要通过匹配模型(列表1中的第一个字符)和列表2中的第一个字符来构建车辆对象,并构建这样的东西 使用流可以避免嵌套循环吗?
在索引位置1的输入中遇到的字符串的前半部分将被替换为字符“-”使用流我们如何执行操作? 我有上面的列表,我想用我这样做的循环将每个嵌套列表值的第一个位置替换为“-” 样本输出:[[0,-],[6,-],[0,-],[6,gh],[4,ij],[0,ab],[6,cd]] 但是任何人都可以解释如何使用流来实现同样的目标
下面是我在Apache Camel反应性流解决方案中的尝试,该解决方案通过JVM将发布者连接到订阅者(骆驼路线的代码如下所示) 为了使通信能够跨越JVM,似乎需要一个“代理”服务器。因此,我实现了Artemis broker并相应地修改了application.properties文件(根据我对如何做的最好理解)。 另外,为了缩小焦点,选择了使用小黑道安培连接器。 问题: 订阅者应该接收并记录字符
我有一个带有Actuator和Kafka活页夹的Spring Cloud Stream项目。我正在探索执行器,并试图停止生产者作为一个练习。我通过curl提出以下POST请求: 实际结果:查询返回204。生产者的状态(从GET /actuator/bindings/producer-out-0看到)现在是。然而,生产者仍在产生消息,这可以从日志记录和消费者关于该主题的活动中看到。 预期结果:我希望
我正在将Spring Cloud Stream应用程序迁移到功能方法。到目前为止,我使用了提供Spring集成的千分尺指标。请参见Spring集成-千分尺集成。 自迁移以来,使用StreamBridge生成的消息不再生成度量。这似乎是因为Spring集成在将通道定义为bean时创建了它的度量,而StreamBridge则动态创建通道。 话虽如此,我的问题是: SCS是否提供任何其他类型的度量? 如
我试图让自己更适应Java8流API。目前我想翻译像流这样的东西,但似乎我还不够舒服,因为我不知道如何做这件事。
我正在使用spring cloud stream阅读来自Kafka主题的消息。正在从队列中读取并处理消息,如果消息在处理时失败,则该消息应进入配置的错误队列,但会出现以下错误。 从消息中提取标题时出现异常,解决此问题的最佳方法是什么? kafka版本为1.0,kafka客户端为2.11-1.0
我正在为服务开发测试用例。 另一个代码:
当使用Spring-boot:1.5.1时,我得到了下面提到的错误,而当使用Spring-boot:1.4.4时,则没有 有人遇到过这个吗? 应用程序属性:
在Spring-Cloud-Stream中是否有支持或计划支持avro和/或来自汇流平台的模式注册表?我发现spring-integration-kafka 1.3.0版中存在对avro的依赖,而spring-cloud-stream-binder-kafka的主分支(2.0)和spring-kafka没有任何avro依赖。
我正在尝试为SpringCloudAzure服务总线队列流绑定器配置错误通道,但未成功。我已通过启用错误通道 并尝试定义一个: 我还尝试了和。显然,我在这里遗漏了一些东西,但我找不到一个有效的例子。 编辑:我正在使用以下供应商bean: 根据绑定命名约定,绑定的名称将为。我可以看到消息确实发送到了(另一侧有一个消费者)。 编辑和解决方案: 看来我的环境出了问题,重建后一切都如期进行,正如Garry
在使用流进行聚合时,Brian Goetz将使用stream.collect()填充集合与使用stream.foreach()进行相同的操作进行了比较,其中有以下两个片段: 根据我的理解,只有在流是并行的情况下,多个线程才能在forEach()中工作。但是,在给出的示例中,forEach()是在一个顺序流上操作的(没有调用parallelStream())。 那么,是forEach()总是并行工作
我有一个返回的接口。 然而迭代不能“流”。 知道如何使用迭代作为流而不将其转换为列表吗?