序
本文主要聊一下jesque的几个dao
dao列表
- FailureDAO
- KeysDAO
- QueueInfoDAO
- WorkerInfoDAO
FailureDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/FailureDAO.java
/**
* FailureDAO provides access to job failures.
*
* @author Greg Haines
*/
public interface FailureDAO {
/**
* @return total number of failures
*/
long getCount();
/**
* @param offset offset into the failures
* @param count number of failures to return
* @return a sub-list of the failures
*/
List<JobFailure> getFailures(long offset, long count);
/**
* Clear the list of failures.
*/
void clear();
/**
* Re-queue a job for execution.
* @param index the index into the failure list
* @return the date the job was re-queued
*/
Date requeue(long index);
/**
* Remove a failure from the list.
* @param index the index of the failure to remove
*/
void remove(long index);
}复制代码
主要操纵的是namespace:failed,是一个list类型
- count
使用llen方法获取队列长度
- clear
使用del删除namespace:failed队列
- getFailures
使用lrange命令查询
- requeue
根据index取出failed job,重新设定retry时间,放到入队列中
- remove
根据index删,使用lrem,这里是先lset一个随机值,再根据这个随机值lrem
KeysDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/KeysDAO.java
/**
* KeysDAO provides access to available keys.
*
* @author Greg Haines
*/
public interface KeysDAO {
/**
* Get basic key info.
* @param key the key name
* @return the key information or null if the key did not exist
*/
KeyInfo getKeyInfo(String key);
/**
* Get basic key info plus a sub-list of the array value for the key, if applicable.
* @param key the key name
* @param offset the offset into the array
* @param count the number of values to return
* @return the key information or null if the key did not exist
*/
KeyInfo getKeyInfo(String key, int offset, int count);
/**
* Get basic info on all keys.
* @return a list of key informations
*/
List<KeyInfo> getKeyInfos();
/**
* @return information about the backing Redis database
*/
Map<String, String> getRedisInfo();
}复制代码
- getKeyInfo
使用type获取类型
- getKeyInfos
使用keys *方法
- getRedisInfo
使用info
QueueInfoDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/QueueInfoDAO.java
/**
* QueueInfoDAO provides access to the queues in use by Jesque.
*
* @author Greg Haines
*/
public interface QueueInfoDAO {
/**
* @return the list of queue names
*/
List<String> getQueueNames();
/**
* @return total number of jobs pending in all queues
*/
long getPendingCount();
/**
* @return total number of jobs processed
*/
long getProcessedCount();
/**
* @return the list of queue informations
*/
List<QueueInfo> getQueueInfos();
/**
* @param name the queue name
* @param jobOffset the offset into the queue
* @param jobCount the number of jobs to return
* @return the queue information or null if the queue does not exist
*/
QueueInfo getQueueInfo(String name, long jobOffset, long jobCount);
/**
* Delete the given queue.
* @param name the name of the queue
*/
void removeQueue(String name);
}复制代码
- getQueueNames
使用smembers方法操作namespace:queues
getPendingCount
对每个queue计算大小,分queue类型
private long size(final Jedis jedis, final String queueName) { final String key = key(QUEUE, queueName); final long size; if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZCARD size = jedis.zcard(key); } else { // Else, use LLEN size = jedis.llen(key); } return size; }复制代码
延时队列使用的是zcard操作SortSet
非延时队列使用llen操作listgetProcessedCount
直接查询stat的string对象
getQueueInfos
顺带计算每个queue的大小
removeQueue
public void removeQueue(final String name) { PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() { /** * {@inheritDoc} */ @Override public Void doWork(final Jedis jedis) throws Exception { jedis.srem(key(QUEUES), name); jedis.del(key(QUEUE, name)); return null; } }); }复制代码
操作了queues以及queue两个对象
WorkerInfoDAO
jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/WorkerInfoDAO.java
/**
* WorkerInfoDAO provides access to information about workers.
*
* @author Greg Haines
*/
public interface WorkerInfoDAO {
/**
* @return total number of workers known
*/
long getWorkerCount();
/**
* @return number of active workers
*/
long getActiveWorkerCount();
/**
* @return number of paused workers
*/
long getPausedWorkerCount();
/**
* @return information about all active workers
*/
List<WorkerInfo> getActiveWorkers();
/**
* @return information about all paused workers
*/
List<WorkerInfo> getPausedWorkers();
/**
* @return information about all workers
*/
List<WorkerInfo> getAllWorkers();
/**
* @param workerName the name of the worker
* @return information about the given worker or null if that worker does not exist
*/
WorkerInfo getWorker(String workerName);
/**
* @return a map of worker informations by hostname
*/
Map<String, List<WorkerInfo>> getWorkerHostMap();
/**
* Removes the metadata about a worker.
*
* @param workerName
* The worker name to remove
*/
void removeWorker(String workerName);
}复制代码
- getAllWorkers
smembers操作namespace:workers
- getActiveWorkers
smembers操作namespace:workers,然后过来出来state是working的
- getPausedWorkers
smembers操作namespace:workers,然后过来出来state是paused的
- getWorkerCount
直接scard操作namespace:workers
- getActiveWorkerCount
smembers操作namespace:workers,然后过来出来state是working的
- getPausedWorkerCount
smembers操作namespace:workers,然后过来出来state是paused的
- getWorkerHostMap
smembers操作namespace:workers,然后按照host来分map
这个基本是万能的,其他的count基本是这个衍生出来 - removeWorker
public void removeWorker(final String workerName) { PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() { /** * {@inheritDoc} */ @Override public Void doWork(final Jedis jedis) throws Exception { jedis.srem(key(WORKERS), workerName); jedis.del(key(WORKER, workerName), key(WORKER, workerName, STARTED), key(STAT, FAILED, workerName), key(STAT, PROCESSED, workerName)); return null; } }); }复制代码
操作works以及其他相关的对象