当前位置: 首页 > 知识库问答 >
问题:

Raku Cro服务订阅“后台”数据一般指南

王飞英
2023-03-14

我试图把一个Cro服务,它有一个反应/每当块消耗数据“在后台”所以不像许多使用Cro的网络插座的例子,这与可能通过浏览器访问的路由无关。

我的用例是使用通过MQTT主题接收的消息,并对它们进行一些处理。在开发的后期阶段,我可能会使用这些数据创建一个供应,但现在,当收到数据时,它将存储在一个变量中,并取决于某些条件,通过http post发送到另一个服务。

我的想法是在Cro::HTTP::Server设置中包含一个provider(),如下所示:

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use DataProvider; # Here

my Cro::Service $http = Cro::HTTP::Server.new(
        http => <1.1>,
        host => ...,
        port => ...,
        application => [routes(), provider()], # Made this into an array of subs?
        after => [
            Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
        ]
    );

在数据提供者中。pm6:

use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {
        whenever $mqtt.subscribe('some/mqtt/topic') {
            say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
        }
    }
}

这会引发一系列错误:

A react block:
  in sub provider at DataProvider.pm6 (DataProvider) line 5
  in block <unit> at service.p6 line 26

Died because of the exception:
    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'.  Did
    you forget a '.new'?
      in method subscribe at /home/cam/raku/share/perl6/site/sources/42C762836A951A1C11586214B78AD34262EC465F (MQTT::Client) line 133
      in sub provider at DataProvider.pm6 (DataProvider) line 6
      in block <unit> at service.p6 line 26

老实说,我完全猜测这就是我如何处理在Cro服务后台订阅数据的需求,但我找不到任何关于推荐方法的信息。

最初,我在主服务中设置了react/where块。pm6文件,但这似乎不正确。需要包装在start{}块中,因为正如我刚刚了解到的,react是blocking:),而cro实际上无法启动。

但是遵循路由实现的模式似乎是合乎逻辑的,但是我错过了一些东西。这个错误涉及到设置一个新方法,但我不相信这是根本原因。Routes.pm6没有构造函数。

谁能给我指一下正确的方向吗?


共有3个答案

嵇浩然
2023-03-14

你现在看起来很好,但当我第一次看到这个时,我做了这个https://github.com/jonathanstowe/Cro-MQTT这将MQTT客户机转换为一级Cro服务。

我还没有发布它,但它可能会有所启发。

龙安阳
2023-03-14

这个错误涉及到设置一个new方法,但我不相信这是根本原因。

这不是关于建立一种新的方法。这是一个应该定义的值,而不是未定义的值。这通常意味着无法尝试初始化它,这通常意味着无法调用。新建

谁能给我指一下正确的方向吗?

希望这个问题有帮助。

我完全在猜测,这就是我在Cro服务的背景下处理订阅数据需求的方法,但我无法找到任何关于推荐方法的信息。

列出您从Cro入门开始所遵循的最新步骤可能会对您有所帮助,包括基础知识,但也包括最后的“了解”步骤。

A react block:
  in sub provider ...

Died because of the exception:
    ... 
      in method subscribe ...

错误消息以内置的react构造开始,该构造报告它捕获了异常(并通过在响应中抛出自己的异常进行处理)。与代码中出现的react相对应的“回溯”从初始的“A react block:”缩进。

错误消息继续使用反应构造来总结它自己的异常(死亡是因为...),并通过在随后的行中报告进一步缩进的原始异常来解释它自己。这包括另一个回溯,这一次对应于原始异常,这可能发生在具有不同调用栈的不同线程上。

(Raku的所有结构化多线程构造[1]都使用这种由两部分组成的错误报告方法,用于捕获异常并通过抛出另一个异常进行处理。)

第一个回溯指示反应行:

in sub provider at DataProvider.pm6 (DataProvider) line 5
use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {

第二个回溯是关于原始异常的:

    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'. ...
      in method subscribe at ... (MQTT::Client) line 133

这报告了在MQTT::Client的第133行调用的方法要求其调用者是类型'IO::Socket::Async'的实例。它得到的值属于该类型,但不是实例,而是“类型对象”。(非本机类型的所有值要么是类型对象,要么是其类型的实例。)

错误消息以以下内容结束:

  Did you forget a '.new'?

这是一个简洁的提示,它基于这样一个事实:当需要实例时,遇到类型对象的原因有99次是因为代码未能初始化变量。(类型对象的用途之一是在Perl等语言中充当“未定义”的角色。)

那么,您能理解为什么应该是“IO::Socket::Async”的初始化实例的东西是未初始化的吗?

[1]Raku的并行、并发异步结构遵循结构化编程范式。乔纳森·沃辛顿(Jonathan Worthington)关于这种总体方法的视频演示,请参见Raku中的并行、并发和异步。像react这样的结构化构造可以清晰地观察、包含和管理在其执行范围内任何地方发生的事件,包括错误(如错误异常),即使它们发生在其他线程上。

贺彬
2023-03-14

感谢所有提供信息的人,这是一次非常有价值的学习活动。

通过附加的子例程,沿侧路由器()应用程序参数Cro::HTTP::Server.new给进一步的麻烦允许,并打破路由)

相反,我将后台工作移动到它自己的类中,并给它一个更类似于Cro::HTTP::Server的方法。

我的新方法:

服务pm6

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use KlineDataSubscriber; # Moved mqtt functionality here 
use Database;

my $dsn         = "host=localhost port=5432 dbname=act user=.. password=..";
my $dbh         = Database.new :$dsn;

my $mqtt-host   = 'localhost';
my $subscriber  = KlineDataSubscriber.new :$mqtt-host;

$subscriber.start; # Inspired by $http.start below

my Cro::Service $http = Cro::HTTP::Server.new(
    http => <1.1>,
    host => ...,
    port => ...,
    application => routes($dbh), # Basically back the way it was originally 
    after => [
        Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
    ]
);

$http.start;
say "Listening at...";
react {
    whenever signal(SIGINT) {
        say "Shutting down...";
        $subscriber.stop;
        $http.stop;
        done;
    }
}

在KlineDataSubscriber.pm

use MQTT::Client;

class KlineDataSubscriber {
    has Str $.mqtt-host is required;
    has MQTT::Client $.mqtt = Nil;

    submethod TWEAK() {
        $!mqtt = MQTT::Client.new: server => $!mqtt-host;
        await $!mqtt.connect;
    }

    method start(Str $topic = 'act/feed/exchange/binance/kline-closed/+/json') {
        start {
            react {
                whenever $!mqtt.subscribe($topic) {
                    say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
                }
            }
        }
    }

    method stop() {
        # TODO Figure how to unsubscribe and cleanup nicely
    }
}

对我来说,这感觉更“Cro惯用”,但我很乐意被纠正。更重要的是,它的工作正如预期的那样,我觉得它在某种程度上证明了未来。我应该能够创建一个供应,使路由器可以使用实时数据,并将数据推送到任何连接的web客户端

我还打算使用http GETendpoint/status进行各种检查,以确保一切正常

 类似资料:
  • 第一,我试过了 第二,我尝试了下面的。(使用onErrorResumeNext),但取消了订阅。 (未调用onError,但调用OnComplete。因此已取消订阅) 第三,我试了下面。(带重试) 这比第一好。但没有刻录。 我想使刷新按钮,工作后错误。 我想知道 null 对不起,我的英语太差了。

  • 我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”

  • 在数据时代,及时了解产品数据表现是非常好的习惯。 为了帮助你随时掌握数据,诸葛在提供移动端的基础上,进一步提供“微信扫码订阅数据日报”功能。 一、如何订阅 1、 在WEB端查看数据看板 在WEB端 你可以查看配置好的数据看板,关注各类数据表现。 2、 微信扫码直接订阅 你关心的看板 选择感兴趣的看板,点击右上角“数据日报”,微信扫码即可开始订阅。 3、绑定账号后可选订阅周期 目前支持的订阅周期有:

  • 我想创建一个小应用程序,在后台记录数据。所以我试着用绑定服务。这很好,但如果我关闭应用程序,服务也会停止。< br >那么,我的问题是:使用即时服务来执行这一操作是不是一个好方法?当应用程序关闭时,我如何保持服务在后台运行(我也想在启动后启动它)?

  • 来自第三次订阅的消息会发生什么情况,是否会在TTL之后发送到死信队列 有没有办法找出消息未被使用的订阅

  • 我试图利用固有的WSO2ESB主题发布到jms队列。我已经创建了主题,并提供了一个订阅者URL:jms:/topictest?transport.jms.destinationtype=queue。然而,当我将消息发布到主题时,它不能被传递到队列。日志生成以下内容 “系统无法从jms:/queue?destination=topictest URL推断传输信息。” 另外,我似乎不知道如何发布到WS