当前位置: 首页 > 知识库问答 >
问题:

如何忽略Kafka Streams应用程序中从同一主题读写不同事件类型的某些类型的消息

詹高畅
2023-03-14

让我们假设Spring Cloud Stream应用程序从Order主题创建KStream。它对ordercreated{“id”:x,“productid”:y,“customerid”:z}事件感兴趣。一旦到达,它将对其进行处理并生成一个输出事件ordershipped{“id”:x,“productID”:y,“customername”:<,“customeraddress”:z}到相同的订单主题

我面临的问题是,由于Kafka流应用程序从/向同一个主题读写,所以它试图处理自己的写操作,这是没有意义的。

如何防止此应用程序处理它生成的事件?

更新:正如Artem Bilan和sobychako指出的,我曾考虑过使用kstream.filter(),但有些细节让我怀疑如何处理:

interface ShippingKStreamProcessor {
    ...
    @Input("order")
    fun order(): KStream<String, OrderCreated>

    @Output("output")
    fun output(): KStream<String, OrderShipped>
    @StreamListener
    @SendTo("output")
    fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {

OrderCreated类:

data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
    constructor() : this(null, null, null)
}

OrderShipped类

data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
    constructor() : this(null, null, null, null)
}

我使用JSON作为消息格式,因此消息如下所示:

    null

共有1个答案

隆扬
2023-03-14

我已经接受了布鲁诺的回答,认为这是解决这个问题的有效方法。但是,我想我已经提出了一个更直接/逻辑的方法,使用了用JSONTypeInfo注释的事件结构。

首先,您需要一个用于Order事件的基类,并指定所有子类。请注意,JSON文档中将添加一个type属性,它将帮助Jackson封送/解封DTO:

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
    JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
    JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent

data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
    constructor() : this(null, null, null)
}

data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
    constructor() : this(null, null, null, null)
}

有了这一点,OrderCreatedEvent对象的生产者将生成如下所示的消息:

orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }

完整的KStream逻辑:

@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {

        val intSerde = Serdes.IntegerSerde()
        val customerSerde = JsonSerde<Customer>(Customer::class.java)
        val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)

        val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
                Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
                        .withKeySerde(intSerde)
                        .withValueSerde(customerSerde)

        val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
                .reduce({ _, y -> y }, stateStore)


        return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
                .map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
                .selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
                .join(customerTable, { orderIt, customer ->
                    OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
                }, Joined.with(intSerde, orderCreatedSerde, customerSerde))
                .selectKey { _, value -> value.id }
                //.to("order", Produced.with(intSerde, orderShippedSerde))
    }

在此过程之后,我将生成一个新消息键:1值:{“type”:“ordershipped”,“id”:1,“productID”:24,“customername”:“anna”,“customeraddress”:“cipress street”}到订单主题中,但这将被流过滤掉。

 类似资料:
  • 我正在尝试使用一个新的特性(https://www.confluent.io/blog/put-moulation-event-types-kafka-topic/)来存储同一主题上的两种不同类型的事件。实际上,我正在使用Confluent版本4.1.0并设置下面的属性来实现这一点 数据被写入主题而没有问题,并且可以从Kafka Streams应用程序中看到为通用的Avro记录。另外,在Kafka

  • 我正在编写一个使用Kafka流的应用程序。它从主题A读取,进行一些转换,然后写入主题B。在转换期间,值按键分组,因此输出键、值类型不同于输入值类型。Kafka流使用特定类型的Serdes(例如String serdes序列化和反序列化字符串)进行序列化和反序列化,因此在数据转换后它将无法工作。如何在Streams API中定义不同的序列化器和反序列化器?

  • 假设我有一个抽象类“车辆”和另外两个继承自名为“汽车”和“自行车”的车辆类的子类。 所以我可以随机创建一系列汽车和自行车对象: 并将它们全部添加到Arraylist: 是否有可能将这些对象写入单个文本文件,并根据特定的对象类型(汽车、自行车)从文本文件读取回Arraylist,而无需为每种类型维护不同的文本文件

  • 我正在尝试使用头读取CSV文件。我正在使用以下代码: 是否有任何方法,我可以读取csv使用头,而跳过其他行。提前谢谢你。

  • 使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?

  • 请任何人解释Oracle中的锁定模式,即共享、独占和更新锁定。我找到了很多关于这个和那个的理论 共享锁:没有人可以改变数据,只读目的 独占锁定:只允许一个用户/连接更改数据。 更新锁:行被锁定,直到用户提交/回滚。 然后,我尝试共享以检查它的工作原理 然后,我发现,用户可以在共享锁定后更改数据。那么,它与独占锁和更新锁有什么不同呢。 另一个问题,更新锁和排他锁有什么不同,即使它们看起来几乎是等同的