当前位置: 首页 > 知识库问答 >
问题:

是否有任何可用于 spring 集成 kafka 版本 3.x 中的入站通道适配器的 xml 配置

欧阳勇
2023-03-14

<int-kafka:zookeeper-connect
    id="zookeeperConnect" zk-connect="localhost:2181"
    zk-connection-timeout="6000" zk-session-timeout="6000"
    zk-sync-time="2000" />

<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter"
    kafka-consumer-context-ref="consumerContext" auto-startup="true"
    channel="inputFromKafka">
    <int:poller fixed-delay="2000" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>

<bean id="consumerProperties"
    class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="properties">
        <props>
            <prop key="auto.offset.reset">smallest</prop>
            <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
            <prop key="fetch.message.max.bytes">5242880</prop>
            <prop key="auto.commit.interval.ms">1000</prop>
        </props>
    </property>
</bean>

<int-kafka:consumer-context
    id="consumerContext" consumer-timeout="1000"
    zookeeper-connect="zookeeperConnect"
    consumer-properties="consumerProperties">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
            group-id="Group1" max-messages="5000"
            key-decoder="deccoder" value-decoder="deccoder">
            <int-kafka:topic id="Helloworld-Topic" streams="3" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<bean id="deccoder"
    class="org.springframework.integration.kafka.serializer.common.StringDecoder" />

共有1个答案

雷曜灿
2023-03-14

请参阅文档(Spring for Apache Kafka 参考中的一章)。

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>
 类似资料:
  • 问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更

  • 如何通过注释而不是常规配置文件配置入站通道适配器?我可以为会话工厂定义bean,如下所示: 如何配置通过注释下给出的入站通道适配器? 我正在寻找的是在应用程序启动时连接所有bean,然后公开一些方法来开始轮询服务器,处理它们,然后从本地删除它们,类似于 其中getPollableChannel()为我提供了用于轮询的bean。

  • 我发现了一个xml配置的入站适配器示例,但我并不完全理解。配置指定REST请求设置请求方法、使用的格式等。 我认为,从Spring集成的角度来看,响应应该更加重要,因为响应实际上是为消息通道提供信息的。我说得对吗? HTTP入站适配器用作消息endpoint(实际上是消息起始点),它调用HTTP请求,例如REST服务的URL。”http://myRest/transfer/next“-向SI消息通

  • 使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息 通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题 Spring XML配置 JUnit测试代码 测试用例成功,在调试时,我发现channel.send()返回true 我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。 bin/kaf

  • 我试图将从Quickfix读取消息(读取修复消息)配置到spring集成中。我知道我可以使用入站通道适配器从外部源(如QuickFix)读取数据。您能提供如何编写事件驱动入站通道适配器的示例吗?我有以下配置不起作用

  • 我有一个模型对象,它是在多次转换和解析之后填充的。现在,我需要使用spring集成将模型中的消息属性发送给kafka。我可以使用messageKey方法构造键,但如何从m.getPayload()之类的模型中获取实际消息。getMessage()并将其发送给Kafka。