当前位置: 首页 > 知识库问答 >
问题:

Spring Integration:如何访问从上一个订阅服务器返回的值

佟颖逸
2023-03-14

我的Spring Boot配置:

@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    final Properties jschProps = new Properties();
    jschProps.put("StrictHostKeyChecking", "no");
    jschProps.put("PreferredAuthentications", "publickey,password");
    factory.setSessionConfig(jschProps);

    factory.setHost(sftpHost);
    factory.setPort(sftpPort);
    factory.setUser(sftpUser);
    if (sftpPrivateKey != null) {
        factory.setPrivateKey(sftpPrivateKey);
        factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
    } else {
        factory.setPassword(sftpPasword);
    }
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<>(factory);
}

@Bean
@BridgeTo
public MessageChannel toSftpChannel() {
    return new PublishSubscribeChannel();
}


@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
@Order(0)
public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
    handler.setFileNameGenerator(message -> {
        if (message.getPayload() instanceof byte[]) {
            return (String) message.getHeaders().get("filename");
        } else {
            throw new IllegalArgumentException("File expected as payload.");
        }
    });
    return handler;
}

@ServiceActivator(inputChannel = "toSftpChannel")
@Order(1)
public String transferComplete(@Payload byte[] file, @Header("filename") String filename) {
    return "The SFTP transfer complete for file: " + filename;
}

@MessagingGateway
public interface UploadGateway {

    @Gateway(requestChannel = "toSftpChannel")
    String upload(@Payload byte[] file, @Header("filename") String filename);

}

我的测试用例:

final String pdfStatus = uploadGateway.upload(content, documentName);
log.info("Upload of {} completed, {}.", documentName, pdfStatus);

从网关上传调用的返回中,我希望得到确认上传的字符串,例如“文件的SFTP传输完成:...”但我得到的是上传文件的返回内容(以字节[])为单位):

Upload of 123456789.1.pdf completed, 37,80,68,70,45,49,46,54,13,37,-30,-29,-49,-45,13,10,50,55,53,32,48,32,111,98,106,13,60,60,47,76,105,110,101,97,114,105,122,101,100,32,49,47,76,32,50,53,52,55,49,48,47,79,32,50,55,55,47,69,32,49,49,49,55,55,55,47,78,32,49,47,84,32,50,53,52,51,53,57,47,72,32,91,32,49,49,57,55,32,53,51,55,93,62,62,13,101,110,100,111,98,106,13,32,32,32,32,32,32,32,32,32,32,32,32,13,10,52,55,49,32,48,32,111,98,106,13,60,60,47,68,101,99,111,100,101,80,97,114,109,115,60,60,47,67,111,108,117,109,110,115,32,53,47,80,114,101,100,105,99,116,111,114,32,49,50,62,62,47,70,105,108,116,101,114,47,70,108,97,116,101,68,101,99,111,100,101,47,73,68,91,60,57,66,53,49,56,54,69,70,53,66,56,66,49,50,52,49,65,56,50,49,55,50,54,56,65,65,54,52,65,57,70,54,62,60,68,52,50,68,51,55,54,53,54,65,67,48,55,54,52,65,65,53,52,66,52,57,51,50,56,52,56,68,66 etc.

我错过了什么?

共有1个答案

龚奕
2023-03-14

我认为@order(0)不能与@bean一起工作。

要修复它,您应该在bean定义IBEAD中执行以下操作:

final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setOrder(0);

有关更多信息,请参见参考手册:

@Bean
@BridgeTo
public MessageChannel toSftpChannel() {
    return new PublishSubscribeChannel();
}

日志确认了这一点:

Adding {bridge:dmsSftpConfig.toSftpChannel.bridgeTo} as a subscriber to the 'toSftpChannel' channel
Channel 'org.springframework.context.support.GenericApplicationContext@b3d0f7.toSftpChannel' has 3 subscriber(s).
started dmsSftpConfig.toSftpChannel.bridgeTo

因此,TosFTPChannel实际上还有一个订阅服务器,它是一个BridgeHandler,具有对ReplyChannel标头的输出。缺省顺序类似于private volatile int order=ordered.lowest_preasence;这一个将作为第一个订阅服务器,并且正是这一个返回字节[],因为它是请求的有效负载

你需要决定你是否真的需要这样一座桥。但是,@order没有解决办法...

 类似资料:
  • 我正在调用一个函数,其中有一个订户函数。此函数必须返回一个数组,但它给出了一个错误: 缺少订阅。 如何做到这一点?

  • 问题内容: 这就是我想要做的。 如果在内部调用,您可能建议将其分配给变量like 并使用i like 。但是我出于个人目的需要使用like 。请提出建议? 问题答案: 您只是不能直接返回该值,因为它是一个异步调用。异步调用意味着它在后台运行(实际上已计划在以后执行),同时代码继续执行。 您也不能直接在类中有这样的代码。需要将其移至方法或构造函数中。 您可以做的不是直接使用而是使用像 此外,您可以将

  • 刚开始玩推送通知,我设法处理了所有的订阅过程,我正在数据库中保存endpoint和密钥。我的问题是,如果有的话,我应该遵循什么策略来删除数据库中的旧字幕详细信息?。所以,如果有人允许通知,他们撤销了权限,我怎么知道是谁从数据库中删除了详细信息?。因为如果用户取消订阅,我只会从pushManager获得空订阅。

  • 我试图从Laravel项目、DBeaver和Artisan访问我的PostgreSQL数据库。从Laravel project或Artisan()访问它总是会导致: 无法连接到服务器:连接被拒绝 服务器是否在主机“127.0.0.1”上运行,并在端口5432上接受TCP/IP连接 因为我不依赖我的Laradock设置,而且我对Laravel也不太熟悉,所以我确实用默认设置重新下载了Laradock

  • 尝试从同一命名空间中的另一个服务连接到一个服务。使用ClusterIP创建服务。创建服务后使用该Ip访问服务。请求有时成功,有时失败,我看到两个pod都启动并运行。以下是服务配置

  • 我对一个发布者-多个订阅者模式的实现有疑问。发布者使用固定大小的缓冲区并将消息排队。消息被发送给所有订户。订阅者获取消息的顺序必须与发布消息的顺序相同。 我使用阻止队列来保存发布者消息(发布者队列)并将其传递给每个订阅者阻止队列(订阅者队列)。 问题是缓冲区和订阅服务器工作正常,但缓冲区大小 (发布者Queue.size()) 始终返回 1。 这是我的完整代码: PublisherSubscrib

  • 我基本上需要从python服务器向设备发送命令,设备将发布对主题的回复,我需要捕获回复服务器端。要从服务器发布到设备,我正在使用boto3物联网数据模块。但是我如何订阅另一个主题以从设备获得回复?似乎没有办法使用aws python库。我需要使用像paho这样的遗传MQTT客户机吗? 谢谢你。

  • 第一,我试过了 第二,我尝试了下面的。(使用onErrorResumeNext),但取消了订阅。 (未调用onError,但调用OnComplete。因此已取消订阅) 第三,我试了下面。(带重试) 这比第一好。但没有刻录。 我想使刷新按钮,工作后错误。 我想知道 null 对不起,我的英语太差了。