我将pub/sub订阅逻辑包装在subscribe方法中,该方法在服务初始化期间为每个订阅调用一次:
def subscribe(self,
callback: typing.Callable,
subscription_name: str,
topic_name: str,
project_name: str = None) -> typing.Optional[SubscriberClient]:
"""Subscribes to Pub/Sub topic and return subscriber client
:param callback: subscription callback method
:param subscription_name: name of the subscription
:param topic_name: name of the topic
:param project_name: optional project name. Uses default project if not set
:return: subscriber client or None if testing
"""
project = project_name if project_name else self.pubsub_project_id
self.logger.info('Subscribing to project `{}`, topic `{}`'.format(project, topic_name))
project_path = self.pubsub_subscriber.project_path(project)
topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)
# check if there is an existing subscription, if not, create it
if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
self.logger.info('Creating new subscription `{}`, topic `{}`'.format(subscription_name, topic_name))
self.pubsub_subscriber.create_subscription(subscription_path, topic_path)
# subscribe to the topic
self.pubsub_subscriber.subscribe(
subscription_path, callback=callback,
scheduler=self.thread_scheduler
)
return self.pubsub_subscriber
此方法是这样调用的:
self.subscribe_client = self.subscribe(
callback=self.pubsub_callback,
subscription_name='subscription_topic',
topic_name='topic'
)
回调方法执行一系列操作,发送2封电子邮件,然后确认消息
def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
self.logger.debug('Processing pub sub message')
try:
self.do_something_with_message(data)
self.logger.debug('Acknowledging the message')
data.ack()
self.logger.debug('Acknowledged')
return
except:
self.logger.warning({
"message": "Failed to process Pub/Sub message",
"request_size": data.size,
"data": data.data
}, exc_info=True)
self.logger.debug('Acknowledging the message 2')
data.ack()
有什么想法吗?
在pub/sub中,确认是尽力而为的,所以消息被重新传递是可能的,但不寻常。
如果总是收到重复的邮件,可能是由于重复发布了相同的邮件内容。就pub/sub而言,这些是不同的消息,并将被分配不同的消息ID。检查pub/sub-provided消息ID,以确保您实际上多次接收相同的消息。
使用流式拉(这是Python客户机库使用的)处理大量积压的小消息时存在一个边缘情况。如果您正在运行多个订阅同一订阅的客户端,则此边缘情况可能与此相关。
我有一个从pub/sub消息触发的云函数。此函数从不显式确认源代码中的消息。 那么,如果确认在源代码中从未发生过,这个函数什么时候确认pub/sub消息呢? 进入test_topic并发布以Go为文本的消息。 test_function日志中将出现错误。但是,只有一个函数调用会出现错误,即使几天后也会出现这种情况。
我试图了解GCP pub/sub,但我有一个关于pub/sub中消息生命周期的问题。事实上,我把这篇文章作为我的参考。在这篇文章中,他们说: 一旦每个订阅的至少一个订阅者确认了该消息,Pub/Sub就从存储中删除该消息。 所以我的第一个问题是:例如,我有一个订阅a,它连接到订阅者X和订阅者Y。根据文档,当订阅者X收到消息并向订阅者a发送ACK时,Pub/Sub将从存储中删除消息,而不考虑订阅者Y是
我正在使用最新版本的google-cloud-pubsub,并且正在经历一个据称已经修复的bug。 我正在使用这个版本和其中的代码示例:https://pypi.org/project/google-cloud-pubsub/ 问题:因此,在我运行呼叫订阅者的订阅者工作者大约4-5小时后,它停止接收消息。 对如何修复它有什么建议吗?
如果我想发送消息到谷歌PubSub并使用它的消息。您建议使用Spring cloud GCP库还是只使用Google cloud Java API。 有人能区分这两者吗?或者与谷歌云pubsub库相比,Spring Cloud gcp提供了哪些功能。
我对GCP Pub/Sub REST API感到困惑。 背景:我正在尝试使用GCP发布/订阅编写一个应用程序,其中该语言不会作为客户端库退出(我正在尝试使用R)。 因此,我需要依赖提供的REST API:https://cloud.google.com/pubsub/docs/reference/rest 根据我对REST API的理解,我们必须使用pull订阅:https://cloud.goo
它会无止境地重复。此外,我在订阅关系图中看到消息仍然存在(在确认回调调用之后) 丢弃逻辑: 我不知道如何排除故障。有什么想法吗?