This document describes how to access CKafka to send/receive messages with the SDK for Node.js through SASL_PLAINTEXT over public network.
Run the following command to switch to the yum
source configuration directory /etc/yum.repos.d/
.
cd /etc/yum.repos.d/
Create the yum
source configuration file confluent.repo
.
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1
Run the following command to install the C++ dependent library.
yum install librdkafka-devel
Run the following command to specify the OpenSSL header file path for the preprocessor.
export CPPFLAGS=-I/usr/local/opt/openssl/include
Run the following command to specify the OpenSSL library path for the connector.
export LDFLAGS=-L/usr/local/opt/openssl/lib
Run the following command to install the Node.js dependent library.
npm install i --unsafe-perm node-rdkafka
Create the CKafka configuration file setting.js
.
module.exports = {
'sasl_plain_username': 'ckafka-xxxxxxx#ckafkademo',
'sasl_plain_password': 'ckafkademo123',
'bootstrap_servers': ["xxx.ckafka.tencentcloudmq.com:6018"],
'topic_name': 'xxx',
'group_id': 'xxx'
}
Parameter | Description |
---|---|
sasl_plain_username | Username in the format of instance ID + # + username . The instance ID can be obtained in Basic Info on the instance details page in the CKafka console, and the username is set when the user is created in User Management |
sasl_plain_password | User password, which is set when the user is created in User Management on the instance details page in the CKafka console. |
bootstrap_servers | SASL access point, which can be obtained in Basic Info > Access Mode on the instance details page in the CKafka console.![]() |
topic_name | Topic name, which can be created and obtained in Topic Management on the instance details page in the CKafka console. ![]() |
group_id | Consumer group ID, which can be customized based on business requirements. |
producer.js
.const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log("features:" + Kafka.features);
console.log(Kafka.librdkafkaVersion);
var producer = new Kafka.Producer({
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'dr_cb': true,
'dr_msg_cb': true,
'security.protocol' : 'SASL_PLAINTEXT',
'sasl.mechanisms' : 'PLAIN',
'sasl.username' : config['sasl_plain_username'],
'sasl.password' : config['sasl_plain_password']
});
var connected = false
producer.setPollInterval(100);
producer.connect();
producer.on('ready', function() {
connected = true
console.log("connect ok")
});
function produce() {
try {
producer.produce(
config['topic_name'],
new Buffer('Hello CKafka SASL'),
null,
Date.now()
);
} catch (err) {
console.error('Error occurred when sending message(s)');
console.error(err);
}
}
producer.on("disconnected", function() {
connected = false;
producer.connect();
})
producer.on('event.log', function(event) {
console.log("event.log", event);
});
producer.on("error", function(error) {
console.log("error:" + error);
});
producer.on('delivery-report', function(err, report) {
console.log("delivery-report: producer ok");
});
// Any errors we encounter, including connection errors
producer.on('event.error', function(err) {
console.error('event.error:' + err);
})
setInterval(produce,1000,"Interval");
Execute the following command to send messages.
node producer.js
View the execution result.
On the Topic Management tab page on the instance details page in the CKafka console, select the target topic, and click More > Message Query to view the message just sent.
consumer.js
.consumer.on('event.log', function(event) {
console.log("event.log", event);
});
consumer.on('error', function(error) {
console.log("error:" + error);
});
consumer.on('event', function(event) {
console.log("event:" + event);
});const Kafka = require('node-rdkafka');
const config = require('./setting');
console.log(Kafka.features);
console.log(Kafka.librdkafkaVersion);
console.log(config)
var consumer = new Kafka.KafkaConsumer({
'api.version.request': 'true',
'bootstrap.servers': config['bootstrap_servers'],
'security.protocol' : 'SASL_PLAINTEXT',
'sasl.mechanisms' : 'PLAIN',
'message.max.bytes': 32000,
'fetch.message.max.bytes': 32000,
'max.partition.fetch.bytes': 32000,
'sasl.username' : config['sasl_plain_username'],
'sasl.password' : config['sasl_plain_password'],
'group.id' : config['group_id']
});
consumer.connect();
consumer.on('ready', function() {
console.log("connect ok");
consumer.subscribe([config['topic_name']]);
consumer.consume();
})
consumer.on('data', function(data) {
console.log(data);
});
consumer.on('event.log', function(event) {
console.log("event.log", event);
});
consumer.on('error', function(error) {
console.log("error:" + error);
});
consumer.on('event', function(event) {
console.log("event:" + event);
});
Execute the following command to send messages.
node consumer.js
View the execution result.
On the Consumer Group tab page on the instance details page in the CKafka console, select the corresponding consumer group name, enter the topic name, and click Query Details to view the consumption details.
Was this page helpful?