我的Spring Cloud应用程序中有3个不同的流。每天有数十万条记录,然而,每天大约有3到4条记录消失。我在日志中看到了它们,但并没有完成所有的连接。
代码:
@StreamListener
fun processOrderEvent(
@Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_CHARGE_IN)
chargeEvent: KStream<String, ChargeEvent>,
@Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_IN)
orderEvent: KStream<String, OrderChargedEvent>,
@Input(StreamBindings.BUSINESS_CONDITION_ORDER_CREATED_CHARGE_IN)
orderCreatedEvent: KStream<String, OrderCreatedEvent>
) {
val tracing = Tracing.newBuilder().build()
val kafkaStreamsTracing = KafkaStreamsTracing.create(tracing)
val chargeKeyValue = chargeEvent
.filter { _, event -> shouldProcessBusinessConditions(event) && (event.flow == "___________" || event.flow == "____")}
.transform(
kafkaStreamsTracing.map<String, ChargeEvent, ByteArray, ByteArray>("processOrderEvent_ChargeEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
val keyValue = KeyValue(value.id.toString().toByteArray(), objectMapper.writeValueAsString(
Charge(
value.id.toString(),
value.amount?.value,
value.status?.name,
LocalDateTime.now().toString(),
value.paymentMethod?.type?.name,
value.creditor?.customerId,
value.channel?.name,
value.amount?.currency?.name,
value.paymentMethod?.installments,
value.card?.brand,
buildSellerEmail(value),
value.createdAt,
value.amount?.summary?.total,
value.amount?.summary?.paid,
value.amount?.summary?.refunded,
value.connect?.id,
value.connect?.name,
value.flow
)).toByteArray())
log.info("m=processOrderEvent traceId=$traceId chargeId=${value.id} step=chargeKeyValue")
keyValue
}
)
val orderKeyValue = orderEvent
.transform(
kafkaStreamsTracing.map<String, OrderChargedEvent, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
log.info("m=processOrderEvent traceId=$traceId chargeId=${value.chargeId} orderId=${value.orderId} step=orderKeyValue")
KeyValue(value.chargeId.toByteArray(), objectMapper.writeValueAsString(Order(value.orderId, value.chargeId)).toByteArray())
}
)
val orderCreatedKeyValue = orderCreatedEvent
.transform(
kafkaStreamsTracing.map<String, OrderCreatedEvent, ByteArray, ByteArray>("processOrderEvent_OrderCreatedEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} step=before_orderCreatedKeyValue")
val originalValue = buildOrderOriginalValue(value)
val keyValue = KeyValue(value.orderId.toByteArray(), objectMapper.writeValueAsString(OrderCreated(value.orderId, originalValue)).toByteArray())
log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} originalValue=${originalValue} step=orderCreatedKeyValue")
keyValue
}
)
chargeKeyValue.join(orderKeyValue, OrderChargeValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
.transform(
kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent") { _, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
val orderWithChargeJson = objectMapper.readValue(value, OrderWithCharge::class.java)
val keyValue = KeyValue(orderWithChargeJson.order.orderId!!.toByteArray(), value)
log.info("m=processOrderEvent traceId=$traceId chargeId=${orderWithChargeJson.charge.chargeId} orderId=${orderWithChargeJson.order.orderId} step=orderKeyValueJoin")
keyValue
}
)
.join(orderCreatedKeyValue, OrderCreatedValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
.transform(
kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent_OrderCreatedEvent") { key, value ->
var traceId = tracing.tracer().currentSpan().context().traceIdString()
val event = objectMapper.readValue(value, OrderWithChargeAndOrderCreated::class.java)
log.info("m=processOrderEvent traceId=$traceId chargeId=${event.charge.chargeId} orderId=${event.order.orderId} step=orderCreatedKeyValueJoin")
KeyValue(key, objectMapper.writeValueAsString(OrderWithChargeAndOrderCreatedTraceId(traceId, event)).toByteArray())
}
).process(ProcessorSupplier { businessConditionCkoutEventProcessor })
}
少数几个不起作用的日志之一:
21年9月1日上午5:48:15.863 e2e64c0faa9e 05:48:15.863[订单-chargeds-charges-v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24-StreamThread-1]信息u.p.s.SomeEventStreams-m=处理订单事件跟踪ID=06352e7dfb290f6f chargeId=-c798-4891-58ecdbf7ccf7订单ID=ORDE\u413D-4D5C-\u1873F0DB5C ADE步骤=orderKeyValueJoin
21年9月1日5:48:15.763 AM
c232aa303f2f 05:48:15.763[orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=06352e7dfb290f6f chargeId=-c798-4891-58ecdbf7ccf7 orderId=Order\UUU413D-4D5C-\U1873F0DB5ADE步骤=OrderKeyKeyId价值观
9/1/21 5:48:15.749AM
c232aa303f2f05:48:15.749[orders-chargeds-charge-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]INFO u. p. p. s.
21年8月31日下午6:31:50.499
c232aa303f2f 18:31:50.499[orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=95fc78a495f03b64 orderId=Order\uUU-413D-4D5C-1873F0DB5ADE原始值=22000步骤=orderCreatedKeyValue
8/31/21 6:31:50.499PM
c232aa303f2f18:31:50.499[orders-chargeds-charices-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]INFO u. p. p. s.一些事件流-m=Process OrderEvent traceId=95fc78a495f03b64 orderId=ORDE_3_-413D-4D5C-_-1873F0DB5ADE步骤=before_orderCreatedKeyValue
数千个工作之一的日志:
9/2/21 6:34:33.547PM
f84980d99867 18:34:33.547[order-chargeds-chargeds-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1]INFO u. p. p. s.一些事件流-m=进程事件跟踪ID=3b5981d48ac7e130 chargeId=-243a--8576-3cb1173804d5 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A步骤=orderCreatedKeyValueJoin
21年9月2日下午6:34:33.446 e2e64c0faa9e 18:34:33.446[订单-chargeds-charges-v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24-StreamThread-1]信息u.p.s.SomeEventStreams-m=processOrderEvent traceId=3B5981D8AC7E130 chargeId=3834ad80-243a-4ffe-8576-3cb1173804d5 orderId=Order\U084F-4CE-BAU4DCAF1D50A步骤=orderKeyValueJoin
21年9月2日下午6:34:33.346
f84980d99867 18:34:33.346[订单-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=6501784c0bf59948 orderId=Order\uUU-084F-4CCE-\u4DCAF1D50A原始值=85400步骤=orderCreatedKeyValue
21年9月2日下午6:34:33.346
f84980d99867 18:34:33.346[orders-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=6501784c0bf59948 orderId=Order\uUU-084F-4CCE-\u4DCAF1D50A步骤=在orderCreatedKeyValue之前
21年9月2日下午6:34:33.344
c232aa303f2f 18:34:33.344[orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=-243a-4ffe-3cb1173804d5 orderId=-084F-4CCE-BA4DCAF1D50A步骤=orderKeyValue
9/2/21 6:34:33.326PM
c232aa303f2f18:34:33.326[orders-chargeds-charge-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]INFO u. p. p. s.
“OrderCreatedKeyValuejoe”日志是结束,并且在大多数情况下会显示,但是,在某些情况下,事件永远不会在结束时出现。
您需要检查如何使用Kafka实现DLQ。
要实现DLQ,您需要创建单独的队列,无论主题如何失败,都需要将其放入DLQ中。写入重试逻辑以读取它。
这些文件对你有帮助吗https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC1/multi/multi_kafka-dlq-processing.html
带spring Kafka的Kafka死信队列(DLQ)
Kafka 服务器和客户端 JAR 移至最新库:0.10.0.1 我的消费者和生产者代码使用如上所述的最新kafka jars,但仍然使用旧的消费者API(0 . 8 . 2)。 我在调用commit offset时在消费者端遇到问题。 kafka服务器端配置: 以下 Kafka 消费者的配置: 要创建消费者,我使用以下api: 和提交调用 在从 Kafka 读取消息时,我们使用以下方法来处理超时
我有一个spring boot应用程序与单个Kafka消费者从一些主题获取消息。但有时在处理消息时会出现错误。 我理解我需要禁用自动提交并手动提交成功的消息,但是,在这种情况下,如果我没有为这个异常情况抛出任何异常,并手动提交每个下一个成功的消息,那么我将丢失前一个不成功的消息,对吗?
我正在用Python编写一个带有Tkinter UI的程序。我想有一个没有标题栏的小窗口。此窗口必须接收键盘输入。我并不挑剔这是一个条目小部件的形式,还是仅仅绑定到KeyPress。 通常是禁用标题栏的方式。不幸的是,(除了在Windows中,这似乎阻止了许多事件被接收。我写了这段代码来说明问题: 这将创建一个小窗口(没有标题栏),当它接收到公共事件时,该窗口将打印这些事件的名称。我已经在Wind
我需要延迟处理一些事件。 我有三件事(发表在Kafka上): A(id: 1, retry At: now) B(id: 2, retry At: 10分钟后) C(id: 3, retry At: now) 我需要立即处理记录A和C,而记录B需要在十分钟后处理。这在Apache Flink中实现可行吗? 到目前为止,无论我研究了什么,“触发器”似乎都有助于在Flink中实现它,但还没有能够正确实
我正在使用libgdx和box2d开发我的第一个游戏。我正在使用调试渲染器测试我的对象。我创造了一些类似汽车的物体。每辆车都有一个主体,它是一个由6个点组成的大多边形(约1米长,0.7米高),有两个轮子通过旋转关节连接。 主车有一门大炮和一把机枪,也通过旋转接头连接。 我面临的问题是,汽车的大部分没有按预期进行碰撞。当两辆车相互碰撞时,它们是重叠的,如下所示: 一些注意事项: 轮子和大炮(较小的形
这是一个非常奇怪的问题,但我只是在JPanel中添加了一个简单的keyListener,它在keyPressed和KeyRelease上打印。通常它工作得很好,但在像'A'这样的某些键上,如果我在释放之前按住不放,在释放之后没有其他键会触发按下的键,直到我按住'D'这样的某些键。之后,它又回到了平常,除非我按下一个“坏”键,并保持太长时间。 最后一个注意,KeyRelease总是正确地触发,只是按