当前位置: 首页 > 文档资料 > NestJS 中文文档 >

自定义传输

优质
小牛编辑
131浏览
2023-12-01

自定义传输

Nest provides TCP and Redis as a built-in transport methods. It makes prototyping incredibly fast & easy, but sometimes you might want to use another type of transport, e.g. RabbitMQ messaging. Is it possible? Yes, sure.

Nest提供TCPRedis作为内置传输方法。这些方法使得原型化特别容易,特别迅速。但是,有时候你可能想使用其他类型的传输类型,比如RabbitMQ messaging

You can port any transport strategy to Nest. You only have to create a class, which extends Server and implements CustomTransportStrategy interface.

你只需要创建一个类,就可以将任何传输方式应用到Nest中。因为这个类可以扩展服务器并实现CustomTransportStrategy接口。

The Server class provides getHandlers() method, which returns MessagePattern mappings (object, where key is a pattern and value is a callback), while CustomTransportStrategy forces on you to implement both listen() and close() methods.

该服务器类提供getHandlers()方法,该方法返回MessagePattern映射(对象,该对象中,key是模式,值是回调函数),CustomTransportStrategy迫使你实现listen()andclose()方法。

Let's create a simple RabbitMQServer class. We will use ampqlib library.

让我们使用amprlib库创建一个简单的RabbitMQServer类。

import * as amqp from 'amqplib';
import { Server, CustomTransportStrategy } from '@nestjs/microservices';
import { Observable } from 'rxjs/Observable';

export class RabbitMQServer extends Server implements CustomTransportStrategy {
    private server = null;
    private channel = null;

    constructor(
        private readonly host: string,
        private readonly queue: string) {
            super();
        }

    public async listen(callback: () => void) {
        await this.init();
        this.channel.consume(`${this.queue}_sub`, this.handleMessage.bind(this), { noAck: true });
    }

    public close() {
        this.channel && this.channel.close();
        this.server && this.server.close();
    }

    private handleMessage(message) {
        const { content } = message;
        const msg = JSON.parse(content.toString());

        const handlers = this.getHandlers();
        const pattern = JSON.stringify(msg.pattern);
        if (!this.messageHandlers[pattern]) {
            return;
        }

        const handler = this.messageHandlers[pattern];
        const response$ = handler(msg.data) as Observable<any>;
        response$ && this.send(response$, (data) => this.sendMessage(data));
    }

    private sendMessage(message) {
        this.channel.sendToQueue(`${this.queue}_pub`, Buffer.from(JSON.stringify(message)));
    }

    private async init() {
        this.server = await amqp.connect(this.host);
        this.channel = await this.server.createChannel();
        this.channel.assertQueue(`${this.queue}_sub`, { durable: false });
        this.channel.assertQueue(`${this.queue}_pub`, { durable: false });
    }
}

The most interesting method is handleMessage(). Its resposibility is to match pattern with appropriate handler and call it with received data. Also, notice that I used send() method inherited from Server class. You should use it too if you want to avoid e.g. sending disposed message when Observable is completed.

最有趣的方法是handleMessage()。该方法负责用合适的handler匹配模式,并且用接收的数据调用该模式。请注意,我使用的是从服务器类继承的send()方法。你也应该使用该方法避免Observable完成后发送设置信息。

Last step is to set-up our RabbitMQ strategy:

最后一个步骤是设置我们的RabbitMQ方法:

const app = NestFactory.createMicroservice(ApplicationModule, {
    strategy: new RabbitMQServer('amqp://localhost', 'example'),
});

It's everything!

客户端

The RabbitMQ server is listening for messages. Now, we must create a client class, which should extends built-in ClientProxy. We only have to override abstract sendSingleMessage() method.

RabbitMQ服务器监听消息。现在,我们必须创建一个可以扩展内置ClientProxy的客户端类。

Let's create RabbitMQClient class:

让我们来创建一个RabbitMQClient类。

import * as amqp from 'amqplib';
import { ClientProxy } from '@nestjs/microservices';

export class RabbitMQClient extends ClientProxy {
    constructor(
        private readonly host: string,
        private readonly queue: string) {
            super();
        }

    protected async sendSingleMessage(msg, callback: (err, result, disposed?: boolean) => void) {
        const server = await amqp.connect(this.host);
        const channel = await server.createChannel();
        const sub = this.getSubscriberQueue();
        const pub = this.getPublisherQueue();

        channel.assertQueue(sub, { durable: false });
        channel.assertQueue(pub, { durable: false });

        channel.consume(pub, (message) => this.handleMessage(message, server, callback), { noAck: true });
        channel.sendToQueue(sub, Buffer.from(JSON.stringify(msg)));
    }

    private handleMessage(message, server, callback: (err, result, disposed?: boolean) => void) {
        const { content } = message;
        const { err, response, disposed } = JSON.parse(content.toString());
        if (disposed) {
            server.close();
        }
        callback(err, response, disposed);
    }

    private getPublisherQueue(): string {
        return `${this.queue}_pub`;
    }

    private getSubscriberQueue(): string {
        return `${this.queue}_sub`;
    }
}

How to use it? There is nothing special, just create an instance:

该怎么使用它呢?只需要创建一个实例即可。

export class ClientController {
    private readonly client = new RabbitMQClient('amqp://localhost', 'example');
}

The rest work equivalently (use send() method).

剩余的跟(use send() 方法)相同。