当前位置: 首页 > 知识库问答 >
问题:

Spring.cloud.stream.kafka.binder.headers未按预期工作

柯甫
2023-03-14

我试图使用spring.cloud.stream.kafka.binder.headers来传输我根据前面的问题设置的自定义标头。

我在文件中读到...

spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.

Default: empty.
@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name"))
public interface RedemptionGateway {
    ...
}
2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput', message: GenericMessage [payload=x.TrivialRedemption@2d052d2a[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={orderId=create, id=5dccea6f-266e-82b9-54c6-57ec441a26ac, timestamp=1471288144882}] - {applicationSystemCode=x, clientIP=0:0:0:0:0:0:0:1, clusterId=Cluster-Id-NA, containerId=Container-Id-NA, correlationId=UNDEFINED, domainName=defaultDomain, hostName=Host-NA, messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef, userId=anonymous, webAnalyticsCorrelationId=|}
2016-08-15 15:09:05 kafka-binder- DEBUG DirectChannel:430 - preSend on channel 'enrichingInput', message: GenericMessage [payload=x.TrivialRedemption@357bd4dd[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={kafka_offset=10, orderId=create, kafka_messageKey=null, kafka_topic=received, kafka_partitionId=0, kafka_nextOffset=11, contentType=application/x-java-object;type=x.TrivialRedemption}] - {}

我的属性包括:


    spring.cloud.stream.kafka.binder.headers=orderId

共有1个答案

席嘉祯
2023-03-14

您使用的是什么版本的spring-cloud-stream?

我刚写了一个快速测试用例,它运行得很好...

spring.cloud.stream.kafka.binder.headers=bar
spring.cloud.stream.bindings.output.destination=foobar
spring.cloud.stream.bindings.input.destination=foobar
spring.cloud.stream.bindings.input.group=foo

应用程序:

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

@SpringBootApplication
@EnableBinding(Processor.class)
public class So38961697Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38961697Application.class, args);
        Foo foo = context.getBean(Foo.class);
        foo.start();
        foo.send();
        Thread.sleep(30000);
        context.close();
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    private static class Foo {

        @Autowired
        Processor processor;

        public void send() {
            Message<?> m = MessageBuilder.withPayload("foo")
                    .setHeader("bar", "baz")
                    .build();
            processor.output().send(m);
        }

        public void start() {
            this.processor.input().subscribe(new MessageHandler() {

                @Override
                public void handleMessage(Message<?> m) throws MessagingException {
                    System.out.println(m);
                }

            });
        }

    }

}

结果:

GenericMessage [payload=foo, headers={bar=baz, kafka_offset=0, kafka_messageKey=null, kafka_topic=foobar, kafka_partitionId=0, kafka_nextOffset=1, contentType=text/plain}]

添加一个组以确保使用者从最早的消息开始消费。见下面的评论。

 类似资料:
  • 我正在使用spring Roo并希望访问Controller类中的一个bean,该类在ApplicationContext.xml中具有以下配置: 配置类本身是: 在我的Controller中,我认为一个简单的Autowired注释应该可以完成这项工作 在启动过程中,spring在setSkipWeeks方法中打印消息。不幸的是,每当我在控制器中调用config.getSkipWeeks()时,它

  • 当我运行以下程序时,它只打印 然而,从Java 8的equalsIgnoreCase文档中我们发现: 如果以下至少一项为真,则两个字符c1和c2被视为相同的忽略情况: •对每个字符应用java.lang.character.ToUpperCase(char)方法会产生相同的结果 所以我的问题是为什么这个程序不打印 在这两种操作中,都使用了大写字符。

  • 我正在和selenium一起工作,刮一些数据。 有一个按钮在页面上,我正在点击说“Custom_Cols”。这个按钮为我打开了一个窗口,我可以在那里选择我的列。 我的问题是为什么新窗口上的元素不可见,即使我正在等待元素的可见。补充一下,我已经尝试增加延迟时间,但我还是会偶尔出现这个错误。 我的密码在这里

  • 我正在使用Grails 2.0.1中的springsecurity插件。我的角色层次结构和其他s2属性如下所示。

  • 我有一个问题jsPlumb的deleteEndpoint函数。 我想删除没有任何连接的endpoint。我希望在“连接”时触发此操作,但我认为可能存在一些问题。 但是我把代码移到了“dblclick”,我发现了同样的问题。标记为删除的一个节点被删除,但随后系统中的每个endpoint都被冻结在原地——一旦其元素被拖动,就与该元素断开连接。我不确定是什么原因造成的:以下是相关代码和一些屏幕截图。。。