我有一个简单的SCDF流,如下所示:
http--端口=12346 | mvmn转换|文件--名称=tmp。txt--目录=/tmp
mvmn变换是一个简单的自定义变压器,如下所示:
@SpringBootApplication
@EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
public static void main(String args[]) {
SpringApplication.run(ScdfTestTransformer.class, args);
}
@Autowired
protected ScdfTestTransformerProperties config;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(Message<?> message) {
Object payload = message.getPayload();
Map<String, Object> result = new HashMap<>();
Map<String, String> headersStr = new HashMap<>();
message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));
result.put("headers", headersStr);
result.put("payload", payload);
result.put("configProp", config.getSomeConfigProp());
return result;
}
// See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
这很好用。
但我读到,Spring云函数应该允许我实现这样的应用程序,而无需指定绑定和转换器注释,所以我将其更改为:
@SpringBootApplication
// @EnableBinding(Processor.class)
@EnableConfigurationProperties(ScdfTestTransformerProperties.class)
@Configuration
public class ScdfTestTransformer {
public static void main(String args[]) {
SpringApplication.run(ScdfTestTransformer.class, args);
}
@Autowired
protected ScdfTestTransformerProperties config;
// @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
@Bean
public Function<Message<?>, Map<String, Object>> transform(
// Message<?> message
) {
return message -> {
Object payload = message.getPayload();
Map<String, Object> result = new HashMap<>();
Map<String, String> headersStr = new HashMap<>();
message.getHeaders().forEach((k, v) -> headersStr.put(k, v != null ? v.toString() : null));
result.put("headers", headersStr);
result.put("payload", payload);
result.put("configProp", "Config prop val: " + config.getSomeConfigProp());
return result;
};
}
// See https://stackoverflow.com/questions/59155689/could-not-decode-json-type-for-key-file-name-in-a-spring-cloud-data-flow-stream
@Bean("kafkaBinderHeaderMapper")
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
BinderHeaderMapper mapper = new BinderHeaderMapper();
mapper.setEncodeStrings(true);
return mapper;
}
}
现在我有一个问题-Spring云函数显然忽略了SCDF源和目标主题名称,而创建了主题transform-in-0
和transform-out-0
。
SCDF创建名称如下的主题
以前它们用于变压器-应该如此,因为它是SCDF流的一部分。但现在它们被Spring-Cloud-Function忽略,并创建了transform-in-0
和transform-out-0
。
因此,我的transformer不再接收任何输入,因为它期望它针对错误的Kafka主题。而且可能也不会向流输出,因为它也会输出到错误的Kafka主题。
P、 为了以防万一,GitHub上的完整项目代码:https://github.com/mvmn/scdftest-transformer/tree/scfunc
要在本地运行启动Kafka、Skipper、SCDF和SCDF控制台,请在app文件夹中执行mvn clean install,然后执行app register--名称mvmn-transform-1--类型processor--urimaven://x.mvmn.study.scdf.scdftest:scdftest-转换器:0.1.1-SNAPSHOT--元数据urimaven://x.mvmn.study.scdf.scdftest:scdftest-transformer:0.1.1-SNAPSHOT(快照)(位于坐标系中)。然后,您可以使用definitionhttp--port=12346 | mvmn transform | file--name=tmp部署流。txt--目录=/tmp
由于您使用的是编写Spring Cloud Stream应用程序的功能模型,因此在部署此应用程序时,您需要在自定义处理器上传递两个属性以恢复Spring Cloud Data Flow行为。
spring.cloud.stream.function.bindings.transformspring.cloud.stream.function.bindings.transform输出
你能试试看有没有区别吗?
我创建了一个定制的Spring云流处理器应用程序,并将其部署为源|处理器|汇流中的处理器步骤。一切似乎都很好,但我的自定义应用程序在数据流UI中显示“部署”。如果这会影响任何事情,我会将其部署为mavenLocal的快照。我是否遗漏了一些让SCDF知道部署成功的信息?
我正在从Spring XD迁移到Spring Cloud Data Flow。当我寻找模块列表时,我意识到一些源码没有在Spring Cloud Flow中列出--其中一个是Kafka源码。 我的问题是为什么在spring cloud data flow中KAFKA源从标准源列表中删除?
我已经使用firebase云函数一段时间了,今天在代码中修复了一个小错误,在尝试部署时出现了以下错误。我取消了该更改,并尝试使用上次提交的稳定更改再次部署,但仍然是相同的错误。有什么解决办法吗?PS:这是一个typescript项目,我用tsc编译它。
我正在使用处理器 API 对状态存储进行一些低级处理。关键是我还需要在存储到商店后写入主题。如何在Spring Cloud Streams Kafka应用程序中完成?
我正在尝试用《Spring的云流》和《Kafka》。下面是示例代码。但它似乎没有任何作用。它总是创建一个名为“输出”的主题。但这些价值观尚未公布。 应用亚马尔 我的目标就是创造价值。 依赖性-2.2.6。释放