tencent cloud

TDMQ for CKafka

Release Notes and Announcements
Release Notes
Broker Release Notes
Announcement
Product Introduction
Introduction and Selection of the TDMQ Product Series
What Is TDMQ for CKafka
Strengths
Scenarios
Technology Architecture
Product Series Introduction
Apache Kafka Version Support Description
Comparison with Apache Kafka
High Availability
Use Limits
Regions and AZs
Related Cloud Services
Billing
Billing Overview
Pricing
Billing Example
Changing from Postpaid by Hour to Monthly Subscription
Renewal
Viewing Consumption Details
Overdue Payments
Refund
Getting Started
Guide for Getting Started
Preparations
VPC Network Access
Public Domain Name Access
User Guide
Usage Process Guide
Configuring Account Permission
Creating Instance
Configuring Topic
Connecting Instance
Managing Messages
Managing Consumer Group
Managing Instance
Changing Instance Specification
Configuring Traffic Throttling
Configuring Elastic Scaling Policy
Configuring Advanced Features
Viewing Monitoring Data and Configuring Alarm Rules
Synchronizing Data Using CKafka Connector
Use Cases
Cluster Resource Assessment
Client Practical Tutorial
Log Integration
Open-Source Ecosystem Integration
Replacing Supporting Route (Old)
Migration Guide
Migration Solution Overview
Migrating Cluster Using Open-Source Tool
Troubleshooting
Topics
Clients
Messages
​​API Reference
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK Reference
SDK Overview
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
SDK for Connector
Security and Compliance
Permission Management
Network Security
Deletion Protection
Event Record
CloudAudit
FAQs
Instances
Topics
Consumer Groups
Client-Related
Network-Related
Monitoring
Messages
Agreements
CKafka Service Level Agreements
Contact Us
Glossary

Accessing CKafka via Schema Registry

PDF
포커스 모드
폰트 크기
마지막 업데이트 시간: 2026-01-20 17:10:14
We can serialize/deserialize classes by using Avro APIs or the Twitter Bijection class library, but the disadvantage of the two methods is that the Kafka record size will multiply as each record must be embedded with a schema. However, the schema is required for reading the records. TDMQ for CKafka (CKafka) makes it possible for data to share one schema by registering the content of the schema in Confluent Schema Registry. Kafka producers and consumers can implement serialization/deserialization by identifying the schema content in Confluent Schema Registry.




Prerequisites

You have downloaded JDK 8.
You have downloaded Confluent OSs 4.1.1.

Operation Steps

Step 1: Obtaining the Instance Access Address and Enabling Automatic Topic Creation

2. In the left sidebar, select Instance List and click the ID of the target instance to go to the basic instance information page.
3. In the Access Mode module on the basic instance information page, you can obtain the access address of the instance.


4. Enable automatic topic creation in the Automatic Topic Creation module.
Note
Automatic topic creation must be enabled, as a topic named schemas will be automatically created when OSs are started.

Step 2: Preparing Confluent Configurations

1. Modify the server address and other information in the OSs configuration file. The configuration information of the PLAINTEXT access method is as follows:
kafkastore.bootstrap.servers=PLAINTEXT://xxxx
kafkastore.topic=schemas
debug=true
The configuration information of the SASL_PLAINTEXT access method is as follows:
kafkastore.bootstrap.servers=SASL_PLAINTEXT://ckafka-xxxx.ap-xxx.ckafka.tencentcloudmq.com:50004 kafkastore.security.protocol=SASL_PLAINTEXT kafkastore.sasl.mechanism=PLAIN kafkastore.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='ckafka-xxxx#xxxx' password='xxxx';
kafkastore.topic=schemas
debug=true
Note
bootstrap.servers: Access the network and copy from the Network column of the Access Mode section on the instance details page in the CKafka console.

2. Run the following command to start Schema Registry:
bin/schema-registry-start etc/schema-registry/schema-registry.properties
The running result is as follows:



Step 3: Sending and Receiving Messages

Below is the content of the schema file:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
1. Register the schema in the topic named test. The script below is an example of registering a schema by calling an API with the curl command in the environment deployed in Schema Registry.
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \\
--data '{"schema": "{\\"type\\": \\"record\\", \\"name\\": \\"User\\", \\"fields\\": [{\\"name\\": \\"id\\", \\"type\\": \\"int\\"}, {\\"name\\": \\"name\\", \\"type\\": \\"string\\"}, {\\"name\\": \\"age\\", \\"type\\": \\"int\\"}]}"}' \\
http://127.0.0.1:8081/subjects/test/versions
2. The Kafka producer sends messages.
package schemaTest;
import java.util.Properties;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SchemaProduce {
public static final String USER_SCHEMA = "{\\"type\\": \\"record\\", \\"name\\": \\"User\\", " +
"\\"fields\\": [{\\"name\\": \\"id\\", \\"type\\": \\"int\\"}, " +
"{\\"name\\": \\"name\\", \\"type\\": \\"string\\"}, {\\"name\\": \\"age\\", \\"type\\": \\"int\\"}]}";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// Add the access address of the CKafka instance.
props.put("bootstrap.servers", "xx.xx.xx.xx:xxxx");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Use the Confluent KafkaAvroSerializer.
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// Add the address of the schema service to obtain the schema.
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Random rand = new Random();
int id = 0;
while(id < 100) {
id++;
String name = "name" + id;
int age = rand.nextInt(40) + 1;
GenericRecord user = new GenericData.Record(schema);
user.put("id", id);
user.put("name", name);
user.put("age", age);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("test", user);
producer.send(record);
Thread.sleep(1000);
}
producer.close();
}
}
After running the script for a while, go to the CKafka console, select the Topic Management tab, select the target topic, and choose More > Message Query to view the message just sent.


3. The Kafka consumer consumes messages.
package schemaTest;
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SchemaProduce {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "xx.xx.xx.xx:xxxx"); //Access address of the CKafka instance.
props.put("group.id", "schema");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Use the Confluent KafkaAvroDeserializer.
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
// Add the schema service address to obtain the schema.
props.put("schema.registry.url", "http://127.0.0.1:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(10);
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "
+ "partition = " + record.partition() + ", " + "offset = " + record.offset());
}
}
} finally {
consumer.close();
}
}
}
On the Consumer Group tab page in the CKafka console, select the consumer group named schema, enter the topic name, and click View Consumer Details to view the consumption details.


Start the consumer for consumption. Below is a screenshot of the consumption log:



도움말 및 지원

문제 해결에 도움이 되었나요?

피드백