消息队列(MQ)可以让系统中不同部分的应用程序或者不同的微服务之间进行异步通信。
Node.js常用的MQ有RabbitMQ、Kafka、或Redis等。
安装
搭建RabbitMQ服务器
构建一个RabbitMQ服务器的Docker容器并暴露服务端口。
bash1
| docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:management
|
5672端口是RabbitMQ的默认通信端口,15672端口是web管理界面的端口。
安装amqplib包
在Node.js项目中安装amqplib:
使用
连接RabbitMQ服务器并发送消息到队列
创建一个消息生产者,连接RabbitMQ并发送消息到队列:
js1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| const amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; } connection.createChannel(function(error1, channel) { if (error1) { throw error1; }
const queue = 'task_queue';
channel.assertQueue(queue, { durable: true });
const msg = 'Hello World';
channel.sendToQueue(queue, Buffer.from(msg), { persistent: true });
console.log(" [x] Sent '%s'", msg); });
setTimeout(function() { connection.close(); process.exit(0); }, 500); });
|
从队列中消费消息
创建一个消息消费者,从消息队列中取出消息并消费:
js1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| const amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(error0, connection) { if (error0) { throw error0; }
connection.createChannel(function(error1, channel) { if (error1) { throw error1; }
const queue = 'task_queue';
channel.assertQueue(queue, { durable: true });
channel.prefetch(1);
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
channel.consume(queue, function(msg) { const secs = msg.content.toString().split('.').length - 1;
console.log(" [x] Received %s", msg.content.toString());
setTimeout(function() { console.log(" [x] Done"); channel.ack(msg); }, secs * 1000); }, { noAck: false }); }); });
|
持久化
- 队列持久化
消息消费者连接队列时开启durable: true
可以实现队列持久化。
- 消息持久化
消息生产者发送消息时开启persistent: true
可以实现消息持久化。