logo

Queues(Bull)

대기열은 작업을 순차적으로 처리할 수 있게 해주며 작업 처리에 있어 확장 및 성능 문제를 처리할 수 있게 해주는 디자인 패턴입니다.
NestJS는 대기열 기능 제공을 위해 Node.js 라이브러리인 Bull을 사용합니다.
Bull은 Redis 기반의 대기열 시스템 라이브러리로 Redis로 작업 데이터를 유지하는 방식으로 대기열 기능을 제공합니다. 따라서 NestJS의 대기열 기능 사용을 위해 시스템에 Redis 설치가 필요합니다.

Diagram#center

대기열은 다음과 같이 사용될 수 있습니다.

  • Producer는 CPU 집약적이고 임의의 시간에 시작될 수 있는 작업을 동기적으로 수행하는 대신 대기열에 추가합니다.
  • Producer는 대기열의 작업을 자신의 별도의 프로세스에서 처리하려는 경우 Processor를 정의하고 구현합니다.
  • 대기열의 작업을 Producer가 아닌 별도의 Consumer에서 처리하려는 경우 Consumer를 추가하고 Consumer에 Processor를 정의하고 구현합니다.

대기열 사용 시 HTTP Server는 CPU 집약적인 작업을 비동기적으로 수행하는 것으로 응답성을 유지할 수 있습니다. 또한 작업의 처리능력 향상을 위한 시스템 확장이 필요할 때 Consumer를 추가하는 것으로 간단하게 시스템을 확장할 수 있습니다.
Bull은 Redis를 사용하여 대기열 기능을 제공하므로 개발 언어나 환경에 독립적으로 구현되거나 확장될 수 있습니다.


설치

Producer와 Consumer 프로젝트 폴더에서 @nestjs/bull, bull을 설치합니다.
Producer와 Consumer 프로젝트가 Monorepo로 구성된 경우 레포지포리에 @nestjs/bull, bull을 설치합니다.

npm i @nestjs/bull bull

테스트를 위해 Producer에 @nestjs/swagger를 설치합니다.

npm i @nestjs/swagger

Producer 구현

Producer는 HTTP 요청을 수신하고 Bull을 사용하여 Queue에 작업을 추가합니다.

main.ts 파일을 수정합니다.

main.ts
├  apps
│  └  producer
│     └  src
│        └  main.ts
└  ...
import { NestFactory } from '@nestjs/core';
import { DocumentBuilder, SwaggerModule } from '@nestjs/swagger';
import { ProducerModule } from './producer.module';

async function bootstrap() {
  const app = await NestFactory.create(ProducerModule, { cors: true });
  const config = new DocumentBuilder().setTitle('JihyunLab Bull').setVersion('1.0').build();

  SwaggerModule.setup('api', app, SwaggerModule.createDocument(app, config));
  await app.listen(3001);
}
bootstrap();

bootstrap 함수에서 Swagger로부터 HTTP 요청을 수신할 애플리케이션을 만듭니다.

producer.module.ts 파일을 수정합니다.

producer.module.ts
├  apps
│  └  producer
│     └  src
│        └  producer.module.ts
└  ...
import { Module } from '@nestjs/common';
import { ProducerController } from './producer.controller';
import { ProducerService } from './producer.service';
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: '127.0.0.1',
        port: 6379,
      },
    }),
    BullModule.registerQueue({
      name: 'QUEUE',
    }),
  ],
  controllers: [ProducerController],
  providers: [ProducerService],
})
export class ProducerModule {}

Producer 컨트롤러와 Producer 서비스로 Producer 모듈을 구성하고 Redis에 대한 연결 정보로 @nestjs/bull의 BullModule을 등록합니다. BullModule을 등록한 후 추가할 작업을 위한 Queue를 등록합니다.

producer.controller.ts 파일을 수정합니다.

producer.controller.ts
├  apps
│  └  producer
│     └  src
│        └  producer.controller.ts
└  ...
import { Controller, Get, HttpStatus, Res } from '@nestjs/common';
import { ApiResponse, ApiTags } from '@nestjs/swagger';
import { Response } from 'express';
import { ProducerService } from './producer.service';

@ApiTags('Bull')
@Controller('/bull')
export class ProducerController {
  constructor(private readonly producerService: ProducerService) {}

  @Get('/produce')
  @ApiResponse({ status: HttpStatus.ACCEPTED, description: 'Accepted' })
  async produce(@Res() res: Response) {
    res.status(HttpStatus.ACCEPTED).send();
    this.producerService.produce();
  }
}

Producer 컨트롤러는 Swagger로부터 HTTP 요청을 수신 후 요청에 대한 Accepted 응답을 전송하고 Producer 서비스의 produce 함수를 호출합니다.

producer.service.ts 파일을 수정합니다.

producer.service.ts
├  apps
│  └  producer
│     └  src
│        └  producer.service.ts
└  ...
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';

@Injectable()
export class ProducerService {
  constructor(@InjectQueue('QUEUE') private queue: Queue) {}

  async produce() {
    try {
      const message = String(Math.floor(Math.random() * (1000 - 1)) + 1);
      this.queue.add('job', { message: message });

      console.log(`produce: ${message}`);
    } catch (error) {
      console.log(error);
    }
  }
}

Producer 서비스의 produce 함수에서 작업을 등록된 Queue에 추가합니다.


Consumer 구현

Consumer는 Queue에 추가된 작업을 처리하기 위한 Processor를 정의하고 구현합니다.

main.ts 파일을 수정합니다.

main.ts
├  apps
│  └  consumer
│     └  src
│        └  main.ts
└  ...
import { NestFactory } from '@nestjs/core';
import { ConsumerModule } from './consumer.module';

async function bootstrap() {
  const app = await NestFactory.create(ConsumerModule);
  await app.listen(3002);
}
bootstrap();

bootstrap 함수에서 Queue에 추가된 작업을 처리하기 위한 애플리케이션을 만듭니다. Bull을 사용한 대기열 구현에서 애플리케이션과 마이크로서비스 모두 Queue에 작업을 추가하거나 처리할 수 있습니다.

consumer.module.ts 파일을 수정합니다.

consumer.module.ts
├  apps
│  └  consumer
│     └  src
│        └  consumer.module.ts
└  ...
import { Module } from '@nestjs/common';
import { ConsumerProcessor } from './consumer.processor';
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: '127.0.0.1',
        port: 6379,
      },
    }),
    BullModule.registerQueue({
      name: 'QUEUE',
    }),
  ],
  providers: [ConsumerProcessor],
})
export class ConsumerModule {}

Consumer 컨트롤러와 Consumer 서비스로 Consumer 모듈을 구성하고 Redis에 대한 연결 정보로 @nestjs/bull의 BullModule을 등록합니다.
BullModule을 등록한 후 처리할 작업을 위한 Queue를 등록합니다. BullModule과 Queue에 대한 등록은 Producer와 Consumer 모두 동일한 방법으로 구현됩니다. 따라서 Producer에서 Queue의 작업을 처리하고자 할 경우 Producer에 Processor만 구현하면 별도의 Consumer를 구현할 필요 없이 Queue의 작업이 Producer에서 처리됩니다.

consumer.processor.ts 파일을 생성하고 수정합니다.

consumer.processor.ts
├  apps
│  └  consumer
│     └  src
│        └  consumer.processor.ts
└  ...
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';

@Processor('QUEUE')
export class ConsumerProcessor {
  constructor() {}

  @Process('job')
  async job(job: Job) {
    try {
      console.log(`process: ${JSON.stringify(job)}`);
    } catch (error) {
      console.log(error);
    }
  }
}

Queue에 추가된 작업은 @Processer 데코레이터와 @Process 데코레이터로 수신됩니다.
@Processer 데코레이터는 처리할 작업이 존재하는 Queue의 이름으로 정의되며 @Process 데코레이터는 처리할 작업의 이름으로 정의됩니다.
@Process 데코레이터를 정의한 job 함수에 작업 처리 요청 수신을 확인하기 위한 로그를 삽입합니다.

logo

ⓒ 2023-2024 JihyunLab. All rights reserved.

info@jihyunlab.com