当前位置: 首页 > 知识库问答 >
问题:

Spring云数据流DLQ配置不工作

徐杰
2023-03-14

我正在尝试在Spring云数据流中配置DLQ。下面是流定义以及我如何部署它

  stream create --definition ":someTestTopic > custom-transform
     --spring.cloud.stream.bindings.input.consumer.headerMode=raw | log --spring.cloud.stream.bindings.input.consumer.headerMode=raw" --name ticktran


    stream deploy ticktran --properties
  "apps.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,apps.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.output.destination=test-tran,app.log.spring.cloud.stream.bindings.input.destination=test-tran,app.custom-transform.spring.cloud.stream.kafka.bindings.test-tran.consumer.enableDlq=true"

在自定义转换处理器代码中,我已经提到过

if(out.contains("ERROR")) {
            throw new RuntimeException("Error ");
        }

这意味着若消息包含错误,那个么RunTimeException和我想在DLQ中捕获这些消息。但当我运行代码时,似乎没有得到任何名为test tran的Kafka DL队列。

我是否需要设置更多属性来启用DLQ,还是需要更改代码中的某些内容以正确使用DLQ。

自定义转换代码

TransformationServiceApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.hateoas.config.EnableEntityLinks;

@SpringBootApplication
@EnableEntityLinks
public class TransformationServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(TransformationServiceApplication.class, args);
    }
}

TransformationMessageEndPoint。Java语言

@EnableBinding(Processor.class)
@MessageEndpoint
public class TransformationMessageEndpoint {

    private static final String NS = "http://openrisk.com/ingestion/";

    AtomicInteger index = new AtomicInteger(1);
    @ServiceActivator(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object process(Message<?> message) {
        String out =  new String((byte[])message.getPayload());

        System.out.println("*****" + out);

        if(out.contains("ERROR")) {
            throw new RuntimeException("Error ");
        }

        return message;

    }
}

pom。xml

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.3.6.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>1.0.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <version>1.0.0.BUILD-SNAPSHOT</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud.stream.module</groupId>
            <artifactId>spring-cloud-stream-modules-test-support</artifactId>
            <version>1.0.0.BUILD-SNAPSHOT</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.jena</groupId>
            <artifactId>jena-core</artifactId>
            <version>3.1.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

正在添加模块

app register --name custom-transform --type processor --uri maven://com.openrisk.openmargin:TransformationService:0.0.1-SNAPSHOT

正在添加流

stream create --definition ":someTesstTopic > custom-transform | log " --name ticktran

部署流

stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.custom-transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"

共有2个答案

房光临
2023-03-14

我将数据流版本更改为1.1 M1版本,使用以下提及命令创建和部署属性,它现在正在工作

stream create --definition ":someTesstTopic > transform | log " --name ticktran


stream deploy ticktran --properties "app.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.bindings.input.consumer.headerMode=raw,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"

谢谢Sabby Anandan

汪兴旺
2023-03-14

流定义几乎没有问题。

  • 部署属性从app开始。

流创建-定义”:一些TesssstTopic

流部署ticktran--属性"app.log.spring.cloud.stream.bindings.input.consumer.header模式=原始,app.transform.spring.cloud.stream.bindings.input.consumer.header模式=原始,app.transform.spring.cloud.stream.kafka.bindings.input.consumer.enableDlq"

  • 在应用程序中引用时,目的地测试传输的格式不可接受。使改变Spring云流动Kafka。绑定。

我们将通过#885向参考指南中添加一些DSL示例

编辑:我更新了流定义以反映正确的部署属性前缀。

 类似资料:
  • 我试图了解运行批处理任务时通过Spring Cloud数据流WRT数据源配置的预期行为。 Spring批处理数据库表(Batch\u JOB\u EXECUTION等)是否在SCDF数据库本身中?当通过SCDF启动任务时,似乎发生了一些神奇的事情,它在SCDF数据库中创建了这些表,并似乎在使用它们。它似乎正在将SCDF数据源注入我的应用程序? 我目前在localhost服务器版本2.0.1上运行。

  • 我用的是Apache Kafka 2.7.0和Spring Cloud Stream Kafka Streams。 在我的Spring Cloud Stream (Kafka Streams)应用程序中,我已经将我的application.yml配置为当输入主题中的消息出现反序列化错误时使用sendToDlq机制: 我启动了我的应用程序,但我看不到这个主题存在。文档指出,如果 DLQ 主题不存在,

  • 大家好,我完全是Spring Cloud Streams框架的新手。 在用于Kafka Streams的spring cloud stream文档中,我可以看到在示例中使用的应用程序yaml/properties文件中引用了前缀为spring.cloud.stream.function.definition等的属性。 我知道Cloud streams使用Cloud函数,但是Cloud stream

  • 根据本文档,应该可以订阅Spring Integration提供的全局错误通道--“ErrorChannel”。 在我这个非常简单的例子中,它不起作用: 应用:

  • 我有以下docker-compose.yml我有数据流服务器运行,kafka,zoomaster,mysql,指标收集器。 配置:(我只提供dataflow server(1.5.2版本),metrics collector)metrics collector:图片:springcloud/metrics-collector-kafka-10环境:-spring。安全使用者名称=Spring-Sp

  • 我有一个现有的过程,我正试图转换成SCDF实现。目前的流程是, HTTP接收器(接收HTTP POST数据)->RabbitMQ->MQ接收器服务->处理/转换->DB接收器