前言
安装
$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull
使用
- 本次demo制作一个接口进行增加队列,一个接口显示队列任务,一个接口可以暂停或者恢复队列,一个接口可以清空老队列。
- 实际上Processor那个相当于一个服务。
- 首先新建一个新模块。
- 需要把bull引入进来:
@Module({
imports: [
BullModule.registerQueueAsync({
name: 'task',
useFactory: () => ({
redis: {
host: 'localhost',
port: 6379,
},
}),
}),
],
controllers: [QueneController],
providers: [QueneService],
})
export class QueneModule {}
@Controller('quene')
export class QueneController {
constructor(
@InjectQueue('task') private readonly taskQueue: Queue,
private readonly taskSrv: QueneService,
) {}
@Get()
async getTasl() {
return await this.taskSrv.showJobs();
}
@Post()
async addtask() {
return await this.taskQueue.add('yehuozhili', 20);
}
@Delete()
async deleteOld() {
return await this.taskSrv.cleanOldJobs();
}
@Put()
async toggle() {
const status = await this.taskQueue.isPaused();
console.log('now quene will change to ', status ? 'resume' : 'pause');
if (status) {
return await this.taskQueue.resume();
}
return await this.taskQueue.pause();
}
}
- 至于service,实际上就可以把它改造成消费者加一些任务处理:
import {
InjectQueue,
OnQueueActive,
OnQueueCleaned,
OnQueueCompleted,
OnQueueDrained,
OnQueueError,
OnQueueFailed,
OnQueuePaused,
OnQueueProgress,
OnQueueRemoved,
OnQueueResumed,
OnQueueStalled,
OnQueueWaiting,
Process,
Processor,
} from '@nestjs/bull';
import { Job, Queue } from 'bull';
@Processor('task')
export class QueneService {
constructor(@InjectQueue('task') private readonly taskQueue: Queue) {}
@Process('yehuozhili')
async processTask(job: Job<number>) {
console.log('Processing', job.id, 'for', job.data, 'seconds');
await new Promise(resolve => {
setTimeout(() => {
resolve('');
}, job.data * 1000);
});
console.log('Processing done', job.id);
}
async cleanOldJobs() {
(await this.taskQueue.getJobs(['completed'])).map(
async job => await job.remove(),
);
(await this.taskQueue.getJobs(['failed'])).map(
async job => await job.remove(),
);
}
async showJobs() {
console.log('----------------------------------------');
console.log(
'completedJobs',
(await this.taskQueue.getJobs(['completed'])).map(job => job.id),
);
console.log(
'waiting',
(await this.taskQueue.getJobs(['waiting'])).map(job => job.id),
);
console.log(
'paused',
(await this.taskQueue.getJobs(['paused'])).map(job => job.id),
);
console.log(
'failedJobs',
(await this.taskQueue.getJobs(['failed'])).map(job => job.id),
);
}
@OnQueueActive()
onQueueActive(job: Job) {
console.log('OnQueueActive', job.id);
}
@OnQueueError()
onQueueError(error: Error) {
console.log('OnQueueError', error);
}
@OnQueueWaiting()
onQueueWaiting(jobId: number | string) {
console.log('OnQueueWaiting', jobId);
}
@OnQueueStalled()
onQueueStalled(job: Job) {
console.log('OnQueueStalled', job.id);
}
@OnQueueProgress()
onQueueProgress(job: Job) {
console.log('OnQueueProgress', job.id);
}
@OnQueueCompleted()
onQueueCompleted(job: Job, result: any) {
console.log('OnQueueCompleted', job.id, result);
}
@OnQueueFailed()
onQueueFailed(job: Job, err: Error) {
console.log('onQueueFailed', job.id, err);
}
@OnQueuePaused()
onQueuePaused() {
console.log('OnQueuePaused');
}
@OnQueueResumed()
onQueueResumed() {
console.log('OnQueueResumed');
}
@OnQueueCleaned()
onQueueCleaned(jobs: Job[], type: string) {
console.log('OnQueueCleaned', jobs, type);
}
@OnQueueDrained()
onQueueDrained() {
console.log('OnQueueDrained');
}
@OnQueueRemoved()
onQueueRemoved(job: Job) {
console.log('OnQueueRemoved', job.id);
}
}