当前位置: 首页 > 知识库问答 >
问题:

SpringXD:自动启动=false在Kafka消息驱动通道适配器上不起作用

松俊美
2023-03-14

我使用的是< code>SpringXD,我的配置如下:

  • Spring集成kafka 2.1.0.释放
  • kafka客户端0.10.0.1
  • Kafka0.10.x.x
  • Spring-xd-1.3.1.释放

我的 xml 文件中有以下配置:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">


    <int:channel id="input" />
    <int:channel id="output" />

    <int:control-bus input-channel="input" />

    <int-kafka:message-driven-channel-adapter
        id="kafka-inbound-channel-adapter-testing" listener-container="container1"
        auto-startup="false" phase="100" send-timeout="5000"
        channel="output" mode="record"
        message-converter="messageConverter" />

    <bean id="messageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter" />

    <!--Consumer -->
    <bean id="container1"
        class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="localhost:9092" />
                        <entry key="enable.auto.commit" value="false" />
                        <entry key="auto.commit.interval.ms" value="100" />
                        <entry key="session.timeout.ms" value="15000" />
                        <entry key="max.poll.records" value="3" />
                        <entry key="group.id" value="bridge-stream-testing" />
                        <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer" />
                        <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>

        <constructor-arg>
            <bean class="org.springframework.kafka.listener.config.ContainerProperties">
                <constructor-arg name="topics" value="testing-topic" />
            </bean>
        </constructor-arg>
    </bean>

</beans>

这是我用来启动/停止频道的Java类:

package com.kafka.source.logic;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
@EnableScheduling
@ImportResource("classpath:/config/kafka-source-context.xml")
public class KafkaSourceRetry {

    @Autowired
    MessageChannel input;

    @Scheduled(cron="*/50 * * * * *")
    void startAdapter(){
        //CODE COMMENTED OUT TO MAKE SURE THE ADAPTER IS NOT BEING STARTED
        //EVEN IF I UNCOMMENT THE CODE, THE 50 secs defined related to the cron are not respected.
        //That is, if I send a message to the topic, it is inmediately consumed
        //input.send(new GenericMessage<String>("@kafka-inbound-channel-adapter-testing.start()"));
    }
}

然后我创建了一个基本流来检查我发送到主题的一些消息是否通过

stream create --name bridgeStream --definition "kafkaSourceLatestApi_v2|bridge|file" --deploy

我检查了创建的文件,它包含我发送到 Kafka 主题的所有消息

hola _ que _ tal que _ bonito bridgestream . out(END)

同样在日志中,我发现了这个:

2017-04-10T22:37:06-0300 1.3.1.发布信息 部署路径儿童缓存-0 支持。默认生命周期处理器 - 在第 0 阶段启动 bean 2017-04-10T22:37:06-0300 1.3.1.发布调试部署路径子缓存-0 支持。默认生命周期处理器 - 启动 bean 'container1' 的类型 [class org.springframework.kafka.listener.KafkaMessageListenerContainer] 2017-04-10T22:37:06-0300 1.3.1.发布调试部署路径儿童缓存-0 支持。默认生命周期处理器 - 已成功启动 bean 'container1' 2017-04-10T22:37:06-0300 1.3.1.发布信息 部署路径儿童缓存-0 支持。默认生命周期处理器 - 在第 100 阶段启动 bean 2017-04-10T22:37:06-0300 1.3.1.发布调试部署路径儿童缓存-0 支持。默认生命周期处理器 - 启动 bean 'kafka-inbound-channel-adapter-testing' 类型 [class org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter] 2017-04-10T22:37:06-0300 1.3.1.发布信息 部署路径儿童缓存-0 入站。KafkaMessageDrivenChannelAdapter - 已启动 kafka-inbound-channel-adapter-test 2017-04-10T22:37:06-0300 1.3.1.发布调试部署PathChildrenCache-0支持。默认生命周期处理器 - 已成功启动 bean 'kafka-inbound-channel-adapter-testing'

我的问题是:为什么频道会自动启动?

共有1个答案

周宏胜
2023-03-14

它是这样设计的;所有模块的自动启动都设置为 false,因此它们不会乱序启动;部署流时,各个模块将从右到左部署和启动。

部署/取消部署是启动/停止流的方式。

请参阅模块部署程序

 类似资料:
  • 试图调用不存在的方法。尝试是从以下位置进行的: 以下方法不存在: 3.如果您使用kafka版本2.5及以上版本,但却被2021-03-22 13:56:05.102-0400 org{local_sparta}WARN[data-pipeline,,,][DP-ACCOUNT][DPA][]annotationConfigServletWebServerApplicationContext:上下文

  • 我使用了spring io文档中列出的示例配置,它运行良好。 然而,当我用下游应用程序测试它时,我从Kafka那里消费并将其发布到下游。如果下游关闭,则消息仍在使用中,不会重播。 或者说,在使用kafka主题后,如果我在service activator中发现一些异常,我还想抛出一些异常,这些异常应该回滚事务,以便可以重播kafka消息。 简而言之,如果消费应用程序有一些问题,那么我想回滚事务,这

  • 我正在尝试运行一个简单的Spring Boot Kafka应用程序,但我无法使其工作。我遵循了各种教程,现在我正在实现这个教程,但当我启动应用程序时,会发生以下情况: 我可以在控制台中写入,但消费者没有收到任何消息。 这是我的SpringApplication类: application.yml: 消费者类、生产者类及其配置类与教程中所写的相同。< br >在我的server.properties

  • 我在处理spring集成流中的错误时遇到了一个问题。流程的工作方式如下:我的入口点是消息驱动的通道适配器->路由器->过滤器->转换器->服务激活器->数据库。我已经使用自定义消息侦听器容器编写了自己的错误处理程序,它的工作方式与预期一致,但是当我出现异常时,我需要将原始消息保存到数据库中。 问题是,当我从数据库中获得异常时,错误处理策略会重新使用一个MessageHandleException,

  • 我的广播接收器由事件BOOT_COMPLETED触发,在除Nomi C10103以外的许多设备上工作正常。该消息出现在设备上的日志中:d/activitymanager:send broadcast:Android.intent.action.boot_completed,跳过package:com.example.MyPackageName 发送消息am广播-来自adb shell的Androi