当前位置: 首页 > 知识库问答 >
问题:

从 Spring 云流中的处理器写入主题 Kafka 流应用程序

闻人仲渊
2023-03-14

我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?

@Bean
fun processEvent() = Consumer<KStream<EventId, EventValue>> { event ->

    event.map{
        ...
    }.process(ProcessorSupplier {

            object : Processor<EventId, MappedEventValue> {

                private lateinit var store: KeyValueStore<EventId, MappedEventValue>

                override fun init(context: ProcessorContext) {
                    store = context.getStateStore("event-store") as KeyValueStore<EventId, MappedEventValue>
                }

                override fun process(key: EventId, value: MappedEventValue) {
                    ...
                    store.put(key, processedMappedEventValue)

                    //TODO Write into a topic
                }
            }
    }
}  

共有1个答案

蒙墨竹
2023-03-14

你不能。< code>process()方法是一个终端操作,不允许您向下游发出数据。相反,您可以使用< code>transform()(基本上与< code>process()相同,但是允许您向下游发送数据);或者根据您的应用程序,< code>transformValues()或< code>flatTransform()等。

使用

 类似资料:
  • 我有以下Spring Cloud Stream Kafka Streams Binder 3. x应用程序: 当我通过这个应用程序运行X消息时,通过使用和从联调将它们发布到,点1和点2的消息计数是相等的,正如我所期望的那样。 当我使用连接到Kafka代理的实时应用程序做同样的事情时,点1和点2的计数仍然显着不同: 消费者在< code >主题2上有很大的滞后,并且该滞后保持不变(在我停止发布消息后

  • 我正在开发一个应用程序,在该应用程序中,事件会导致spring data repository保存数据; 此代码可以引发各种异常,如DataIntegrityViolationException(运行时异常)。 处理此类异常和 生成带有导致此错误的有效负载的消息 例外, 允许生产者采取操作。

  • 我对Kafka和Kafka流很陌生,所以请容忍我。我想知道我是否在正确的轨道上。 我正在给一个Kafka主题写信,试图通过rest服务访问数据。在访问原始数据之前,需要对其进行转换。 到目前为止,我拥有的是一个将原始数据写入主题的制作人。 1)现在我想要streams应用程序(应该是一个在容器中运行的jar),它可以将数据转换为我想要的形状。遵循这里的物化视图范式。 1的过度简化版本。) 2)和另

  • 我正在使用Spring Cloud Stream Kafka Binder。我有以下Kafka活页夹函数。 在yml中,我有: 如果我想从同一个功能向两个不同的主题发送数据,我需要做什么?

  • 我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。

  • 我刚开始接触Kafka。我已经经历了这一切。它只表示kafka流DSL的数据/主题管理。任何人都可以共享Kafka流处理器API的相同数据管理的任何链接吗?我对处理器API的用户和内部主题管理特别感兴趣。 在流处理器开始使用输入数据之前,从哪里用输入数据填充此源主题? 简而言之,我们可以像制片人写主题一样,使用流来写Kafka的“源”主题吗?或者流仅用于主题的并行消费?我相信我们应该像“Kafka