tencent cloud

数据开发治理平台 WeData

产品动态
动态发布记录(2026年)
产品简介
产品概述
产品优势
产品架构
产品功能
应用场景
购买指南
计费概述
产品版本购买说明
执行资源购买说明
购买方式
欠费说明
退费说明
准备工作
账号和权限管理概述
添加白名单/安全组(可选)
通过 Microsoft Entra ID(Azure AD)单点登录(SSO)WeData
操作指南
管理控制台
项目管理
数据集成
Studio
数据开发
数据分析
数据科学
数据治理(with Unity Semantics)
API 文档
History
Introduction
API Category
Making API Requests
Smart Ops Related Interfaces
Project Management APIs
Resource Group APIs
Data Development APIs
Data Asset - Data Dictionary APIs
Data Development APIs
Ops Center APIs
Data Operations Related Interfaces
Data Exploration APIs
Asset APIs
Metadata Related Interfaces
Task Operations APIs
Data Security APIs
Instance Operation and Maintenance Related Interfaces
Data Map and Data Dictionary APIs
Data Quality Related Interfaces
DataInLong APIs
Platform Management APIs
Data Source Management APIs
Data Quality APIs
Platform Management APIs
Asset Data APIs
Data Source Management APIs
Data Types
Error Codes
WeData API 2025-08-06
服务等级协议
相关协议
隐私协议
数据处理和安全协议
联系我们
词汇表

PySpark

PDF
聚焦模式
字号
最后更新时间: 2025-03-04 12:21:44
注意:
需要在 EMR 集群中启动 Hive、Spark 组件服务。
1. 当前用户在 EMR 集群有权限。
2. 已在 Hive 中创建对应的数据库和表,如示例中的:wedata_demo_db。
3. PySpark 系统自动使用 cluster 模式提交任务。

代码示例

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("WeDataApp").getOrCreate()

schema = StructType([
StructField("user_id", IntegerType(), True),
StructField("user_name", StringType(), True),
StructField("age", IntegerType(), True)
])

data = [(1, "Alice", 25), (2, "Bob", 30)]
df = spark.createDataFrame(data, schema=schema)

df.show()
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WeDataApp").enableHiveSupport().getOrCreate()

df = spark.sql("SELECT * FROM WeData_demo_db.user_demo")

count = df.count()

print("The number of rows in the dataframe is:", count)

参数说明

参数
说明
Python 版本
支持 Python2、Python3。

在 PySpark 任务中使用调度资源组的 Python 环境

在调度资源组中安装 Python 库

1. 进入项目管理 > 执行资源组 > 标准调度资源组界面,单击资源详情,进入资源运维界面。

2. 在资源运维界面,单击 Python 包安装,可以安装内置的 Python 库,推荐安装 Python3 的版本。

3. 目前平台只支持内置库的安装,这里安装 sklearn 和 pandas 库,安装完成后,可以通过 Python 包查看功能,查看已安装的 Python 库。


编辑 PySpark 任务

1. 创建任务,调度资源组选中安装了 Python 包的调度资源组。
2. 编写 PySpark 代码使用 Python 库,这里使用了 pandas 和 sklearn。



from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, IntegerType, StringType import pandas as pd import sklearn spark = SparkSession.builder.appName("WeDataApp-1").getOrCreate() schema = StructType([ StructField("user_id", IntegerType(), True), StructField("user_name", StringType(), True), StructField("age", IntegerType(), True) ]) data = [(1, "Alice", 25), (2, "Bob", 30)] df = spark.createDataFrame(data, schema=schema) pandas_df = df.toPandas() df.show() print(pandas_df.head(10)) print(sklearn.__version__)

调试 PySpark 任务

1. 单击调试运行,查看调试运行的日志和结果。
示例:日志中可以查看使用调度资源组的 Python 环境作为任务运行的环境。
spark.yarn.dist.archives,file:///usr/local/python3/python3.zip#python3



2. 查看日志结果,即可查看使用安装的 pandas 库,正确打印了安装的 sklearn 库的版本。


周期调度 PySpark 任务

周期调度运行,查看调试运行的日志和结果。日志中可以查看使用调度资源组的 Python 环境作为任务运行的环境。







帮助和支持

本页内容是否解决了您的问题?

填写满意度调查问卷,共创更好文档体验。

文档反馈