We can serialize/deserialize classes by using Avro APIs or the Twitter Bijection class library, but the disadvantage of the two methods is that the Kafka record size will multiply as each record must be embedded with a schema. However, the schema is required for reading the records.
CKafka makes it possible for data to share one schema by registering the content of the schema in Confluent Schema Registry. Kafka producers and consumers can implement serialization/deserialization by identifying the schema content in Confluent Schema Registry.
Note:Automatic topic creation must be enabled as a topic named
schemas
will be automatically created when OSS is started.
kafkastore.bootstrap.servers=PLAINTEXT://xxxx
kafkastore.topic=schemas
debug=true
<blockquote class="rno-document-tips rno-document-tips-explain"> <div class="rno-document-tips-body"> <i class="rno-document-tip-icon"></i> <div class="rno-document-tip-title">Note</div> <div class="rno-document-tip-desc"><p><code>bootstrap.servers</code>: Access network, which can be copied in the <strong>Network</strong> column in the <strong>Access Mode</strong> module on the instance details page in the <a href="https://console.tencentcloud.com/ckafka">CKafka console</a>.<br> <img src="https://main.qcloudimg.com/raw/6b12eca18662d26a334d55b743c825ef.png" alt=""></p></div> </div></blockquote>
Run the following command to start Schema Registry.
bin/schema-registry-start etc/schema-registry/schema-registry.properties
The execution result is as follows:
Step 3. Receive/Send messages
Below is the content of the schema file:
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
test
.curl
command in the environment deployed in Schema Registry.curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"id\", \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' \
http://127.0.0.1:8081/subjects/test/versions
package schemaTest;
import java.util.Properties;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SchemaProduce {
public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
"\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
"{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// Add the access address of the CKafka instance
props.put("bootstrap.servers", "xx.xx.xx.xx:xxxx");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Use the Confluent `KafkaAvroSerializer`
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// Add the schema service address to obtain the schema
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<string, genericrecord=""> producer = new KafkaProducer<>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Random rand = new Random();
int id = 0;
while(id < 100) {
id++;
String name = "name" + id;
int age = rand.nextInt(40) + 1;
GenericRecord user = new GenericData.Record(schema);
user.put("id", id);
user.put("name", name);
user.put("age", age);
ProducerRecord<string, genericrecord=""> record = new ProducerRecord<>("test", user);
producer.send(record);
Thread.sleep(1000);
}
producer.close();
}
}
After running the script for a while, go to the CKafka console, select the Topic Management tab on the instance details page, select the topic, and click More > Message Query to view the message just sent.
package schemaTest;
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SchemaProduce {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "xx.xx.xx.xx:xxxx"); // Access address of the CKafka instance
props.put("group.id", "schema");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// Use the Confluent `KafkaAvroDeserializer`
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
// Add the schema service address to obtain the schema
props.put("schema.registry.url", "http://127.0.0.1:8081");
KafkaConsumer<string, genericrecord=""> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
try {
while (true) {
ConsumerRecords<string, genericrecord=""> records = consumer.poll(10);
for (ConsumerRecord<string, genericrecord=""> record : records) {
GenericRecord user = record.value();
System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "
+ "partition = " + record.partition() + ", " + "offset = " + record.offset());
}
}
} finally {
consumer.close();
}
}
}
On the Consumer Group tab page in the CKafka console, select the consumer group named schema
, enter the topic name, and click View Consumer Details to view the consumption details.
Start the consumer for consumption. Below is a screenshot of the consumption log:
Was this page helpful?