当前位置: 首页 > 知识库问答 >
问题:

即使在确认之后,应用程序也接受来自google Pub/Sub的重复消息

丁星火
2023-03-14
2019-12-06 12:37:47.348  INFO 1 --- [sub-subscriber3] .i.g.MyAcknowledgementHandler : Acknowledged message - 1575635858865987
        var generation = message.getHeaders().get("objectGeneration");
        pubSubMessage = message.getHeaders().get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class)
        pubSubMessage.ack().addCallback(
                v -> {
                    removeFromIdempotentStore(targetMessage, false);
                    log.info("Acknowledged message - {}", generation);
                },
                e -> {
                    removeFromIdempotentStore(targetMessage, false);
                    log.error("Failed to acknowledge message - {}", generation, e);
                }
        );
2019-12-06 12:37:48.868 WARN 1 --- [sub-subscriber1] c.b.m.i.MyDiscardedMessagesHandler : Duplicate message received GenericMessage [... headers={gcp_pubsub_acknowledgement=org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter$1@1abafe68, bxwid=12345, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@3c3efd63, idempotent.keys=[objectId.mixed emails.csv, objectGeneration.1575635858865987].....

它会无止境地重复。此外,我在订阅关系图中看到消息仍然存在(在确认回调调用之后)

丢弃逻辑:

....
.gateway(nexrFlow, idempotentByHeader("objectId")); 


Consumer<GatewayEndpointSpec> idempotentByHeader(String objectIdHeader) {
    return endpointSpec -> endpointSpec.advice(idempotentByHeaderInterceptor(objectIdHeader))
            .errorChannel(errorChannel())
            .replyTimeout(0L);
}

default IdempotentReceiverInterceptor idempotentByHeaderInterceptor(String header) {
    MessageProcessor<String> headerSelector = message -> headerExpression(header).apply(message);
    var interceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(headerSelector, idempotencyStore()));
    interceptor.setDiscardChannel(idempotentDiscardChannel());
    return interceptor;
}

我不知道如何排除故障。有什么想法吗?

暂时还没有答案

 类似资料:
  • 我将pub/sub订阅逻辑包装在subscribe方法中,该方法在服务初始化期间为每个订阅调用一次: 此方法是这样调用的: 回调方法执行一系列操作,发送2封电子邮件,然后确认消息 null 有什么想法吗?

  • 图像元素不显示。它显示具有替代名称的默认图像文件。 这是我的代码。我渲染它有对象头像有值null.so如果它的null它应该显示图像。 这是我的文件夹结构

  • 在下面的代码中,我单击Submit按钮。在backing\u home中,通过ajax调用将布尔值更改为true。 如果我删除渲染的,我会正确地看到更新的输出。但是对于下面的代码,它是。我认为它没有呈现新值 可能是什么原因?

  • 好吧,我不需要详细介绍我设置的整个系统, 我遇到的问题是,当使用者取消(amqpchannel->basic_cancel)对队列的监听时,会留下一个额外的消息未被这个工作者确认。它也不会触发正常的回调来处理此消息。 队列正在阻塞(使用wait) 预取为1,这是您所能拥有的最小值 使用者可以动态侦听() 使用者可以动态忘记() 我不会告诉给定使用者使用或取消给定队列的确切方式。但这一切都是完美的工

  • 问题内容: 我正在使用Node.js,带有Redisstore的Socket.io,来自Socket.io的集群以及Redis。 我有一个发布/订阅应用程序,仅在一个Node.js节点上运行良好。但是,当它承受沉重的负担时,它最多只能耗尽服务器的一个核心,因为Node.js并不是为多核计算机编写的。 如您在下面看到的,我现在使用的是Learnboost的Cluster模块,这是制造Socket.i

  • 我已经为这个问题挣扎了几天了 我有一个android应用程序,用户可以指定植物浇水的频率。我想根据植物浇水的频率发送通知。(例如,每3天发送一次通知) 我知道有很多问题可以解决这个问题,但我仍然无法让它发挥作用 我从使用报警管理器实现/启动服务开始 主要问题是,一旦我的应用程序被终止,通知将不再发送 我正在使用firebase存储所有数据 我读了一些关于firebase函数的内容,但我不知道这是否