当前位置: 首页 > 知识库问答 >
问题:

Spring集成Kafka线程配置

杨海
2023-03-14

我正在使用带有以下配置的spring集成kafka 1.1.0。我不太了解流的配置。当我增加这个值时,Spring会自动生成更多线程来处理消息吗?e、 g.当我有流=2时,相关的转换器和服务激活器是否都在2个线程中运行?我想错过一些线程执行器配置,但不知道如何配置。任何提示都将不胜感激。谢谢

<int:poller default="true" fixed-delay="10"/>

<int:channel id="tag.track">
</int:channel>

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapterForTagTrack" kafka-consumer-context-ref="consumerContextForTagTrack" auto-startup="true" channel="tag.track">
</int-kafka:inbound-channel-adapter>

<int-kafka:consumer-context id="consumerContextForTagTrack"
	consumer-timeout="${kafka.consumer.timeout}" zookeeper-connect="zookeeperConnect">
	<int-kafka:consumer-configurations>
		<int-kafka:consumer-configuration group-id="${kafka.consumer.group.track}" max-messages="200">
			<int-kafka:topic id="tag.track" streams="2" />
		</int-kafka:consumer-configuration>
	</int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<int:channel id="tag.track.transformed">
	<int:interceptors>
		<int:wire-tap channel="event.logging" />
	</int:interceptors>
</int:channel>

<int:transformer id="kafkaMessageTransformerForTagTrack"
	ref="kafkaMessageTransformer" input-channel="tag.track" method="transform"
	output-channel="tag.track.transformed" />

<int:service-activator input-channel="tag.track.transformed" ref="tagTrackMessageHandler" method="handleTagMessage">
	<int:request-handler-advice-chain>
		<ref bean="userTagRetryAdvice" />
	</int:request-handler-advice-chain>
</int:service-activator>

已尝试使用消息驱动通道适配器,但无法使其工作,以下配置未拾取任何消息。还尝试了组织。springframework。集成。Kafka。侦听器。KafkaTopicOffsetManager,它抱怨偏移管理主题不能有多个分区。此外,在此适配器中,如何配置使用者组?是否有关于如何使用消息驱动通道适配器的详细示例?项目页面上的说明非常高。

<int:channel id="tag.track">
	<int:queue capacity="100"/>
</int:channel>

<bean id="kafkaConfiguration" class="org.springframework.integration.kafka.core.ZookeeperConfiguration">
	<constructor-arg ref="zookeeperConnect"/>
</bean>

<bean id="connectionFactory" class="org.springframework.integration.kafka.core.DefaultConnectionFactory">
	<constructor-arg ref="kafkaConfiguration"/>
</bean>

<bean id="decoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder"/>

<int-kafka:message-driven-channel-adapter
		id="adapter"
		channel="tag.track"
		connection-factory="connectionFactory"
		key-decoder="decoder"
		payload-decoder="decoder"
		max-fetch="100"
		topics="tag.track"
		auto-startup="true"
		/>

共有1个答案

娄飞鸾
2023-03-14

streams属性与Spring本身无关;调用ConsumerConnector时,它只会传递给Kafka。createMessageStreams()(每个主题/流条目都在map参数中传递)。

请参阅Kafka文档。

编辑:

使用高级使用者时,会轮询kafka入站通道适配器,因此下游集成流运行的线程与kafka客户端线程无关;它们在轮询器配置中进行管理。

您可以考虑改用消息驱动的通道适配器。

 类似资料:
  • 我想创建一个简单的IntegrationFlow与Spring集成,我有困难。 我想创建一个集成流,从Rabbit Mq中的多个队列中获取消息,并将消息发布到不同的Restendpoint。 我想知道我是否可以并行化这个。 我想检查两个场景的可行性: 首先,我想为每个RabbitMq队列创建一个线程,该队列在接收到消息后将侦听并执行流: 场景1 第二个场景:在这个场景中,我想为每个队列创建一个动态

  • 我的pom.xml是: 我错过了什么?

  • 我想了解Spring集成中如何处理消息:串行或并行。特别是我有一个带有轮询器和HTTP出站网关的入站通道适配器。我猜拆分器、变压器、标头丰富器等不会产生自己的线程。 我可能错过了,但是这些细节在留档的某个地方指定了吗? 还可以通过编程方式获取系统中的所有频道吗?

  • 通过和注释,我得到以下错误: 然后添加以下配置: 错误消失了。但是,我想我应该能够按照文档中的建议,使用属性自动配置它。 null

  • 我目前正在尝试编写一个适配器,它将使用来自ActiveMQ的消息并将其发布到Kafka。 我正在考虑使用Spring集成来集成这两个消息传递系统。 我的问题是,我的应用程序不会维护模型的注册表,许多应用程序将使用该注册表将记录发布到activeMQ。我想接收这些javax-jms消息,并想执行一些转换,比如将jmscorrelationId添加到kafka消息中。 另外,另一个要求是仅当kafka

  • 在Spring MVC项目中,我试图通过Spring Websockets将使用过的Kafka数据发送到前端(JavaScript)。 为了建立服务器和客户端之间的通信,我有以下内容。 客户端(app.js) 服务器(KafkaController.java) 要使用来自特定Kafka主题的数据,我使用@KafkaListener注释如下: 我有一个适当的Kafkanconfig类,包含所有必要的