kue api文档说明
慕铭
2023-12-01
## 创建job queue job队列
var jobs = kue.createQueue();
##创建job
var job = queue.create('email', {
title: 'welcome email for tj'
, to: 'tj@learnboost.com'
, template: 'welcome-email'
}).save( function(err){
if( !err ) console.log( job.id );
});
##设置job优先级
queue.create('email', {
title: 'welcome email for tj'
, to: 'tj@learnboost.com'
, template: 'welcome-email'
}).priority('high').save();
{
low: 10
, normal: 0
, medium: -5
, high: -10
, critical: -15
};
##失败后重试次数
queue.create('email', {
title: 'welcome email for tj'
, to: 'tj@learnboost.com'
, template: 'welcome-email'
}).priority('high').attempts(5).save();
##失败后重试的延迟设置
// Honor job's original delay (if set) at each attempt, defaults to fixed backoff 采用默认delay设置
job.attempts(3).backoff( true )
// Override delay value, fixed backoff 延迟1分钟
job.attempts(3).backoff( {delay: 60*1000, type:'fixed'} )
// Enable exponential backoff using original delay (if set)
job.attempts(3).backoff( {type:'exponential'} )
// Use a function to get a customized next attempt delay value
job.attempts(3).backoff( function( attempts, delay ){
return my_customized_calculated_delay;
})
##job有效时间 TTL
queue.create('email', {title: 'email job with TTL'}).ttl(milliseconds).save();
##job log
job.log('$%d sent to %s', amount, user.name);
##job进度
job.progress(completed, total [, data])
job.progress(frames, totalFrames);
##job event
event types
- `enqueue` the job is now queued
- `promotion` the job is promoted from delayed state to queued
- `progress` the job's progress ranging from 0-100
- `failed attempt` the job has failed, but has remaining attempts yet
- `failed` the job has failed and has no remaining attempts
- `complete` the job has completed
- `remove` the job has been removed
var job = queue.create('video conversion', {
title: 'converting loki\'s to avi'
, user: 1
, frames: 200
});
job.on('complete', function(result){
console.log('Job completed with data ', result);
}).on('failed attempt', function(errorMessage, doneAttempts){
console.log('Job failed');
}).on('failed', function(errorMessage){
console.log('Job failed');
}).on('progress', function(progress, data){
console.log('\r job #' + job.id + ' ' + progress + '% complete with data ', data );
});
Note Kue stores job objects in memory until
they are complete/failed to be able to emit events on them.
If you have a huge concurrency in uncompleted jobs,
turn this feature off and use queue level events for better memory scaling.
job对象是保存在内存的,因此如果有大量未完成的并发jobs,建议将jobEvents特性关闭,
并使用队列级别的events
kue.createQueue({jobEvents: false})
##queue events
queue.on('job enqueue', function(id, type){
console.log( 'Job %s got queued of type %s', id, type );
}).on('job complete', function(id, result){
kue.Job.get(id, function(err, job){
if (err) return;
job.remove(function(err){
if (err) throw err;
console.log('removed completed job #%d', job.id);
});
});
});
##delay jobs 延迟job执行
var email = queue.create('email', {
title: 'Account renewal required'
, to: 'tj@learnboost.com'
, template: 'renewal-email'
}).delay(milliseconds)
.priority('high')
.save();
##processing jobs 处理job
process是个单例对象
Note that unlike what the name createQueue suggests,
it currently returns a singleton Queue instance.
So you can configure and use only a single Queue object within your node.js process.
var kue = require('kue')
, queue = kue.createQueue();
queue.process('email', function(job, done){
email(job.data.to, done);
});
function email(address, done) {
if(!isValidEmail(address)) {
//done('invalid to address') is possible but discouraged
return done(new Error('invalid to address'));
}
// email send stuff...
done();
}
##process Concurrency并发
By default a call to queue.process() will only accept one job at a time
for processing. For small tasks like sending emails this is not ideal,
so we may specify the maximum active jobs for this type by passing a number:
第二个参数的数字表示jobs并发数量
queue.process('email', 20, function(job, done){
// ...
});
##pause process 暂停和恢复任务处理
queue.process('email', function(job, ctx, done){
ctx.pause( 5000, function(err){
console.log("Worker is paused... ");
setTimeout( function(){ ctx.resume(); }, 10000 );
});
});
##update progress 更新进度
// 创建job
queue.create('slideshow pdf', {
title: user.name + "'s slideshow"
, slides: [...] // keys to data stored in redis, mongodb, or some other store
});
queue.process('slideshow pdf', 5, function(job, done){
var slides = job.data.slides
, len = slides.length;
function next(i) {
var slide = slides[i]; // pretend we did a query on this slide id ;)
job.log('rendering %dx%d slide', slide.width, slide.height);
renderSlide(slide, function(err){
if (err) return done(err);
job.progress(i, len, {nextSlide : i == len ? 'itsdone' : i + 1});
if (i == len) done()
else next(i + 1);
});
}
next(0);
});
##Graceful shutdown 优雅的停止
showdown 信号 的两种处理方式
.在给定的时间内,等jobs正在处理的job都执行完毕,所有workers都停止
.所有正在处理的job变成失败状态,并记录失败原因为shutdown
Queue#shutdown([timeout,] fn) signals all workers to stop processing
after their current active job is done.
Workers will wait timeout milliseconds for their active job's done
to be called or mark the active job failed with shutdown error reason.
When all workers tell Kue they are stopped fn is called.
var queue = require('kue').createQueue();
process.once( 'SIGTERM', function ( sig ) {
queue.shutdown( 5000, function(err) {
console.log( 'Kue shutdown: ', err||'' );
process.exit( 0 );
});
});
Note that shutdown method signature is changed from Kue >=0.9.0 to move the callback function to the last.
##Error Handling
var queue = require('kue').createQueue();
queue.on( 'error', function( err ) {
console.log( 'Oops... ', err );
});
##Prevent from Stuck Active Jobs 防止jobs一直处于active状态
Kue marks a job complete/failed when done is called by your worker,
so you should use proper error handling to prevent uncaught exceptions
in your worker's code and node.js process exiting before in handle jobs get done.
This can be achieved in two ways:
1 Wrapping your worker's process function in Domains(domain是node.js的异常处理模块)
queue.process('my-error-prone-task', function(job, done){
var domain = require('domain').create();
domain.on('error', function(err){
done(err);
});
domain.run(function(){ // your process function
throw new Error( 'bad things happen' );
done();
});
});
##Queue Maintenance
Queue object has two type of methods to tell you about the number of jobs in each state
queue.inactiveCount( function( err, total ) { // others are activeCount, completeCount, failedCount, delayedCount
if( total > 100000 ) {
console.log( 'We need some back pressure here' );
}
});
you can also query on an specific job type:
queue.failedCount( 'my-critical-job', function( err, total ) {
if( total > 10000 ) {
console.log( 'This is tOoOo bad' );
}
});
##Programmatic Job Management
##Redis Connection Settings
定义redis 数据库连接
var kue = require('kue');
var q = kue.createQueue({
prefix: 'job',
redis: {
port: 1234,
host: '10.0.50.20',
auth: 'password',
db: 0, // if provided select a non-default redis db
options: {
// see https://github.com/mranney/node_redis#rediscreateclient
}
}
});
或者这种方式
var q = kue.createQueue({
redis: 'redis://example.com:1234?redis_option=value&redis_option=value'
});
##Connecting using Unix Domain Sockets
或者通过socket方式
var kue = require('kue');
var q = kue.createQueue({
prefix: 'q',
redis: {
socket: '/data/sockets/redis.sock',
auth: 'password',
options: {
// see https://github.com/mranney/node_redis#rediscreateclient
}
}
});
##User-Interface
express.js开发的ui
##Third-party interfaces
https://github.com/StreetHub/kue-ui
##JSON API
###查询jobs
Query jobs, for example "GET /job/search?q=avi video":
GET /job/search?q=
在创建job的时候要加上searchKeys
var kue = require('kue');
queue = kue.createQueue();
queue.create('email', {
title: 'welcome email for tj'
, to: 'tj@learnboost.com'
, template: 'welcome-email'
}).searchKeys( ['to', 'title'] ).save();
在创建队列的时候要开启search设置,默认是关闭的
var kue = require('kue');
q = kue.createQueue({
disableSearch: false
});
##GET /stats
获取jobs统计数据
http://127.0.0.1:3000/stats
Currently responds with state counts, and worker activity time in milliseconds:
{"inactiveCount":0,"completeCount":0,"activeCount":0,"failedCount":0,"delayedCount":7,"workTime":null}
##GET /job/:id
Get a job by :id:
http://127.0.0.1:3000/job/7
{"id":"3","type":"email","data":{"title":"welcome email for tj","to":"tj@learnboost.com","template":"welcome-email"},"priority":-10,"progress":"100","state":"complete","attempts":null,"created_at":"1309973155248","updated_at":"1309973155248","duration":"15002"}
##GET /job/:id/log
##DELETE /job/:id
Delete job :id:
$ curl -X DELETE http://local:3000/job/2
{"message":"job 2 removed"}
##POST /job
通过API创建jobs
Create a job:
$ curl -H "Content-Type: application/json" -X POST -d \
'{
"type": "email",
"data": {
"title": "welcome email for tj",
"to": "tj@learnboost.com",
"template": "welcome-email"
},
"options" : {
"attempts": 5,
"priority": "high"
}
}' http://localhost:3000/job
{"message": "job created", "id": 3}
You can create multiple jobs at once by passing an array. In this case, the response will be an array too, preserving the order:
$ curl -H "Content-Type: application/json" -X POST -d \
'[{
"type": "email",
"data": {
"title": "welcome email for tj",
"to": "tj@learnboost.com",
"template": "welcome-email"
},
"options" : {
"attempts": 5,
"priority": "high"
}
},
{
"type": "email",
"data": {
"title": "followup email for tj",
"to": "tj@learnboost.com",
"template": "followup-email"
},
"options" : {
"delay": 86400,
"attempts": 5,
"priority": "high"
}
}]' http://localhost:3000/job
[
{"message": "job created", "id": 4},
{"message": "job created", "id": 5}
]
Note: when inserting multiple jobs in bulk, if one insertion fails Kue
will keep processing the remaining jobs in order.
The response array will contain the ids of the jobs added successfully,
and any failed element
will be an object describing the error: {"error": "error reason"}.
##Parallel Processing With Cluster
并行处理(使用cluster和wokers)
var kue = require('kue')
, cluster = require('cluster')
, queue = kue.createQueue();
var clusterWorkerSize = require('os').cpus().length;
if (cluster.isMaster) {
kue.app.listen(3000);
for (var i = 0; i < clusterWorkerSize; i++) {
cluster.fork();
}
} else {
queue.process('email', 10, function(job, done){
var pending = 5
, total = pending;
var interval = setInterval(function(){
job.log('sending!');
job.progress(total - pending, total);
--pending || done();
pending || clearInterval(interval);
}, 1000);
});
}
##Securing Kue
var app = express.createServer({ ... tls options ... });
app.use(express.basicAuth('foo', 'bar'));
app.use(kue.app);
app.listen(3000);