当前位置: 首页 > 知识库问答 >
问题:

用未来的运行关闭Google Cloud PubSub客户端

马浩淼
2023-03-14
def pubsub_callback(future):
    message_id = future.result()
    LOGGER.info("Successfully published %s", message_id)

def send_oneoff_pubsub_message(self, client=None):
    if not client:
        client = self.get_client('pubsubpub') # Creates a pubsub publisher client
    
    future = client.publish({...})
    try:
        future.exception(timeout=10)
    except Exception as exc:
        print("error")

    future.add_done_callback(pubsub_callback)

现在,在许多地方,我们正在缓慢地重构,以显式地在函数之外创建客户机(因此我们不会创建太多客户机)。然而,我仍然希望重构它,以便在消息发布后关闭客户端。

链接的问题建议在处理完客户端后client.api.transport._channel.close()。但是,在本例中,我只是在pubsub_callback被触发之后才完成它。

我看不出有任何方法可以从将来获得客户机,而且回调add_done_callback不允许发送参数。

Update:
查看代码,似乎这将在将来成功关闭客户端:

def send_oneoff_pubsub_message(self, client=None):
    if not client:
        client = self.get_client('pubsubpub') # Creates a pubsub publisher client

    future = client.publish({...})
    try:
        future.exception(timeout=10)
    except Exception as exc:
        print("error")

    future.add_done_callback(pubsub_callback)
    future.result(timeout=10)
    client.api.transport._channel.close()

这种asn方法有什么缺点吗?除了函数块直到发布(这对我来说是可以的)

共有1个答案

微生耘豪
2023-03-14

您的代码混合执行异步操作和同步操作。在调用publish之后,您将等待长达10秒的时间来等待Future上的异常。大多数情况下,publish将在10秒内完成,因此在这一点上,您不妨同步调用future.result,甚至不必使用add_done_callback:

def pubsub_callback(future):
  try:
    message_id = future.result()
    LOGGER.info("Successfully published %s", message_id)
  except Exception as exc:
    print("error")

def send_oneoff_pubsub_message(self, client=None):
  if not client:
    client_created = True
    client = self.get_client('pubsubpub') # Creates a pubsub publisher client

  future = client.publish({...})
  pubsub_callback(future)

  if client_created:
    client.api.transport._channel.close()

如果希望异步执行,可以使用functools.partial:

from functools import partial

def pubsub_callback(client, future):
  try:
    message_id = future.result()
    LOGGER.info("Successfully published %s", message_id)
  except Exception as exc:
    print("error")

  if client:
    client.api.transport._channel.close()

def send_oneoff_pubsub_message(self, client=None):
  if not client:
    client_created = True
    client = self.get_client('pubsubpub') # Creates a pubsub publisher client

  if client_created:
    callback = partial(pubsub_callback, client)
  else:
    callback = partial(pubsub_callback, None)

  future = client.publish({...})
  future.add_done_callback(callback)

任何一种方式都应该允许您在所需的点关闭客户端。

 类似资料:
  • Android studio不在Windows 8中运行。我下载并安装了它,但当我按下图标运行它时,什么都没发生。我尝试与管理员一起运行。我尝试设置与Windows 7的兼容性。

  • 我正在尝试使用WorkManager每10秒运行一个简单的工作。当应用程序在后台或前台运行时,它可以完美地工作。当我关闭应用程序(杀死应用程序)时,工作将不再被调用。 我在创建MainActivity时调用以下代码 工人阶级 这些代码在我的应用程序未关闭时运行良好。 我如何让它运行,即使应用程序关闭?

  • 问题内容: 我正在使用一些示例代码,该代码使我可以将消息从Python客户端发送到Android服务器(TCP)。但是,仅在关闭客户端后,该消息才会显示在Android仿真器上。 我可能缺少tcp套接字(首​​次使用和实现)背后的一些基本知识。 我的主要目的是使Android App中的按钮可以在单击时将不同的消息发送到单独的Linux系统上的Python客户端,并且Python客户端在收到该消息

  • socket.io 0.9 node.js 0.10.15 速递3.3.4 即:调用 --服务器端 --客户端

  • 问题内容: 如何关闭客户端的套接字连接? 我在用: socket.io 0.9 node.js 0.10.15 express3.3.4 即:呼叫 -服务器端 - 客户端 如果加载测试页,则需要来自服务器的一些值(getInitData)。 在第一页上,我获取一次数据,在重新加载或第二遍上,我获取两次数据,依此类推。 重新加载页面以及离开页面后,服务器端的连接将自动关闭。 但是在客户端,连接仍然打

  • 我刚刚开始使用hazelcast[3.3.1]。根据hazelcast应用程序和客户端教程,我创建了一个hazelcast应用程序实例和一个客户端(使用eclipse IDE)。 从客户端,我能够将对象添加到地图并成功获取它们。但是,我在实例的控制台上看到以下警告,它们似乎警告客户端断开连接。这是每个客户端get/put的正常行为吗? 在退出客户端程序之前,是否有适当的方法断开客户端与实例的连接?