我正在尝试在Spring云数据流中配置DLQ。下面是流定义以及我如何部署它
stream create --definition ":someTestTopic > custom-transform
--spring.cloud.stream.bindings.input.consumer.headerMode=raw | log --spring.cloud.stream.bindings.input.consumer.headerMode=raw" --name ticktran
stream deploy ticktran --properties
"apps.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,apps.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.output.destination=test-tran,app.log.spring.cloud.stream.bindings.input.destination=test-tran,app.custom-transform.spring.cloud.stream.kafka.bindings.test-tran.consumer.enableDlq=true"
在自定义转换处理器代码中,我已经提到过
if(out.contains("ERROR")) {
throw new RuntimeException("Error ");
}
这意味着若消息包含错误,那个么RunTimeException和我想在DLQ中捕获这些消息。但当我运行代码时,似乎没有得到任何名为test tran的Kafka DL队列。
我是否需要设置更多属性来启用DLQ,还是需要更改代码中的某些内容以正确使用DLQ。
自定义转换代码
TransformationServiceApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.hateoas.config.EnableEntityLinks;
@SpringBootApplication
@EnableEntityLinks
public class TransformationServiceApplication {
public static void main(String[] args) {
SpringApplication.run(TransformationServiceApplication.class, args);
}
}
TransformationMessageEndPoint。Java语言
@EnableBinding(Processor.class)
@MessageEndpoint
public class TransformationMessageEndpoint {
private static final String NS = "http://openrisk.com/ingestion/";
AtomicInteger index = new AtomicInteger(1);
@ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object process(Message<?> message) {
String out = new String((byte[])message.getPayload());
System.out.println("*****" + out);
if(out.contains("ERROR")) {
throw new RuntimeException("Error ");
}
return message;
}
}
pom。xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.6.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.module</groupId>
<artifactId>spring-cloud-stream-modules-test-support</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.jena</groupId>
<artifactId>jena-core</artifactId>
<version>3.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
正在添加模块
app register --name custom-transform --type processor --uri maven://com.openrisk.openmargin:TransformationService:0.0.1-SNAPSHOT
正在添加流
stream create --definition ":someTesstTopic > custom-transform | log " --name ticktran
部署流
stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"
我将数据流版本更改为1.1 M1版本,使用以下提及命令创建和部署属性,它现在正在工作
stream create --definition ":someTesstTopic > transform | log " --name ticktran
stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"
谢谢Sabby Anandan
流定义几乎没有问题。
流创建-定义”:一些TesssstTopic
流部署ticktran--属性"app.log.spring.cloud.stream.bindings.input.consumer.header模式=原始,app.transform.spring.cloud.stream.bindings.input.consumer.header模式=原始,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"
我们将通过#885向参考指南中添加一些DSL示例。
编辑:我更新了流定义以反映正确的部署属性前缀。
我试图了解运行批处理任务时通过Spring Cloud数据流WRT数据源配置的预期行为。 Spring批处理数据库表(Batch\u JOB\u EXECUTION等)是否在SCDF数据库本身中?当通过SCDF启动任务时,似乎发生了一些神奇的事情,它在SCDF数据库中创建了这些表,并似乎在使用它们。它似乎正在将SCDF数据源注入我的应用程序? 我目前在localhost服务器版本2.0.1上运行。
我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,
大家好,我完全是Spring Cloud Streams框架的新手。 在用于Kafka Streams的spring cloud stream文档中,我可以看到在示例中使用的应用程序yaml/properties文件中引用了前缀为spring.cloud.stream.function.definition等的属性。 我知道Cloud streams使用Cloud函数,但是Cloud stream
根据本文档,应该可以订阅Spring Integration提供的全局错误通道--“ErrorChannel”。 在我这个非常简单的例子中,它不起作用: 应用:
我有以下docker-compose.yml我有数据流服务器运行,kafka,zoomaster,mysql,指标收集器。 配置:(我只提供dataflow server(1.5.2版本),metrics collector)metrics collector:图片:springcloud/metrics-collector-kafka-10环境:-spring。安全使用者名称=Spring-Sp
我有一个现有的过程,我正试图转换成SCDF实现。目前的流程是, HTTP接收器(接收HTTP POST数据)->RabbitMQ->MQ接收器服务->处理/转换->DB接收器