我在StreamListener初始化时遇到问题。我无法解决我的问题。 我在我的项目中使用Spring Cloud Stream Kafka和Spring Cache。Spring Cache在SmartLifeCycle的start()方法之后初始化。但是StreamListener在SmartLifeCycle的start()方法之前开始消费数据。因此,我无法在StreamListener开始
与不同,它看起来不支持参数。有没有办法为实现相同的行为?这是我的用例: 我有一个通用的Spring应用程序,它可以监听任何Kafka主题并写入我数据库中的相应表。对于某些主题,音量较低,因此可以以非常低的延迟处理单个消息。对于其他大容量的主题,代码应该接收消息的微批处理并使用Jdbc批处理写入数据库的频率较低。理想情况下,侦听器的定义如下所示: 对于小容量的主题,我会设置启用为true和为fals
我有下面的代码,可以在Redis流附加新记录时接收值。问题是,当与,始终处于非活动状态。这个代码有什么问题?我错过了什么?文件供参考。 在spring启动时,初始化必要的redis资源 生菜连接厂 从连接工厂重新创建模板 Rest控制器将数据附加到redis流 JMS风格的命令式消息侦听器 初始化侦听器 不过,我可以通过api将其附加到流中。
我已经开始使用Akka Streams和Op Rabbit,我有点困惑。 我需要根据谓词拆分流,然后将它们组合起来,就像我在创建图形和使用分区和合并时所做的那样。 我已经能够使用GraphDSL. Builder做这样的事情,但似乎无法让它与AckedSource/Flow/Sink一起工作 图表如下所示: 我不确定是否拆分什么时候是我应该使用的,因为我总是需要正好2个流。 这是一个不进行分区且不
嗨,我有一个代码块,它将一种窗口对象的列表转换为另一种,并将其填充到一个地图中,由ID键控。可能有许多具有相同id但具有不同名称和属性的对象实例。我所做的是做一个列表[W1(1,2,3),W1(2,3,4),W2(1,3,4)...]并将其转换为映射[键W1,值(1,2,3),(2,3,4)键W2,....] 我想知道我是否可以使用流和lambdas来崩溃这个。 list.stream().col
我正在努力加深对fs2的了解,并想尝试fs2Kafka的一个用例,在这个用例中我将取代akka stream。想法很简单,从Kafka读取数据,通过http请求将数据发布到接收器,然后在成功后提交回Kafka。到目前为止,我还不能真正理解http部分。在akka stream/akka http中,您有一个现成的流https://doc.akka.io/docs/akka-http/current
我有以下简单的case类层次结构: 我有一个(来自一个基于Websocket的协议,已经有了编解码器)。 我想将此解复用为Foo和Baz类型的单独流,因为它们由完全不同的路径处理。 最简单的方法是什么?应该很明显,但我错过了一些东西。。。
假设我有一个披萨烤箱和一系列需要烘烤的披萨。我的烤箱一次只能烤4个披萨,一天中至少有4个披萨排队,所以烤箱需要尽可能多地满负荷工作。 每次我把披萨放进烤箱,我都会在手机上设置一个计时器。一旦它响了,我就把披萨从烤箱里拿出来,给任何想要的人,容量就变得可用了。 我这里有两个来源,一个是等待烹制的披萨队列,另一个是当披萨烹制完成时发出的鸡蛋计时器。系统中还有两个水槽,一个是熟食披萨的目的地,另一个是发
我正在学习JavaAkka流,并使用https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html定义了以下内容: 运行此代码的行为符合预期。 Akka Streams正在解决这个代码可以重复执行的问题。 在现实场景中,不会是静态的,Akka Streams对如何处理不断变化的数据有意见吗,还是由开发人员决定? 在
我们有一个Akka应用程序,它使用Kafka主题,并将收到的消息发送给Akka参与者。我不确定我编程的方式是否充分利用了Akka Streams内置背压机制的所有优点。 以下是我的配置。。。 这做了我所期望的商业案例,myActor收到命令更新(MyAvro) 我更讨厌背压的技术概念,据我所知,背压机制部分由水槽控制,但在这种水流配置中,我的水槽只是“水槽”。“忽略”。所以我的水槽可以缓解背压。
我正在使用作为Akka-Stream。我不知道如何可预测地命名输入参与者,以便我可以从应用程序的其他部分向其发送消息。我正在像这样实例化我的源: 当我具体化流时,我会得到一个,但它的路径是动态生成的,并且它只使用我提供的名称作为代码生成的以流为中心的命名方案的一部分。 有没有办法让这个前端参与者源有一个明确的名称,或者我在传递ActorRef时卡住了? 如果我不能明确命名它,这是否意味着您不能直接
让我们假设这样一个简单的例子: KStream使用带有转换器的转换操作来消除ORDER_主题中的重复消息,该转换器通过密钥/id将消息存储在持久本地状态存储中。这样,如果相同的顺序到达两次,它将被忽略。 现在一个新订单到达,它不是重复的,所以它存储在本地存储中,但在将其发送到VALIDATED_ORDER_TOPIC应用程序崩溃之前。 我想知道KStream中的事务保证是什么:记录是否已存储并提交
我希望只使用一次语义,但我不想和消费者一起阅读信息。我宁愿读Kafka的留言。如果我加上处理。保证=恰好一次流配置,将恰好一次语义保留?
我正在浏览文档,我知道通过启用 幂等性:幂等生成函数对一个主题对一个生成函数只启用一次。基本上,每一条消息发送都有更高的保证,并且在出现错误时不会重复 那么,如果我们已经有幂等性,那么为什么我们需要在Kafka Stream中另一个恰好一次的属性呢?幂等性和恰好一次之间有什么区别 为什么在普通Kafka制作人中不提供一次房产?
在启动应用程序时,Kafka流出现了奇怪的错误 结果,关于失败流的错误: