tencent cloud

Data Lake Compute

Python にアクセス

PDF
フォーカスモード
フォントサイズ
最終更新日: 2025-12-25 11:01:13
データレイクコンピューティング DLC は、DBAPI 2.0 規格に準拠したツールを提供します。Python を使用して DLC の Presto/Spark エンジンに接続し、SQL 方式で DLC のライブラリテーブルを操作できます。

環境準備

1. Python 3.9 以降のバージョン。
2. tencentcloud-dlc-connectorをインストールします。
pip install -i https://mirrors.tencent.com/pypi/simple/ tencentcloud-dlc-connector

使用例

ステップ1:エンジンに接続する

コード:
import tdlc_connector
import datetime
from tdlc_connector import constants

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
token=None,
endpoint=None,
catalog=constants.Catalog.DATALAKECATALOG,
engine="<ENGINE>",
engine_type=constants.EngineType.AUTO,
result_style=constants.ResultStyles.LIST,
download=False,
mode=constants.Mode.LASY,
database='',
config={},
callback=None,
callback_events=None,
)
パラメータ説明:
パラメータ
説明
region
エンジンの所在地域、例えば ap-nanjing、ap-beijing、ap-guangzhou、ap-shanghai、ap-chengdu、ap-chongqing、na-siliconvalley、ap-singapore、ap-hongkong
secret_id
テンセントクラウド SecretID
secret_key
テンセントクラウド SecretKey
token
(オプション)一時キートークン
endpoint
(オプション)サービスノードに接続
engine
使用するエンジン名、例えば「test_python」
engine_type
(オプション)エンジンタイプ:対応するエンジン名のエンジンタイプ、デフォルト値 constants.EngineType.AUTO
例えば:AUTO、PRESTO、SPARK、SPARK_BATCH
result_style
(オプション)返される結果の形式、LIST/DICT から選択可能
download
(オプション)データを直接ダウンロードするかどうか True/False、詳細はダウンロードモード説明
mode
(オプション)モード。ALL/LASY/STREAM をサポート
database
(オプション)デフォルトデータベース
config
(オプション)クラスター構成に送信
callback
(オプション)コールバック関数、関数シグネチャ def cb(statement_id, status)
callback_events
(オプション)コールバックトリガーイベント、callbackと併用します。詳細はコールバックメカニズムの説明を参照してください
driver_size
(オプション)Driver ノードサイズ、デフォルト値 constants.PodSize.SMALL (SPARK_BATCH クラスターでのみ有効)
オプション値:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE
executor_size
(オプション)Executorノードサイズ、デフォルト値 constants.PodSize.SMALL(SPARK_BATCHクラスタのみ有効)<br>選択可能な値:SMALL、MEDIUM、LARGE、XLARGE、M_SMALL、M_MEDIUM、M_LARGE、M_XLARGE
executor_num
(オプション)Executor ノード数、デフォルト値1 (SPARK_BATCH クラスターでのみ有効)
executor_max_num
(オプション)Executor 最大ノード数、executor_num と異なる場合、リソース動的割り当てを有効化 (SPARK_BATCH クラスターでのみ有効)

ダウンロードモード説明
シリアル番号
download
mode
説明
1
False
ALL
インターフェースからすべてのデータを取得し、取得が完了して初めてデータをフェッチできます
2
False
LASY
インターフェースからデータを取得し、fetchのデータ量に応じてサーバーからのデータ取得を遅延させます
3
False
STREAM
LASYモード
4
True
ALL
COSからすべての結果をダウンロードします(COS読み取り権限が必要)。ローカルの一時ストレージを占有するため、データ量が大きい場合は使用を推奨します。
5
True
LASY
COSから結果をダウンロードします(COS読み取り権限が必要)。fetchのデータ量に応じてファイルのダウンロードを遅延させます
6
True
STREAM
リアルタイムでCOSから結果ストリームを読み取ります(COS読み取り権限が必要)。パフォーマンスは遅いですが、ローカルのメモリとディスク使用率は非常に低いです。

ステップ2:SQLを実行する

コード
# 基本操作

cursor = conn.cursor()
count = cursor.execute("SELECT 1")
print(cursor.fetchone()) # 一行のデータを読み取ります
for row in cursor.fetchall(): # 残りの複数行のデータを読み取ります
print(row)

{assistant_name}」機能をオンにしますか?

cursor.execute("SELECT * FROM dummy WHERE date < %s", datetime.datetime.now())
cursor.execute("SELECT * FROM dummy WHERE status in %s", (('SUCCESS', 'INIT', 'FAIL'),))
cursor.execute("SELECT * FROM dummy WHERE date < %(date)s AND status = %(status)s", {'date': datetime.datetime.now(), 'status': 'SUCCESS'})

BULK方式を使用します

cursor.executemany("INSERT INTO dummy VALUES(%s, %s)", [('張三', 18), ('李四', 20)])

基本操作プロセス

上記のコードのプロセスは以下の通りです:
1. conn.cursor()を通じてカーソルオブジェクトを作成しました。
2. cursor.execute("SELECT 1")を通じてSQLクエリ文を実行し、結果を変数countに代入しました。
3. cursor.fetchone()メソッドを呼び出して一行のデータを読み取り、それを出力しました。
4. cursor.fetchall()メソッドをループ内で呼び出して残りの複数行のデータを読み取り、行ごとに出力しました。

パラメータの受け渡し方法

2種類の異なるパラメータ渡し方法をサポートしています
pyformat形式を使用:SQL文を実行する際、%sをプレースホルダーとして使用し、実際のパラメータをタプルまたは辞書形式で渡すことができます。
BULK方式を使用:executemany()メソッドを呼び出すことで、複数のレコードをデータベーステーブルに一括挿入できます。

機能特性

コールバックメカニズムの使用説明

import tdlc_connector
import datetime
from tdlc_connector import constants


def tdlc_connector_callback(statement_id, state):
'''
parmas: statement_id タスクID
params: state タスクの状態、列挙値はconstants.TaskStatusを参照
'''
print(statement_id, state)


conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>",
engine_type=constants.EngineType.SPARK,
result_style=constants.ResultStyles.LIST,
callback=tdlc_connector_callback,
callback_events=[constants.CallbackEvent.ON_INIT, constants.CallbackEvent.ON_SUCCESS]
)

cursor = conn.cursor()
cursor.execute("SELECT 1")
cursor.fetchone()

# コールバック関数は、タスクの初期化時とタスク成功時に呼び出されます


ジョブクラスタにタスクを送信する

現在、Sparkジョブクラスターへのタスクの送信がサポートされています。具体的な例については、以下のサンプルを参照してください。
from tdlc_connector import constants

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>", # spark ジョブエンジンを選択してください
result_style=constants.ResultStyles.LIST,
driver_size=constants.PodSize.SMALL, # Driver 仕様を選択してください
executor_size=constants.PodSize.SMALL, # Executor 仕様を選択してください
executor_num=1, # Executor 数を設定してください
executor_max_num=1, # Executor 最大数を設定してください。{executor_num}と等しくない場合、動的リソース割り当てを開始します
)
説明:
この機能を使用するには、コネクタをバージョン1.1.0以上にアップグレードしてください。

自動推論エンジンタイプ

現在、エンジンを指定すると、エンジンタイプを指定する必要がなくなり、コネクタが自動的に推論します。詳細は以下の例を参照してください。
from tdlc_connector import constants

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>",
engine_type=constants.EngineType.AUTO # AUTOに設定するか、このパラメータを渡さずにドライバーに自動推論させることができます
)
説明:
この機能を使用するには、コネクタをバージョン1.1.0以上にアップグレードしてください。

空値変換

現在の結果セットはCSV形式で保存されており、エンジンはデフォルトで空値を空の文字列に変換します。空値を区別する必要がある場合は、空値の表現を指定してください。例えば「\\1」を指定すると、エンジンのクエリ結果は空値を「\\1」に変換し、ドライバーは「\\1」フィールドをNoneに変換します。詳細は以下の例を参照してください。
from tdlc_connector import constants, formats

formats.FORMAT_STRING_NULL = '\\1'

conn = tdlc_connector.connect(region="<REGION>",
secret_id="<SECRET_ID>",
secret_key="<SECRET_KEY>",
engine="<ENGINE>",
result_style=constants.ResultStyles.LIST
)
説明:
空値変換は現在SparkSQLクラスターのみサポートしています。コネクタを>= 1.1.3にアップグレードしてください。


ヘルプとサポート

この記事はお役に立ちましたか?

フィードバック