当前位置: 首页 > 知识库问答 >
问题:

如何使用GCP Pub/Sub从订阅中提取消息?

公孙锋
2023-03-14

我对GCP Pub/Sub REST API感到困惑。

背景:我正在尝试使用GCP发布/订阅编写一个应用程序,其中该语言不会作为客户端库退出(我正在尝试使用R)。

因此,我需要依赖提供的REST API:https://cloud.google.com/pubsub/docs/reference/rest

根据我对REST API的理解,我们必须使用pull订阅:https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions/pull

问题:我的困惑是这是一个帖子请求

POST https://pubsub.googleapis.com/v1/{subscription}:pull

此POST请求有响应:

{
  "receivedMessages": [
    {
      object (ReceivedMessage)
    }
  ]
}

我如何收到POST请求的回复?这对我来说没有意义。

我的目标是订阅消息的发布/订阅,类似于此处的Python库:

要订阅Cloud Pub/Sub中的html" target="_blank">数据,您可以根据主题创建订阅,并通过传递回调函数订阅该订阅。

import os
from google.cloud import pubsub_v1

topic_name = 'projects/{project_id}/topics/{topic}'.format(
    project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
    topic='MY_TOPIC_NAME',  # Set this to something appropriate.
)

subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
    project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
    sub='MY_SUBSCRIPTION_NAME',  # Set this to something appropriate.
)

def callback(message):
    print(message.data)
    message.ack()

with pubsub_v1.SubscriberClient() as subscriber:
    subscriber.create_subscription(
        name=subscription_name, topic=topic_name)
    future = subscriber.subscribe(subscription_name, callback)
    try:
        future.result()
    except KeyboardInterrupt:
        future.cancel()

共有1个答案

丁勇
2023-03-14

云发布/订阅客户端库使用流式pull来接收消息,而不是pull。它还将用户从这些实际请求中抽象出来。客户端库本身接收消息列表,然后调用用户提供的回调。

如果您直接使用pull请求,那么您希望迭代响应中的每条消息,并对每条消息调用用户回调。在所有受支持的语言(如Python)的云发布/订阅文档中有一个例子:

from google.api_core import retry
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

NUM_MESSAGES = 3

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    # The subscriber pulls a specific number of messages. The actual
    # number of messages pulled may be smaller than max_messages.
    response = subscriber.pull(
        request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
        retry=retry.Retry(deadline=300),
    )

    ack_ids = []
    for received_message in response.received_messages:
        print(f"Received: {received_message.message.data}.")
        ack_ids.append(received_message.ack_id)

    # Acknowledges the received messages so they will not be sent again.
    subscriber.acknowledge(
        request={"subscription": subscription_path, "ack_ids": ack_ids}
    )

    print(
        f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
    )
 类似资料:
  • 我使用SockJS和StompJS,当我在浏览器中打开我的应用程序时,有时它会在连接到websocket之前尝试订阅一些主题。我希望主题订阅等待应用程序连接到websocket。 这就是我实现此代码的原因,我将其称为: 因此,我只在连接状态为时才订阅该主题,并且只有在客户端首次成功连接时才会调用该主题。 我想稍后从主题中取消订阅,所以我需要内部订阅返回的对象,我还需要内部订阅的消息。 我所实现的很

  • 我遵循这篇文档来实现上述场景。 那么,有没有人可以建议我如何一次使用多个订阅者从主题中读取消息。

  • 问题内容: 在该程序包下,您有诸如和之类的类,我想可以将其描述为一些可用的示例。 这些主题如何退订?没有方法,并且调用完全结束了Observable,对吗? 问题答案: 同时是an 和an ,可以像普通可观察对象一样取消订阅。使主题特别之处在于它是可观察者和观察者之间的桥梁。它可以通过释放观测到的项目,也可以发射新的项目。就像对期货的承诺一样,主体是可观察的对象。 这是主题科的简短说明: Asyn

  • 在这篇文章之后,我正在使用带有RxJava/RxKotlin Flowable的Room。我让它运行,但在带有3个片段的ViewPager中使用它存在问题。 我将向您介绍我的代码: 我有一个带有选项卡布局和三个片段(A、B和收藏夹)的视图分页器。前两个片段包含可以添加到收藏夹的数据列表。 在最喜欢的片段中,我使用Flowable来监听A和B所做的更改并相应地更新列表。但是,当一个项目在A和B中成为

  • 语境: 给定一个WooCommerce和WooCommerce订阅的WordPress网站,我试图获取特定用户订阅的列表。在最近的更新之前,这一行代码已经为我解决了这个问题。以下是我一直在使用的代码: 其中$user\u id是WordPress中的有效用户id。 问题: 自上次更新以来,我们经常看到以下错误: 致命错误:在/home/warfarep/public_html/wp content

  • 我用的是带条纹付款的Laravel收银员。一个用户可以有多个订阅。用户应该能够取消特定订阅。是否仍然可以按条带id或计划id取消订阅?