这是我发布的另一个问题的延续。以下是之前问题的链接:来自多个主题的消息聚合
我的要求是确保node1应该接收响应R1M1
实施的事情:
>
设置Kafka领导人。来自两个节点的生产者记录中的相关ID。
KafkCamessageListenerContainerBean使用容器属性中的两个响应主题创建。ContainerProperties ContainerProperties=新的ContainerProperty(output-message-topic1,output-message-topic2);
聚合回复KafkatTemplate。setSharedReplyTopic(true);
consumerFactory配置为单个groupId。
consumerConfigProps.put(ConsumerConfig.GROUP_ID_CONFIG, "xxxx.yyyyy.zzzz");
注:
>
由于infra的限制,无法为每个使用者(节点)创建单独的主题。
我需要关于需要实现的更改的帮助,以获得请求的相关聚合响应。
看到留档。
您必须将每个实例配置为使用离散的回复主题分区,并且服务器必须将回复路由到请求的分区;或者您必须使用不同的组。每个实例的id
,以便向这两个实例发送回复(并被未发出请求的实例丢弃)。
使用单个回复TopicPartitionOffset
进行配置时,只要每个实例侦听不同的分区,就可以对多个模板使用相同的回复主题。使用单个回复主题进行配置时,每个实例必须使用不同的组。id
。在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例才能找到相关ID。这可能有助于自动缩放,但会带来额外网络流量的开销和丢弃每个不需要的回复的小成本。使用此设置时,我们建议您将模板的sharedReplyTopic
设置为true,这将降低要调试的意外回复的日志记录级别,而不是默认错误。
如果您有多个客户端实例,并且您没有如上一段所述配置它们,则每个实例都需要一个专用的回复主题。另一种选择是设置KafkaHeaders。REPLY_PARTITION
并为每个实例使用专用分区。Header包含一个四个字节的int(big-endian)。服务器必须使用此标头将回复路由到正确的分区(@KafkaListener
执行此操作)。然而,在这种情况下,回复容器不能使用Kafka的组管理功能,必须配置为在固定分区上侦听(通过在其构造函数中使用ContainerProperties
中的TopicPARtionOffset
)。
我有一个要求,即我必须使用Kafka的同步请求-应答模式,因此我正在使用ReplyingKafkatTemplate。 作为实现的一部分,有一个生产者在一个主题(input-message-topic1)上推送请求消息,但作为回报,我期待来自两个主题(output-message-topic1和output-message-topic2)的响应,我必须进一步聚合和处理。 问题:是否可以使用Repl
这更多的是一个系统设计问题。 让我们假设我有一个微服务体系结构,我有X个实例(用于负载平衡对服务的HTTP请求)。但是,也是Kafka主题的消费者。如何避免将同一消息处理X次(X是的实例数)<如果处理是幂等的,至少一次就可以了。它不需要是,但不能是。 服务A可以是订单服务。它生成关于用户向订单主题下单的消息。 服务B可以是支付服务。它使用订单主题中的消息向用户收费。 支付订单可能是幂等操作。但是,
我们正在开发一个应用程序,我们想听Kafka中不止一个主题。所有主题都有一个分区。所有主题名称都有一个公共的前缀,例如“test-x”、“test-y”,所以我们可以对它使用spring。 我们希望编写一个java spring使用者,它使用模式监听所有主题。我们的想法是,我们可以运行同一个消费者(属于同一个组)的多个实例,Kafka将为不同的消费者分发来自不同主题的消息。 然而,这似乎并不奏效。
我有一个Windows服务器,目前运行两个不同的Tomcat实例作为Windows服务。两者都有自己的目录,并且在它们之间不共享任何文件。通过设置向导安装的第一个Tomcat实例设置了CATALINA_HOME和CATALINA_BASE环境变量。第二个以相同的方式安装。它忽略全局设置并作为独立的实例运行。 问题来了。我需要安装一个自带Tomcat的产品。我已经完成了产品的安装,但是现在我需要配置
我正在开发一个使用的软件。我有一个用户订阅了多个主题,我想知道是否有一个订单接收来自这些主题的消息。我在我的电脑上尝试了一些组合,但我需要确定这一点。例 null [编辑]我想指定这两个主题各有一个分区,并且只有一个生产者和一个消费者。我需要首先阅读来自第一个主题的所有消息,然后阅读来自另一个主题的消息
我正在尝试用https://github.com/php-amqplib/rabbitMQBundle和Symfony2框架实现RabbitMQ。 我已经设法使这个东西在一个生产者和一个消费者的情况下工作,但问题是当我使用多个消费者的时候。 这是我的配置: [Symfony\Component\DependencyInjection\Exception\ServiceNotFoundExcepti