视频缓存AndroidVideoCache攻略

麹培
2023-12-01

好久没有更新博客了,一个是因为太忙,业务是在太多,另外一个就是也比较懒,所以拖着就有接近两个月的时间没有写博客了,实在是罪过。今天分享一个开源库 AndroidVideoCache 。这个库主要是做视频缓存管理功能,支持边下边播,离线播放,缓存管理等。用过MediaPlayer的小伙伴都知道,可以支持在线播放和播放本地资源,但是不支持缓存,这样很消耗用户流量,这个时候AndroidVideoCache就派上用场了。

1.基本原理

AndroidVideoCache 通过代理的策略将我们的网络请求代理到本地服务,本地服务再决定是从本地缓存拿还是发起网络请求,如果需要发起网络请求就先向本地写入数据,再从本地提供数据给视频播放器。这样就做到了数据的复用。

借用一张AndroidVideoCache - 视频边播放边缓存的代理策略里面的图片看的比较清楚:

在视频播放器,比如VideoView发起一个urlA,通过HttpProxyCacheServer转成一个本地host和端口的urlB,这样视频播放器发起请求就是向HttpProxyCacheServer请求,返回视频播放器的Socket,Server再建立一个HttpProxyCacheServerClients来发起网络请求处理缓存等工作,然后把数据通过前面的Socket返回给视频播放器。

了解了基本原理,再看下代码结构。

2.代码结构

整个代码结构还是比较清晰,涉及到的类比较多,这里只画出了一些主要的相关类,看下我的手绘图?:

HttpProxyCacheServer是库对外的接口,通过这个和视频播放器联系,判断本地是否有缓存,有的话直接返回本地文件;没有就建立一个和url对应的HttpProxyCacheServerClients处理本次请求,请求工作是交给Source接口,缓存工作是通过Cache接口。文件缓存是用LRU算法实现,可以根据文件大小或者文件个数管理缓存。

CacheListener是缓存本地成功后回调接口,可以用于更新视频进度条等UI需求。

上面整体介绍了下原理和代码结构,接下来是时候看下使用方法了,暴露出来的接口比较少,所以使用起来也简单。

3. 使用

首先是导包,截止到写这边博客,最新的版本是2.7.1:

dependencies {
   compile 'com.danikula:videocache:2.7.1'
}
复制代码

然后在全局初始化一个本地代理服务器,这里选择在 Application 的实现类中

public class App extends Application {

   private HttpProxyCacheServer proxy;

   public static HttpProxyCacheServer getProxy(Context context) {
       App app = (App) context.getApplicationContext();
       return app.proxy == null ? (app.proxy = app.newProxy()) : app.proxy;
   }

   private HttpProxyCacheServer newProxy() {
       return new HttpProxyCacheServer(this);
   }
}
复制代码

有了代理服务器,我们就可以使用了,把自己的网络视频 url 用提供的方法替换成另一个 URL

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);

   HttpProxyCacheServer proxy = getProxy();
   String proxyUrl = proxy.getProxyUrl(VIDEO_URL);
   videoView.setVideoPath(proxyUrl);
}
复制代码

提供了更多的可以自定义的地方,比如缓存的文件最大大小,以及文件个数,缓存采取的是 LruCache 的方法,对于老文件在达到上限后会自动清理。

private HttpProxyCacheServer newProxy() {
    return new HttpProxyCacheServer.Builder(this)
            .maxCacheSize(1024 * 1024 * 1024)       // 1 Gb for cache
            .build();
}

private HttpProxyCacheServer newProxy() {
    return new HttpProxyCacheServer.Builder(this)
            .maxCacheFilesCount(20)
            .build();
}
复制代码

除了这个,还有一个就是生成的文件名,默认是使用的 MD5 方式生成 key,考虑到一些业务逻辑,我们也可以继承一个 FileNameGenerator 来实现自己的策略

public class MyFileNameGenerator implements FileNameGenerator {

    // Urls contain mutable parts (parameter 'sessionToken') and stable video's id (parameter 'videoId').
    // e. g. http://example.com?videoId=abcqaz&sessionToken=xyz987
    public String generate(String url) {
        Uri uri = Uri.parse(url);
        String videoId = uri.getQueryParameter("videoId");
        return videoId + ".mp4";
    }
}

...
HttpProxyCacheServer proxy = HttpProxyCacheServer.Builder(context)
    .fileNameGenerator(new MyFileNameGenerator())
    .build()
复制代码

很明显,构造Server是通过建造者的模式,看下Builder的代码就知道支持哪些配置和默认配置是什么了。

        private static final long DEFAULT_MAX_SIZE = 512 * 1024 * 1024;

        private File cacheRoot;
        private FileNameGenerator fileNameGenerator;
        private DiskUsage diskUsage;
        private SourceInfoStorage sourceInfoStorage;
        private HeaderInjector headerInjector;

        public Builder(Context context) {
            this.sourceInfoStorage = SourceInfoStorageFactory.newSourceInfoStorage(context);
            this.cacheRoot = StorageUtils.getIndividualCacheDirectory(context);
            this.diskUsage = new TotalSizeLruDiskUsage(DEFAULT_MAX_SIZE);
            this.fileNameGenerator = new Md5FileNameGenerator();
            this.headerInjector = new EmptyHeadersInjector();
        }
复制代码
  • cacheRoot就是缓存默认的文件夹,如果有sd卡并且申请了权限,会放到下面的目录
<i>("/Android/data/[app_package_name]/cache")</i> 
复制代码

否则放到手机的内部存储

cacheDirPath = "/data/data/" + context.getPackageName() + "/cache/";
复制代码
  • FileNameGenerator用于生成文件名,默认是 Md5FileNameGenerator,生成MD5串作为文件名。

  • DiskUsage是用于管理本地缓存,默认是通过文件大小进行管理,大小默认是512M

private static final long DEFAULT_MAX_SIZE = 512 * 1024 * 1024;
this.diskUsage = new TotalSizeLruDiskUsage(DEFAULT_MAX_SIZE);
复制代码
  • SourceInfoStorage是用于存储SourInfo,默认是数据库存储
this.sourceInfoStorage = SourceInfoStorageFactory.newSourceInfoStorage(context);

public static SourceInfoStorage newSourceInfoStorage(Context context) {
        return new DatabaseSourceInfoStorage(context);
}
复制代码

SourInfo是什么?主要用于存储http请求源的一些信息,比如url,数据长度length,请求资源的类型mime:

    public final String url;
    public final long length;
    public final String mime;
复制代码
  • HeaderInjector主要用于添加一些自定义的头部字段,默认是空
this.headerInjector = new EmptyHeadersInjector();
复制代码

最后把这些字段构造成Config,构造HttpProxyCacheServer需要,后面会再传给HttpProxyCacheServerClients用于发起请求(url,length,mime)等,和本地缓存(DiskUsage,SourceInfoStorage,cacheRoot)等。

        /**
         * Builds new instance of {@link HttpProxyCacheServer}.
         *
         * @return proxy cache. Only single instance should be used across whole app.
         */
        public HttpProxyCacheServer build() {
            Config config = buildConfig();
            return new HttpProxyCacheServer(config);
        }

        private Config buildConfig() {
            return new Config(cacheRoot, fileNameGenerator, diskUsage, sourceInfoStorage, headerInjector);
        }
复制代码

4. 源码分析

从上面分析知道入口是HttpProxyCacheServer,所以我们先看下它:

    HttpProxyCacheServer.java

    private static final Logger LOG = LoggerFactory.getLogger("HttpProxyCacheServer");
    private static final String PROXY_HOST = "127.0.0.1";

    private final Object clientsLock = new Object();
    private final ExecutorService socketProcessor = Executors.newFixedThreadPool(8);
    private final Map<String, HttpProxyCacheServerClients> clientsMap = new ConcurrentHashMap<>();
    private final ServerSocket serverSocket;
    private final int port;
    private final Thread waitConnectionThread;
    private final Config config;
    private final Pinger pinger;

    public HttpProxyCacheServer(Context context) {
        this(new Builder(context).buildConfig());
    }

    private HttpProxyCacheServer(Config config) {
        this.config = checkNotNull(config);
        try {
            InetAddress inetAddress = InetAddress.getByName(PROXY_HOST);
            this.serverSocket = new ServerSocket(0, 8, inetAddress);
            this.port = serverSocket.getLocalPort();
            IgnoreHostProxySelector.install(PROXY_HOST, port);
            CountDownLatch startSignal = new CountDownLatch(1);
            this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
            this.waitConnectionThread.start();
            startSignal.await(); // freeze thread, wait for server starts
            this.pinger = new Pinger(PROXY_HOST, port);
            LOG.info("Proxy cache server started. Is it alive? " + isAlive());
        } catch (IOException | InterruptedException e) {
            socketProcessor.shutdown();
            throw new IllegalStateException("Error starting local proxy server", e);
        }
    }
复制代码

首先是构造一个本地127.0.0.1ServerSocker,随机分配了一个端口,然后启动一个线程去执行WaitRequestsRunnable,在这里面执行 waitForRequest,通过 accept() 方法监听这个服务器 socket 的入站连接,accept() 方法会一直阻塞,直到有一个客户端尝试建立连接。

    private void waitForRequest() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                Socket socket = serverSocket.accept();
                LOG.debug("Accept new socket " + socket);
                socketProcessor.submit(new SocketProcessorRunnable(socket));
            }
        } catch (IOException e) {
            onError(new ProxyCacheException("Error during waiting connection", e));
        }
    }
复制代码

再回到前面的构造函数中,有个信号量用来保证Server启动后再走往下的流程,Server启动后会构造一个pinger,用来看服务是否可用。

CountDownLatch startSignal = new CountDownLatch(1);
this.waitConnectionThread = new Thread(new WaitRequestsRunnable(startSignal));
this.waitConnectionThread.start();
startSignal.await(); // freeze thread, wait for server starts
this.pinger = new Pinger(PROXY_HOST, port);
复制代码

通过上面几步,HttpProxyCacheServer就已经启动起来了,在等待客户端的连接,那客户端怎么连接到服务?再看下第三章节使用里面提到的另外一个方法getProxyUrl,看下官方解释,如果本地有缓存那么会返回本地地址的 Uri,file:// uri,否则返回一个代理的url。

    /**
     * Returns url that wrap original url and should be used for client (MediaPlayer, ExoPlayer, etc).
     * <p>
     * If parameter {@code allowCachedFileUri} is {@code true} and file for this url is fully cached
     * (it means method {@link #isCached(String)} returns {@code true}) then file:// uri to cached file will be returned.
     *
     * @param url                a url to file that should be cached.
     * @param allowCachedFileUri {@code true} if allow to return file:// uri if url is fully cached
     * @return a wrapped by proxy url if file is not fully cached or url pointed to cache file otherwise (if {@code allowCachedFileUri} is {@code true}).
     */
复制代码

再看下代码就很简单了, 如果本地已经缓存了,就直接拿本地地址的 Uri,并且 touch 一下文件,把时间更新后最新,因为后面 LruCache 是根据文件被访问的时间进行排序的。

    public String getProxyUrl(String url, boolean allowCachedFileUri) {
        if (allowCachedFileUri && isCached(url)) {
            File cacheFile = getCacheFile(url);
            touchFileSafely(cacheFile);
            return Uri.fromFile(cacheFile).toString();
        }
        return isAlive() ? appendToProxyUrl(url) : url;
    }
复制代码

如果文件没有被缓存那么就会先走一下 isAlive() 方法,这里会ping一下Server,确保是通的。如果不通就直接返回原url,通的话就返回代理url:

private static final String PROXY_HOST = "127.0.0.1";
    private String appendToProxyUrl(String url) {
    return String.format(Locale.US, "http://%s:%d/%s", PROXY_HOST, port, ProxyCacheUtils.encode(url));
    }
复制代码

所以在视频播放器拿着这个代理url发起请求会和Server进行连接,然后前面提到的waitForRequest会返回一个客户端的Socket,用于和客户端通信。然后会用线程池处理这个请求,可以看到最多支持8个并发连接。

private final ExecutorService socketProcessor = Executors.newFixedThreadPool(8);
socketProcessor.submit(new SocketProcessorRunnable(socket));
复制代码

SocketProcessorRunnable请求会通过processSocket进行处理,前面 ping 的过程其实也被会这个 socket 监听并且走进来这一段。资源请求会走到else逻辑里面。

private void processSocket(Socket socket) {
        try {
            GetRequest request = GetRequest.read(socket.getInputStream());
            LOG.debug("Request to cache proxy:" + request);
            String url = ProxyCacheUtils.decode(request.uri);
            if (pinger.isPingRequest(url)) {
                pinger.responseToPing(socket);
            } else {
                HttpProxyCacheServerClients clients = getClients(url);
                clients.processRequest(request, socket);
            }
        } catch (SocketException e) {
            // There is no way to determine that client closed connection http://stackoverflow.com/a/10241044/999458
            // So just to prevent log flooding don't log stacktrace
            LOG.debug("Closing socket… Socket is closed by client.");
        } catch (ProxyCacheException | IOException e) {
            onError(new ProxyCacheException("Error processing request", e));
        } finally {
            releaseSocket(socket);
            LOG.debug("Opened connections: " + getClientsCount());
        }
    }
复制代码

首先在内存缓存,其实就是ConcurrentHashMap,看看有没有url对应的HttpProxyCacheServerClients,没有的话构造一个。HttpProxyCacheServerClients就是用来处理一个请求url对应的工作。

    public void processRequest(GetRequest request, Socket socket) throws ProxyCacheException, IOException {
        startProcessRequest();
        try {
            clientsCount.incrementAndGet();
            proxyCache.processRequest(request, socket);
        } finally {
            finishProcessRequest();
        }
    }
复制代码

通过startProcessRequest()构造HttpProxyCache:

    private synchronized void startProcessRequest() throws ProxyCacheException {
        proxyCache = proxyCache == null ? newHttpProxyCache() : proxyCache;
    }

    private HttpProxyCache newHttpProxyCache() throws ProxyCacheException {
        HttpUrlSource source = new HttpUrlSource(url, config.sourceInfoStorage, config.headerInjector);
        FileCache cache = new FileCache(config.generateCacheFile(url), config.diskUsage);
        HttpProxyCache httpProxyCache = new HttpProxyCache(source, cache);
        httpProxyCache.registerCacheListener(uiCacheListener);
        return httpProxyCache;
    }
复制代码

在前面第二章节代码结构中可以看到不管网络请求HttpUrlSource还是缓存FileCache都是通过HttpProxyCache管理。然后注册一个回调CacheListener,在HttpProxyCache缓存可用的时候会回调通知HttpProxyCacheServerClients,Clients就可以通知监听者:

httpProxyCache.registerCacheListener(uiCacheListener);
this.uiCacheListener = new UiListenerHandler(url, listeners);

    private static final class UiListenerHandler extends Handler implements CacheListener {

        private final String url;
        private final List<CacheListener> listeners;

        public UiListenerHandler(String url, List<CacheListener> listeners) {
            super(Looper.getMainLooper());
            this.url = url;
            this.listeners = listeners;
        }

        @Override
        public void onCacheAvailable(File file, String url, int percentsAvailable) {
            Message message = obtainMessage();
            message.arg1 = percentsAvailable;
            message.obj = file;
            sendMessage(message);
        }

        @Override
        public void handleMessage(Message msg) {
            for (CacheListener cacheListener : listeners) {
                cacheListener.onCacheAvailable((File) msg.obj, url, msg.arg1);
            }
        }
    }
复制代码

再回到HttpProxyCacheServerClients构造函数中,接下来会调用proxyCache.processRequest(request, socket):

    public void processRequest(GetRequest request, Socket socket) throws IOException, ProxyCacheException {
        OutputStream out = new BufferedOutputStream(socket.getOutputStream());
        String responseHeaders = newResponseHeaders(request);
        out.write(responseHeaders.getBytes("UTF-8"));

        long offset = request.rangeOffset;
        if (isUseCache(request)) {
            responseWithCache(out, offset);
        } else {
            responseWithoutCache(out, offset);
        }
    }
复制代码

首先通过Socket回消息给视频播放器头部信息,接下来判断是否需要走缓存,不走缓存就直接通过HttpUrlSource发起HttpURLConnection,读取数据通过Socket返回给播放器。如果需要走缓存,会走下面代码,先调用read读取8k的数据,读取成功通过Socket先返回给播放器,再重复读直到完成。

    HttpProxyCache.java

    static final int DEFAULT_BUFFER_SIZE = 8 * 1024;

    private void responseWithCache(OutputStream out, long offset) throws ProxyCacheException, IOException {
        byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
        int readBytes;
        while ((readBytes = read(buffer, offset, buffer.length)) != -1) {
            out.write(buffer, 0, readBytes);
            offset += readBytes;
        }
        out.flush();
    }
复制代码

read方法是调用的父类ProxyCache的read方法:

    public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
        ProxyCacheUtils.assertBuffer(buffer, offset, length);

        while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
            readSourceAsync();
            waitForSourceData();
            checkReadSourceErrorsCount();
        }
        int read = cache.read(buffer, offset, length);
        if (cache.isCompleted() && percentsAvailable != 100) {
            percentsAvailable = 100;
            onCachePercentsAvailableChanged(100);
        }
        return read;
    }
复制代码

通过循环不断读取数据,直到下面其中一个条件满足:

  • 文件读取完成
  • 或者读取的数据已经达到length的要求,默认是8k
  • Clients已经调用shutdown

读取数据会启动一个新的线程去读取:

    private synchronized void readSourceAsync() throws ProxyCacheException {
        boolean readingInProgress = sourceReaderThread != null && sourceReaderThread.getState() != Thread.State.TERMINATED;
        if (!stopped && !cache.isCompleted() && !readingInProgress) {
            sourceReaderThread = new Thread(new SourceReaderRunnable(), "Source reader for " + source);
            sourceReaderThread.start();
        }
    }
复制代码

SourceReaderRunnable中主要就是调用readSource,这里主要是通过HttpUrlSource.read读取网络数据,然后通过FileCache写入到本地缓存,在缓存结束后同样也会发送一个通知通知自己已经缓存完了,回调由外界控制。

    private void readSource() {
        long sourceAvailable = -1;
        long offset = 0;
        try {
            offset = cache.available();
            source.open(offset);
            sourceAvailable = source.length();
            byte[] buffer = new byte[ProxyCacheUtils.DEFAULT_BUFFER_SIZE];
            int readBytes;
            while ((readBytes = source.read(buffer)) != -1) {
                synchronized (stopLock) {
                    if (isStopped()) {
                        return;
                    }
                    cache.append(buffer, readBytes);
                }
                offset += readBytes;
                notifyNewCacheDataAvailable(offset, sourceAvailable);
            }
            tryComplete();
            onSourceRead();
        } catch (Throwable e) {
            readSourceErrorsCount.incrementAndGet();
            onError(e);
        } finally {
            closeSource();
            notifyNewCacheDataAvailable(offset, sourceAvailable);
        }
    }
复制代码

同时调用ProxyCache.read的线程现在在做什么?在看下read方法里面的代码:

    public int read(byte[] buffer, long offset, int length) throws ProxyCacheException {
        ProxyCacheUtils.assertBuffer(buffer, offset, length);

        while (!cache.isCompleted() && cache.available() < (offset + length) && !stopped) {
            readSourceAsync();
            waitForSourceData();
            checkReadSourceErrorsCount();
        }
        int read = cache.read(buffer, offset, length);
        if (cache.isCompleted() && percentsAvailable != 100) {
            percentsAvailable = 100;
            onCachePercentsAvailableChanged(100);
        }
        return read;
    }
复制代码

readSourceAsync启动另外一个线程(为了方便这里简称为ThreadB)后,本线程(为了方便这里简称为ThreadA)会接下来执行 waitForSourceData, 先获得wc这个锁,然后调用ThreadA会挂起1s的时间或者ThreadB已经写完缓存,通过notifyAll通知。

    private void waitForSourceData() throws ProxyCacheException {
        synchronized (wc) {
            try {
                wc.wait(1000);
            } catch (InterruptedException e) {
                throw new ProxyCacheException("Waiting source data is interrupted!", e);
            }
        }
    }

    private void notifyNewCacheDataAvailable(long cacheAvailable, long sourceAvailable) {
        onCacheAvailable(cacheAvailable, sourceAvailable);

        synchronized (wc) {
            wc.notifyAll();
        }
    }
复制代码

接下来ThreadA会继续执行checkReadSourceErrorsCount方法,如果ThreadB在readSource出现异常,会增加一次错误次数,然后会抛出异常。

ProxyCache.java

private static final int MAX_READ_SOURCE_ATTEMPTS = 1;

    private void checkReadSourceErrorsCount() throws ProxyCacheException {
        int errorsCount = readSourceErrorsCount.get();
        if (errorsCount >= MAX_READ_SOURCE_ATTEMPTS) {
            readSourceErrorsCount.set(0);
            throw new ProxyCacheException("Error reading source " + errorsCount + " times");
        }
    }
复制代码

线程ThreadA会在while循环中继续判断条件,如果满足会跳出,然后从FileCache中读取length字节的数据返回到HttpProxyCacheresponseWithCache方法中,通过Socket写回给播放器。

到此整个读取数据,缓存数据的流程就结束了。

5. 总结

写的比较长,先介绍了下AndroidVideoCache的基本原理,然后手绘了张代码框架图,方便全局了解,然后看了下使用方法,最后分析了主要流程的源码。简单说起来就是通过代理策略,拦截网络请求,从本地拿出数据给到播放器。后面如果有时间可以再简单说下本地缓存的一些代码。

如果本文对你有帮助,欢迎关注哈。

感谢@右倾倾,希望你能少点痛苦,平平安安,快快乐乐。

下车了,提前祝大家新年快乐!

 类似资料: