我试图把一个Cro服务,它有一个反应/每当块消耗数据“在后台”所以不像许多使用Cro的网络插座的例子,这与可能通过浏览器访问的路由无关。
我的用例是使用通过MQTT主题接收的消息,并对它们进行一些处理。在开发的后期阶段,我可能会使用这些数据创建一个供应,但现在,当收到数据时,它将存储在一个变量中,并取决于某些条件,通过http post发送到另一个服务。
我的想法是在Cro::HTTP::Server
设置中包含一个provider()
,如下所示:
use Cro::HTTP::Log::File;
use Cro::HTTP::Server;
use Routes;
use DataProvider; # Here
my Cro::Service $http = Cro::HTTP::Server.new(
http => <1.1>,
host => ...,
port => ...,
application => [routes(), provider()], # Made this into an array of subs?
after => [
Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
]
);
在数据提供者中。pm6:
use MQTT::Client;
sub provider() is export {
my $mqtt = MQTT::Client.new: server => 'localhost';
react {
whenever $mqtt.subscribe('some/mqtt/topic') {
say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
}
}
}
这会引发一系列错误:
A react block:
in sub provider at DataProvider.pm6 (DataProvider) line 5
in block <unit> at service.p6 line 26
Died because of the exception:
Invocant of method 'write' must be an object instance of type
'IO::Socket::Async', not a type object of type 'IO::Socket::Async'. Did
you forget a '.new'?
in method subscribe at /home/cam/raku/share/perl6/site/sources/42C762836A951A1C11586214B78AD34262EC465F (MQTT::Client) line 133
in sub provider at DataProvider.pm6 (DataProvider) line 6
in block <unit> at service.p6 line 26
老实说,我完全猜测这就是我如何处理在Cro服务后台订阅数据的需求,但我找不到任何关于推荐方法的信息。
最初,我在主服务中设置了react/where块。pm6文件,但这似乎不正确。需要包装在
start{}
块中,因为正如我刚刚了解到的,react是blocking:),而cro实际上无法启动。
但是遵循路由实现的模式似乎是合乎逻辑的,但是我错过了一些东西。这个错误涉及到设置一个新方法,但我不相信这是根本原因。
Routes.pm6
没有构造函数。
谁能给我指一下正确的方向吗?
你现在看起来很好,但当我第一次看到这个时,我做了这个https://github.com/jonathanstowe/Cro-MQTT这将MQTT客户机转换为一级Cro服务。
我还没有发布它,但它可能会有所启发。
这个错误涉及到设置一个new
方法,但我不相信这是根本原因。
这不是关于建立一种新的方法。这是一个应该定义的值,而不是未定义的值。这通常意味着无法尝试初始化它,这通常意味着无法调用。新建
。
谁能给我指一下正确的方向吗?
希望这个问题有帮助。
我完全在猜测,这就是我在Cro服务的背景下处理订阅数据需求的方法,但我无法找到任何关于推荐方法的信息。
列出您从Cro入门开始所遵循的最新步骤可能会对您有所帮助,包括基础知识,但也包括最后的“了解”步骤。
A react block:
in sub provider ...
Died because of the exception:
...
in method subscribe ...
错误消息以内置的react
构造开始,该构造报告它捕获了异常(并通过在响应中抛出自己的异常进行处理)。与代码中出现的react
相对应的“回溯”从初始的“A react block:”缩进。
错误消息继续使用反应
构造来总结它自己的异常(死亡是因为...
),并通过在随后的行中报告进一步缩进的原始异常来解释它自己。这包括另一个回溯,这一次对应于原始异常,这可能发生在具有不同调用栈的不同线程上。
(Raku的所有结构化多线程构造[1]都使用这种由两部分组成的错误报告方法,用于捕获异常并通过抛出另一个异常进行处理。)
第一个回溯指示反应
行:
in sub provider at DataProvider.pm6 (DataProvider) line 5
use MQTT::Client;
sub provider() is export {
my $mqtt = MQTT::Client.new: server => 'localhost';
react {
第二个回溯是关于原始异常的:
Invocant of method 'write' must be an object instance of type
'IO::Socket::Async', not a type object of type 'IO::Socket::Async'. ...
in method subscribe at ... (MQTT::Client) line 133
这报告了在MQTT::Client
的第133行调用的写
方法要求其调用者是类型'IO::Socket::Async'的实例。它得到的值属于该类型,但不是实例,而是“类型对象”。(非本机类型的所有值要么是类型对象,要么是其类型的实例。)
错误消息以以下内容结束:
Did you forget a '.new'?
这是一个简洁的提示,它基于这样一个事实:当需要实例时,遇到类型对象的原因有99次是因为代码未能初始化变量。(类型对象的用途之一是在Perl等语言中充当“未定义”的角色。)
那么,您能理解为什么应该是“IO::Socket::Async”的初始化实例的东西是未初始化的吗?
[1]Raku的并行、并发和异步结构遵循结构化编程范式。乔纳森·沃辛顿(Jonathan Worthington)关于这种总体方法的视频演示,请参见Raku中的并行、并发和异步。像react
这样的结构化构造可以清晰地观察、包含和管理在其执行范围内任何地方发生的事件,包括错误(如错误异常),即使它们发生在其他线程上。
感谢所有提供信息的人,这是一次非常有价值的学习活动。
通过附加的子例程,沿侧路由器()
在应用程序
参数Cro::HTTP::Server.new
给进一步的麻烦允许,并打破路由)
相反,我将后台工作移动到它自己的类中,并给它一个更类似于Cro::HTTP::Server的方法。
我的新方法:
服务pm6
use Cro::HTTP::Log::File;
use Cro::HTTP::Server;
use Routes;
use KlineDataSubscriber; # Moved mqtt functionality here
use Database;
my $dsn = "host=localhost port=5432 dbname=act user=.. password=..";
my $dbh = Database.new :$dsn;
my $mqtt-host = 'localhost';
my $subscriber = KlineDataSubscriber.new :$mqtt-host;
$subscriber.start; # Inspired by $http.start below
my Cro::Service $http = Cro::HTTP::Server.new(
http => <1.1>,
host => ...,
port => ...,
application => routes($dbh), # Basically back the way it was originally
after => [
Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
]
);
$http.start;
say "Listening at...";
react {
whenever signal(SIGINT) {
say "Shutting down...";
$subscriber.stop;
$http.stop;
done;
}
}
在KlineDataSubscriber.pm
use MQTT::Client;
class KlineDataSubscriber {
has Str $.mqtt-host is required;
has MQTT::Client $.mqtt = Nil;
submethod TWEAK() {
$!mqtt = MQTT::Client.new: server => $!mqtt-host;
await $!mqtt.connect;
}
method start(Str $topic = 'act/feed/exchange/binance/kline-closed/+/json') {
start {
react {
whenever $!mqtt.subscribe($topic) {
say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
}
}
}
}
method stop() {
# TODO Figure how to unsubscribe and cleanup nicely
}
}
对我来说,这感觉更“Cro惯用”,但我很乐意被纠正。更重要的是,它的工作正如预期的那样,我觉得它在某种程度上证明了未来。我应该能够创建一个供应,使路由器可以使用实时数据,并将数据推送到任何连接的web客户端。
我还打算使用http GETendpoint/status
进行各种检查,以确保一切正常
第一,我试过了 第二,我尝试了下面的。(使用onErrorResumeNext),但取消了订阅。 (未调用onError,但调用OnComplete。因此已取消订阅) 第三,我试了下面。(带重试) 这比第一好。但没有刻录。 我想使刷新按钮,工作后错误。 我想知道 null 对不起,我的英语太差了。
我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”
在数据时代,及时了解产品数据表现是非常好的习惯。 为了帮助你随时掌握数据,诸葛在提供移动端的基础上,进一步提供“微信扫码订阅数据日报”功能。 一、如何订阅 1、 在WEB端查看数据看板 在WEB端 你可以查看配置好的数据看板,关注各类数据表现。 2、 微信扫码直接订阅 你关心的看板 选择感兴趣的看板,点击右上角“数据日报”,微信扫码即可开始订阅。 3、绑定账号后可选订阅周期 目前支持的订阅周期有:
我想创建一个小应用程序,在后台记录数据。所以我试着用绑定服务。这很好,但如果我关闭应用程序,服务也会停止。< br >那么,我的问题是:使用即时服务来执行这一操作是不是一个好方法?当应用程序关闭时,我如何保持服务在后台运行(我也想在启动后启动它)?
来自第三次订阅的消息会发生什么情况,是否会在TTL之后发送到死信队列 有没有办法找出消息未被使用的订阅
我试图利用固有的WSO2ESB主题发布到jms队列。我已经创建了主题,并提供了一个订阅者URL:jms:/topictest?transport.jms.destinationtype=queue。然而,当我将消息发布到主题时,它不能被传递到队列。日志生成以下内容 “系统无法从jms:/queue?destination=topictest URL推断传输信息。” 另外,我似乎不知道如何发布到WS