当前位置: 首页 > 知识库问答 >
问题:

与服务相关的设计问题

孟谭三
2023-03-14

我想验证我写的多线程应用程序的设计,并在几点上得到澄清/再保证。我提前为这么长的帖子道歉——我想把它分成几个问题,但是我必须引用相同的代码,它们似乎都是相互关联的,所以我选择把所有的东西放在一个帖子里。如果这是不合适的-请让我知道,我会打破这成多个职位。

这是我所拥有的:

  1. BatchService(Spring Singleton bean):接受上传指定目录或zip存档的请求。为此,它拥有ExecutorService servicePool。在每个请求中,它向池中提交一个新的BatchUploader Callable任务,并将返回的Future存储在列表中-TX方法。它提供了获取所有上传状态和取消所有上传的方法。它还启动一个新的BatchMonitor线程来监视上传进度,并更新保存已完成和未完成上传信息的队列。它还会在bean即将被销毁时清除所有资源(使用Spring的PreDestroy回调)
  2. BatchUploader是一个可调用的任务,它也有自己的ServiceExecator batchPool来上传单个文件。在它的call()方法中,它扫描目录或zip存档,并为每个文件提交一个SingleFileUploader Callable任务到它的池。
  3. SingleFileUploader是一个Callable任务,在它的call()方法中,它完成上传和处理文件的所有工作,并返回一些状态。

下面是一些真实代码和一些伪代码:

public class BatchService {

private ExecutorService servicePool;
private ConcurrentHashMap<String, Future<SingleBatchUploadResult>> uploadBatchFutures = new ConcurrentHashMap<String, Future<SingleBatchUploadResult>>();
// keep last 100 unsuccessful uploads
private ConcurrentLinkedQueue<SingleBatchUploadResult> notCompletedBatches = new ConcurrentLinkedQueue<SingleBatchUploadResult>();
// keep last 100 successful uploads
private ConcurrentLinkedQueue<String> completedBatches = new ConcurrentLinkedQueue<String>();
private Thread monitorThread;

public BatchService() {
    executorService = Executors.newFixedThreadPool(MAX_BATCH_UPLOAD_THREADS);
    monitorThread = new Thread(new BatchMonitor());
    monitorThread.setDaemon(true);
    monitorThread.start();
}

@Transactional
public void processUpload(String uploadId, String contentName) {
    Future<SingleBatchUploadResult> taskFuture = servicePool.submit(new BatchUploader(uploadId, contentName));
    uploadBatchFutures.put(uploadId, taskFuture);
}

@PreDestroy
public void preDestroy() {
    // stop the monitor thread
    monitorThread.interrupt();
    // stop all executors and their threads
    cancelAllTasks();
}

public void cancelAllTasks(){
    List<Runnable> waitingTasks =  servicePool.shutdownNow();
    for (Runnable task: waitingTasks){
        // examine which tasks are still waiting, if necessary            
    }
}

public boolean cancelBatchById(String uploadId){
    Future<SingleBatchUploadResult> resultFuture = activeBatchFutures.get(uploadId);
    if (resultFuture != null && (!resultFuture.isDone() || !resultFuture.isCancelled()) ){
        resultFuture.cancel(true);
        return true;
    } 
    // this task was either already finished, cancelled, not submitted or unknown
    return false;
}

public void getCurrentStatus(){
    // just print out the sizes of queues for now
    System.out.println("number of active uploads: " + activeBatchFutures.size());            
    System.out.println("number of successfully completed uploads: " + completedBatches.size());            
    System.out.println("number of failed uploads: " + notCompletedBatches.size());                   
}


public class BatchMonitor implements Runnable {
    @Override
    public void run() {
        boolean cont = true;
        while (cont) {
            if (Thread.currentThread().isInterrupted()){
                // the thread is being shut down - get out
                cont = false;
                break;
            }                 
            Iterator<Entry<String, Future<SingleBatchUploadResult>>> iterator = activeBatchFutures.entrySet().iterator();
            // remove completed Futures from the map
            // add successfully completed batches to completedBatches queue
            // add all other batches to notCompletedBatches queue
            while (iterator.hasNext() && cont){
               …
                if (batchUploadFuture.isCancelled()) {                        
                    addToNotCompleted(defaultResult);
                    // remove this future from the active list
                    activeBatchFutures.remove(uploadId);                        
                } else if (batchUploadFuture.isDone()){
                    try {
                        SingleBatchUploadResult result = batchUploadFuture.get();
                        if (UploadStatus.SUCCESS.equals(result.getUploadStatus()))
                            addToCompleted(uploadId);
                        else 
                            addToNotCompleted(result);
                    } catch (InterruptedException e) {
                        // the thread is being shut down - stop processing
                        cont = false;
                        // preserve interruption state of the thread
                        Thread.currentThread().interrupt();
                        break;
                    } catch (ExecutionException e) {
                        addToNotCompleted(defaultResult);
                    }
                    // remove this future from the active list
                    activeBatchFutures.remove(uploadId);
                } else {
                    // the task has not finished yet - let it be
                    // TODO if a Future is not complete - see how old it is [how ?] If older then timeout - cancel it
                    // For now, rely on the ExecutorService timeout set on the BatchUploader 
                }

            }
            // try to sleep for 5 sec, unless the thread is being shutdown
            if (!Thread.currentThread().isInterrupted()){
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    cont = false;
                    // preserve interruption state of the thread
                    Thread.currentThread().interrupt();
                }
            }

        }
        System.out.println("BatchMonitor.run() has terminated");
    }

    public void addToCompleted(String uploadId){
        int currentSize = completedBatches.size();
        // bring the size of the queue below MAX
        if (currentSize > MAX_SUCCESSFUL_RESULTS) {
            int delta = MAX_SUCCESSFUL_RESULTS - currentSize;
            while (delta > 0){
                completedBatches.poll();
                delta--;
            }
        }
        completedBatches.offer(uploadId);            
    }

    public void addToNotCompleted(SingleBatchUploadResult result){
        int currentSize = notCompletedBatches.size();
        // bring the size of the queue below MAX
        if (currentSize > MAX_UNSUCCESSFUL_RESULTS) {
            int delta = MAX_UNSUCCESSFUL_RESULTS - currentSize;
            while (delta > 0){
                notCompletedBatches.poll();
                delta--;
            }
        }
        notCompletedBatches.offer(result);            
    }

}
}

public class BatchUploader implements Callable<SingleBatchUploadResult> {

private ExecutorService executorService;
// Map<fileName, Future result> - holds Futures for all files that were submitted for upload (those that did not fail validation)
private ConcurrentHashMap<String, Future<SingleFileUploadResult>> uploadTaskFutures = new ConcurrentHashMap<String, Future<SingleFileUploadResult>>();
private ConcurrentHashMap<String, SingleFileUploadResult> notUploadedFiles = new ConcurrentHashMap<String, SingleFileUploadResult>();
private int totalFilesToUpload = 0;

public BatchUploader(...) {
    executorService = Executors.newFixedThreadPool(MAX_THREADS_PER_BATCH);
}

public SingleBatchUploadResult call() {
// do some validation
     if ( this is a correct ZIP file){
        String errorMessage = processZipArchive(threadName, contentName);
        // the errorMessage will be not null if there were some exceptions that happened during the zip archive read:
        // opening the ZIP archive, reading entries or thread interruption exceptions
        if (errorMessage != null) {
    ...
            return errorBatchUploadResult;                
        }
     }        
    // all tasks are submitted - stop the service from accepting new requests and shutdown when done
    executorService.shutdown();

    // now wait until all tasks have finished - but only up to BATCH_UPLOAD_TIMEOUT_IN_SEC seconds
    try {
        executorService.awaitTermination(BATCH_UPLOAD_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
        // try to shutdown all running tasks and stop waiting tasks from being scheduled;
        executorService.shutdownNow();
        // preserve interruption state of the thread
        Thread.currentThread().interrupt();
        return errorBatchUploadResult; 
    }

    // at this point, we either finished all tasks (awaitTermination finished before timeout),
    // or we timed out waiting. Get the latest status of each task
    List<String> successfullyUploadedFiles = new LinkedList<String>();
    for (String entryName : uploadTaskFutures.keySet()) {
        Future<SingleFileUploadResult> future = uploadTaskFutures.get(entryName);
        try {
            if (future.isCancelled()) {
                ...
                notUploadedFiles.putIfAbsent(entryName, taskResult);                   
            } else if (future.isDone()) {
                // this task has finished
                taskResult = future.get();
                if (taskResult.getUploadStatus().equals(UploadStatus.SUCCESS))
                    successfullyUploadedFiles.add(entryName);
                else
                    notUploadedFiles.putIfAbsent(entryName, taskResult);                   
            } else {
                // this task is either not started yet or not finished yet 
                …
                notUploadedFiles.putIfAbsent(entryName, sometaskResult);
            }
        } catch (InterruptedException e){
            // this is  a signal to stop processing
            batchUploadResult.setTotalFilesToUpload(totalFilesToUpload);
            batchUploadResult.setNotUploadedFiles(notUploadedFiles);
            batchUploadResult.setSuccessfullyUploadedFiles(successfullyUploadedFiles);
            batchUploadResult.setStatusMessage(statusMessage);
            batchUploadResult.setUploadStatus(UploadStatus.PARTIAL_FAILURE);
            // cancel/stop all executing/waiting SingleFileUpload tasks
            executorService.shutdownNow();
            // preserve interruption state of the thread
            Thread.currentThread().interrupt();
            return batchUploadResult;
        } catch (ExecutionException e) {
            // we do not know what the state of this task is 
            …
            notUploadedFiles.putIfAbsent(entryName, sometaskResult);
        }            
    }
    ...
    return batchUploadResult;
}

private String processZipArchive(String threadName, String zipName) {
   // do all ZIP-reading work here
        while ( valid file found )
        {
            if (Thread.currentThread().isInterrupted()){
                // this batch uploader thread is being shut down -  stop all SingleFileUpload tasks
                executorService.shutdownNow();
                return errorMessage;
            } 
            // do a try while processing individual files to be able to gather info about failed files but continue processing good ones
            try {
                // read the file and pass it for processing to SingleFileUploader
                Future<SingleFileUploadResult> taskFuture = executorService.submit(new SingleFileUploader(uploadId, bytesContent, zipEntryName));
                uploadTaskFutures.put(zipEntryName, taskFuture);
                ...
             } catch (some exceptions) {
                  notUploadedFiles.put(zipEntryName, taskResult);
            }
        }
return errorMessage;
}    
}

public class SingleFileUploader implements Callable<SingleFileUploadResult> {
...    
@Override
public SingleFileUploadResult call() {
    // check if there was a cancellation request
    if (Thread.currentThread().isInterrupted()){
        // this file uploader thread is being shut down - get out            
        return errorResult;
    } 
    // do the real work here
    return result;
}

}

所有这些在常规场景中都能正常工作。然而,我仍然想听听你对是否有更好/更可靠的方法来做我想做的事情的看法,特别是在以下领域:

>

  • 我使用一个单独的线程BatchMonitor,通过定期扫描活动期货列表并将其移动到“成功完成”或“未完成[失败”队列来跟踪哪些是活动的、已经完成的和尚未完成的。我想知道是否有更好的方法来做到这一点?

    为此,我使用了同步的无界队列,并在不断向队列中添加项目时将它们绑定到指定的最大大小。我在标准JDK LIB中找不到“有界并发队列”,只有无界队列,我希望可以使用Guava中的ReceivingQueue,但它被捆绑到了15.0版本中,似乎还没有发布……因此,我决定自己限制队列的大小,代价是使用size()操作,我知道这是并发队列的问题,因为它会对队列进行完整扫描……我的推理是,如果我将队列的大小保持在较小的范围内(在我的情况下为100),可能就可以了

    我是否需要并发队列?修改队列的唯一线程是BatchMonitor线程,读取队列的唯一其他线程是BatchService线程。我唯一能进入不同步情况的时候是BatchSErvil试图获取特定上传的状态。可能该上载已经从activeBatchFutures映射中删除,但尚未放入“已完成”或“未完成”队列,因为我不会故意在映射和队列之间同步读/写,以避免不必要的锁定。但是我可以离开,偶尔会返回特定上传的“未找到”状态——第二次询问状态会得到正确的结果。

    处理超时和取消:我正在尝试使这个应用程序在资源清理方面防弹——我正在尝试处理所有ThreadInterpution案例并停止处理以允许终止html" target="_blank">线程。我依靠在BAtchUploader中捕获和处理的InterruptedException,通过调用batchPool.shutdownNow()将此事件传播到各个FileUploader任务。你们能看到任何可能出现线程失控的情况吗?当JVM关闭时,应用程序被重新部署到Web容器中?

    谢谢!

    玛丽娜

  • 共有1个答案

    法景明
    2023-03-14

    >

  • 使用Guava的ListenableFuture,而不是您的BatchMonitor-ListenableFuture可以在完成Future后立即执行回调,这使得您无需使用线程来监视您的Futures

    使用ArrayBlockingQueue,这是一个有界并发队列。如果队列是空的,在消费者线程中使用ake来删除项目和块,并且在生产者线程中使用offer(E,long timeout,TimeUnit单位)来添加项目和块(对于timeout单位)如果队列已满。

    如果您使用的ListenableFutures,那么您不需要一个BatchMonitor或并发队列

    我建议您检查Thread.currentThread(). isInter0002()在您的的每一次迭代中(String entryName:上传TaskFutures.keySet())循环,因为您没有调用抛出InterruptedExc0019的方法在所有代码路径上(例如,如果您继续通过其他路径,那么可能需要一段时间才能注意到设置了中断标志)

  •  类似资料:
    • 很多Java语言处理器不会读Java,而是读Java类文件,并从类文件生成符号表和抽象语法树。Java类文件里的代码在语法和语义上都是正确的。结果就是这些工具的作者避免考虑实现一个Java前端时会遇到的很多困难的问题。 Java编程语言的设计者在设计这个语言时没有考虑实现的简单性。确实应当如此,因为更重要的是语言容易使用。设计Java编译器前端的语义分析时遇到的一个很困难的问题就是符号表的设计。这

    • 3.1.2 话题与服务相关对象 C++ 在 roscpp 中,话题和服务的相关对象一般由 NodeHandle 创建。 NodeHandle有一个重要作用是可以用于设置命名空间,这是后期的重点,但是本章暂不介绍。 1.发布对象 对象获取: /** * \brief 根据话题生成发布对象 * * 在 ROS master 注册并返回一个发布者对象,该对象可以发布消息 * * 使用示例如下: * *

    • 我试图建立一个通用服务器总是在连接的客户端。 该体系结构由4个主要组件组成 有状态应用程序服务器 工艺流程 客户端连接到网关 我正在使用JavaNetty作为网关。appserver也是用Java编写的。 我很想说这个设计像Mongrel2,但我不能完全确定。我想说,这更符合城市飞艇氦边缘服务器的设计(http://urbanairship.com/blog/2010/08/24/c500k-in

    • #设计人的求职记录# 准备好作品集之后,怎么向面试官精彩的分享你的作品,展示优秀的你也是一门技术活以下四类问题将会帮助你梳理自己过往的经历,做到对答如牛 #秋招#

    • 我刚刚开始将我的项目分离到小微服务中。我有一个处理 API 授权的微服务(检查 API 请求中提供的 apiKey 是否有效),因此为此,我有一个单独的 API 授权数据库,其中包含下表和以下架构: APIKey: ApiKey (VARCHAR, PK) TenantID (INT, FK) 租户:租户ID(INT, PK)名称(VARCHAR) 如您所见,APIKey表链接到租户表。 我有另一

    • 我正面临一个问题,比如我使用apache POI生成pptx powerpoint演示文稿,所以生成的ppt可以用libra office打开,但当我试图在ms powerpoint中打开时,它产生了一些问题,比如我在演示文稿中插入的图像无法显示。我将json数组中的字节编码字符串传递给我的服务。有人能帮我吗?有什么问题吗?谢谢