当前位置: 首页 > 知识库问答 >
问题:

在Kafka加入的某些事件中失败

夔庆
2023-03-14

我的Spring Cloud应用程序中有3个不同的流。每天有数十万条记录,然而,每天大约有3到4条记录消失。我在日志中看到了它们,但并没有完成所有的连接。

代码:

  @StreamListener
fun processOrderEvent(
    @Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_CHARGE_IN)
    chargeEvent: KStream<String, ChargeEvent>,
    @Input(StreamBindings.BUSINESS_CONDITION_ORDER_CHARGED_IN)
    orderEvent: KStream<String, OrderChargedEvent>,
    @Input(StreamBindings.BUSINESS_CONDITION_ORDER_CREATED_CHARGE_IN)
    orderCreatedEvent: KStream<String, OrderCreatedEvent>
) {

        val tracing = Tracing.newBuilder().build()
        val kafkaStreamsTracing = KafkaStreamsTracing.create(tracing)

        val chargeKeyValue = chargeEvent
            .filter { _, event -> shouldProcessBusinessConditions(event) && (event.flow == "___________" || event.flow == "____")}
            .transform(
                kafkaStreamsTracing.map<String, ChargeEvent, ByteArray, ByteArray>("processOrderEvent_ChargeEvent") { _, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    val keyValue = KeyValue(value.id.toString().toByteArray(), objectMapper.writeValueAsString(
                        Charge(
                            value.id.toString(),
                            value.amount?.value,
                            value.status?.name,
                            LocalDateTime.now().toString(),
                            value.paymentMethod?.type?.name,
                            value.creditor?.customerId,
                            value.channel?.name,
                            value.amount?.currency?.name,
                            value.paymentMethod?.installments,
                            value.card?.brand,
                            buildSellerEmail(value),
                            value.createdAt,
                            value.amount?.summary?.total,
                            value.amount?.summary?.paid,
                            value.amount?.summary?.refunded,
                            value.connect?.id,
                            value.connect?.name,
                            value.flow
                        )).toByteArray())

                    log.info("m=processOrderEvent traceId=$traceId chargeId=${value.id} step=chargeKeyValue")
                    keyValue
                }
            )

        val orderKeyValue = orderEvent
            .transform(
                kafkaStreamsTracing.map<String, OrderChargedEvent, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent") { _, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    log.info("m=processOrderEvent traceId=$traceId chargeId=${value.chargeId} orderId=${value.orderId} step=orderKeyValue")
                    KeyValue(value.chargeId.toByteArray(), objectMapper.writeValueAsString(Order(value.orderId, value.chargeId)).toByteArray())
                }
            )

        val orderCreatedKeyValue = orderCreatedEvent
            .transform(
                kafkaStreamsTracing.map<String, OrderCreatedEvent, ByteArray, ByteArray>("processOrderEvent_OrderCreatedEvent") { _, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} step=before_orderCreatedKeyValue")
                    val originalValue = buildOrderOriginalValue(value)
                    val keyValue = KeyValue(value.orderId.toByteArray(), objectMapper.writeValueAsString(OrderCreated(value.orderId, originalValue)).toByteArray())
                    log.info("m=processOrderEvent traceId=$traceId orderId=${value.orderId} originalValue=${originalValue} step=orderCreatedKeyValue")
                    keyValue
                }
            )

        chargeKeyValue.join(orderKeyValue, OrderChargeValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
            .transform(
                kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent") { _, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    val orderWithChargeJson = objectMapper.readValue(value, OrderWithCharge::class.java)
                    val keyValue = KeyValue(orderWithChargeJson.order.orderId!!.toByteArray(), value)
                    log.info("m=processOrderEvent traceId=$traceId chargeId=${orderWithChargeJson.charge.chargeId} orderId=${orderWithChargeJson.order.orderId} step=orderKeyValueJoin")
                    keyValue
                }
            )
            .join(orderCreatedKeyValue, OrderCreatedValueJoiner(), JoinWindows.of(Duration.ofHours(5)))
            .transform(
                kafkaStreamsTracing.map<ByteArray, ByteArray, ByteArray, ByteArray>("processOrderEvent_OrderChargedEvent_ChargeEvent_OrderCreatedEvent") { key, value ->
                    var traceId = tracing.tracer().currentSpan().context().traceIdString()
                    val event = objectMapper.readValue(value, OrderWithChargeAndOrderCreated::class.java)
                    log.info("m=processOrderEvent traceId=$traceId chargeId=${event.charge.chargeId} orderId=${event.order.orderId} step=orderCreatedKeyValueJoin")
                    KeyValue(key, objectMapper.writeValueAsString(OrderWithChargeAndOrderCreatedTraceId(traceId, event)).toByteArray())
                }
            ).process(ProcessorSupplier { businessConditionCkoutEventProcessor })
}

少数几个不起作用的日志之一:

21年9月1日上午5:48:15.863 e2e64c0faa9e 05:48:15.863[订单-chargeds-charges-v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24-StreamThread-1]信息u.p.s.SomeEventStreams-m=处理订单事件跟踪ID=06352e7dfb290f6f chargeId=-c798-4891-58ecdbf7ccf7订单ID=ORDE\u413D-4D5C-\u1873F0DB5C ADE步骤=orderKeyValueJoin

21年9月1日5:48:15.763 AM
c232aa303f2f 05:48:15.763[orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=06352e7dfb290f6f chargeId=-c798-4891-58ecdbf7ccf7 orderId=Order\UUU413D-4D5C-\U1873F0DB5ADE步骤=OrderKeyKeyId价值观

9/1/21 5:48:15.749AM
c232aa303f2f05:48:15.749[orders-chargeds-charge-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]INFO u. p. p. s.

21年8月31日下午6:31:50.499
c232aa303f2f 18:31:50.499[orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=95fc78a495f03b64 orderId=Order\uUU-413D-4D5C-1873F0DB5ADE原始值=22000步骤=orderCreatedKeyValue

8/31/21 6:31:50.499PM
c232aa303f2f18:31:50.499[orders-chargeds-charices-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]INFO u. p. p. s.一些事件流-m=Process OrderEvent traceId=95fc78a495f03b64 orderId=ORDE_3_-413D-4D5C-_-1873F0DB5ADE步骤=before_orderCreatedKeyValue

数千个工作之一的日志:

9/2/21 6:34:33.547PM
f84980d99867 18:34:33.547[order-chargeds-chargeds-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1]INFO u. p. p. s.一些事件流-m=进程事件跟踪ID=3b5981d48ac7e130 chargeId=-243a--8576-3cb1173804d5 orderId=ORDE__-084F-4CCE-_-BA4DCAF1D50A步骤=orderCreatedKeyValueJoin

21年9月2日下午6:34:33.446 e2e64c0faa9e 18:34:33.446[订单-chargeds-charges-v3-df80aa80-d3f6-48c7-a862-7a05c55d5d24-StreamThread-1]信息u.p.s.SomeEventStreams-m=processOrderEvent traceId=3B5981D8AC7E130 chargeId=3834ad80-243a-4ffe-8576-3cb1173804d5 orderId=Order\U084F-4CE-BAU4DCAF1D50A步骤=orderKeyValueJoin

21年9月2日下午6:34:33.346
f84980d99867 18:34:33.346[订单-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=6501784c0bf59948 orderId=Order\uUU-084F-4CCE-\u4DCAF1D50A原始值=85400步骤=orderCreatedKeyValue

21年9月2日下午6:34:33.346
f84980d99867 18:34:33.346[orders-chargeds-charges-v3-63c5bb8a-8026-4b05-937f-c41360f90201-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=6501784c0bf59948 orderId=Order\uUU-084F-4CCE-\u4DCAF1D50A步骤=在orderCreatedKeyValue之前

21年9月2日下午6:34:33.344
c232aa303f2f 18:34:33.344[orders-chargeds-charges-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]信息u.p.p.s.SomeEventStreams-m=processOrderEvent traceId=3b5981d48ac7e130 chargeId=-243a-4ffe-3cb1173804d5 orderId=-084F-4CCE-BA4DCAF1D50A步骤=orderKeyValue

9/2/21 6:34:33.326PM
c232aa303f2f18:34:33.326[orders-chargeds-charge-v3-a1f58fb2-63d6-4195-bffd-6a9009b00707-StreamThread-1]INFO u. p. p. s.

“OrderCreatedKeyValuejoe”日志是结束,并且在大多数情况下会显示,但是,在某些情况下,事件永远不会在结束时出现。

共有1个答案

滕成双
2023-03-14

您需要检查如何使用Kafka实现DLQ。

要实现DLQ,您需要创建单独的队列,无论主题如何失败,都需要将其放入DLQ中。写入重试逻辑以读取它。

这些文件对你有帮助吗https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC1/multi/multi_kafka-dlq-processing.html

带spring Kafka的Kafka死信队列(DLQ)

 类似资料:
  • Kafka 服务器和客户端 JAR 移至最新库:0.10.0.1 我的消费者和生产者代码使用如上所述的最新kafka jars,但仍然使用旧的消费者API(0 . 8 . 2)。 我在调用commit offset时在消费者端遇到问题。 kafka服务器端配置: 以下 Kafka 消费者的配置: 要创建消费者,我使用以下api: 和提交调用 在从 Kafka 读取消息时,我们使用以下方法来处理超时

  • 我有一个spring boot应用程序与单个Kafka消费者从一些主题获取消息。但有时在处理消息时会出现错误。 我理解我需要禁用自动提交并手动提交成功的消息,但是,在这种情况下,如果我没有为这个异常情况抛出任何异常,并手动提交每个下一个成功的消息,那么我将丢失前一个不成功的消息,对吗?

  • 我正在用Python编写一个带有Tkinter UI的程序。我想有一个没有标题栏的小窗口。此窗口必须接收键盘输入。我并不挑剔这是一个条目小部件的形式,还是仅仅绑定到KeyPress。 通常是禁用标题栏的方式。不幸的是,(除了在Windows中,这似乎阻止了许多事件被接收。我写了这段代码来说明问题: 这将创建一个小窗口(没有标题栏),当它接收到公共事件时,该窗口将打印这些事件的名称。我已经在Wind

  • 我需要延迟处理一些事件。 我有三件事(发表在Kafka上): A(id: 1, retry At: now) B(id: 2, retry At: 10分钟后) C(id: 3, retry At: now) 我需要立即处理记录A和C,而记录B需要在十分钟后处理。这在Apache Flink中实现可行吗? 到目前为止,无论我研究了什么,“触发器”似乎都有助于在Flink中实现它,但还没有能够正确实

  • 我正在使用libgdx和box2d开发我的第一个游戏。我正在使用调试渲染器测试我的对象。我创造了一些类似汽车的物体。每辆车都有一个主体,它是一个由6个点组成的大多边形(约1米长,0.7米高),有两个轮子通过旋转关节连接。 主车有一门大炮和一把机枪,也通过旋转接头连接。 我面临的问题是,汽车的大部分没有按预期进行碰撞。当两辆车相互碰撞时,它们是重叠的,如下所示: 一些注意事项: 轮子和大炮(较小的形

  • 这是一个非常奇怪的问题,但我只是在JPanel中添加了一个简单的keyListener,它在keyPressed和KeyRelease上打印。通常它工作得很好,但在像'A'这样的某些键上,如果我在释放之前按住不放,在释放之后没有其他键会触发按下的键,直到我按住'D'这样的某些键。之后,它又回到了平常,除非我按下一个“坏”键,并保持太长时间。 最后一个注意,KeyRelease总是正确地触发,只是按