当前位置: 首页 > 知识库问答 >
问题:

Spring Integration Kafka适配器不生成消息

穆宾白
2023-03-14

我为此挣扎了好几天。

我正在Spring启动容器下使用SI适配器进行Kafka。

我已经在我的机器上配置了zookeeper和kafka。我还创建了控制台生产者和消费者测试它,一切正常(我设法生成控制台消息并让控制台消费者使用它们)。

我现在尝试通过Spring集成kafka出站适配器生成消息,但控制台消费者不会使用该消息

SI/Spring xd xml:

<int:publish-subscribe-channel id="inputToKafka"/>


    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                        kafka-producer-context-ref="kafkaProducerContext"
                                        auto-startup="true"
                                        order="1"
                                        channel="inputToKafka">

    </int-kafka:outbound-channel-adapter>


    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="localhost:9092"
                                              async="true"
                                              topic="zerg.hydra"
                                              compression-codec="default"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
</beans>

Java:

@Named
public class KafkaProducer {

    @Autowired
    @Qualifier("inputToKafka")
    MessageChannel inputToKafka;


    public void sendMessageToKafka(String message)
    {
        inputToKafka.send(
                MessageBuilder.withPayload(message)
                        .setHeader("messageKey", "3")
                        .setHeader("topic", "zerg.hydra").build());

    }

}

我是这样运行kafka控制台消费者的:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zerg.hydra --from-beginning

日志:

Testing started at 12:49 PM ...
12:49:54 PM: Executing external tasks 'cleanTest test'...
:cleanTest
:compileJava
:processResources UP-TO-DATE
:classes
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test
12:50:07,165 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
12:50:07,166 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
12:50:07,166 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/Users/idan/dev/Projects/CalcMicroService/build/resources/test/logback.xml]
12:50:07,167 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resource [logback.xml] occurs multiple times on the classpath.
12:50:07,167 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resource [logback.xml] occurs at [file:/Users/idan/dev/Projects/CalcMicroService/build/resources/test/logback.xml]
12:50:07,167 |-WARN in ch.qos.logback.classic.LoggerContext[default] - Resource [logback.xml] occurs at [file:/Users/idan/dev/Projects/CalcMicroService/build/resources/main/logback.xml]
12:50:07,247 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - debug attribute not set
12:50:07,250 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
12:50:07,259 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [stdout]
12:50:07,281 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
12:50:07,327 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [reactor] to INFO
12:50:07,327 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.projectreactor] to INFO
12:50:07,328 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.springframework] to WARN
12:50:07,328 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [org.springframework.integration] to DEBUG
12:50:07,328 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
12:50:07,328 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [stdout] to Logger[ROOT]
12:50:07,331 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
12:50:07,332 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@3de433b4 - Registering current configuration as safe fallback point


  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.1.9.RELEASE)

12:50:09.530 [Test worker] INFO  o.s.i.config.IntegrationRegistrar - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
12:50:09.544 [Test worker] DEBUG o.s.i.config.IntegrationRegistrar - SpEL function '#xpath' isn't registered: there is no spring-integration-xml.jar on the classpath.
12:50:10.717 [Test worker] INFO  o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
12:50:10.719 [Test worker] INFO  o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
12:50:10.973 DEBUG [Test worker][org.jboss.logging] Logging Provider: org.jboss.logging.Log4jLoggerProvider
12:50:10.974 INFO  [Test worker][org.hibernate.validator.internal.util.Version] HV000001: Hibernate Validator 5.0.3.Final
12:50:10.991 DEBUG [Test worker][org.hibernate.validator.internal.engine.resolver.DefaultTraversableResolver] Cannot find javax.persistence.Persistence on classpath. Assuming non JPA 2 environment. All properties will per default be traversable.
12:50:11.006 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom MessageInterpolator of type org.springframework.validation.beanvalidation.LocaleContextMessageInterpolator
12:50:11.011 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ConstraintValidatorFactory of type org.springframework.validation.beanvalidation.SpringConstraintValidatorFactory
12:50:11.018 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ParameterNameProvider of type com.sun.proxy.$Proxy46
12:50:11.024 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] Trying to load META-INF/validation.xml for XML based Validator configuration.
12:50:11.032 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] No META-INF/validation.xml found. Using annotation based configuration only.
12:50:12.089 [Test worker] INFO  o.a.catalina.core.StandardService - Starting service Tomcat
12:50:12.091 [Test worker] INFO  o.a.catalina.core.StandardEngine - Starting Servlet Engine: Apache Tomcat/7.0.56
12:50:14.162 [localhost-startStop-1] INFO  o.a.c.c.C.[Tomcat].[localhost].[/] - Initializing Spring embedded WebApplicationContext
12:50:16.567 [Test worker] INFO  o.s.i.k.support.ProducerFactoryBean - Using producer properties => {metadata.broker.list=localhost:9092, compression.codec=0, producer.type=async}
12:50:17.036 INFO  [Test worker][kafka.utils.VerifiableProperties] Verifying properties
12:50:17.096 INFO  [Test worker][kafka.utils.VerifiableProperties] Property compression.codec is overridden to 0
12:50:17.096 INFO  [Test worker][kafka.utils.VerifiableProperties] Property metadata.broker.list is overridden to localhost:9092
12:50:17.096 INFO  [Test worker][kafka.utils.VerifiableProperties] Property producer.type is overridden to async
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.resolver.DefaultTraversableResolver] Cannot find javax.persistence.Persistence on classpath. Assuming non JPA 2 environment. All properties will per default be traversable.
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom MessageInterpolator of type org.springframework.validation.beanvalidation.LocaleContextMessageInterpolator
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ConstraintValidatorFactory of type org.springframework.validation.beanvalidation.SpringConstraintValidatorFactory
12:50:17.591 DEBUG [Test worker][org.hibernate.validator.internal.engine.ConfigurationImpl] Setting custom ParameterNameProvider of type com.sun.proxy.$Proxy46
12:50:17.592 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] Trying to load META-INF/validation.xml for XML based Validator configuration.
12:50:17.592 DEBUG [Test worker][org.hibernate.validator.internal.xml.ValidationXmlParser] No META-INF/validation.xml found. Using annotation based configuration only.
12:50:18.967 [Test worker] DEBUG o.s.i.c.GlobalChannelInterceptorProcessor - No global channel interceptors.
12:50:18.978 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {mongo:outbound-channel-adapter:mongoAdapter.adapter} as a subscriber to the 'mongoAdapter' channel
12:50:18.979 [Test worker] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.mongoAdapter' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started mongoAdapter.adapter
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {mongo:outbound-channel-adapter:adapterWithConverter.adapter} as a subscriber to the 'adapterWithConverter' channel
12:50:18.979 [Test worker] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.adapterWithConverter' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started adapterWithConverter.adapter
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {message-handler:kafkaOutboundChannelAdapter} as a subscriber to the 'inputToKafka' channel
12:50:18.979 [Test worker] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.inputToKafka' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started kafkaOutboundChannelAdapter
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
12:50:18.979 [Test worker] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.errorChannel' has 1 subscriber(s).
12:50:18.979 [Test worker] INFO  o.s.i.endpoint.EventDrivenConsumer - started _org.springframework.integration.errorLogger
12:50:19.026 [Test worker] INFO  o.a.coyote.http11.Http11NioProtocol - Initializing ProtocolHandler ["http-nio-8091"]
12:50:19.040 [Test worker] INFO  o.a.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8091"]
12:50:19.047 [Test worker] INFO  o.a.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
12:50:19.338 [Test worker] DEBUG o.s.i.c.PublishSubscribeChannel - preSend on channel 'inputToKafka', message: [Payload String content=Hello Kafka From SI][Headers={messageKey=3, topic=zerg.hydra, id=be46fb68-c762-f16e-6ccb-6841ef3fe868, timestamp=1416999019338}]
12:50:19.338 [Test worker] DEBUG o.s.i.k.o.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0 received message: [Payload String content=Hello Kafka From SI][Headers={messageKey=3, topic=zerg.hydra, id=be46fb68-c762-f16e-6ccb-6841ef3fe868, timestamp=1416999019338}]
12:50:19.362 [Test worker] DEBUG o.s.i.c.PublishSubscribeChannel - postSend (sent=true) on channel 'inputToKafka', message: [Payload String content=Hello Kafka From SI][Headers={messageKey=3, topic=zerg.hydra, id=be46fb68-c762-f16e-6ccb-6841ef3fe868, timestamp=1416999019338}]
Empty test suite.
12:50:19.402 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {mongo:outbound-channel-adapter:mongoAdapter.adapter} as a subscriber to the 'mongoAdapter' channel
12:50:19.417 [Thread-4] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.mongoAdapter' has 0 subscriber(s).
12:50:19.419 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped mongoAdapter.adapter
12:50:19.420 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {mongo:outbound-channel-adapter:adapterWithConverter.adapter} as a subscriber to the 'adapterWithConverter' channel
12:50:19.422 [Thread-4] INFO  o.s.i.channel.DirectChannel - Channel 'application:8091.adapterWithConverter' has 0 subscriber(s).
12:50:19.422 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped adapterWithConverter.adapter
12:50:19.423 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {message-handler:kafkaOutboundChannelAdapter} as a subscriber to the 'inputToKafka' channel
12:50:19.424 [Thread-4] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.inputToKafka' has 0 subscriber(s).
12:50:19.424 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped kafkaOutboundChannelAdapter
12:50:19.425 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
12:50:19.426 [Thread-4] INFO  o.s.i.c.PublishSubscribeChannel - Channel 'application:8091.errorChannel' has 0 subscriber(s).
12:50:19.426 [Thread-4] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped _org.springframework.integration.errorLogger
BUILD SUCCESSFUL
Total time: 25.469 secs
12:50:20 PM: External tasks execution finished 'cleanTest test'.

我尝试在同一个应用程序中使用官方Kafka客户端制作人,它工作得很好:

@Named
public class KafkaProducerJava {

    ProducerConfig config=null;
    Producer<String, String> producer;
    Properties props=null;

    @PostConstruct
    public void init()
    {
        props = new Properties();

        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
       // props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");



    }


    public void sendMsgToKafka(String msg)
    {
        config= new ProducerConfig(props);
        producer=new Producer<String, String>(config);
        KeyedMessage<String, String> data = new KeyedMessage<String, String>("test", "", msg);

        producer.send(data);
        producer.close();
    }

知道为什么我的消费者从未通过Spring Integration kafka adaptor收到这条消息吗??

共有2个答案

柏麒
2023-03-14

我从早上开始就面临着同样的问题,终于找到了解决方案。如果您以 kafka 作为主题的前缀,如下所示

.setHeader("kafka_topic", "zerg.hydra")

这是API内部的一个问题,当从消息头查找密钥时,kafka_被加上前缀。

孔海超
2023-03-14

我只是在XD中运行它,它对我来说很好...

$ bin/xd-shell
 _____                           __   _______
/  ___|          (-)             \ \ / /  _  \
\ `--. _ __  _ __ _ _ __   __ _   \ V /| | | |
 `--. \ '_ \| '__| | '_ \ / _` |  / ^ \| | | |
/\__/ / |_) | |  | | | | | (_| | / / \ \ |/ /
\____/| .__/|_|  |_|_| |_|\__, | \/   \/___/
      | |                  __/ |
      |_|                 |___/
eXtreme Data
1.1.0.BUILD-SNAPSHOT | Admin Server Target: http://localhost:9393
Welcome to the Spring XD shell. For assistance hit TAB or type "help".
xd:>stream create --name foo --definition "time | kafka --topic=test" --deploy
Created and deployed new stream 'foo'
xd:>stream destroy foo
Destroyed stream 'foo'
xd:>

.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
2014-11-26 10:03:09
2014-11-26 10:03:10
2014-11-26 10:03:11
2014-11-26 10:03:12
2014-11-26 10:03:13

我还写了这个测试用例...

public class OutboundTests {

    @Test
    public void test() throws Exception {
        KafkaProducerContext<String, String> kafkaProducerContext = new KafkaProducerContext<String, String>();
        ProducerMetadata<String, String> producerMetadata = new ProducerMetadata<String, String>("test");
        producerMetadata.setValueClassType(String.class);
        producerMetadata.setKeyClassType(String.class);
        Encoder<String> encoder = new StringEncoder<String>();
        producerMetadata.setValueEncoder(encoder);
        producerMetadata.setKeyEncoder(encoder);
        ProducerFactoryBean<String, String> producer = new ProducerFactoryBean<String, String>(producerMetadata, "localhost:9092");
        ProducerConfiguration<String, String> config = new ProducerConfiguration<String, String>(producerMetadata, producer.getObject());
        kafkaProducerContext.setProducerConfigurations(Collections.singletonMap("test", config));
        KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<String, String>(kafkaProducerContext);
        handler.handleMessage(MessageBuilder.withPayload("foo")
                .setHeader("messagekey", "3")
                .setHeader("topic", "test")
                .build());
    }

}

...来模拟你正在做的事情,这也很有效。您是否看到任何记录的异常?我看到你没有设置键/值类型和编码器。

编辑:

如评论中所述,问题是您正在使用异步生产者。在您的“有效”测试示例中,您正在关闭生产者,这会刷新队列。默认情况下,队列不会在5秒内被刷新,并且您的测试用例等待时间不够长。

我更新了我的测试以包含 XML 配置的版本,并将 queue.buffering.max.ms 减少到 500 毫秒,并向测试添加了代码,以便在终止之前等待几秒钟。

有关详细信息,请参阅新提交。

 类似资料:
  • 这就是我的配置 这个想法是每3秒轮询一个目录,并根据通道向调度程序发送3条消息,以允许异步执行。然后根据消息数量聚合消息,然后发送到下一个服务激活器。第一个服务激活器将文件放在源目录中,第二个服务激活器获取聚合列表以将这些文件移动到暂存目录。 似乎发生的情况是,源文件夹跳过了一些文件,但临时文件夹确实获取了所有文件。我的猜测是,轮询器将消息发送到dispatcher通道,但当其线程池变满时,它会忽

  • 使用Spring Integration Kafka,使用出站通道适配器,我尝试向名为“test”的主题发送消息 通过命令行终端,我启动了动物园管理员、kafka并创建了名为“test”的主题 Spring XML配置 JUnit测试代码 测试用例成功,在调试时,我发现channel.send()返回true 我使用下面的命令通过命令行检查了主题,但是我在测试主题中看不到任何消息。 bin/kaf

  • API 2:GET:/school/student/all

  • 从for循环讲起 我们在控制语句里学习了Rust的for循环表达式,我们知道,Rust的for循环实际上和C语言的循环语句是不同的。这是为什么呢?因为,for循环不过是Rust编译器提供的语法糖! 首先,我们知道Rust有一个for循环能够依次对迭代器的任意元素进行访问,即: for i in 1..10 { println!("{}", i); } 这里我们知道, (1..10) 其本身

  • 我设置了一个EJB项目,使用JMS将持久性实体对象发送到MDB。我使用JBoss EAP 7,使用Apache ActiveMQ作为消息传递提供程序。我像这样设置ConnectionFactory和队列: 这是我的消息生成器,它接收“Account”实体对象作为参数并将其发送到队列: EntityEnqueueBean。Java语言 MDB从队列接收消息并对其进行处理: java账户 不确定我做错

  • 我正试图用我的MERN stack web应用程序自动化构建过程。 目前,我使用CodePipeline,它: 从GitHub获取我的代码作为源代码 使用CodeBuild(Ubuntu 2.0)运行构建 并将其部署到我的Elastic BeanStalk环境中 步骤1 在尝试使用CodeBuild之后,即使客户端似乎完全按照日志进行构建,前端似乎也不会更新。 以下是我CodeBuild项目的一些