tencent cloud

Data Lake Compute

PySparkジョブ開発ガイド

フォーカスモード
フォントサイズ
最終更新日: 2025-12-25 10:52:26

適用シーン

DLCはPython言語で書かれたプログラムの実行をサポートします。この例では、Pythonコードを記述してオブジェクトストレージ(COS)でデータを読み書きし、DLCでデータベーステーブルを作成し、テーブルを読み書きする詳細な操作を実演し、ユーザーがDLCでジョブ開発を完了するのを支援します。

環境準備

依存関係:PyCharm またはその他の Python プログラミング開発ツール。

開発フロー

開発フロー図

DLC Spark JAR ジョブ開発フロー図は以下の通りです:


リソースを作成

DLCで初めてジョブを実行する場合、「dlc-demo」などのSparkジョブリソースを作成する必要があります。
1. データレイクコンピューティング DLC コンソールにログインし、サービスが所在するリージョンを選択し、ナビゲーションメニューでデータエンジンをクリックします。
2. 左上隅のリソースを作成をクリックし、リソース構成購入ページに進みます。
3. クラスタ構成 > 計算エンジンタイプオプションで Spark ジョブエンジンを選択します。

情報設定 > リソース名に「dlc-demo」と入力します。新しいリソースの詳細については、専用データエンジンの購入を参照してください。

4. 今すぐ開通をクリックし、リソース構成情報を確認します。
5. 情報に誤りがないことを確認したら、送信をクリックして、リソース構成を完了します。

データをCOSにアップロード

「dlc-demo」という名前のバケットを作成し、people.jsonファイルをアップロードします。これはCOSからのデータ読み書きサンプル用で、people.jsonファイルの内容は以下の通りです:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":3}
{"name":"WangHua", "age":19}
{"name":"ZhangSan", "age":10}
{"name":"LiSi", "age":33}
{"name":"ZhaoWu", "age":37}
{"name":"MengXiao", "age":68}
{"name":"KaiDa", "age":89}
1. Cloud Object Storage (COS)コンソールにログインし、左側のメニューナビゲーションでバケットリストをクリックします。
2. バケットを作成
左上隅のバケットを作成をクリックし、名前欄に「dlc-dmo」と入力して、次へをクリックし設定を完了します。
3. ファイルをアップロード
ファイルリスト > ファイルをアップロードをクリックし、ローカルの「people.json」ファイルを選択して「dlc-demo-1305424723」バケットにアップロードします(-1305424723はバケット作成時にプラットフォームで生成されたランダムな文字列です)。アップロードをクリックして、ファイルのアップロードを完了します。新しいストレージバケットの作成詳細については、ストレージバケットの作成を参照してください。


新しいPythonプロジェクト

PyCharm で「demo」という名前の新しいプロジェクトを作成します。

コードを書く

1. 新しいcos.pyファイルを作成し、コードを記述します。機能はCOSからのデータの読み書きと、DLCでのデータベース作成、テーブル作成、データのクエリおよび書き込みです。
import sys
from pyspark.sql import SparkSession
from pyspark.sql import Row

if __name__ == "__main__":
spark = SparkSession\\
.builder\\
.appName("Operate data on cos")\\
.getOrCreate()

# 1. cos上のデータを読み取る。json、csv、parquet、orc、textなど、さまざまなタイプのファイルをサポートしています。
read_path = "cosn://dlc-demo-1305424723/people.json"
peopleDF = spark.read.json(read_path)
# 2. データを操作する
peopleDF.createOrReplaceTempView("people")
data_src = spark.sql("SELECT * FROM people WHERE age BETWEEN 13 AND 19")
data_src.show()
# 3. データを書き込む
write_path = "cosn://dlc-demo-1305424723/people_output"
data_src.write.csv(path=write_path, header=True, sep=",", mode='overwrite')

spark.stop()
2. 新規 db.py ファイルを作成し、DLC 上でデータベースの作成、テーブルの作成、データのクエリ、およびデータの書き込みを行うコードを記述します。
from os.path import abspath

from pyspark.sql import SparkSession



if __name__ == "__main__":

spark = SparkSession \\
.builder \\
.appName("Operate DB Example") \\
.getOrCreate()
# 1. データベースを作成
spark.sql("CREATE DATABASE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py` COMMENT 'demo test' ")
# 2. 内部テーブルを作成
spark.sql("CREATE TABLE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py`.`test`(`id` int,`name` string,`age` int) ")
3. 内部データを書き込む
spark.sql("INSERT INTO `DataLakeCatalog`.`dlc_db_test_py`.`test` VALUES (1,'Andy',12),(2,'Justin',3) ")
# 4. 内部データの照会
spark.sql("SELECT * FROM `DataLakeCatalog`.`dlc_db_test_py`.`test` ").show()
# 5. 外部テーブルの作成
spark.sql("CREATE EXTERNAL TABLE IF NOT EXISTS `DataLakeCatalog`.`dlc_db_test_py`.`ext_test`(`id` int, `name` string, `age` int) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS TEXTFILE LOCATION 'cosn://cry-1305424723/ext_test' ")
# 6. 外部データを書き込む
spark.sql("INSERT INTO `DataLakeCatalog`.`dlc_db_test_py`.`ext_test` VALUES (1,'Andy',12),(2,'Justin',3) ")
# 7. 外部データの照会
spark.sql("SELECT * FROM `DataLakeCatalog`.`dlc_db_test_py`.`ext_test` ").show()
spark.stop()
外表を作成する際には、COSにデータをアップロードする手順に従って、バケット内に対応するテーブル名のフォルダを作成し、テーブルファイルを保存できます。


デバッグ

PyCharm デバッグで構文エラーはありません。

pyファイルをCOSにアップロード

COSコンソールにログインし、前述のデータをCOSにアップロード手順を参考に、cos.pyとdb.pyをCOSにアップロードします。

Spark Jarデータジョブを作成

データジョブを作成する前に、データジョブが安全にデータにアクセスできるように、データアクセスポリシーの設定を完了する必要があります。データアクセスポリシーの設定の詳細については、データアクセスポリシーの設定を参照してください。すでにデータポリシー名が qcs::cam::uin/100018379117:roleName/dlc-demo として設定されている場合。
1. データレイクコンピューティング DLC コンソールにログインし、サービスが所在するリージョンを選択し、ナビゲーションメニューでデータジョブをクリックします。
2. 左上隅のジョブの作成ボタンをクリックし、作成ページに進みます。
3. ジョブ設定ページで、ジョブの実行パラメータを設定します。具体的な説明は以下の通りです。
設定パラメータ
説明
作業名
カスタム Spark 作業名、例:cosn_py
作業タイプ
バッチ処理タイプを選択
データエンジン
リソース作成手順で作成された dlc-demo 計算エンジンを選択
プログラムパッケージ
COSを選択し、pyファイルをCOSにアップロード手順でpyファイルをアップロード:
COSからデータを読み書きするには、以下を選択します: cosn://dlc-demo-1305424723/cos.py
DLCでデータベースやテーブルを作成するには、以下を選択します: cosn://dlc-demo-1305424723/db.py
データアクセスポリシー
この手順の前に作成したポリシー qcs::cam::uin/100018379117:roleName/dlc-demo を選択します
その他のパラメータ値はデフォルトのままにします。

4. 保存をクリックし、Spark ジョブページで作成したジョブを確認できます。

ジョブを実行して結果を確認します

1. ジョブを実行:Spark ジョブページで新しく作成したジョブを見つけ、実行をクリックすると、ジョブを実行できます。
2. ジョブの実行結果を確認:ジョブの実行ログと実行結果を確認できます。

ジョブの実行ログを確認

1. ジョブ名 > 履歴タスクをクリックして、タスクの実行状況を確認します:

2. タスクID > 実行ログをクリックして、ジョブの実行ログを確認します。

ジョブの実行結果を確認

1. COSからデータを読み書きするサンプルを実行する場合、COSコンソールでデータ書き込み結果を確認します。

2. DLCでテーブルやデータベースを作成する場合、DLCデータ探索ページでデータベースやテーブルの作成を確認します。


ヘルプとサポート

この記事はお役に立ちましたか?

フィードバック