Kafka相比于RabbitMQ性能更好。
安装
安装Kafka客户端库
配置Kafka broker集群
通常会使用多个Kafka代理(broker)构成集群。
Kafka broker集群可以提高系统的可用性和吞吐量。
js1 2 3 4 5 6 7 8
| const kafka = new Kafka({ clientId: 'my-app', brokers: ['broker1:9092', 'broker2:9092'], retry: { retries: 5 } });
|
创建生产者
Kafka生产者向Topic发送消息:
js1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| const { Kafka } = require('kafkajs');
const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092'] });
const producer = kafka.producer();
const run = async () => { await producer.connect(); await producer.send({ topic: 'test-topic', messages: [ { value: 'Hello KafkaJS user!' }, ], });
await producer.disconnect(); };
run().catch(console.error);
|
创建消费者
Kafka消费者从Topic中读取消息并消费:
js1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| const { Kafka } = require('kafkajs');
const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'test-group' });
const run = async () => { await consumer.connect(); await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });
await consumer.run({ eachMessage: async ({ topic, partition, message }) => { console.log({ partition, offset: message.offset, value: message.value.toString(), }); }, }); };
run().catch(console.error);
|