当前位置: 首页 > 知识库问答 >
问题:

GCP消息在确认后停留在pub/sub中

严繁
2023-03-14

我将pub/sub订阅逻辑包装在subscribe方法中,该方法在服务初始化期间为每个订阅调用一次:

    def subscribe(self,
                  callback: typing.Callable,
                  subscription_name: str,
                  topic_name: str,
                  project_name: str = None) -> typing.Optional[SubscriberClient]:
        """Subscribes to Pub/Sub topic and return subscriber client

        :param callback: subscription callback method
        :param subscription_name: name of the subscription
        :param topic_name: name of the topic
        :param project_name: optional project name. Uses default project if not set
        :return: subscriber client or None if testing
        """
        project = project_name if project_name else self.pubsub_project_id
        self.logger.info('Subscribing to project `{}`, topic `{}`'.format(project, topic_name))

        project_path = self.pubsub_subscriber.project_path(project)
        topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
        subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)

        # check if there is an existing subscription, if not, create it
        if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
            self.logger.info('Creating new subscription `{}`, topic `{}`'.format(subscription_name, topic_name))
            self.pubsub_subscriber.create_subscription(subscription_path, topic_path)

        # subscribe to the topic
        self.pubsub_subscriber.subscribe(
            subscription_path, callback=callback,
            scheduler=self.thread_scheduler
        )
        return self.pubsub_subscriber

此方法是这样调用的:

        self.subscribe_client = self.subscribe(
            callback=self.pubsub_callback,
            subscription_name='subscription_topic',
            topic_name='topic'
        )

回调方法执行一系列操作,发送2封电子邮件,然后确认消息

    def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
        self.logger.debug('Processing pub sub message')

        try:
            self.do_something_with_message(data)

            self.logger.debug('Acknowledging the message')
            data.ack()
            self.logger.debug('Acknowledged')
            return

        except:
            self.logger.warning({
                "message": "Failed to process Pub/Sub message",
                "request_size": data.size,
                "data": data.data
            }, exc_info=True)

        self.logger.debug('Acknowledging the message 2')
        data.ack()
    null

有什么想法吗?

共有1个答案

穆浩皛
2023-03-14

在pub/sub中,确认是尽力而为的,所以消息被重新传递是可能的,但不寻常。

如果总是收到重复的邮件,可能是由于重复发布了相同的邮件内容。就pub/sub而言,这些是不同的消息,并将被分配不同的消息ID。检查pub/sub-provided消息ID,以确保您实际上多次接收相同的消息。

使用流式拉(这是Python客户机库使用的)处理大量积压的小消息时存在一个边缘情况。如果您正在运行多个订阅同一订阅的客户端,则此边缘情况可能与此相关。

    null
 类似资料:
  • 我有一个从pub/sub消息触发的云函数。此函数从不显式确认源代码中的消息。 那么,如果确认在源代码中从未发生过,这个函数什么时候确认pub/sub消息呢? 进入test_topic并发布以Go为文本的消息。 test_function日志中将出现错误。但是,只有一个函数调用会出现错误,即使几天后也会出现这种情况。

  • 我试图了解GCP pub/sub,但我有一个关于pub/sub中消息生命周期的问题。事实上,我把这篇文章作为我的参考。在这篇文章中,他们说: 一旦每个订阅的至少一个订阅者确认了该消息,Pub/Sub就从存储中删除该消息。 所以我的第一个问题是:例如,我有一个订阅a,它连接到订阅者X和订阅者Y。根据文档,当订阅者X收到消息并向订阅者a发送ACK时,Pub/Sub将从存储中删除消息,而不考虑订阅者Y是

  • 我正在使用最新版本的google-cloud-pubsub,并且正在经历一个据称已经修复的bug。 我正在使用这个版本和其中的代码示例:https://pypi.org/project/google-cloud-pubsub/ 问题:因此,在我运行呼叫订阅者的订阅者工作者大约4-5小时后,它停止接收消息。 对如何修复它有什么建议吗?

  • 如果我想发送消息到谷歌PubSub并使用它的消息。您建议使用Spring cloud GCP库还是只使用Google cloud Java API。 有人能区分这两者吗?或者与谷歌云pubsub库相比,Spring Cloud gcp提供了哪些功能。

  • 我对GCP Pub/Sub REST API感到困惑。 背景:我正在尝试使用GCP发布/订阅编写一个应用程序,其中该语言不会作为客户端库退出(我正在尝试使用R)。 因此,我需要依赖提供的REST API:https://cloud.google.com/pubsub/docs/reference/rest 根据我对REST API的理解,我们必须使用pull订阅:https://cloud.goo

  • 它会无止境地重复。此外,我在订阅关系图中看到消息仍然存在(在确认回调调用之后) 丢弃逻辑: 我不知道如何排除故障。有什么想法吗?