在Node.js项目微服务架构中使用RabbitMQ通信

在Node.js项目微服务架构中使用RabbitMQ通信

消息队列(MQ)可以让系统中不同部分的应用程序或者不同的微服务之间进行异步通信。

Node.js常用的MQ有RabbitMQ、Kafka、或Redis等。

安装

搭建RabbitMQ服务器

构建一个RabbitMQ服务器的Docker容器并暴露服务端口。

bash
1
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management

5672端口是RabbitMQ的默认通信端口,15672端口是web管理界面的端口。

安装amqplib包

在Node.js项目中安装amqplib:

bash
1
npm install -S amqplib

使用

连接RabbitMQ服务器并发送消息到队列

创建一个消息生产者,连接RabbitMQ并发送消息到队列:

js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
// 创建一个通道
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}

const queue = 'task_queue';

// 声明一个队列
channel.assertQueue(queue, {
durable: true
});

// 发送消息到队列
const msg = 'Hello World';

channel.sendToQueue(queue, Buffer.from(msg), {
persistent: true
});

console.log(" [x] Sent '%s'", msg);
});

// 关闭连接
setTimeout(function() {
connection.close();
process.exit(0);
}, 500);
});

从队列中消费消息

创建一个消息消费者,从消息队列中取出消息并消费:

js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}

connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}

const queue = 'task_queue';

channel.assertQueue(queue, {
durable: true
});

channel.prefetch(1);

console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);

// 消费消息
channel.consume(queue, function(msg) {
const secs = msg.content.toString().split('.').length - 1;

console.log(" [x] Received %s", msg.content.toString());

setTimeout(function() {
console.log(" [x] Done");
channel.ack(msg);
}, secs * 1000);
}, {
noAck: false
});
});
});

持久化

  1. 队列持久化

消息消费者连接队列时开启durable: true可以实现队列持久化。

  1. 消息持久化

消息生产者发送消息时开启persistent: true可以实现消息持久化。

评论