当前位置: 首页 > 知识库问答 >
问题:

春云流>SendTo不发送给Kafka而是通过直接通道直接发送给Kafka

云瑞
2023-03-14

我在我的应用程序中有两个通道,它们与Kafka的两个主题绑定在一起:

  1. 输入
  2. error.input.my-group
@StreamListener(Channels.DLQ)
@SendTo(Channels.INPUT)
public Message<?> reRoute(Message<?> failed){
    messageDeliveryService.waitUntilCanBeDelivered(failed);
    processed.incrementAndGet();
    Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
    retries = retries == null ? 1 : retries+1;
     if (retries < MAX_RETRIES) {
        logger.info("Retry (count={}) for {}", retries, failed);
        return buildRetryMessage(failed, retries);
    }
    else {
        logger.error("Retries exhausted (-> sent to parking lot) for {}", failed);
        Channels.parkingLot().send(MessageBuilder.fromMessage(failed)
                .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                        failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                .build());
    }
    return null;
}

private Message<?> buildRetryMessage(Message<?> failed, int retries) {
    return MessageBuilder.fromMessage(failed)
            .setHeader(X_RETRIES_HEADER, retries)
            .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                    failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
            .build();
}
        @Component
    public interface Channels {

        String INPUT = "INPUT";
        //Default name use by SCS (error.<input-topic-name>.<group-name>)
        String DLQ = "error.input.my-group";
        String PARKING_LOT = "parkingLot.input.my-group";

        @Input(INPUT)
        SubscribableChannel input();

        @Input(DLQ)
        SubscribableChannel dlq();

        @Output(PARKING_LOT)
        MessageChannel parkingLot();


}
spring:
  cloud:
    stream:
      default:
        group: my-group
      binder:
        headerMode: headers      kafka:
        binder:
          # Necessary in order to commit the message to all the Kafka brokers handling the partition -> maximum durability
          # -1 = all
          requiredAcks: -1
          brokers: bootstrap.kafka.svc.cluster.local:9092,bootstrap.kafka.svc.cluster.local:9093,bootstrap.kafka.svc.cluster.local:9094,bootstrap.kafka.svc.cluster.local:9095,bootstrap.kafka.svc.cluster.local:9096,bootstrap.kafka.svc.cluster.local:9097
        bindings:
          input:
            consumer:
              partitioned: true
              enableDlq: true
              dlqProducerProperties:
                configuration:
                  key.serializer: "org.apache.kafka.common.serialization.ByteArraySerializer"
          "[error.input.my-group]":
            consumer:
              # We cannot loose any message and we don't have any DLQ for the DLQ, therefore we only commit in case of success
              autoCommitOnError: false
              ackEachRecord: true
              partitioned: true
              enableDlq: false
      bindings:
        input:
          contentType: application/xml
          destination: input
        "[error.input.my-group]":
          contentType: application/xml
          destination: error.input.my-group
        "[parkingLot.input.my-group]":
          contentType: application/xml
          destination: parkingLot.input.my-group

问题是我的信息再也不会被推送到Kafka,而是直接送到我的输入通道。我是不是误解了什么?

共有1个答案

太叔烨霖
2023-03-14

为了@sendto而不是直接到达kafka目的地,您需要一个输出绑定。

 类似资料:
  • 问题内容: 您好,我尝试使用Java中的客户端-服务器类发送文件。由于某种原因,当调用发送文件的方法时,套接字关闭。这是代码: 和来自客户端的代码: 和我得到的错误消息:严重:null java.net.SocketException:套接字已关闭 我对此并没有真正的经验,所以如果有帮助的话会很棒。 问题答案: 该方法返回一个,代表它实际读取的字节数。不能保证从字节数组中读取所需的字节数。它通常会

  • 我正试图通过Java的socket发送一个custome对象。我知道我需要将具有我需要发送的对象的类放在相同的包中,具有相同的serialVersionUID并实现Serializable。我已经这样做了,但我仍然不能通过套接字发送对象。我错在哪里了? 以下是客户端代码: 客户端中的类用户 发送user类对象的代码: 这里是服务器的代码: user类的代码:与客户端的user完全相同(我从客户端复

  • 如何通过Azure从我的UWP-App向不同设备上的应用程序的其他实例发送推送通知? 以下是注册设备以接收推送的说明。(这是可行的)第二部分是关于如何在控制台应用程序上发送推送(这也是可行的)https://azure.microsoft.com/en-us/documentation/articles/notification-hubs-windows-store-dotnet-get-star

  • 我有一个问题,然后通过套接字发送列表,它必须是字节样的对象,好吧,我可以转换它字符串,然后做,但问题是字符串,这是很难重建它 列表从字符串,和从库 没有工作,然后我有这样的东西: 这就是问题所在,我必须拥有这些对象,我的问题是如何发送python对象而不需要将其转换为字符串,或者类似于JSON之类的对象符号? 这可以通过Python套接字文档在基本套接字服务器上进行测试。 失败的文本评估: 错误:

  • 我正在尝试运行此脚本以发送电子邮件 但它会返回以下错误: 2015-03-09 21:29:37客户- 2015-03-09 21:29:37客户- 2015-03-09 21:29:37客户- 2015-03-09 21:29:37客户- 2015-03-09 21:29:37SMTP错误:密码命令失败:534-5.7.14请通过Web浏览器登录,然后重试。534-5.7.14在534了解更多信

  • ap.sendSocketMessage(OPTION, CALLBACK) 通过 WebSocket 连接发送数据,需要先 ap.connectSocket,并在 ap.onSocketOpen 回调之后才能发送。 OPTION 参数说明 名称 类型 必选 描述 data String/ArrayBuffer 否 请求的参数 代码示例 <script src="https://gw.alipay