当前位置: 首页 > 知识库问答 >
问题:

作为Spring云数据流的一部分部署的Spring云函数应用程序的Kafka主题名称错误

章心水
2023-03-14

我有一个简单的SCDF流,如下所示:

http--端口=12346 | mvmn转换|文件--名称=tmp。txt--目录=/tmp

mvmn变换是一个简单的自定义变压器,如下所示:

@SpringBootApplication
@EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object transform(Message<?> message) {
        Object payload = message.getPayload();
        Map<String, Object> result = new HashMap<>();
        Map<String, String> headersStr = new HashMap<>();

        message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

        result.put("headers", headersStr);
        result.put("payload", payload);
        result.put("configProp", config.getSomeConfigProp());

        return result;
    }

    // See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

这很好用。

但我读到,Spring云函数应该允许我实现这样的应用程序,而无需指定绑定和转换器注释,所以我将其更改为:

@SpringBootApplication
// @EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
    public static void main(String args[]) {
        SpringApplication.run(ScdfTestTransformer.class, args);
    }

    @Autowired
    protected ScdfTestTransformerProperties config;

    // @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    @Bean
    public Function<Message<?>, Map<String, Object>> transform(
    // Message<?> message
    ) {
        return message -> {
            Object payload = message.getPayload();
            Map<String, Object> result = new HashMap<>();
            Map<String, String> headersStr = new HashMap<>();

            message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));

            result.put("headers", headersStr);
            result.put("payload", payload);
            result.put("configProp", "Config prop val: " + config.getSomeConfigProp());

            return result;
        };
    }

    // See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
    @Bean("kafkaBinderHeaderMapper")
    public KafkaHeaderMapper kafkaBinderHeaderMapper() {
        BinderHeaderMapper mapper = new BinderHeaderMapper();
        mapper.setEncodeStrings(true);
        return mapper;
    }
}

现在我有一个问题-Spring云函数显然忽略了SCDF源和目标主题名称,而创建了主题transform-in-0transform-out-0

SCDF创建名称如下的主题

以前它们用于变压器-应该如此,因为它是SCDF流的一部分。但现在它们被Spring-Cloud-Function忽略,并创建了transform-in-0transform-out-0

因此,我的transformer不再接收任何输入,因为它期望它针对错误的Kafka主题。而且可能也不会向流输出,因为它也会输出到错误的Kafka主题。

P、 为了以防万一,GitHub上的完整项目代码:https://github.com/mvmn/scdftest-transformer/tree/scfunc

要在本地运行启动Kafka、Skipper、SCDF和SCDF控制台,请在app文件夹中执行mvn clean install,然后执行app register--名称mvmn-transform-1--类型processor--urimaven://x.mvmn.study.scdf.scdftest:scdftest-转换器:0.1.1-SNAPSHOT--元数据urimaven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT(快照)(位于坐标系中)。然后,您可以使用definitionhttp--port=12346 | mvmn transform | file--name=tmp部署流。txt--目录=/tmp

共有1个答案

花飞扬
2023-03-14

由于您使用的是编写Spring Cloud Stream应用程序的功能模型,因此在部署此应用程序时,您需要在自定义处理器上传递两个属性以恢复Spring Cloud Data Flow行为。

spring.cloud.stream.function.bindings.transformspring.cloud.stream.function.bindings.transform输出

你能试试看有没有区别吗?

 类似资料:
  • 我创建了一个定制的Spring云流处理器应用程序,并将其部署为源|处理器|汇流中的处理器步骤。一切似乎都很好,但我的自定义应用程序在数据流UI中显示“部署”。如果这会影响任何事情,我会将其部署为mavenLocal的快照。我是否遗漏了一些让SCDF知道部署成功的信息?

  • 我正在从Spring XD迁移到Spring Cloud Data Flow。当我寻找模块列表时,我意识到一些源码没有在Spring Cloud Flow中列出--其中一个是Kafka源码。 我的问题是为什么在spring cloud data flow中KAFKA源从标准源列表中删除?

  • 我已经使用firebase云函数一段时间了,今天在代码中修复了一个小错误,在尝试部署时出现了以下错误。我取消了该更改,并尝试使用上次提交的稳定更改再次部署,但仍然是相同的错误。有什么解决办法吗?PS:这是一个typescript项目,我用tsc编译它。

  • 我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?

  • 我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放