我需要创建一个库,其中将包含同步和异步方法。
executeSynchronous()
-等到得到结果,然后返回结果。executeAsynchronous()
-立即返回Future,如果需要,可以在完成其他操作后进行处理。我图书馆的核心逻辑
客户将使用我们的库,他们将通过传递DataKey
构建器对象来调用它。然后,我们将使用该DataKey
对象构造一个URL,并通过执行该URL对该URL进行HTTP客户端调用,然后将响应作为JSON字符串返回给我们,然后通过创建DataResponse
对象将该JSON字符串发送回给我们的客户。有些客户会打电话executeSynchronous()
,有些可能会打电话,executeAsynchronous()
所以这就是为什么我需要在库中分别提供两个方法。
接口:
public interface Client {
// for synchronous
public DataResponse executeSynchronous(DataKey key);
// for asynchronous
public Future<DataResponse> executeAsynchronous(DataKey key);
}
然后我有了DataClient
实现以上Client
接口的代码:
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
private ExecutorService executor = Executors.newFixedThreadPool(10);
// for synchronous call
@Override
public DataResponse executeSynchronous(DataKey key) {
DataResponse dataResponse = null;
Future<DataResponse> future = null;
try {
future = executeAsynchronous(key);
dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
// does this look right the way I am doing it?
future.cancel(true); // terminating tasks that have timed out.
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
//for asynchronous call
@Override
public Future<DataResponse> executeAsynchronous(DataKey key) {
Future<DataResponse> future = null;
try {
Task task = new Task(key, restTemplate);
future = executor.submit(task);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
}
return future;
}
}
简单类将执行实际任务:
public class Task implements Callable<DataResponse> {
private DataKey key;
private RestTemplate restTemplate;
public Task(DataKey key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
String response = null;
try {
String url = createURL();
response = restTemplate.getForObject(url, String.class);
// it is a successful response
dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
} catch (RestClientException ex) {
PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
} catch (Exception ex) {
PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
}
return dataResponse;
}
// create a URL by using key object
private String createURL() {
String url = somecode;
return url;
}
}
问题陈述:-
当我开始研究此解决方案时,我并没有终止超时的任务。我正在向客户端报告超时,但是任务继续在线程池中运行(很长时间可能占用了我有限的10个线程之一)。所以,我做了一些研究,并在网上我发现我可以取消我的任务有那些通过使用超时cancel
在future
如下图所示-
future.cancel(true);
但是我想确保,在我executeSynchronous
取消超时的任务的方法中,它看起来是否正确?
由于我正在调用cancel()
,Future
如果任务仍在队列中,它将停止运行,所以我不确定我在做什么对吗?什么是正确的方法?
如果有更好的方法,那么有人可以提供示例吗?
如果任务仍在队列中,则只需调用即可取消它,future.cancel()
但是显然您不知道该任务是否在队列中。同样,即使您请求future
中断任务,它也可能无法正常工作,因为您的任务仍然可以执行忽略线程中断状态的操作。
因此,您可以使用,future.cancel(true)
但您需要确保您的任务(线程)确实考虑了线程中断的状态。例如,您提到要进行http调用,因此线程中断后,您可能需要关闭http客户端资源。
请参考下面的例子。
我试图实现任务取消方案。通常,线程可以检查isInterrupted()
并尝试终止自身。但是,当您使用可调用的线程池执行程序时,如果任务不是真正喜欢的话,这将变得更加复杂while(!Thread.isInterrupted()) {// execute task}
。
在此示例中,一个任务正在写入文件(我没有使用http调用来简化它)。线程池执行程序开始运行任务,但是调用方希望在100毫秒后取消它。现在,future将中断信号发送到线程,但是可调用任务无法在写入文件时立即检查它。因此,为了使这种情况发生,callable会维护一个将要使用的IO资源列表,并且在将来要取消该任务时,它仅调用cancel()
所有IO资源,这将使该任务以IOException终止,然后线程完成。
public class CancellableTaskTest {
public static void main(String[] args) throws Exception {
CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
long startTime = System.currentTimeMillis();
Future<String> future = threadPoolExecutor.submit(new CancellableTask());
while (System.currentTimeMillis() - startTime < 100) {
Thread.sleep(10);
}
System.out.println("Trying to cancel task");
future.cancel(true);
}
}
class CancellableThreadPoolExecutor extends ThreadPoolExecutor {
public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new CancellableFutureTask<T>(callable);
}
}
class CancellableFutureTask<V> extends FutureTask<V> {
private WeakReference<CancellableTask> weakReference;
public CancellableFutureTask(Callable<V> callable) {
super(callable);
if (callable instanceof CancellableTask) {
this.weakReference = new WeakReference<CancellableTask>((CancellableTask) callable);
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean result = super.cancel(mayInterruptIfRunning);
if (weakReference != null) {
CancellableTask task = weakReference.get();
if (task != null) {
try {
task.cancel();
} catch (Exception e) {
e.printStackTrace();
result = false;
}
}
}
return result;
}
}
class CancellableTask implements Callable<String> {
private volatile boolean cancelled;
private final Object lock = new Object();
private LinkedList<Object> cancellableResources = new LinkedList<Object>();
@Override
public String call() throws Exception {
if (!cancelled) {
System.out.println("Task started");
// write file
File file = File.createTempFile("testfile", ".txt");
BufferedWriter writer = new BufferedWriter(new FileWriter(file));
synchronized (lock) {
cancellableResources.add(writer);
}
try {
long lineCount = 0;
while (lineCount++ < 100000000) {
writer.write("This is a test text at line: " + lineCount);
writer.newLine();
}
System.out.println("Task completed");
} catch (Exception e) {
e.printStackTrace();
} finally {
writer.close();
file.delete();
synchronized (lock) {
cancellableResources.clear();
}
}
}
return "done";
}
public void cancel() throws Exception {
cancelled = true;
Thread.sleep(1000);
boolean success = false;
synchronized (lock) {
for (Object cancellableResource : cancellableResources) {
if (cancellableResource instanceof Closeable) {
((Closeable) cancellableResource).close();
success = true;
}
}
}
System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all"));
}
}
对于与REST Http客户端相关的要求,您可以修改工厂类,例如:
public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory {
private List<Object> cancellableResources;
public CancellableSimpleClientHttpRequestFactory() {
}
public CancellableSimpleClientHttpRequestFactory(List<Object> cancellableResources) {
this.cancellableResources = cancellableResources;
}
protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException {
HttpURLConnection connection = super.openConnection(url, proxy);
if (cancellableResources != null) {
cancellableResources.add(connection);
}
return connection;
}
}
在这里,您需要RestTemplate
在可运行任务中创建时使用此工厂。
RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources));
确保传递与您维护相同的可取消资源列表CancellableTask
。
现在,您需要像这样修改cancel()
方法CancellableTask
-
synchronized (lock) {
for (Object cancellableResource : cancellableResources) {
if (cancellableResource instanceof HttpURLConnection) {
((HttpURLConnection) cancellableResource).disconnect();
success = true;
}
}
}
问题内容: 我是python和线程的新手。我已经编写了充当网络爬虫的python代码,并在网站中搜索特定的关键字。我的问题是,如何使用线程同时运行类的三个不同实例。当实例之一找到关键字时,所有三个实例都必须关闭并停止爬网。这是一些代码。 如何使用线程让Crawler同时执行三个不同的爬网? 问题答案: 似乎没有一种(简单的)方法可以终止Python中的线程。 这是一个并行运行多个HTTP请求的简单
问题内容: 我正在尝试以这种方式使用python的多处理程序包: 从池的进程中,我要避免等待等待60多个返回结果的进程。那可能吗? 问题答案: 这是一种无需更改功能即可执行此操作的方法。需要两个步骤: 使用您可以传递的选项来确保每次执行任务后重新启动池中的工作进程。 将现有的辅助函数包装在另一个函数中,该函数将调用守护程序线程,然后等待该线程的结果数秒钟。使用守护程序线程很重要,因为进程在退出之前
我正在编写一个JavaFX应用程序,我的对象扩展任务提供了JavaFXGUI线程之外的并发性。 我的主要课程是这样的: 我的GUI控制器示例如下(略作抽象): 目前,我的任务只是进行睡眠并打印数字1到10: 我遇到的问题是,一旦任务完成,就好像启动任务的线程继续运行一样。因此,当我按下右上角的“X”退出JavaFX应用程序时,JVM继续运行,我的应用程序不会终止。如果你看一下我的主课,我已经把系统
我有一个压缩图像的任务,它在图像中使用了许多循环: 我在普通线程中运行此方法,如下所示: 或者在后台工作线程中运行 问题是:这种方法有时会出错,在接收无效输入时会导致无限循环。在这种情况下,它将永远运行,并损害CPU,即使当设备的屏幕关闭时,这会增加设备的温度(如果我使用工作线程,它还会阻止等待队列中的其他任务)。 我想我需要设置一个超时来终止长时间运行的任务。在正常Java线程中实现这一点的最佳
问题内容: 如何使多线程python程序响应Ctrl + C键事件? 编辑: 代码是这样的: 我试图在所有线程上删除join(),但仍然无法正常工作。是否因为每个线程的run()过程中的锁段? 编辑: 上面的代码应该可以工作,但是当当前变量在5,000-6,000范围内并遍历以下错误时,它总是会中断 问题答案: 在启动主线程之前,将除主线程之外的每个线程都设为守护进程(在2.6或更高版本中,在2.
问题内容: 如果我在无限循环中有一个线程,有没有办法在主程序结束时(例如,当我按 +时 )终止它? 问题答案: 检查这个问题。正确答案对如何以正确的方式终止线程有很好的解释: 是否有任何方法可以杀死Python中的线程? 要使线程在键盘中断信号(ctrl + c)上停止,您可以在退出之前捕获异常“ KeyboardInterrupt”并进行清除。像这样: 这样,您可以控制程序突然终止时的处理方式。