当前位置: 首页 > 知识库问答 >
问题:

如果Google PUBSUB中没有来自publisher的消息,如何让订阅者Hibernate

司浩壤
2023-03-14

我使用谷歌PubSub Publisher来触发通知,并在谷歌云存储桶中上传文档时向订阅者发送消息。

我通过->gsutil notification create-t[TOPIC_NAME]-f json-e OBJECT_FINALIZE gs://[BUCKET_NAME]创建了一个通知主题

我的订户功能是:**

def callback(message):
    try:
        #storage_client = storage.Client.from_service_account_json('storage_service_key.json')
        print('Received message: {}'.format(message.data.decode("utf-8")))
        data = json.loads(message.data.decode("utf-8"))

        filename = data['name']
        file_name = re.search(r'/(.*)', filename).group(1)
        #filelink = data['selfLink']
        print("Processing the file : {}".format(file_name))
        path = "sample/"+file_name
        download_files(path, file_name)
        rming.image_remover(file_name) ## my custom function
        message.ack()
        os.remove(file_name)


    except Exception as error_message:
         print("Error in callback method: {}".format(error_message))
         pass


flow_control = pubsub_v1.types.FlowControl(max_messages=1)

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(proj_name, sub_name)

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback,flow_control=flow_control
)
print('Listening for messages on: {}'.format(subscription_path))
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        #flag=0
        streaming_pull_future.result()
    except Exception as error_message:  # noqa
        print("exception occured while handling subscription: {}").format(error_message)
        pass

**我想在docker容器中html" target="_blank">部署这个订阅器函数。然后它将使订阅服务器24/7运行。

当没有可用的消息时,我可以让订阅服务器进入睡眠/(不是发出请求)到Publisher吗?或者我的另一个术语是,当没有传入消息时,我可以让订阅服务器在空闲时间进入睡眠吗。

任何帮助都很感激!!!!

共有1个答案

郜琦
2023-03-14

此外,在评论的总结中,还有许多事情

  • 您希望将函数放入容器中。因此,对您的应用程序使用云运行
  • 在调用云运行服务endpoint的主题上定义推送订阅
    • 小心,推送订阅和直接调用函数之间的消息格式略有不同

    云运行规模为0,您可以只处理时间,四舍五入到上限100ms。当没有消息时,它会Hibernate并不花费任何费用。

 类似资料:
  • 我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml

  • 对调试的任何帮助都将非常感谢。 这是我的堆栈: node.js socket.io express.js passport.js MongoDb react.js 流程: Anna在聊天中发送一条消息(这条消息写入数据库并发布到PubSubtopic“messages”) Node.js express应用程序运行订阅,然后根据消息内容发送给其他应该接收消息的人。 在本例中,与安娜在同一频道的鲍勃

  • 我在mac上运行Kafka和Flink作为docker容器。 我已经实现了Flink作业,它应该消耗来自Kafka主题的消息。我运行一个向主题发送消息的python生产者。 工作开始时没有问题,但没有收到任何消息。我相信这些消息被发送到了正确的主题,因为我有一个能够使用消息的python消费者。 flink作业(java): Flink作业日志: 生产者作业(python):(在主机上运行-不是d

  • 微信文档:https://developers.weixin.qq.com/miniprogram/dev/api-backend/open-api/subscribe-message/subscribeMessage.addTemplate.html 组合模板并添加至帐号下的个人模板库 $tid = 563; // 模板标题 id,可通过接口获取,也可登录小程序后台查看获取 $kidLi

  • 开普勒消息目前分为三大类:公告,告警和通知。 通知中根据不同的操作事件类型,分为十几个事件。每个事件都跟项目操作相关。便于接收项目操作变更的通知。 分类 事件 公告 Alarm 告警 Proclaim 通知 Build,Apply,Audit,Delete,Rollback,Logging,Reboot,Command,Storage,Extend... 订阅界面: 用户中心,点击头像,下拉菜单→

  • 我们为分布式进程之间的消息传输实现了ZMQ PUB/SUB机制。但是由于订阅者的流转时长,消息的处理会有一定的延迟(有时由于排队消息的数量,延迟以小时为单位)。为了克服这种延迟,我计划根据进入发布者队列的未决消息的数量来扩展订阅者进程。 是否有任何机制来获取 ZMQ 发布者队列的计数/长度? 目前,我正在考虑使用发布服务器 RAM 利用率阈值来纵向扩展/缩减订阅服务器进程。