我们正在使用2.1.3版本的Spring云流kafka流- 应用yml具有属性集- 但是我们仍然得到了Kafka活页夹未知的地位- 感谢对此的任何帮助。
我正在使用Spring云流Kafka流编写Java应用程序。下面是我正在使用的函数方法片段: fetch_data_from_database()可以抛出异常。 如果fetch\u from\u database()发生异常,如何停止对入站KStream的处理(不应提交偏移量),并使其使用相同的偏移量数据重试处理?
一段时间以来,我一直试图让Spring Cloud Stream与Kafka Streams一起使用,我的项目使用嵌入式kafka进行Kafka DSL测试,我使用这个存储库作为我的测试实现的基础(它本身就是这个问题的测试用例)。 我在这里制作了一个存储库来演示这一点。 基本上,当使用“Processor.class”的“DemoApplicationTest.ExampleAppWorking.
我有一个在请求/答复配置中使用的Kafka生产者。当producer的一个实例启动时,它可以完美地工作。然而,当启动生产者的第二个实例时,seconds实例将无法工作。它将正确地将消息写入主题,消费者将处理消息并将回复发送回来,然而生产者将找不到它正在等待的回复消息,它将超时。消息似乎是由生产者的第一个实例接收的。因为第一个实例不期望此答复消息。请求/应答消息失败。是否缺少任何使第二个实例工作的配
我正在使用一个非事务性生产者,并试图理解如何处理成功/失败场景的回调。 对于一个成功的发送,我看到回调由kafka-producer-network-thread线程执行(“send ok”消息)。 发送消息成功-kafka-producer-network-thread 00:59:17.522
如果每个Kafka消息属于一个特定的会话,如何管理会话关联,以便同一个Spark执行器看到链接到一个会话的所有消息? 如何确保属于会话的消息被Spark executor按照在Kafka中报告的顺序处理?我们能以某种方式实现这一点而不对线程计数施加限制并导致处理开销(如按消息时间戳排序)吗? 何时检查会话状态?在执行器节点崩溃的情况下,如何从最后一个检查点恢复状态?在驱动程序节点崩溃的情况下,如何
我开发了一个Python Kafka生成器,它将多个json记录作为nd-json二进制字符串发送到一个Kafka主题。然后,我尝试用PySpark在Spark结构化流媒体中读取这些消息,如下所示:
Kafka大约有5000万张唱片库存(即将消耗)。主题是3个分区。 我的消费应用程序: 我限制了spark streaming的消耗大小,在我的例子中,我将设置为10000,这意味着在我的例子中,它每批消耗30000条记录。 有什么方法可以让spark streaming在每个批处理中提交? Spark streaming日志,证明它每批消耗的记录num:
我正在使用spark结构流发送记录到一个Kafka主题。kafka主题是用config- 这样做使得目标Kafka主题记录具有与原始记录相同的时间戳。 我的Kafka流代码:
Spring Cloud Kafka Streams与Spring Cloud Stream、Spring Cloud Function、Spring AMQP和Spring for Apache Kafka有什么区别?
我怀疑为每一个传入的事件记录执行postgresql操作将是一个性能问题。 对于这种情况,有什么更好的或可供选择的设计?
谢谢!
有人能举例说明如何使用Spring Cloud Kafka Streams metric
我知道这里之前有人问过这个问题:Kafka流并发? 但这对我来说很奇怪。根据文档(或者我可能遗漏了什么),每个分区都有一个任务,这意味着不同的处理器实例,每个任务由不同的线程执行。但是当我测试它的时候,我看到不同的线程可以得到不同的处理器实例。因此,如果你想在处理器中保持内存状态(老式的方式),你必须锁定? 线程ID:88 ID:c667e669-9023-494b-9345-236777e9df
我写了Kafka流应用程序,我想把它部署在Kafka集群上。因此,我构建了一个jar文件,并使用以下命令运行它: