下面的最小示例(针对我遇到的问题)将在通过同一通道发送的第二个命令时失败,错误为“channel closed”。
use strictures 2;
use AnyEvent::RabbitMQ;
main();
############################################################################
sub main {
_log( debug => 'main' );
my $condvar = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new;
$ar->load_xml_spec;
_log( debug => 'Connecting to RabbitMQ...' );
$ar->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
on_success => sub { _on_connect_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
$condvar->recv;
$ar->close;
return;
}
############################################################################
sub _on_connect_success {
my ( $condvar, $ar, $new_ar ) = @_;
_log( debug => 'Connected to RabbitMQ.' );
_open_channel( $condvar, $new_ar );
return;
}
############################################################################
sub _open_channel {
my ( $condvar, $ar ) = @_;
_log( debug => 'Opening RabbitMQ channel...' );
$ar->open_channel(
on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_open_channel_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
_declare_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _declare_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ queue...' );
$channel->declare_queue(
queue => 'test',
auto_delete => 1,
passive => 0,
durable => 0,
exclusive => 0,
no_ack => 1,
ticket => 0,
on_success =>
sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ queue.' );
_bind_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ queue...' );
$channel->bind_queue(
queue => 'test',
exchange => '',
routing_key => '',
on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_bind_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ queue.' );
_log( info => 'Master ready to publish messages.' );
_publish_message( $condvar, $ar, $channel, 'Hello, world!' );
return;
}
############################################################################
sub _publish_message {
my ( $condvar, $ar, $channel, $message ) = @_;
_log( debug => "Publishing RabbitMQ message ($message)..." );
$channel->publish(
queue => 'test',
exchange => '',
routing_key => '',
body => $message,
header => {},
mandatory => 0,
immediate => 0,
on_success =>
sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
on_ack => sub { _error( $condvar, $ar, 'ack', @_ ) },
on_nack => sub { _error( $condvar, $ar, 'nack', @_ ) },
);
return;
}
############################################################################
sub _on_publish_message_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => "Published RabbitMQ message." );
sleep 1;
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
return;
}
############################################################################
sub _error {
my ( $condvar, $ar, $type, @error ) = @_;
_log( error => sprintf '%s - %s', $type, join ', ', @error );
$condvar->send( $condvar, $ar, $type, @error );
return;
}
############################################################################
sub _log {
my ( $level, $message ) = @_;
my @time = gmtime time;
$time[5] += 1900;
$time[4] += 1;
my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
my @caller0 = caller(0);
my @caller1 = caller(1);
my $subroutine = $caller1[3];
$subroutine =~ s/^$caller0[0]:://;
print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
return;
}
该方案应:
2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22)
2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37)
2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46)
2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59)
2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76)
2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89)
2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102)
2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50)
为什么anyevent::RabbitMQ
或RabbitMQ本身会关闭通道(而不是连接或我错过了什么)?
如果您查看RabbitMQ服务器日志,您将看到如下内容:
{amqp_error,access_rejiend,“不允许在默认Exchange上操作”,“queue.bind”}
显然,它不允许您在默认交换上绑定队列。所以您需要先声明并绑定自己的exchange。
sub _declare_exchange {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ exchange...' );
$channel->declare_exchange(
exchange => 'testest',
type => 'fanout',
on_success =>
sub { _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_exchange_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ exchange.' );
_bind_exchange( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_exchange {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ exchange...' );
$channel->bind_exchange(
source => 'testest',
destination => 'testest',
routing_key => '',
on_success => sub { _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
sub _on_open_channel_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
$channel->confirm;
_declare_exchange( $condvar, $ar, $channel );
return;
}
$channel->bind_queue(
queue => 'test',
exchange => 'testest', # <-- here
routing_key => '',
# ...
);
需要在_publish_message
中使用publish()
调用来完成同样的操作。在这里,您还应该用实际处理确认的东西替换on_ack
处理程序。我认为您打算这样做,但出现了复制/粘贴错误1。
$channel->publish(
queue => 'test',
exchange => 'testest', # <-- here
routing_key => '',
# ...
on_ack => sub {
_on_publish_message_success( $condvar, $ar, $channel, @_ );
},
);
还有一点是,在使用任何事件时,在_on_publish_message_success
中调用sleep
不是一个好主意,因为这将停止整个程序。请改用ae::timer
。
my $t;
$t = AE::timer(1,0,sub {
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
undef $t;
});
这里是包含所有更改的完整代码。
use strictures 2;
use AnyEvent::RabbitMQ;
main();
############################################################################
sub main {
_log( debug => 'main' );
my $condvar = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new;
$ar->load_xml_spec;
_log( debug => 'Connecting to RabbitMQ...' );
$ar->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/guest',
timeout => 1,
tls => 0,
on_success => sub { _on_connect_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
$condvar->recv;
$ar->close;
return;
}
############################################################################
sub _on_connect_success {
my ( $condvar, $ar, $new_ar ) = @_;
_log( debug => 'Connected to RabbitMQ.' );
_open_channel( $condvar, $new_ar );
return;
}
############################################################################
sub _open_channel {
my ( $condvar, $ar ) = @_;
_log( debug => 'Opening RabbitMQ channel...' );
$ar->open_channel(
on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_open_channel_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
$channel->confirm;
_declare_exchange( $condvar, $ar, $channel );
return;
}
############################################################################
sub _declare_exchange {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ exchange...' );
$channel->declare_exchange(
exchange => 'testest',
type => 'fanout',
on_success =>
sub { _on_declare_exchange_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_exchange_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ exchange.' );
_bind_exchange( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_exchange {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ exchange...' );
$channel->bind_exchange(
source => 'testest',
destination => 'testest',
routing_key => '',
on_success => sub { _on_bind_exchange_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_bind_exchange_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ exchange.' );
_declare_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _declare_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ queue...' );
$channel->declare_queue(
queue => 'test',
auto_delete => 1,
passive => 0,
durable => 0,
exclusive => 0,
no_ack => 1,
ticket => 0,
on_success =>
sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ queue.' );
_bind_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ queue...' );
$channel->bind_queue(
queue => 'test',
exchange => 'testest',
routing_key => '',
on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_bind_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ queue.' );
_log( info => 'Master ready to publish messages.' );
_publish_message( $condvar, $ar, $channel, 'Hello, world!' );
return;
}
############################################################################
sub _publish_message {
my ( $condvar, $ar, $channel, $message ) = @_;
_log( debug => "Publishing RabbitMQ message ($message)..." );
$channel->publish(
queue => 'test',
exchange => 'testest',
routing_key => '',
body => $message,
header => {},
mandatory => 0,
immediate => 0,
on_success =>
sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
on_ack => sub {
_on_publish_message_success( $condvar, $ar, $channel, @_ );
# _error( $condvar, $ar, 'ack', @_ )
},
on_nack => sub { _error( $condvar, $ar, 'nack', @_ ) },
);
return;
}
############################################################################
sub _on_publish_message_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => "Published RabbitMQ message." );
my $t; $t=AE::timer(1,0,sub {
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
undef $t;
});
return;
}
############################################################################
sub _error {
my ( $condvar, $ar, $type, @error ) = @_;
_log( error => sprintf '%s - %s', $type, join ', ', @error );
$condvar->send( $condvar, $ar, $type, @error );
return;
}
############################################################################
sub _log {
my ( $level, $message ) = @_;
my @time = gmtime time;
$time[5] += 1900;
$time[4] += 1;
my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
my @caller0 = caller(0);
my @caller1 = caller(1);
my $subroutine = $caller1[3];
$subroutine =~ s/^$caller0[0]:://;
print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
return;
}
1)在某些地方,你需要为那些:)的同事请一杯啤酒
我有一个多线程应用程序,它将传入的消息发布到rabbitmq交换。使用rabbitmq java客户端,我在应用程序启动时创建一个rabbitmq连接,并在所有线程之间共享它。每个线程都创建一个新通道(threadlocal),这样通道就不会像rabbitmq文档所建议的那样在多个线程之间共享。我正在使用netty,我看到相同数量的rabbitmq通道被创建为netty通道管道线程。到目前为止还不
关闭通道的意思是该通道将不再允许写入数据。这个方法可以让通道数据的接受端知道数据已经全部发送完成了。 package main import "fmt" // 在这个例子中,我们使用通道jobs在main函数所在的协程和一个数据 // 接收端所在的协程通信。当我们数据发送完成后,我们关闭jobs通道 func main() { jobs := make(chan int, 5) d
RabbitMQ Java客户端有以下概念: -与RabbitMQ服务器实例的连接 -??? 使用者线程池-使用RabbitMQ服务器队列中的消息的线程池 队列-按FIFO顺序保存消息的结构 我试图理解他们之间的关系,更重要的是,他们之间的联系。 我仍然不太清楚是什么,除了这是您发布和使用的结构,并且它是从一个开放的连接创建的。如果有人能向我解释一下“通道”代表什么,可能有助于澄清一些问题。 通道
我从RabbitMq服务器收到此错误 服务器关闭通道:406(前提条件-失败),并显示消息“前提条件_失败-未知交付标签80” 发生这种情况是因为在消费者任务期间连接丢失,最后,当消息被确认/nack时,我收到此错误,因为我无法在与我接收它的通道不同的通道上确认消息。 这是 RabbitMq 连接的代码 如您所见,连接建立后会创建一个通道,在我收到连接问题后,通道设置为NULL,1秒钟后连接重试,
问题内容: 不知道频道 长度时,我无法关闭频道 它给我错误 这是合乎逻辑的-当第二个goroutine关闭通道试图发送给它时,它关闭了它。在这种情况下关闭渠道的最佳方法是什么? 问题答案: 通道关闭后,您将无法在该通道上发送更多值,否则会出现混乱。这就是您的经验。 这是因为您启动了使用同一个通道的多个goroutine,并且它们在该通道上发送值。然后关闭每个通道。而且由于它们未同步,因此一旦第一个
问题内容: 我正在玩Golang,我创建了这个小应用程序,使用goroutines进行了多个并发的api调用。 当应用程序运行时,调用完成后,该应用程序将卡住,这是有道理的,因为由于通道未关闭,无法退出 范围c 循环。 我不确定在哪种情况下可以更好地关闭该通道。 问题答案: 当没有更多值要发送时,您将关闭通道,因此在这种情况下,所有goroutine已完成。 (请注意,from 仅将反映连接和协议