tencent cloud

消息队列 CKafka 版

动态与公告
新功能发布记录
Broker 版本升级记录
公告
产品简介
TDMQ 产品系列介绍与选型
什么是消息队列 CKafka 版
产品优势
应用场景
技术架构
产品系列介绍
开源 Kafka 版本支持说明
与开源 Kafka 对比
高可用
使用限制
地域和可用区
相关云服务
产品计费
计费概述
价格说明
计费示例
按小时付费转包年包月
续费说明
查看消费明细
欠费说明
退费说明
快速入门
入门流程指引
准备工作
VPC 网络接入
公网域名接入
用户指南
使用流程指引
配置账号权限
创建实例
配置 Topic
连接实例
管理消息
管理消费组
管理实例
变更实例规格
配置限流
配置弹性伸缩策略
配置高级特性
查看监控和配置告警
使用连接器同步数据
实践教程
集群资源评估
客户端实践教程
日志接入
开源生态对接
替换支撑路由(旧)
迁移指南
迁移方案概述
使用开源工具迁移集群
故障处理
Topic 相关
客户端相关
消息相关
API 参考
History
Introduction
API Category
Making API Requests
Other APIs
ACL APIs
Instance APIs
Routing APIs
DataHub APIs
Topic APIs
Data Types
Error Codes
SDK 参考
SDK 概述
Java SDK
Python SDK
Go SDK
PHP SDK
C++ SDK
Node.js SDK
连接器相关 SDK
安全与合规
权限管理
网络安全
删除保护
事件记录
云 API 审计
常见问题
实例相关
Topic 相关
Consumer Group 相关
客户端相关
网络问题
监控相关
消息相关
服务协议
服务等级协议
联系我们
词汇表

数据上报 SDK

PDF
聚焦模式
字号
最后更新时间: 2026-01-05 15:20:01

操作场景

本文档以 Java 语言为例,介绍客户端通过集成 Java 版本的数据上报 SDK 快捷地将数据上报到 Datahub 中的操作方法。

操作步骤

步骤1: 创建 HTTP 接入点

参见 创建 HTTP 上报接入点 在 Datahub 控制台创建一个 HTTP 接入点,获取到标识上报 EndPoint 的DatahubId。

步骤2:引入 Java SDK

在 Java 项目通过 Maven、Gradle 等方式引入数据上报 SDK。

步骤3:数据上报

引入 SDK 后,可以通过调用 SDK 的 SendMessage 接口单条/批量上报数据,整体分为四步:
1. 实例化认证对象。
2. 实例化 Client 对象。
3. SendMessage 请求上报数据。
4. 处理返回结果。
import com.tencentcloudapi.common.Credential;
import com.tencentcloudapi.common.profile.ClientProfile;
import com.tencentcloudapi.common.profile.HttpProfile;
import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import com.tencentcloudapi.ckafka.v20190819.CkafkaClient;
import com.tencentcloudapi.ckafka.v20190819.models.*;

public class SendMessage
{
public static void main(String [] args) {
try{
// 实例化一个认证对象,入参需要传入腾讯云账户secretId,secretKey,此处还需注意密钥对的保密
// 密钥可前往https://console.tencentcloud.com/cam/capi网站进行获取
Credential cred = new Credential("SecretId", "SecretKey");
// 实例化一个http选项,可选的,没有特殊需求可以跳过
HttpProfile httpProfile = new HttpProfile();
httpProfile.setEndpoint("ckafka.tencentcloudapi.com");
// 实例化一个client选项,可选的,没有特殊需求可以跳过
ClientProfile clientProfile = new ClientProfile();
clientProfile.setHttpProfile(httpProfile);
// 实例化要请求产品的client对象,clientProfile是可选的
CkafkaClient client = new CkafkaClient(cred, "ap-beijing", clientProfile);
// 实例化一个请求对象,每个接口都会对应一个request对象
SendMessageRequest req = new SendMessageRequest();
req.setDataHubId("datahub-r6gkngy3");

BatchContent[] batchContents1 = new BatchContent[2];
BatchContent batchContent1 = new BatchContent();
batchContent1.setBody("test1");
batchContents1[0] = batchContent1;

BatchContent batchContent2 = new BatchContent();
batchContent2.setBody("test2");
batchContents1[1] = batchContent2;

req.setMessage(batchContents1);

// 返回的resp是一个SendMessageResponse的实例,与请求对象对应
SendMessageResponse resp = client.SendMessage(req);
// 输出json格式的字符串回包
System.out.println(SendMessageResponse.toJsonString(resp));
} catch (TencentCloudSDKException e) {
System.out.println(e.toString());
}
}
}


步骤4:消息查询

数据发送成功后,可以在消息查询页面,查看数据是否发送成功,详情请参见 消息查询

源码 DEMO

Java
Python
Node.JS
PHP
GoLang
.Net
C++
import com.tencentcloudapi.common.Credential;
import com.tencentcloudapi.common.profile.ClientProfile;
import com.tencentcloudapi.common.profile.HttpProfile;
import com.tencentcloudapi.common.exception.TencentCloudSDKException;
import com.tencentcloudapi.ckafka.v20190819.CkafkaClient;
import com.tencentcloudapi.ckafka.v20190819.models.*;

public class SendMessage
{
public static void main(String [] args) {
try{
// 实例化一个认证对象,入参需要传入腾讯云账户secretId,secretKey,此处还需注意密钥对的保密
// 密钥可前往https://console.tencentcloud.com/cam/capi网站进行获取
Credential cred = new Credential("SecretId", "SecretKey");
// 实例化一个http选项,可选的,没有特殊需求可以跳过
HttpProfile httpProfile = new HttpProfile();
httpProfile.setEndpoint("ckafka.tencentcloudapi.com");
// 实例化一个client选项,可选的,没有特殊需求可以跳过
ClientProfile clientProfile = new ClientProfile();
clientProfile.setHttpProfile(httpProfile);
// 实例化要请求产品的client对象,clientProfile是可选的
CkafkaClient client = new CkafkaClient(cred, "ap-beijing", clientProfile);
// 实例化一个请求对象,每个接口都会对应一个request对象
SendMessageRequest req = new SendMessageRequest();
req.setDataHubId("datahub-r6gkngy3");

BatchContent[] batchContents1 = new BatchContent[2];
BatchContent batchContent1 = new BatchContent();
batchContent1.setBody("test1");
batchContents1[0] = batchContent1;

BatchContent batchContent2 = new BatchContent();
batchContent2.setBody("test2");
batchContents1[1] = batchContent2;

req.setMessage(batchContents1);

// 返回的resp是一个SendMessageResponse的实例,与请求对象对应
SendMessageResponse resp = client.SendMessage(req);
// 输出json格式的字符串回包
System.out.println(SendMessageResponse.toJsonString(resp));
} catch (TencentCloudSDKException e) {
System.out.println(e.toString());
}
}
}

import json
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.ckafka.v20190819 import ckafka_client, models
try:
cred = credential.Credential("SecretId", "SecretKey")
httpProfile = HttpProfile()
httpProfile.endpoint = "ckafka.tencentcloudapi.com"

clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = ckafka_client.CkafkaClient(cred, "ap-beijing", clientProfile)

req = models.SendMessageRequest()
params = {
"DataHubId": "datahub-r6gkngy3",
"Message": [
{
"Body": "test1"
},
{
"Body": "test2"
}
]
}
req.from_json_string(json.dumps(params))

resp = client.SendMessage(req)
print(resp.to_json_string())

except TencentCloudSDKException as err:
print(err)

// Depends on tencentcloud-sdk-nodejs version 4.0.3 or higher
const tencentcloud = require("tencentcloud-sdk-nodejs");

const CkafkaClient = tencentcloud.ckafka.v20190819.Client;

const clientConfig = {
credential: {
secretId: "SecretId",
secretKey: "SecretKey",
},
region: "ap-beijing",
profile: {
httpProfile: {
endpoint: "ckafka.tencentcloudapi.com",
},
},
};

const client = new CkafkaClient(clientConfig);
const params = {
"DataHubId": "datahub-r6gkngy3",
"Message": [
{
"Body": "test1"
},
{
"Body": "test2"
}
]
};
client.SendMessage(params).then(
(data) => {
console.log(data);
},
(err) => {
console.error("error", err);
}
);

<?php
require_once 'vendor/autoload.php';
use TencentCloud\\Common\\Credential;
use TencentCloud\\Common\\Profile\\ClientProfile;
use TencentCloud\\Common\\Profile\\HttpProfile;
use TencentCloud\\Common\\Exception\\TencentCloudSDKException;
use TencentCloud\\Ckafka\\V20190819\\CkafkaClient;
use TencentCloud\\Ckafka\\V20190819\\Models\\SendMessageRequest;
try {

$cred = new Credential("SecretId", "SecretKey");
$httpProfile = new HttpProfile();
$httpProfile->setEndpoint("ckafka.tencentcloudapi.com");

$clientProfile = new ClientProfile();
$clientProfile->setHttpProfile($httpProfile);
$client = new CkafkaClient($cred, "ap-beijing", $clientProfile);

$req = new SendMessageRequest();

$params = array(
"DataHubId" => "datahub-r6gkngy3",
"Message" => array(
array(
"Body" => "test1"
),
array(
"Body" => "test2"
)
)
);
$req->fromJsonString(json_encode($params));

$resp = $client->SendMessage($req);

print_r($resp->toJsonString());
}
catch(TencentCloudSDKException $e) {
echo $e;
}

package main

import (
"fmt"

"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/errors"
"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile"
ckafka "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/ckafka/v20190819"
)

func main() {

credential := common.NewCredential(
"SecretId",
"SecretKey",
)
cpf := profile.NewClientProfile()
cpf.HttpProfile.Endpoint = "ckafka.tencentcloudapi.com"
client, _ := ckafka.NewClient(credential, "ap-beijing", cpf)

request := ckafka.NewSendMessageRequest()

request.DataHubId = common.StringPtr("datahub-r6gkngy3")
request.Message = []*ckafka.BatchContent {
&ckafka.BatchContent {
Body: common.StringPtr("test1"),
},
&ckafka.BatchContent {
Body: common.StringPtr("test2"),
},
}

response, err := client.SendMessage(request)
if _, ok := err.(*errors.TencentCloudSDKError); ok {
fmt.Printf("An API error has returned: %s", err)
return
}
if err != nil {
panic(err)
}
fmt.Printf("%s", response.ToJsonString())
}

using System;
using System.Threading.Tasks;
using TencentCloud.Common;
using TencentCloud.Common.Profile;
using TencentCloud.Ckafka.V20190819;
using TencentCloud.Ckafka.V20190819.Models;

namespace TencentCloudExamples
{
class SendMessage
{
static void Main(string[] args)
{
try
{
Credential cred = new Credential {
SecretId = "SecretId",
SecretKey = "SecretKey"
};

ClientProfile clientProfile = new ClientProfile();
HttpProfile httpProfile = new HttpProfile();
httpProfile.Endpoint = ("ckafka.tencentcloudapi.com");
clientProfile.HttpProfile = httpProfile;

CkafkaClient client = new CkafkaClient(cred, "ap-beijing", clientProfile);
SendMessageRequest req = new SendMessageRequest();
req.DataHubId = "datahub-r6gkngy3";
BatchContent batchContent1 = new BatchContent();
batchContent1.Body = "test1";

BatchContent batchContent2 = new BatchContent();
batchContent2.Body = "test2";
req.Message = new BatchContent[] { batchContent1, batchContent2 };

SendMessageResponse resp = client.SendMessageSync(req);
Console.WriteLine(AbstractModel.ToJsonString(resp));
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
Console.Read();
}
}
}

#include <tencentcloud/core/Credential.h>
#include <tencentcloud/core/profile/ClientProfile.h>
#include <tencentcloud/core/profile/HttpProfile.h>
#include <tencentcloud/ckafka/v20190819/CkafkaClient.h>
#include <tencentcloud/ckafka/v20190819/model/SendMessageRequest.h>
#include <tencentcloud/ckafka/v20190819/model/SendMessageResponse.h>
#include <iostream>
#include <string>
#include <vector>

using namespace TencentCloud;
using namespace TencentCloud::Ckafka::V20190819;
using namespace TencentCloud::Ckafka::V20190819::Model;
using namespace std;

int main() {

Credential cred = Credential("SecretId", "SecretKey");

HttpProfile httpProfile = HttpProfile();
httpProfile.SetEndpoint("ckafka.tencentcloudapi.com");

ClientProfile clientProfile = ClientProfile();
clientProfile.SetHttpProfile(httpProfile);
CkafkaClient client = CkafkaClient(cred, "ap-beijing", clientProfile);

SendMessageRequest req = SendMessageRequest();

req.SetDataHubId("datahub-r6gkngy3");
BatchContent batchContent1;
batchContent1.SetBody("test1");
BatchContent batchContent2;
batchContent2.SetBody("test2");

vector<BatchContent> batchContents1 = {batchContent1, batchContent2};
req.SetMessage(batchContents1);

auto outcome = client.SendMessage(req);
if (!outcome.IsSuccess())
{
cout << outcome.GetError().PrintAll() << endl;
return -1;
}
SendMessageResponse resp = outcome.GetResult();
cout << resp.ToJsonString() << endl;

return 0;
}



帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈