{"version": "2.0","statement": [{"effect": "allow","action": ["cam:BuildDataFlowAuthToken"],"resource": ["qcs::cam::uin/<用户 uin>:resourceUser/<实例 ID>/<账号名>"]}]}




<dependency><groupId>com.tencentcloudapi</groupId><artifactId>tencentcloud-dbauth-sdk-java</artifactId><version>1.0.4</version></dependency>
<dependency><groupId>com.tencentcloudapi</groupId><artifactId>tencentcloud-sdk-java</artifactId><version>3.1.1039</version></dependency>
package com.tencentcloud.dbauth;import com.tencentcloudapi.common.Credential;import com.tencentcloud.dbauth.model.GenerateAuthenticationTokenRequest;import com.tencentcloudapi.common.exception.TencentCloudSDKException;import com.tencentcloudapi.common.profile.ClientProfile;import com.tencentcloudapi.common.profile.HttpProfile;public class GenerateDBAuthentication {public static void main(String[] args) {// 定义认证令牌的参数String region = "<实例所在地域>";String instanceId = "<实例 ID>";String userName = "<账号名>";// 从环境变量中获取凭证Credential credential = new Credential(System.getenv("<TENCENTCLOUD_SECRET_ID>"), System.getenv("<TENCENTCLOUD_SECRET_KEY>"));System.out.println(getAuthToken(region, instanceId, userName, credential));}public static String getAuthToken(String region, String instanceId, String userName, Credential credential) {try {// 实例化一个 http 选项,可选的,没有特殊需求可以跳过HttpProfile httpProfile = new HttpProfile();httpProfile.setEndpoint("cam.tencentcloudapi.com");// 实例化一个 client 选项,可选的,没有特殊需求可以跳过ClientProfile clientProfile = new ClientProfile();clientProfile.setHttpProfile(httpProfile);// 构建 GenerateAuthenticationTokenRequestGenerateAuthenticationTokenRequest tokenRequest = GenerateAuthenticationTokenRequest.builder().region(region).credential(credential).userName(userName).instanceId(instanceId).clientProfile(clientProfile) // clientProfile 是可选的.build();return DBAuthentication.generateAuthenticationToken(tokenRequest);} catch (TencentCloudSDKException e) {e.printStackTrace();}return "";}}
mysql --host=<IP 地址> --port=<端口号> --user=<账号名> --password=<密码>;
package com.tencentcloud.examples;import com.tencentcloud.dbauth.DBAuthentication;import com.tencentcloud.dbauth.model.GenerateAuthenticationTokenRequest;import com.tencentcloudapi.common.Credential;import com.tencentcloudapi.common.exception.TencentCloudSDKException;import com.tencentcloudapi.common.profile.ClientProfile;import com.tencentcloudapi.common.profile.HttpProfile;import java.sql.Connection;import java.sql.DriverManager;import java.sql.ResultSet;import java.sql.Statement;public class CAMDatabaseAuthenticationTester {public static void main(String[] args) throws Exception {// 定义连接所需的变量String region = "ap-guangzhou";String instanceId = "cynosdb-123456";String userName = "test";String host = "192.*.*.11";int port = 3306;String dbName = "mysql";String secretId = System.getenv("TENCENTCLOUD_SECRET_ID");String secretKey = System.getenv("TENCENTCLOUD_SECRET_KEY");// 获取连接Connection connection = getDBConnectionUsingCAM(secretId, secretKey, region,instanceId, userName, host, port, dbName);// 验证连接是否成功Statement stmt = connection.createStatement();ResultSet rs = stmt.executeQuery("SELECT 'Success!';");while (rs.next()) {String id = rs.getString(1);System.out.println(id); // 应打印 "Success!"}// 关闭连接stmt.close();connection.close();}/*** 使用 CAM 数据库认证获取数据库连接** @param secretId 密钥 ID* @param secretKey 密钥* @param region 地区* @param instanceId 实例 ID* @param userName 用户名* @param host 主机* @param port 端口* @param dbName 数据库名称* @return Connection 对象* @throws Exception 异常*/private static Connection getDBConnectionUsingCAM(String secretId, String secretKey, String region, String instanceId, String userName,String host, int port, String dbName) throws Exception {// 从 secretId 和 secretKey 获取凭证Credential credential = new Credential(secretId, secretKey);// 定义最大尝试次数int maxAttempts = 3;Exception lastException = null;for (int attempt = 1; attempt <= maxAttempts; attempt++) {try {// 使用凭证获取认证令牌String authToken = getAuthToken(region, instanceId, userName, credential);String connectionUrl = String.format("jdbc:mysql://%s:%d/%s", host, port, dbName);return DriverManager.getConnection(connectionUrl, userName, authToken);} catch (Exception e) {lastException = e;System.out.println("Attempt " + attempt + " failed.");Thread.sleep(5000);}}System.out.println("All attempts failed. error: " + lastException.getMessage());throw lastException;}/*** 获取认证令牌** @param region 区域* @param instanceId 实例 ID* @param userName 用户名* @param credential 凭证* @return 认证令牌*/private static String getAuthToken(String region, String instanceId, String userName, Credential credential) throws TencentCloudSDKException {// 实例化一个 http 选项,可选的,没有特殊需求可以跳过HttpProfile httpProfile = new HttpProfile();httpProfile.setEndpoint("cam.tencentcloudapi.com");// 实例化一个 client 选项,可选的,没有特殊需求可以跳过ClientProfile clientProfile = new ClientProfile();clientProfile.setHttpProfile(httpProfile);// 构建 GenerateAuthenticationTokenRequestGenerateAuthenticationTokenRequest tokenRequest = GenerateAuthenticationTokenRequest.builder().region(region).credential(credential).userName(userName).instanceId(instanceId).clientProfile(clientProfile) // clientProfile 是可选的.build();return DBAuthentication.generateAuthenticationToken(tokenRequest);}}



pip install git+https://github.com/TencentCloud/dbauth-sdk-python.git
import loggingimport osimport timeimport pymysqlfrom dbauth.db_authentication import DBAuthenticationfrom dbauth.model.generate_authentication_token_request import GenerateAuthenticationTokenRequestfrom tencentcloud.common import credentialfrom tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKExceptionfrom tencentcloud.common.profile.client_profile import ClientProfilefrom tencentcloud.common.profile.http_profile import HttpProfile# 配置 root loggerlogging.basicConfig(level=logging.INFO,format='[%(asctime)s] - [%(threadName)s] - {%(module)s:%(funcName)s:%(lineno)d} %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M:%S')log = logging.getLogger(__name__)def main():region = "ap-guangzhou"instance_id = "cynosdb-123456"user_name = "camtest"host = "192.*.*.11"port = 3306db_name = "test"secret_id = os.environ['AK']secret_key = os.environ['SK']connection = Nonetry:# 获取连接connection = get_db_connection_using_cam(secret_id, secret_key, region,instance_id, user_name, host, port, db_name)# 验证连接是否成功with connection.cursor() as cursor:cursor.execute("SELECT 'Success!';")result = cursor.fetchone()log.info(result[0]) # 应该打印 "Success!"except Exception as e:log.error(f"An error occurred: {e}")finally:if connection and connection.open:connection.close()def get_db_connection_using_cam(secret_id, secret_key, region, instance_id, user_name, host, port, db_name):cred = credential.Credential(secret_id, secret_key)max_attempts = 3last_exception = Nonefor attempt in range(1, max_attempts + 1):try:auth_token = get_auth_token(region, instance_id, user_name, cred)connection = pymysql.connect(host=host,port=port,user=user_name,password=auth_token,database=db_name)return connectionexcept Exception as e:last_exception = elog.info(f"Attempt {attempt} failed.")time.sleep(5)log.error(f"All attempts failed. error: {last_exception}")raise last_exceptiondef get_auth_token(region, instance_id, user_name, cred):try:# 实例化一个 http 选项,可选的,没有特殊需求可以跳过http_profile = HttpProfile()http_profile.endpoint = "cam.tencentcloudapi.com"# 实例化一个 client 选项,可选的,没有特殊需求可以跳过client_profile = ClientProfile()client_profile.httpProfile = http_profilerequest = GenerateAuthenticationTokenRequest(region=region,instance_id=instance_id,user_name=user_name,credential=cred,client_profile=client_profile, # 可选)return DBAuthentication.generate_authentication_token(request)except TencentCloudSDKException as err:log.error(err)raiseif __name__ == "__main__":main()

go get -v -u github.com/tencentcloud/dbauth-sdk-go
package mainimport ("database/sql""fmt""os""time"_ "github.com/go-sql-driver/mysql""github.com/sirupsen/logrus""github.com/tencentcloud/dbauth-sdk-go/dbauth""github.com/tencentcloud/dbauth-sdk-go/dbauth/model""github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common""github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile")func init() {logrus.SetOutput(os.Stdout)logrus.SetFormatter(&logrus.TextFormatter{FullTimestamp: true})logrus.SetLevel(logrus.InfoLevel)}func main() {// 定义数据库连接参数region := "ap-guangzhou"instanceId := "cynosdb-123456"userName := "camtest"host := "192.*.*.11"port := 3306dbName := "test"ak := os.Getenv("TENCENTCLOUD_SECRET_ID")sk := os.Getenv("TENCENTCLOUD_SECRET_KEY")// 获取连接connection, err := getDBConnectionUsingCam(ak, sk, region, instanceId, userName, host, port, dbName)if err != nil {logrus.Error("Failed to get connection:", err)return}// 验证连接是否成功stmt, err := connection.Query("SELECT 'Success!';")if err != nil {logrus.Error("Failed to execute query:", err)return}for stmt.Next() {var result stringstmt.Scan(&result)logrus.Info(result) // Success!}// 关闭连接if err := stmt.Close(); err != nil {logrus.Error("Failed to close statement:", err)}if err := connection.Close(); err != nil {logrus.Error("Failed to close connection:", err)}}// 使用 CAM 获取数据库连接func getDBConnectionUsingCam(secretId, secretKey, region, instanceId, userName, host string, port int, dbName string) (*sql.DB, error) {credential := common.NewCredential(secretId, secretKey)maxAttempts := 3var lastErr errorfor attempt := 1; attempt <= maxAttempts; attempt++ {// 获取认证 TokenauthToken, err := getAuthToken(region, instanceId, userName, credential)if err != nil {return nil, err}connectionUrl := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", userName, authToken, host, port, dbName)db, err := sql.Open("mysql", connectionUrl)if err != nil {lastErr = errlogrus.Warnf("Open connection failed. Attempt %d failed.", attempt)time.Sleep(5 * time.Second)continue}if err = db.Ping(); err != nil {lastErr = errlogrus.Warnf("Ping failed. Attempt %d failed.", attempt)time.Sleep(5 * time.Second)continue}return db, nil}logrus.Error("All attempts failed. error:", lastErr)return nil, lastErr}// 获取认证 Tokenfunc getAuthToken(region, instanceId, userName string, credential *common.Credential) (string, error) {// 实例化一个 client 选项,可选的,没有特殊需求可以跳过cpf := profile.NewClientProfile()cpf.HttpProfile.Endpoint = "cam.tencentcloudapi.com"// 创建一个 GenerateAuthenticationTokenRequest 对象,ClientProfile 是可选的tokenRequest, err := model.NewGenerateAuthenticationTokenRequest(region, instanceId, userName, credential, cpf)if err != nil {logrus.Errorf("Failed to create GenerateAuthenticationTokenRequest: %v", err)return "", err}return dbauth.GenerateAuthenticationToken(tokenRequest)}
文档反馈