【nest】bull的基本使用

梅跃
2023-12-01

前言

安装

$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull

使用

  • 本次demo制作一个接口进行增加队列,一个接口显示队列任务,一个接口可以暂停或者恢复队列,一个接口可以清空老队列。
  • 实际上Processor那个相当于一个服务。
  • 首先新建一个新模块。
  • 需要把bull引入进来:
@Module({
  imports: [
    BullModule.registerQueueAsync({
      name: 'task',
      useFactory: () => ({
        redis: {
          host: 'localhost',
          port: 6379,
        },
      }),
    }),
  ],
  controllers: [QueneController],
  providers: [QueneService],
})
export class QueneModule {}
  • controller上注入task这个队列:
@Controller('quene')
export class QueneController {
  constructor(
    @InjectQueue('task') private readonly taskQueue: Queue,
    private readonly taskSrv: QueneService,
  ) {}

  @Get()
  async getTasl() {
    return await this.taskSrv.showJobs();
  }

  @Post()
  async addtask() {
    return await this.taskQueue.add('yehuozhili', 20);
  }

  @Delete()
  async deleteOld() {
    return await this.taskSrv.cleanOldJobs();
  }

  @Put()
  async toggle() {
    const status = await this.taskQueue.isPaused();
    console.log('now quene will change to ', status ? 'resume' : 'pause');
    if (status) {
      return await this.taskQueue.resume();
    }
    return await this.taskQueue.pause();
  }
}
  • 至于service,实际上就可以把它改造成消费者加一些任务处理:
import {
  InjectQueue,
  OnQueueActive,
  OnQueueCleaned,
  OnQueueCompleted,
  OnQueueDrained,
  OnQueueError,
  OnQueueFailed,
  OnQueuePaused,
  OnQueueProgress,
  OnQueueRemoved,
  OnQueueResumed,
  OnQueueStalled,
  OnQueueWaiting,
  Process,
  Processor,
} from '@nestjs/bull';
import { Job, Queue } from 'bull';

@Processor('task')
export class QueneService {
  constructor(@InjectQueue('task') private readonly taskQueue: Queue) {}

  @Process('yehuozhili')
  async processTask(job: Job<number>) {
    console.log('Processing', job.id, 'for', job.data, 'seconds');
    await new Promise(resolve => {
      setTimeout(() => {
        resolve('');
      }, job.data * 1000);
    });
    console.log('Processing done', job.id);
  }

  async cleanOldJobs() {
    (await this.taskQueue.getJobs(['completed'])).map(
      async job => await job.remove(),
    );
    (await this.taskQueue.getJobs(['failed'])).map(
      async job => await job.remove(),
    );
  }

  async showJobs() {
    console.log('----------------------------------------');
    console.log(
      'completedJobs',
      (await this.taskQueue.getJobs(['completed'])).map(job => job.id),
    );
    console.log(
      'waiting',
      (await this.taskQueue.getJobs(['waiting'])).map(job => job.id),
    );
    console.log(
      'paused',
      (await this.taskQueue.getJobs(['paused'])).map(job => job.id),
    );
    console.log(
      'failedJobs',
      (await this.taskQueue.getJobs(['failed'])).map(job => job.id),
    );
  }

  @OnQueueActive()
  onQueueActive(job: Job) {
    console.log('OnQueueActive', job.id);
  }

  @OnQueueError()
  onQueueError(error: Error) {
    console.log('OnQueueError', error);
  }

  @OnQueueWaiting()
  onQueueWaiting(jobId: number | string) {
    console.log('OnQueueWaiting', jobId);
  }

  @OnQueueStalled()
  onQueueStalled(job: Job) {
    console.log('OnQueueStalled', job.id);
  }

  @OnQueueProgress()
  onQueueProgress(job: Job) {
    console.log('OnQueueProgress', job.id);
  }

  @OnQueueCompleted()
  onQueueCompleted(job: Job, result: any) {
    console.log('OnQueueCompleted', job.id, result);
  }

  @OnQueueFailed()
  onQueueFailed(job: Job, err: Error) {
    console.log('onQueueFailed', job.id, err);
  }

  @OnQueuePaused()
  onQueuePaused() {
    console.log('OnQueuePaused');
  }

  @OnQueueResumed()
  onQueueResumed() {
    console.log('OnQueueResumed');
  }

  @OnQueueCleaned()
  onQueueCleaned(jobs: Job[], type: string) {
    console.log('OnQueueCleaned', jobs, type);
  }

  @OnQueueDrained()
  onQueueDrained() {
    console.log('OnQueueDrained');
  }

  @OnQueueRemoved()
  onQueueRemoved(job: Job) {
    console.log('OnQueueRemoved', job.id);
  }
}
  • 此时对接口进行请求即可看到对应效果。
 类似资料: