当前位置: 首页 > 知识库问答 >
问题:

如何在Kafka Streams DSL中查询状态存储以实现消费者幂等

刘修能
2023-03-14

我在一个场景中工作,重复的消息可能会到达消费者(KStream应用程序)。为了使用典型的情况,让我们假设它是一个OrderCreatedEvent,KStream有一个处理订单的逻辑。该事件有一个订单id,可以帮助我识别重复的消息。

我想做的是:

1)将每个订单添加到持久状态存储中

2)当处理KStream中的消息时,查询状态存储以检查消息是否已经被接收,在这种情况下不做任何事情。

        val persistentKeyValueStore = Stores.persistentKeyValueStore("order-store")

        val stateStore: Materialized<Int, Order, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Order>(persistentKeyValueStore)
                        .withKeySerde(intSerde)
                        .withValueSerde(orderSerde)

        val orderTable: KTable<Int, Order> = input.groupByKey(Serialized.with(intSerde, orderSerde))
                .reduce({ _, y -> y }, stateStore)

        var orderStream: KStream<Int, Order> = ...

        orderStream.filter { XXX }
                   .map { key, value -> 
                      processingLogic()
                      KeyValue(key, value)
                   }...

过滤器{XXX}位中,我想查询状态存储检查订单id是否存在(假设订单用作keyvaluestore的键),过滤掉已经处理的订单(存在于状态存储中)。

我的第一个问题是:如何在 KStream DSL 中查询状态存储,例如在筛选器操作内部。

第二个问题:在这种情况下,我如何处理新消息(以前未处理过的消息)的到达?如果 KTable 在订单流 KStream 执行之前将订单保存到状态存储中,则消息将已在存储中。只有在处理完成后,才应html" target="_blank">添加它们。我该怎么做?我可能不应该使用KTable,而是像这样的东西:

           orderStream.filter { keystore.get(key) == null }
                   .map { key, value -> 
                       processingLogic()
                       KeyValue(key, value)
                   }
                   .foreach { key, value -> 
                       keystore.put(key, value); 
                   }

共有1个答案

盛超
2023-03-14

根据马蒂亚斯的指示,我这样做了:

重复数据删除转换器

package com.codependent.outboxpattern.operations.stream

import com.codependent.outboxpattern.account.TransferEmitted
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.Transformer
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.state.KeyValueStore
import org.slf4j.LoggerFactory


@Suppress("UNCHECKED_CAST")
class DeduplicationTransformer : Transformer<String, TransferEmitted, KeyValue<String, TransferEmitted>> {

    private val logger = LoggerFactory.getLogger(javaClass)
    private lateinit var dedupStore: KeyValueStore<String, String>
    private lateinit var context: ProcessorContext

    override fun init(context: ProcessorContext) {
        this.context = context
        dedupStore = context.getStateStore(DEDUP_STORE) as KeyValueStore<String, String>
    }

    override fun transform(key: String, value: TransferEmitted): KeyValue<String, TransferEmitted>? {
        return if (isDuplicate(key)) {
            logger.warn("****** Detected duplicated transfer {}", key)
            null
        } else {
            logger.warn("****** Registering transfer {}", key)
            dedupStore.put(key, key)
            KeyValue(key, value)
        }
    }

    private fun isDuplicate(key: String) = dedupStore[key] != null

    override fun close() {
    }
}

FraudKafkaStreamsConfiguration配置

const val DEDUP_STORE = "dedup-store"

@Suppress("UNCHECKED_CAST")
@EnableBinding(TransferKafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val fraudDetectionService: FraudDetectionService) {

    private val logger = LoggerFactory.getLogger(javaClass)

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    @StreamListener
    @SendTo(value = ["outputKo", "outputOk"])
    fun process(@Input("input") input: KStream<String, TransferEmitted>): Array<KStream<String, *>>? {
        val fork: Array<KStream<String, *>> = input
                .transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)
                .branch(Predicate { _: String, value -> fraudDetectionService.isFraudulent(value) },
                        Predicate { _: String, value -> !fraudDetectionService.isFraudulent(value) }) as Array<KStream<String, *>>
                 ...
 类似资料:
  • 我有多个Kafka消费者和制作人,主题不同。使用独立应用程序,我想监控Kafka消费者的延迟。 我使用Kafka0.10.0.1,因为Kafka现在存储消费者偏移Kafka本身,所以我怎么能读到相同的。 我能够读取每个分区的主题偏移量。

  • 我有一个Spring-boot应用程序,可以听Kafka。为了避免重复处理,我尝试手动提交。为此,我在阅读主题后异步提交了一条消息。但是我被困在如何实现消费者幂等,这样记录就不会被处理两次。

  • 我目前正在做一个kafka java项目。我是新来的,我发现很难理解与Kafka生产者/消费者设计相关的几个基本概念。 > 比方说,我有一个带有单个分区的主题,我有一个生产者正在写这个主题,一个消费者正在从这个主题中消费。如果我部署同一个应用程序的多个实例,每个实例将运行自己的消费者。在这种情况下,因为所有消费者都属于同一个group pId,所以消息是否会在多个实例上运行的消费者之间平均分配?

  • 本文向大家介绍在MySQL存储过程中实现动态SQL查询?,包括了在MySQL存储过程中实现动态SQL查询?的使用技巧和注意事项,需要的朋友参考一下 对于存储过程中的动态SQL查询,请使用PREPARE STATEMENT的概念。让我们首先创建一个表- 使用插入命令在表中插入一些记录- 使用select语句显示表中的所有记录- 这将产生以下输出- 以下是创建存储过程并实现动态SQL的查询- 调用存储

  • ...除了这不存在。但这是关于我在这里想做什么。 或相反的: ...它获取每个GameCharacter的最早版本。为此,我已经尝试了,但显然Javers没有从最后开始计算版本。 这方面的Gradle依赖关系是:

  • 如何提高Kafka消费者的绩效?我有(并且需要)至少一次Kafka消费语义学 我有以下配置。processInDB()需要2分钟才能完成。因此,仅处理10条消息(全部在单个分区中)就需要20分钟(假设每条消息2分钟)。我可以在不同的线程中调用processInDB,但我可能会丢失消息!。如何在2到4分钟的时间窗口内处理所有10条消息? 下面是我的Kafka消费者代码。