Nodejs中使用Kafka

首先,需要安装kafka支持程序包:

npm i kafkajs

然后,编写producer如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 导入所需的模块
const Kafka = require('kafkajs')

async function produce() {

// 创建一个 Kafka 实例,配置客户端ID和代理服务器地址(broker)
const kafka = new Kafka.Kafka({
clientId: 'test', // 客户端ID,用于在 Kafka 中标识此客户端
brokers: ['localhost:9092'], // 代理服务器地址(broker),这里使用本地地址和默认端口
})

// 创建一个生产者实例
const producer = await kafka.producer()

// 连接到 Kafka 代理服务器
await producer.connect()

// 发送消息到指定主题
await producer.send({
topic: 'task-1', // 指定要发送消息的主题
messages: [
{ value: '这是一条测试数据' }, // 要发送的消息内容
{ value:"必须是字符串"}
]
})

// 断开与 Kafka 代理服务器的连接
await producer.disconnect()
}

produce()

接下来编写consumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 导入所需的模块
const Kafka = require('kafkajs')
async function consumer() {
// 创建一个 Kafka 实例,配置客户端ID和代理服务器地址(broker)
const kafka = new Kafka.Kafka({
clientId: 'xiaoMan', // 客户端ID,用于在 Kafka 中标识此客户端
brokers: ['47.93.16.109:8013'] // 代理服务器地址(broker),这里使用本地地址和默认端口
})

// 创建一个消费者实例,指定消费者组ID
const consumer = await kafka.consumer({ groupId: 'my-group' })

// 连接到 Kafka 代理服务器
await consumer.connect()

// 订阅指定主题的消息,从头开始消费
await consumer.subscribe({ topic: 'task-1', fromBeginning: true })

// 启动消费者并处理每条消息
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
value: message.value.toString(),
})
},
})
}

consumer()