首先讲一下Java中的线程, 通常情况下,我们线程操作都是 Thread + Runnable, 而Runnable是没有返回值的
public class FutureTask<V> implements RunnableFuture<V>
复制代码
FutureTask类的继承
public interface RunnableFuture<V> extends Runnable, Future<V>{
void run();
}
复制代码
Future接口:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码
FutureTask源码:
package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
/**
* Returns result or throws exception for completed task.
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
U.compareAndSwapInt(this, STATE, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
protected void done() { }
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
finishCompletion();
}
}
public void run() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
/**
* Executes the computation without setting its result, and then
* resets this future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
*
* @return {@code true} if successfully run and reset
*/
protected boolean runAndReset() {
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
/**
* Ensures that any interrupt from a possible cancel(true) is only
* delivered to a task while in run or runAndReset.
*/
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion or at timeout
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
/**
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers. To avoid effects of unsplicing from already
* removed nodes, the list is retraversed in case of an apparent
* race. This is slow when there are a lot of nodes, but we don't
* expect lists to be long enough to outweigh higher-overhead
* schemes.
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!U.compareAndSwapObject(this, WAITERS, q, s))
continue retry;
}
break;
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long RUNNER;
private static final long WAITERS;
static {
try {
STATE = U.objectFieldOffset
(FutureTask.class.getDeclaredField("state"));
RUNNER = U.objectFieldOffset
(FutureTask.class.getDeclaredField("runner"));
WAITERS = U.objectFieldOffset
(FutureTask.class.getDeclaredField("waiters"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
}
复制代码
Future + Callable + ExcutorService的应用
AsyncTask 通过excute(Runnable) 返回 Future,Futrue支持cancel(), get()等操作。通过Handler进行线程间通信,内部有个静态的线程池,128长度的任务队列,但是依旧是串行执行的,也可以自己传入线程池
Android中AsyncTask的源码:
/*
* Copyright (C) 2008 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package android.os;
import android.annotation.MainThread;
import android.annotation.Nullable;
import android.annotation.WorkerThread;
import java.util.ArrayDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class AsyncTask<Params, Progress, Result> {
private static final String LOG_TAG = "AsyncTask";
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
// We want at least 2 threads and at most 4 threads in the core pool,
// preferring to have 1 less than the CPU count to avoid saturating
// the CPU with background work
private static final int CORE_POOL_SIZE = Math.max(2, Math.min(CPU_COUNT - 1, 4));
private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1;
private static final int KEEP_ALIVE_SECONDS = 30;
private static final ThreadFactory sThreadFactory = new ThreadFactory() {
private final AtomicInteger mCount = new AtomicInteger(1);
public Thread newThread(Runnable r) {
return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
}
};
private static final BlockingQueue<Runnable> sPoolWorkQueue =
new LinkedBlockingQueue<Runnable>(128);
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
/**
* An {@link Executor} that executes tasks one at a time in serial
* order. This serialization is global to a particular process.
*/
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
private static final int MESSAGE_POST_RESULT = 0x1;
private static final int MESSAGE_POST_PROGRESS = 0x2;
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
private static InternalHandler sHandler;
private final WorkerRunnable<Params, Result> mWorker;
private final FutureTask<Result> mFuture;
private volatile Status mStatus = Status.PENDING;
private final AtomicBoolean mCancelled = new AtomicBoolean();
private final AtomicBoolean mTaskInvoked = new AtomicBoolean();
private final Handler mHandler;
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
/**
* Indicates the current status of the task. Each status will be set only once
* during the lifetime of a task.
*/
public enum Status {
/**
* Indicates that the task has not been executed yet.
*/
PENDING,
/**
* Indicates that the task is running.
*/
RUNNING,
/**
* Indicates that {@link AsyncTask#onPostExecute} has finished.
*/
FINISHED,
}
private static Handler getMainHandler() {
synchronized (AsyncTask.class) {
if (sHandler == null) {
sHandler = new InternalHandler(Looper.getMainLooper());
}
return sHandler;
}
}
private Handler getHandler() {
return mHandler;
}
/** @hide */
public static void setDefaultExecutor(Executor exec) {
sDefaultExecutor = exec;
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*/
public AsyncTask() {
this((Looper) null);
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*
* @hide
*/
public AsyncTask(@Nullable Handler handler) {
this(handler != null ? handler.getLooper() : null);
}
/**
* Creates a new asynchronous task. This constructor must be invoked on the UI thread.
*
* @hide
*/
public AsyncTask(@Nullable Looper callbackLooper) {
mHandler = callbackLooper == null || callbackLooper == Looper.getMainLooper()
? getMainHandler()
: new Handler(callbackLooper);
mWorker = new WorkerRunnable<Params, Result>() {
public Result call() throws Exception {
mTaskInvoked.set(true);
Result result = null;
try {
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//noinspection unchecked
result = doInBackground(mParams);
Binder.flushPendingCommands();
} catch (Throwable tr) {
mCancelled.set(true);
throw tr;
} finally {
postResult(result);
}
return result;
}
};
mFuture = new FutureTask<Result>(mWorker) {
@Override
protected void done() {
try {
postResultIfNotInvoked(get());
} catch (InterruptedException e) {
android.util.Log.w(LOG_TAG, e);
} catch (ExecutionException e) {
throw new RuntimeException("An error occurred while executing doInBackground()",
e.getCause());
} catch (CancellationException e) {
postResultIfNotInvoked(null);
}
}
};
}
private void postResultIfNotInvoked(Result result) {
final boolean wasTaskInvoked = mTaskInvoked.get();
if (!wasTaskInvoked) {
postResult(result);
}
}
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult<Result>(this, result));
message.sendToTarget();
return result;
}
/**
* Returns the current status of this task.
*
* @return The current status.
*/
public final Status getStatus() {
return mStatus;
}
@WorkerThread
protected abstract Result doInBackground(Params... params);
/**
* Runs on the UI thread before {@link #doInBackground}.
*
* @see #onPostExecute
* @see #doInBackground
*/
@MainThread
protected void onPreExecute() {
}
/**
* <p>Runs on the UI thread after {@link #doInBackground}. The
* specified result is the value returned by {@link #doInBackground}.</p>
*
* <p>This method won't be invoked if the task was cancelled.</p>
*
* @param result The result of the operation computed by {@link #doInBackground}.
*
* @see #onPreExecute
* @see #doInBackground
* @see #onCancelled(Object)
*/
@SuppressWarnings({"UnusedDeclaration"})
@MainThread
protected void onPostExecute(Result result) {
}
/**
* Runs on the UI thread after {@link #publishProgress} is invoked.
* The specified values are the values passed to {@link #publishProgress}.
*
* @param values The values indicating progress.
*
* @see #publishProgress
* @see #doInBackground
*/
@SuppressWarnings({"UnusedDeclaration"})
@MainThread
protected void onProgressUpdate(Progress... values) {
}
/**
* <p>Runs on the UI thread after {@link #cancel(boolean)} is invoked and
* {@link #doInBackground(Object[])} has finished.</p>
*
* <p>The default implementation simply invokes {@link #onCancelled()} and
* ignores the result. If you write your own implementation, do not call
* <code>super.onCancelled(result)</code>.</p>
*
* @param result The result, if any, computed in
* {@link #doInBackground(Object[])}, can be null
*
* @see #cancel(boolean)
* @see #isCancelled()
*/
@SuppressWarnings({"UnusedParameters"})
@MainThread
protected void onCancelled(Result result) {
onCancelled();
}
/**
* <p>Applications should preferably override {@link #onCancelled(Object)}.
* This method is invoked by the default implementation of
* {@link #onCancelled(Object)}.</p>
*
* <p>Runs on the UI thread after {@link #cancel(boolean)} is invoked and
* {@link #doInBackground(Object[])} has finished.</p>
*
* @see #onCancelled(Object)
* @see #cancel(boolean)
* @see #isCancelled()
*/
@MainThread
protected void onCancelled() {
}
/**
* Returns <tt>true</tt> if this task was cancelled before it completed
* normally. If you are calling {@link #cancel(boolean)} on the task,
* the value returned by this method should be checked periodically from
* {@link #doInBackground(Object[])} to end the task as soon as possible.
*
* @return <tt>true</tt> if task was cancelled before it completed
*
* @see #cancel(boolean)
*/
public final boolean isCancelled() {
return mCancelled.get();
}
public final boolean cancel(boolean mayInterruptIfRunning) {
mCancelled.set(true);
return mFuture.cancel(mayInterruptIfRunning);
}
public final Result get() throws InterruptedException, ExecutionException {
return mFuture.get();
}
public final Result get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return mFuture.get(timeout, unit);
}
@MainThread
public final AsyncTask<Params, Progress, Result> execute(Params... params) {
return executeOnExecutor(sDefaultExecutor, params);
}
@MainThread
public final AsyncTask<Params, Progress, Result> executeOnExecutor(Executor exec,
Params... params) {
if (mStatus != Status.PENDING) {
switch (mStatus) {
case RUNNING:
throw new IllegalStateException("Cannot execute task:"
+ " the task is already running.");
case FINISHED:
throw new IllegalStateException("Cannot execute task:"
+ " the task has already been executed "
+ "(a task can be executed only once)");
}
}
mStatus = Status.RUNNING;
onPreExecute();
mWorker.mParams = params;
exec.execute(mFuture);
return this;
}
@MainThread
public static void execute(Runnable runnable) {
sDefaultExecutor.execute(runnable);
}
@WorkerThread
protected final void publishProgress(Progress... values) {
if (!isCancelled()) {
getHandler().obtainMessage(MESSAGE_POST_PROGRESS,
new AsyncTaskResult<Progress>(this, values)).sendToTarget();
}
}
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result);
}
mStatus = Status.FINISHED;
}
private static class InternalHandler extends Handler {
public InternalHandler(Looper looper) {
super(looper);
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult<?> result = (AsyncTaskResult<?>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]);
break;
case MESSAGE_POST_PROGRESS:
result.mTask.onProgressUpdate(result.mData);
break;
}
}
}
private static abstract class WorkerRunnable<Params, Result> implements Callable<Result> {
Params[] mParams;
}
@SuppressWarnings({"RawUseOfParameterizedType"})
private static class AsyncTaskResult<Data> {
final AsyncTask mTask;
final Data[] mData;
AsyncTaskResult(AsyncTask task, Data... data) {
mTask = task;
mData = data;
}
}
}
复制代码
AsyncTask的机制原理
1、AsyncTask的本质是一个静态的线程池,AsyncTask派生的子类可以实现不同的异步任务,这些任务都是提交到 静态的线程池中执行。 2、线程池中的工作线程执行doInBackground()方法执行异步任务 3、当任务状态改变之后,工作线程会向UI线程发送消息,AsyncTask内部的InternalHandler响应这些消息,并调用相关的回调函数 注意
-
AsyncTask不适合大量数据的请求,因为AsyncTask中线程池一个时间只能执行一个,因为使用了同步锁;
-
由于Handler需要和主线程交互,而Handler又是内置于AsyncTask中,所有AsyncTask的创建必须在主线程。
AsyncTaskResult的doInBackground(Params)方法执行异步任务运行在子线程中,其他方法运行在主线程中,可以操作UI组件。
-
不要手动的去调用AsyncTask的onPreExecute, doInBackground, publishProgress, onProgressUpdate, onPostExecute方法,这些都是由android系统自动调用的。
-
一个AsyncTask任务只能被执行一次。
-
运行中可以随时调用cancel(boolean)方法取消任务,如果成功调用isCancel()会返回true,并不会执行onPostExecute(),取而代之的是调用onCancelled()。从源码看,如果这个任务已经执行了这个时候调用cancel是不会真正的把task结束,而是继续执行,只不过改变的是执行之后的回调方法的onPostExecute还是onCancelled.
-
可能存在内存泄露情况,即非静态内部类持有外部类引用,解决办法同,Handler内存泄露解决办法一样,(在activity的onDestory 方法中调用 AsyncTask的cancel()方法)
-
并行或串行:在android 1.6之前的版本asynctask都是串行,即把任务放线程池中一串一串的执行,1.6到2.3改成并行,2.3之后为了维护系统稳定改成串行,但是任然可以执行并行操作。
-
可能会导致结果丢失