当前位置: 首页 > 知识库问答 >
问题:

如何跟踪databufferutils.write()的进度和速度?

饶德本
2023-03-14
Flux<DataBuffer> dataBufferFlux = webClientEmbed.get()
  .uri(someUri)
  // ...
  .retrieve()
  .bodyToFlux(DataBuffer.class);


DataBufferUtils
  .write(dataBufferFlux, somePath, CREATE)
  .block();

使用DataBuffer类时,跟踪WebClient下载进度的正确方法是什么?

共有1个答案

滑畅
2023-03-14

下面是一个代码片段,说明如何跟踪WebClient和DataBufferUtils的进度

    public DownloadProgress download(String url, String targetFile) {

        WebClient client = WebClient.builder()
                .clientConnector(
                    new ReactorClientHttpConnector(HttpClient.create().followRedirect(true)
                 ))
                .baseUrl(url)
                .build();
        
        final DownloadProgress progress = new DownloadProgress();

        Flux<DataBuffer> dataBufferFlux = client.get()
            .exchangeToFlux( response -> {
                long contentLength = response.headers().contentLength().orElse(-1);
                progress.setLength(contentLength);
                return response.bodyToFlux(DataBuffer.class);
            });
        
        Path path = Paths.get(targetFile);
        FileOutputStream fout = null;
        try {
            fout = new FileOutputStream(path.toFile());
        } catch (FileNotFoundException e) {
            throw new RuntimeException("Unable to create targetFile", e);
        }
        
        DataBufferUtils.write(dataBufferFlux, progress.getOutputStream(fout) )
            .subscribe( DataBufferUtils.releaseConsumer() );
        
        return progress;
    }

此方法返回一个在写入outpout流时更新的“DownloadProgress”对象。下面是这个DownloadProgress类:

public class DownloadProgress {

    private long contentLength = -1;
    private long downloaded;

    public void setLength(long contentLength) {
        this.contentLength = contentLength;
    }
    
    public OutputStream getOutputStream(FileOutputStream fileOutputStream) {
        return new FilterOutputStream(fileOutputStream) {
            @Override
            public void write(byte[] b, int off, int len) throws IOException {
                out.write(b, off, len);
                downloaded += len;
            }
            @Override
            public void write(int b) throws IOException {
                out.write(b);
                downloaded++;
            }
            @Override
            public void close() throws IOException {
                super.close();
                done();
            }
        };
    }
    
    public void done() {
        if ( contentLength == -1 ) {
            contentLength = downloaded;
        } else {
            downloaded = contentLength;
        }
    }
    
    public double getProgress() {
        if ( contentLength == -1 ) return 0;
        return downloaded / (double) contentLength;
    }
    
    public long getDownloaded() {
        return downloaded;
    }

    public boolean finished() {
        return downloaded > 0 && downloaded == contentLength;
    }
    
}

然后,为了监视下载,您可以在finished返回false时循环并显示进度。

            do {
                String progress = String.format("%.2f", dl.getProgress() * 100.0);
                log.info( "Downloaded: {}%", progress );
                try {
                    Thread.sleep(250);
                } catch (InterruptedException e) {
                    return;
                }
            } while(!dl.finished());
 类似资料:
  • 问题内容: 我正在Go中为Linux编写一个ShareX克隆,该克隆通过http POST请求将文件和图像上传到文件共享服务。 我目前正在使用http.Client和Do()发送请求,但我希望能够跟踪较大文件的上传进度,这些文件最多需要一分钟的时间来上传。目前,我能想到的唯一方法是手动在端口80上打开与网站的TCP连接,并以块的形式写入HTTP请求,但我不知道它是否可以在https网站上使用,我不

  • 我有一个spring应用程序,它接受长期存在的连接——每个请求启动一个kafka消费者,然后它将消费和处理的消息写入请求客户端。每个已使用的kafka消息都包含侦探跟踪头,例如: 但是,由于kafka使用者是根据传入请求动态生成的,因此每个传入请求都会生成另一组span/id。 因此,在将kafka消息推送到客户端之前,在处理kafka消息期间生成的所有日志都会使用请求的跟踪/跨度进行跟踪,在我的

  • 问题内容: 我试图严格遵守使用$ http或$ resource进行文件上传的angularjs代码。 这些事件根本没有触发。我想念什么? 参考资料 https://github.com/angular/angular.js/issues/14436 https://github.com/angular/angular.js/pull/11547 问题答案: 发布查询后找到它。感觉很蠢。我必须在b

  • 问题内容: 我正在尝试使用boto3从S3下载文本文件。 这是我写的。 我用它来称呼它 这给我一个错误,指出该文件夹中没有该文件。显然,当我已经在同一文件夹中拥有此名称的文件时,它可以工作,但是当我下载新文件时,它会出错。 我需要进行什么纠正? 问题答案: 创建一个对象,运行其方法,然后将该对象传递给该方法。这意味着该方法 在 开始 之前 运行。 在该方法中,您尝试读取要下载到的本地文件的大小,这

  • 问题内容: 我曾经短暂地附加一个过程。该过程创建了90个线程。当我找到有问题的线程时,我必须繁琐地搜索父线程,然后是祖父母线程,一直到根进程。 是否有技巧或工具可以快速找出哪个线程创建了另一个线程?还是更好,打印类似树的线程创建树? 问题答案: 跟踪编辑的子进程。