当前位置: 首页 > 工具软件 > jesque > 使用案例 >

聊聊jesque的几个dao

燕翼
2023-12-01

本文主要聊一下jesque的几个dao

dao列表

  • FailureDAO
  • KeysDAO
  • QueueInfoDAO
  • WorkerInfoDAO

FailureDAO

jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/FailureDAO.java

/**
 * FailureDAO provides access to job failures.
 * 
 * @author Greg Haines
 */
public interface FailureDAO {

    /**
     * @return total number of failures
     */
    long getCount();

    /**
     * @param offset offset into the failures
     * @param count number of failures to return
     * @return a sub-list of the failures
     */
    List<JobFailure> getFailures(long offset, long count);

    /**
     * Clear the list of failures.
     */
    void clear();

    /**
     * Re-queue a job for execution.
     * @param index the index into the failure list
     * @return the date the job was re-queued
     */
    Date requeue(long index);

    /**
     * Remove a failure from the list.
     * @param index the index of the failure to remove
     */
    void remove(long index);
}复制代码

主要操纵的是namespace:failed,是一个list类型

  • count

    使用llen方法获取队列长度

  • clear

    使用del删除namespace:failed队列

  • getFailures

    使用lrange命令查询

  • requeue

    根据index取出failed job,重新设定retry时间,放到入队列中

  • remove

    根据index删,使用lrem,这里是先lset一个随机值,再根据这个随机值lrem

KeysDAO

jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/KeysDAO.java

/**
 * KeysDAO provides access to available keys.
 * 
 * @author Greg Haines
 */
public interface KeysDAO {

    /**
     * Get basic key info.
     * @param key the key name
     * @return the key information or null if the key did not exist
     */
    KeyInfo getKeyInfo(String key);

    /**
     * Get basic key info plus a sub-list of the array value for the key, if applicable.
     * @param key the key name
     * @param offset the offset into the array
     * @param count the number of values to return
     * @return the key information or null if the key did not exist
     */
    KeyInfo getKeyInfo(String key, int offset, int count);

    /**
     * Get basic info on all keys.
     * @return a list of key informations
     */
    List<KeyInfo> getKeyInfos();

    /**
     * @return information about the backing Redis database
     */
    Map<String, String> getRedisInfo();
}复制代码
  • getKeyInfo

    使用type获取类型

  • getKeyInfos

    使用keys *方法

  • getRedisInfo

    使用info

QueueInfoDAO

jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/QueueInfoDAO.java

/**
 * QueueInfoDAO provides access to the queues in use by Jesque.
 * 
 * @author Greg Haines
 */
public interface QueueInfoDAO {

    /**
     * @return the list of queue names
     */
    List<String> getQueueNames();

    /**
     * @return total number of jobs pending in all queues
     */
    long getPendingCount();

    /**
     * @return total number of jobs processed
     */
    long getProcessedCount();

    /**
     * @return the list of queue informations
     */
    List<QueueInfo> getQueueInfos();

    /**
     * @param name the queue name
     * @param jobOffset the offset into the queue
     * @param jobCount the number of jobs to return
     * @return the queue information or null if the queue does not exist
     */
    QueueInfo getQueueInfo(String name, long jobOffset, long jobCount);

    /**
     * Delete the given queue.
     * @param name the name of the queue
     */
    void removeQueue(String name);
}复制代码
  • getQueueNames

    使用smembers方法操作namespace:queues

  • getPendingCount

    对每个queue计算大小,分queue类型

    private long size(final Jedis jedis, final String queueName) {
          final String key = key(QUEUE, queueName);
          final long size;
          if (JedisUtils.isDelayedQueue(jedis, key)) { // If delayed queue, use ZCARD
              size = jedis.zcard(key);
          } else { // Else, use LLEN
              size = jedis.llen(key);
          }
          return size;
      }复制代码

    延时队列使用的是zcard操作SortSet
    非延时队列使用llen操作list

  • getProcessedCount

    直接查询stat的string对象

  • getQueueInfos

    顺带计算每个queue的大小

  • removeQueue

    public void removeQueue(final String name) {
          PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() {
              /**
               * {@inheritDoc}
               */
              @Override
              public Void doWork(final Jedis jedis) throws Exception {
                  jedis.srem(key(QUEUES), name);
                  jedis.del(key(QUEUE, name));
                  return null;
              }
          });
      }复制代码

    操作了queues以及queue两个对象

WorkerInfoDAO

jesque-2.1.0-sources.jar!/net/greghaines/jesque/meta/dao/WorkerInfoDAO.java

/**
 * WorkerInfoDAO provides access to information about workers.
 * 
 * @author Greg Haines
 */
public interface WorkerInfoDAO {

    /**
     * @return total number of workers known
     */
    long getWorkerCount();

    /**
     * @return number of active workers
     */
    long getActiveWorkerCount();

    /**
     * @return number of paused workers
     */
    long getPausedWorkerCount();

    /**
     * @return information about all active workers
     */
    List<WorkerInfo> getActiveWorkers();

    /**
     * @return information about all paused workers
     */
    List<WorkerInfo> getPausedWorkers();

    /**
     * @return information about all workers
     */
    List<WorkerInfo> getAllWorkers();

    /**
     * @param workerName the name of the worker
     * @return information about the given worker or null if that worker does not exist
     */
    WorkerInfo getWorker(String workerName);

    /**
     * @return a map of worker informations by hostname
     */
    Map<String, List<WorkerInfo>> getWorkerHostMap();

    /**
     * Removes the metadata about a worker.
     * 
     * @param workerName
     *            The worker name to remove
     */
    void removeWorker(String workerName);
}复制代码
  • getAllWorkers

    smembers操作namespace:workers

  • getActiveWorkers

    smembers操作namespace:workers,然后过来出来state是working的

  • getPausedWorkers

    smembers操作namespace:workers,然后过来出来state是paused的

  • getWorkerCount

    直接scard操作namespace:workers

  • getActiveWorkerCount

    smembers操作namespace:workers,然后过来出来state是working的

  • getPausedWorkerCount

    smembers操作namespace:workers,然后过来出来state是paused的

  • getWorkerHostMap

    smembers操作namespace:workers,然后按照host来分map
    这个基本是万能的,其他的count基本是这个衍生出来

  • removeWorker
    public void removeWorker(final String workerName) {
          PoolUtils.doWorkInPoolNicely(this.jedisPool, new PoolWork<Jedis, Void>() {
              /**
               * {@inheritDoc}
               */
              @Override
              public Void doWork(final Jedis jedis) throws Exception {
                  jedis.srem(key(WORKERS), workerName);
                  jedis.del(key(WORKER, workerName), key(WORKER, workerName, STARTED), 
                          key(STAT, FAILED, workerName), key(STAT, PROCESSED, workerName));
                  return null;
              }
          });
      }复制代码

    操作works以及其他相关的对象

 类似资料: