我正在尝试用https://github.com/php-amqplib/rabbitMQBundle和Symfony2框架实现RabbitMQ。
我已经设法使这个东西在一个生产者和一个消费者的情况下工作,但问题是当我使用多个消费者的时候。
这是我的配置:
old_sound_rabbit_mq:
connections:
default:
host: 'localhost'
port: 5672
user: 'guest'
password: 'guest'
vhost: '/'
lazy: false
connection_timeout: 3
read_write_timeout: 3
# requires php-amqplib v2.4.1+ and PHP5.4+
keepalive: false
# requires php-amqplib v2.4.1+
heartbeat: 0
#requires php_sockets.dll
# use_socket: true # default false
producers:
soccer_team_stat:
connection: default
exchange_options: {name: 'soccer_team_stat_ex', type: direct}
queue_options: {name: 'soccer_team_stat_qu'}
soccer_team_stat_form:
connection: default
exchange_options: {name: 'soccer_team_stat_ex', type: direct}
queue_options: {name: 'soccer_team_stat_form_qu'}
consumers:
soccer_team_stat:
connection: default
exchange_options: {name: 'soccer_team_stat_ex', type: direct}
queue_options: {name: 'soccer_team_stat_qu'}
callback: myapp.soccer_team_stat.consume
soccer_team_stat_form:
connection: default
exchange_options: {name: 'soccer_team_stat_ex', type: direct}
queue_options: {name: 'soccer_team_stat_form_qu'}
callback: myapp.soccer_team_stat_form.consume
<services>
<service class="MyApp\EtlBundle\Producers\SoccerTeamStatProducer" id="myapp.soccer_team_stat.produce">
<argument type="service" id="old_sound_rabbit_mq.soccer_team_stat_producer"/>
</service>
<service class="MyApp\EtlBundle\Producers\SoccerTeamStatProducer" id="myapp.soccer_team_stat_form.produce">
<argument type="service" id="old_sound_rabbit_mq.soccer_team_stat_producer"/>
</service>
<service class="MyApp\EtlBundle\Consumers\SoccerTeamStatConsumer" id="myapp.soccer_team_stat.consume">
<argument type="service" id="service_container"/>
</service>
<service class="MyApp\EtlBundle\Consumers\SoccerTeamStatFormConsumer" id="myapp.soccer_team_stat_form.consume">
<argument type="service" id="service_container"/>
</service>
</services>
[Symfony\Component\DependencyInjection\Exception\ServiceNotFoundException]您请求的服务“old_sound_rabbit_mq.soccer_team_stat_form_consumer”不存在。
我尝试了各种组合,包括使用multiple_consumers配置键,但没有成功。我错过了什么?
如果routing_key
和binding_key
都未设置,则direction
exchange将像fanout
那样运行,并将消息发送到它知道的所有队列,根据我从您的配置中看到的情况,您最好使用fanout
。
old_sound_rabbit_mq:
connections:
default:
host: %rabbit_mq_host%
port: %rabbit_mq_port%
user: %rabbit_mq_user%
password: %rabbit_mq_pswd%
vhost: /
lazy: true
producers:
soccer_team_stat:
connection: default
exchange_options: { name: 'soccer_team_stat_ex', type: fanout }
soccer_team_stat_form:
connection: default
exchange_options: { name: 'soccer_team_stat_form_ex', type: fanout }
consumers:
soccer_team_stat:
connection: default
exchange_options: { name: 'soccer_team_stat_ex', type: fanout }
queue_options: { name: 'soccer_team_stat_qu' }
callback: myapp.soccer_team_stat.consume
soccer_team_stat_form:
connection: default
exchange_options: { name: 'soccer_team_stat_form_ex', type: fanout }
queue_options: { name: 'soccer_team_stat_form_qu' }
callback: myapp.soccer_team_stat_form.consume
这个symfony的RabbitMQ扇出示例包括2个Producer&2个Exchange&2个Queue&N个Worker和2个Consumer。它是完整的示例(实际上是您的问题的完整答案/您想要做的事情的已经制作的版本),展示了在symfony应用程序中是如何完成事情的。我建议你按照那里使用的模式。非常容易遵循和维护。如果您想要更多的示例,只需在该博客中搜索rabbitMQ
关键字即可。
我编写了一个定制的NiFi处理器,它使用一些Hadoop类,处理流文件,并在Avro之间序列化流文件。 处理器的pom.xml文件如下所示: 因为我已经将标记为,所以它不会绑定在生成的NAR文件中。现在,我可以做一个快速修复,完全删除作用域并创建NAR,但NiFi会抱怨next class not found错误。 我想知道:
问题内容: 我有一个简单的应用程序,其中要求用户提供以下某些信息。 请提供您的域名。 **用户:www.google.com** 请提供您庞大的网址。 **用户:www.vast.xx.com** 请选择职位。a)左下。b)右下。 用户: b)右下 用户提供了这些信息后,按钮出现,用户单击以生成代码。他得到以下代码。 这是我完整的webpack配置文件:webpack config 使用此脚本,用
问题内容: 这是基于此答案的跟进。 我有一个看起来像的结构 和内装看起来像 作为的一部分,我想捆绑看起来应该包含 如何将其创建为或并与一起生产? 问题答案: 您可以用来执行该任务。 在中创建程序集定义文件 在文件中,添加插件声明,执行阶段和目标。 可以在这里找到更多格式文件或自定义格式文件:https : //maven.apache.org/plugins/maven-assembly- plu
我对一个难以捉摸但可能非常强大的DropWizard特性Bundles感到好奇。根据文件: Dropwizard捆绑包是一组可重用的功能,用于定义应用程序的行为块。 鉴于DropWizard(DW)的文档非常丰富,我感到震惊的是,这实际上是对bundle的唯一解释。我在野外看到了一些这样的例子: 资产组合 但我不明白的是:bundle似乎只是在jar中进行代码打包和分发。那么,为什么我不能编写“原
我目前正在创建一个时间线卡片,方式如下: 这将在时间线中创建一张活卡。我尝试添加另一个相同的标签,以为它会把他们捆绑在一起,最近的将是捆绑封面卡,但它没有做到这一点。 我看了一下文档,我能找到的只是如何使用镜像API来实现它。那么有没有一种方法可以使用GDK创建捆绑卡呢?还是只有通过镜像API才可用?多谢了。
问题内容: 我正在使用Ant构建一些Java项目。在某些情况下,我有一个lib/目录,该目录包含JAR文件形式的外部依赖项。 在构建过程中,我通过将目录中zipfileset每个jar的a添加到bundlejar文件中,创建了一个捆绑jar,其中包含项目代码以及相关性lib/。 问题是,每次添加一个jar或更改名称时,我都需要记住要更新build.xml文,因为我找不到zipfilesets一种自