自定义传输
自定义传输
Nest provides TCP and Redis as a built-in transport methods. It makes prototyping incredibly fast & easy, but sometimes you might want to use another type of transport, e.g. RabbitMQ messaging. Is it possible? Yes, sure.
Nest提供TCP
和Redis
作为内置传输方法。这些方法使得原型化特别容易,特别迅速。但是,有时候你可能想使用其他类型的传输类型,比如RabbitMQ messaging
。
You can port any transport strategy to Nest. You only have to create a class, which extends Server and implements CustomTransportStrategy interface.
你只需要创建一个类,就可以将任何传输方式应用到Nest中。因为这个类可以扩展服务器并实现CustomTransportStrategy
接口。
The Server class provides getHandlers() method, which returns MessagePattern mappings (object, where key is a pattern and value is a callback), while CustomTransportStrategy forces on you to implement both listen() and close() methods.
该服务器类提供getHandlers()
方法,该方法返回MessagePattern
映射(对象,该对象中,key是模式,值是回调函数),CustomTransportStrategy
迫使你实现listen()
andclose()
方法。
Let's create a simple RabbitMQServer class. We will use ampqlib library.
让我们使用amprlib
库创建一个简单的RabbitMQServer
类。
import * as amqp from 'amqplib';
import { Server, CustomTransportStrategy } from '@nestjs/microservices';
import { Observable } from 'rxjs/Observable';
export class RabbitMQServer extends Server implements CustomTransportStrategy {
private server = null;
private channel = null;
constructor(
private readonly host: string,
private readonly queue: string) {
super();
}
public async listen(callback: () => void) {
await this.init();
this.channel.consume(`${this.queue}_sub`, this.handleMessage.bind(this), { noAck: true });
}
public close() {
this.channel && this.channel.close();
this.server && this.server.close();
}
private handleMessage(message) {
const { content } = message;
const msg = JSON.parse(content.toString());
const handlers = this.getHandlers();
const pattern = JSON.stringify(msg.pattern);
if (!this.messageHandlers[pattern]) {
return;
}
const handler = this.messageHandlers[pattern];
const response$ = handler(msg.data) as Observable<any>;
response$ && this.send(response$, (data) => this.sendMessage(data));
}
private sendMessage(message) {
this.channel.sendToQueue(`${this.queue}_pub`, Buffer.from(JSON.stringify(message)));
}
private async init() {
this.server = await amqp.connect(this.host);
this.channel = await this.server.createChannel();
this.channel.assertQueue(`${this.queue}_sub`, { durable: false });
this.channel.assertQueue(`${this.queue}_pub`, { durable: false });
}
}
The most interesting method is handleMessage(). Its resposibility is to match pattern with appropriate handler and call it with received data. Also, notice that I used send() method inherited from Server class. You should use it too if you want to avoid e.g. sending disposed message when Observable is completed.
最有趣的方法是handleMessage()
。该方法负责用合适的handler
匹配模式,并且用接收的数据调用该模式。请注意,我使用的是从服务器类继承的send()
方法。你也应该使用该方法避免Observable
完成后发送设置信息。
Last step is to set-up our RabbitMQ strategy:
最后一个步骤是设置我们的RabbitMQ
方法:
const app = NestFactory.createMicroservice(ApplicationModule, {
strategy: new RabbitMQServer('amqp://localhost', 'example'),
});
It's everything!
客户端
The RabbitMQ server is listening for messages. Now, we must create a client class, which should extends built-in ClientProxy. We only have to override abstract sendSingleMessage() method.
RabbitMQ
服务器监听消息。现在,我们必须创建一个可以扩展内置ClientProxy
的客户端类。
Let's create RabbitMQClient class:
让我们来创建一个RabbitMQClient
类。
import * as amqp from 'amqplib';
import { ClientProxy } from '@nestjs/microservices';
export class RabbitMQClient extends ClientProxy {
constructor(
private readonly host: string,
private readonly queue: string) {
super();
}
protected async sendSingleMessage(msg, callback: (err, result, disposed?: boolean) => void) {
const server = await amqp.connect(this.host);
const channel = await server.createChannel();
const sub = this.getSubscriberQueue();
const pub = this.getPublisherQueue();
channel.assertQueue(sub, { durable: false });
channel.assertQueue(pub, { durable: false });
channel.consume(pub, (message) => this.handleMessage(message, server, callback), { noAck: true });
channel.sendToQueue(sub, Buffer.from(JSON.stringify(msg)));
}
private handleMessage(message, server, callback: (err, result, disposed?: boolean) => void) {
const { content } = message;
const { err, response, disposed } = JSON.parse(content.toString());
if (disposed) {
server.close();
}
callback(err, response, disposed);
}
private getPublisherQueue(): string {
return `${this.queue}_pub`;
}
private getSubscriberQueue(): string {
return `${this.queue}_sub`;
}
}
How to use it? There is nothing special, just create an instance:
该怎么使用它呢?只需要创建一个实例即可。
export class ClientController {
private readonly client = new RabbitMQClient('amqp://localhost', 'example');
}
The rest work equivalently (use send() method).
剩余的跟(use send()
方法)相同。