让我们假设Spring Cloud Stream应用程序从Order主题
创建KStream
。它对ordercreated{“id”:x,“productid”:y,“customerid”:z}
事件感兴趣。一旦到达,它将对其进行处理并生成一个输出事件ordershipped{“id”:x,“productID”:y,“customername”:<,“customeraddress”:z}
到相同的订单主题
。
我面临的问题是,由于Kafka流应用程序从/向同一个主题读写,所以它试图处理自己的写操作,这是没有意义的。
如何防止此应用程序处理它生成的事件?
更新:正如Artem Bilan和sobychako指出的,我曾考虑过使用kstream.filter()
,但有些细节让我怀疑如何处理:
interface ShippingKStreamProcessor {
...
@Input("order")
fun order(): KStream<String, OrderCreated>
@Output("output")
fun output(): KStream<String, OrderShipped>
@StreamListener
@SendTo("output")
fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {
OrderCreated类:
data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
constructor() : this(null, null, null)
}
OrderShipped类
data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
constructor() : this(null, null, null, null)
}
我使用JSON作为消息格式,因此消息如下所示:
我已经接受了布鲁诺的回答,认为这是解决这个问题的有效方法。但是,我想我已经提出了一个更直接/逻辑的方法,使用了用JSONTypeInfo
注释的事件结构。
首先,您需要一个用于Order事件的基类,并指定所有子类。请注意,JSON文档中将添加一个type属性,它将帮助Jackson封送/解封DTO:
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent
data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
constructor() : this(null, null, null)
}
data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
constructor() : this(null, null, null, null)
}
有了这一点,OrderCreatedEvent对象的生产者将生成如下所示的消息:
orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
完整的KStream逻辑:
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
//.to("order", Produced.with(intSerde, orderShippedSerde))
}
在此过程之后,我将生成一个新消息键:1值:{“type”:“ordershipped”,“id”:1,“productID”:24,“customername”:“anna”,“customeraddress”:“cipress street”}
到订单主题中,但这将被流过滤掉。
我正在尝试使用一个新的特性(https://www.confluent.io/blog/put-moulation-event-types-kafka-topic/)来存储同一主题上的两种不同类型的事件。实际上,我正在使用Confluent版本4.1.0并设置下面的属性来实现这一点 数据被写入主题而没有问题,并且可以从Kafka Streams应用程序中看到为通用的Avro记录。另外,在Kafka
我正在编写一个使用Kafka流的应用程序。它从主题A读取,进行一些转换,然后写入主题B。在转换期间,值按键分组,因此输出键、值类型不同于输入值类型。Kafka流使用特定类型的Serdes(例如String serdes序列化和反序列化字符串)进行序列化和反序列化,因此在数据转换后它将无法工作。如何在Streams API中定义不同的序列化器和反序列化器?
假设我有一个抽象类“车辆”和另外两个继承自名为“汽车”和“自行车”的车辆类的子类。 所以我可以随机创建一系列汽车和自行车对象: 并将它们全部添加到Arraylist: 是否有可能将这些对象写入单个文本文件,并根据特定的对象类型(汽车、自行车)从文本文件读取回Arraylist,而无需为每种类型维护不同的文本文件
我正在尝试使用头读取CSV文件。我正在使用以下代码: 是否有任何方法,我可以读取csv使用头,而跳过其他行。提前谢谢你。
使用StreamBridge,我将包含两种不同类型的对象的消息发送到单个Kafka主题。有没有办法定义一个能够使用两种类型消息的Spring Cloud Stream的功能消费者?
请任何人解释Oracle中的锁定模式,即共享、独占和更新锁定。我找到了很多关于这个和那个的理论 共享锁:没有人可以改变数据,只读目的 独占锁定:只允许一个用户/连接更改数据。 更新锁:行被锁定,直到用户提交/回滚。 然后,我尝试共享以检查它的工作原理 然后,我发现,用户可以在共享锁定后更改数据。那么,它与独占锁和更新锁有什么不同呢。 另一个问题,更新锁和排他锁有什么不同,即使它们看起来几乎是等同的