tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

Node.js SDK

PDF
聚焦模式
字号
最后更新时间: 2026-01-05 15:20:02

操作场景

本文介绍使用 Node.js 客户端连接 CKafka 弹性 Topic 并收发消息的操作步骤。

前提条件

操作步骤

步骤1:准备环境

安装 C++依赖库

1. 执行以下命令切换到 yum 源配置目录 /etc/yum.repos.d/
cd /etc/yum.repos.d/
2. 创建 yum 源配置文件 confluent.repo。
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
3. 执行以下命令安装 C++ 依赖库。
yum install librdkafka-devel

安装 Node.js 依赖库

1. 执行以下命令为预处理器指定 OpenSSL 头文件路径。
export CPPFLAGS=-I/usr/local/opt/openssl/include
2. 执行以下命令为连接器指定 OpenSSL 库路径。
export LDFLAGS=-L/usr/local/opt/openssl/lib
3. 执行以下命令安装 Node.js 依赖库。
npm install i --unsafe-perm node-rdkafka

步骤2:创建 Topic 和订阅关系

1. 在控制台的弹性 Topic 列表页面创建一个 Topic。


2. 单击 Topic 的 “ID” 进入基本信息页面,获取用户名、密码和地址信息。


3. 订阅关系页签,新建一个订阅关系(消费组)。
img



步骤2:添加配置文件

module.exports = {
'sasl_plain_username': 'your_user_name',
'sasl_plain_password': 'your_user_password',
'bootstrap_servers': ["xxx.xx.xx.xx:port"],
'topic_name': 'xxx',
'group_id': 'xxx'
}
参数
描述
bootstrapServers
接入地址,在控制台的弹性 Topic 基本信息页面获取。



sasl_plain_username
用户名,在控制台的弹性 Topic 基本信息页面获取。
sasl_plain_password
用户密码,在控制台的弹性 Topic 基本信息页面获取。
topic_name
Topic 名称,在控制台的弹性 Topic 基本信息页面获取。
group.id
消费组名称,在控制台的弹性 Topic 的订阅关系列表获取。



步骤3:生产消息

1. 编写生产消息程序 producer.js
const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log("features:" + Kafka.features);
console.log(Kafka.librdkafkaVersion);

var producer = new Kafka.Producer({
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'dr_cb': true,
'dr_msg_cb': true,
'security.protocol' : 'SASL_PLAINTEXT',
'sasl.mechanisms' : 'PLAIN',
'sasl.username' : config['sasl_plain_username'],
'sasl.password' : config['sasl_plain_password']
});

var connected = false

producer.setPollInterval(100);

producer.connect();

producer.on('ready', function() {
connected = true
console.log("connect ok")

});

function produce() {
try {
producer.produce(
config['topic_name'],
new Buffer('Hello CKafka SASL'),
null,
Date.now()
);
} catch (err) {
console.error('Error occurred when sending message(s)');
console.error(err);
}
}

producer.on("disconnected", function() {
connected = false;
producer.connect();
})

producer.on('event.log', function(event) {
console.log("event.log", event);
});

producer.on("error", function(error) {
console.log("error:" + error);
});

producer.on('delivery-report', function(err, report) {
console.log("delivery-report: producer ok");
});
// Any errors we encounter, including connection errors
producer.on('event.error', function(err) {
console.error('event.error:' + err);
})

setInterval(produce,1000,"Interval");
2. 执行以下命令发送消息。
node producer.js
3. 查看运行结果。



步骤4:消费消息

1. 创建消费消息程序 consumer.js。
consumer.on('event.log', function(event) {
console.log("event.log", event);
});

consumer.on('error', function(error) {
console.log("error:" + error);
});

consumer.on('event', function(event) {
console.log("event:" + event);
});
const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log(Kafka.features);
console.log(Kafka.librdkafkaVersion);
console.log(config)

var consumer = new Kafka.KafkaConsumer({
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'security.protocol' : 'SASL_PLAINTEXT',
'sasl.mechanisms' : 'PLAIN',
'message.max.bytes': 32000,
'fetch.message.max.bytes': 32000,
'max.partition.fetch.bytes': 32000,
'sasl.username' : config['sasl_plain_username'],
'sasl.password' : config['sasl_plain_password'],
'group.id' : config['group_id']
});

consumer.connect();

consumer.on('ready', function() {
console.log("connect ok");
consumer.subscribe([config['topic_name']]);
consumer.consume();
})

consumer.on('data', function(data) {
console.log(data);
});

consumer.on('event.log', function(event) {
console.log("event.log", event);
});

consumer.on('error', function(error) {
console.log("error:" + error);
});

consumer.on('event', function(event) {
console.log("event:" + event);
});
2. 执行以下命令消费消息。
node consumer.js
3. 查看运行结果。



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈