在Node.js项目微服务架构中使用Kafka通信

在Node.js项目微服务架构中使用Kafka通信

Kafka相比于RabbitMQ性能更好。

安装

安装Kafka客户端库

bash
1
npm install -S kafkajs

配置Kafka broker集群

通常会使用多个Kafka代理(broker)构成集群。

Kafka broker集群可以提高系统的可用性和吞吐量。

js
1
2
3
4
5
6
7
8
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['broker1:9092', 'broker2:9092'],
// 重试机制
retry: {
retries: 5
}
});

创建生产者

Kafka生产者向Topic发送消息:

js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
const { Kafka } = require('kafkajs');

// 创建Kafka实例并指定Kafka代理(broker)的地址
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});

// 创建一个生产者
const producer = kafka.producer();

const run = async () => {
// 连接生产者
await producer.connect();

// 发送一条消息到指定的主题
await producer.send({
topic: 'test-topic',
messages: [
{ value: 'Hello KafkaJS user!' },
],
});

// 断开连接
await producer.disconnect();
};

run().catch(console.error);

创建消费者

Kafka消费者从Topic中读取消息并消费:

js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
const { Kafka } = require('kafkajs');

// 创建Kafka实例并指定Kafka代理(broker)的地址
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});

// 创建一个消费者并指定消费的组ID
const consumer = kafka.consumer({ groupId: 'test-group' });

const run = async () => {
// 连接消费者
await consumer.connect();

// 订阅主题
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });

// 处理消息
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
});
},
});
};

run().catch(console.error);

评论