动态发布记录(2026年)









字段分类 | | 描述 | 说明 |
服务名称 | | 模型服务的唯一标识名称。 | 建议根据业务用途命名,支持点击进入服务详情页。 |
服务状态 | | 服务当前的运行生命周期状态。 | 常见状态包括:草稿(启动中)、运行中、已停止等。 |
服务类型 | | 标识服务所属的应用领域。 | 例如:机器学习服务、大模型服务等 |
运行中版本/版本数量 | | 当下活跃服务版本/总版本数 | 展示服务组的多版本管理情况,如 0/1 表示总共 1 个版本但未启动。 |
服务组描述 | | 对该服务组的描述说明。 | 方便跨团队协作时,快速了解该服务组的用途 |
创建人 | | 创建该服务的用户账号。 | 用于审计及权限追溯。 |
创建时间 | | 服务首次被创建的时间。 | 可按时间顺序排列,方便快速查找近期服务。 |
操作 | 编辑 | 修改服务组的基本配置。 | 查看服务组详情,支持更新服务组描述、资源配置等参数。 |
| 调试 | 进入在线调试界面测试模型接口。 | 提供标准化的 REST API 调用地址(要求服务组为运行状态) |
| 新增版本 | 在同一服务下部署新的模型版本。 | 实现模型的灰度发布或 A/B 测试场景,最多支持两个服务版本 |
| 监控 | 查看服务的性能指标视图。 | 包含 QPS、并发请求数及 CPU/MEM/GPU 资源使用率监控。 |
| 日志 | 查看模型运行时的容器日志。 | 支持按实例筛选和时间范围检索,用于定位服务组异常原因 |
| 启动 | 控制服务组的运行状态。 | 点击启动会使服务组变更为运行状态,停止则释放服务资源。 |
| 删除 | 彻底移除模型服务资产及其关联资源。 | 操作不可逆,删除前需确认该服务组已停止运行。 |
{"client_request_id": "123456","dataframe_split": {"columns": ["fixed_acidity", "volatile_acidity", "citric_acid", "residual_sugar", "chlorides", "free_sulfur_dioxide", "total_sulfur_dioxide", "density", "pH", "sulphates", "alcohol"],"data": [[1, 0.8, 0.7, 0.5, 1, 1.5, 0.8, 0.7, 0.5, 0.8, 0.5]]}}
{"client_request_id": "123456","dataframe_split": {"columns": ["fixed_acidity", "volatile_acidity", "citric_acid", "residual_sugar", "chlorides", "free_sulfur_dioxide", "total_sulfur_dioxide", "density", "pH", "sulphates", "alcohol"],"data": [[1, 0.8, 0.7, 0.5, 1, 1.5, 0.8, 0.7, 0.5, 0.8, 0.5],[0.9, 0.7, 0.6, 0.4, 0.9, 1.2, 0.7, 0.6, 0.4, 0.7, 0.4]]}}
curl -X POST https://[服务调用地址]/predict \\-H 'Authorization: [鉴权Token]' \\-H 'Content-Type: application/json' \\-d '{"client_request_id":"123456","dataframe_split":{"columns":["fixed_acidity","volatile_acidity","citric_acid","residual_sugar","chlorides","free_sulfur_dioxide","total_sulfur_dioxide","density","pH","sulphates","alcohol"],"data":[[1,0.8,0.7,0.5,1,1.5,0.8,0.7,0.5,0.8,0.5]]}}'




字段分类 | 描述 | 说明 | |
ID | | 服务版本实例的唯一标识符。 | 由系统自动生成的唯一字符串,用于精准定位和区分不同的服务版本。 |
服务版本名称 | | 该服务的具体版本标识。 | 方便进行多版本管理和灰度发布。 |
状态 | | 服务版本当前的运行生命周期状态。 | 展示该版本运行状态:草稿 /运行中/已停止等。 |
描述 | | 对该服务版本的用途或特性进行说明。 | 帮助团队协作时理解各服务版本差异。 |
模型来源 | | 发布该服务所关联的底层模型资产。 | 展示该服务使用的是模型管理模块中哪个具体模型名称(如 ml_wine_db_wine_model)。 |
资源来源 | | 该版本运行所消耗的计算资源类型。 | 标识资源来自“平台按量计费资源”或“CVM 资源组”。 |
流量分配 | | 在多版本并存时,该版本接收的流量权重。 | 展示在 A/B 测试或灰度发布场景下,该版本所分配流量的百分比。 |
创建人 | | 初始化该服务版本的用户账号。 | 记录该版本的创建者,用于权限管理和操作审计。 |
创建时间 | | 该服务版本首次被创建的时间。 | 系统自动记录,方便按时间维度追踪模型迭代过程。 |
操作 | 更新 | 修改当前服务版本的配置信息。 | 查看服务版本详情,支持更新服务版本描述、资源配置等参数。 |
| 监控 | 查看该版本的实时性能指标。 | 访问 QPS、调用延迟、CPU/GPU 占用率等可视化监控视图。 |
| 日志 | 检索该服务实例的运行日志。 | 用于排查模型代码报错、输入输出异常等运行期问题。 |
| 启动/停止 | 手动控制服务实例的运行状态。 | 点击启动使服务组变更为运行状态,停止则释放服务资源。 |
| 删除 | 永久移除该服务版本。 | 操作不可逆,删除前需确认该版本已停止运行。 |

字段名称 | 描述 | 说明 |
服务名称 | 该服务的具体版本标识。 | 方便进行多版本管理和灰度发布。 |
服务 ID | 服务版本实例的唯一标识符。 | 由系统自动生成的唯一字符串,用于精准定位和区分不同的服务版本。 |
服务描述 | 对该服务版本的用途或特性进行说明。 | 帮助团队协作时理解各服务版本差异。 |
计费模式 | 资源的结算方式 | 如 Pay-as-you-go(按量计费),展示了模型运行的成本结构。 |
资源规格 | 分配给该版本的算力大小。 | 如 4C8G,决定了该版本能承载的模型复杂度及并发上限。 |
接入模型和版本 | 关联的模型资产详情。 | 显示模型管理中的模型名及具体 MLflow 版本号(如 Version2)。 |
副本数量 | 运行中的容器实例总数。 | 决定了服务的高可用性。多副本可分担流量并提供容灾保护。 |
请求限流 | 流量防护阈值。 | 展示 $QPS$ 上限。若达到阈值,系统将触发丢包或排队保护。 |
生成鉴权 | 服务安全控制开关。 | 显示是否开启了签名认证(API Key)。未开启则地址可直接访问。 |
字段/操作 | 描述 | 运维指导 |
实例名称 | 底层计算单元 (Pod) 的标识。 | 唯一标识一个运行中的微服务节点。 |
实例状态 | 实例的物理状态。 | Waiting 表示正在拉取镜像或等待调度;Running 表示服务已就绪。 |
重启次数 | 容器异常退出的频率。 | 核心排障指标。若次数不断增加,说明模型可能存在内存泄漏或代码报错。 |
监控 | 实例级监控入口。 | 查看单个实例的 $CPU/MEM$ 消耗,判断是否存在资源倾斜。 |
日志 | 容器标准输出流。 | 查看模型加载逻辑、预测报错堆栈的最直接手段。 |
重启 | 强制重启实例。 | 当实例出现僵死或响应极慢时,用于快速恢复服务的紧急手段。 |
访问容器 | 交互式终端。 | 允许进入容器内部执行命令,检查文件路径、权限或环境依赖。 |

指标分类 | 核心指标项 | 业务价值说明 |
流量信息 | 网络流量、QPS、QPS 限流次数、并发请求数 | 评估 API 调用压力;并发请求数与限流次数是判断是否需要增加副本数量的关键信号。 |
资源信息 | CPU 使用率、MEM 使用率、显存使用率、GPU 使用率 | 监控底层硬件负载;包括显存使用率和 GPU 使用率 |
实例信息 | 实例总数量、运行中实例数量 | 直观展示服务的可用性,确认实际运行的容器数是否符合配置预期。 |





"""该样例演示了如何使用 Spark Streaming 流式读取 COS 存储中的推理表,并对关键字段(特征、预测值)进行加工处理后写入 DLC 内表的处理过程。## 前提条件1. 确保该 DLC 引擎关联的 COS 访问凭证有访问推理表存储桶的权限。2. 确保模型服务开启了推理表监控。## 如何运行在修改教程中的参数后,在离线开发->编排空间中选择DLC PySpark任务运行。## 定时调度修改相应参数并调试完成后,您可以将该任务配置为定时执行,以便持续加工推理表数据。您还可以将加工后推理表数据与标签数据进行关联,生成包含标签数据的推理表,并在数据质量模块为推理表配置机器学习相关监控指标,持续监控模型质量。"""from pyspark.sql import DataFrame, SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql import types as Tfrom typing import Union, List, Dict, Optional, Sequence, Anyfrom pyspark.sql.streaming import StreamingQueryfrom pyspark.sql.types import (StructType, StructField, StringType, LongType, DoubleType,ArrayType, TimestampType)"""### 定义原始推理表数据格式原始推理表数据按照特定结构以JSON格式存储在用户指定的COS桶中,request_schema代表用户输入请求结构,response_schema代表模型输出结构,请根据模型服务对应的实际输入输出格式进行调整,其余字段为推理表固定结构,一般不需要调整。"""request_schema = StructType([StructField("client_request_id", StringType(), True),StructField("dataframe_split", StructType([StructField("columns", ArrayType(StringType()), True),StructField("data", ArrayType(ArrayType(DoubleType())), True),]), True),])request_metadata_schema = StructType([StructField("model_name", StringType(), True),StructField("model_version", StringType(), True),StructField("service_id", StringType(), True),])response_schema = StructType([StructField("predictions", ArrayType(LongType()), True),])schema = StructType([StructField("client_request_id", StringType(), True),StructField("execution_duration_ms", LongType(), True),StructField("request", request_schema, True),StructField("request_date", StringType(), True),StructField("request_metadata", request_metadata_schema, True),StructField("request_time", TimestampType(), True),StructField("requester", StringType(), True),StructField("response", response_schema, True),StructField("sampling_fraction", DoubleType(), True),StructField("status_code", LongType(), True),StructField("wedata_request_id", StringType(), True),])# 数据源参数SOURCE_PATH = "cosn://存储桶路径/" # 推理表存储路径CHECKPOINT_PATH = "cosn://存储桶路径/checkpoint" # check_point存储路径,建议与推理表存储路径一致# 结果表参数UNPACKED_TABLE_NAME = "DataLakeCatalog.test.unpacked_test_inference_table" # 结果表表名,需要为DLC内表的三段式表名称:<数据目录>.<数据库>.<数据表>MODEL_ID_COL = "model_id" # 结果表模型ID列,用于标识模型EXAMPLE_ID_COL = "example_id" # 结果表记录的唯一ID列,用户更新操作PREDICTION_COL = "prediction" # 预测值列名称FEATURE_COLUMNS = ["sepal_length", "sepal_width", "petal_length", "petal_width"] # 特征列名称,根据模型特征调整def process_requests(requests_raw: DataFrame) -> DataFrame:"""将请求特征展开为单独列,并与对应的预测结果配对Args:requests_raw: 待处理数据Return:处理后数据"""# 过滤成功请求requests_success = requests_raw.filter(F.col("status_code") == 200).drop("status_code")# 生成模型标识requests_identified = requests_success \\.withColumn(MODEL_ID_COL, F.concat(F.col("request_metadata").getItem("model_name"), F.lit("_"), F.col("request_metadata").getItem("model_version"))) \\.drop("request_metadata")# 展开特征列和预测结果# 1. 获取特征列名、特征数据、预测值requests_with_features = (requests_identified.withColumn("feature_columns", F.col("request.dataframe_split.columns")).withColumn("feature_data", F.col("request.dataframe_split.data")).withColumn("predictions", F.col("response.predictions")))# 2. 将每个数据行与对应的预测结果配对requests_exploded = (requests_with_features.withColumn("feature_prediction_pairs",F.arrays_zip(F.col("feature_data"), F.col("predictions"))).withColumn("feature_prediction_pairs",F.explode(F.col("feature_prediction_pairs"))).withColumn("feature_row", F.col("feature_prediction_pairs.feature_data")).withColumn(PREDICTION_COL, F.col("feature_prediction_pairs.predictions")))# 3. 动态创建特征列requests_with_feature_cols = requests_explodedfor i, col_name in enumerate(FEATURE_COLUMNS):requests_with_feature_cols = (requests_with_feature_cols.withColumn(col_name, F.col("feature_row")[i]))# 4. 添加记录唯一ID,便于后续进行upsert操作requests_with_example_id = requests_with_feature_cols \\.withColumn(EXAMPLE_ID_COL, F.expr("uuid()"))# 5. 清理临时列requests_processed = (requests_with_example_id.drop("feature_columns", "feature_data", "feature_prediction_pairs", "feature_row", "predictions", "request", "response"))return requests_processeddef create_table(spark: SparkSession,table_name: str,df: Optional[DataFrame] = None,partition_expr: str = None,description: Optional[str] = None,):"""创建dlc内表Args:spark: SparkSession 实例table_name: 表全称(格式:数据目录.数据库.数据表)df: 初始数据(用于推断schema)partition_expr: 分区列(优化存储查询)description: 表描述Raises:ValueError: 创建异常时抛出"""# 推断表schematable_schema = df.schema# 构建列定义columns_ddl = []for field in table_schema.fields:data_type = field.dataType.simpleString().upper()col_def = f"`{field.name}` {data_type}"if not field.nullable:col_def += " NOT NULL"columns_ddl.append(col_def)# 构建建表语句ddl = f"""CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns_ddl)})USING icebergPARTITIONED BY ({partition_expr})TBLPROPERTIES ('comment'= '{description or ''}','format-version'= '2','write.metadata.previous-versions-max'= '100','write.metadata.delete-after-commit.enabled'= 'true','smart-optimizer.inherit' = 'none','smart-optimizer.written.enable' = 'enable')"""# 打印sqlprint(f"create table ddl: {ddl}\\n")# 执行DDLtry:spark.sql(ddl)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")spark = SparkSession.builder.appName("Operate DB Example").getOrCreate()# 构建流式 DataFramedf = (spark.readStream.format("json").schema(schema).load(SOURCE_PATH))# 处理流式数据unpacked_df = process_requests(df)# 创建结果表,以request_time作为分区字段,按天分区,可以按照模型服务请求量进行调整create_table(spark, UNPACKED_TABLE_NAME, unpacked_df, 'days(request_time)')# 写入数据query = unpacked_df.writeStream \\.format("parquet") \\.outputMode("append") \\.option("checkpointLocation", CHECKPOINT_PATH) \\.trigger(once=True) \\.toTable(UNPACKED_TABLE_NAME)query.awaitTermination()spark.sql(f"select * from {UNPACKED_TABLE_NAME}").show(10)
"""该样例演示了如何使用 Spark Streaming 流式读取 COS 存储中的推理表,并对关键字段(特征、预测值)进行加工处理后写入 EMR Hive 表的处理过程。## 前提条件1. 确保该 DLC 引擎关联的 COS 访问凭证有访问推理表存储桶的权限。2. 确保模型服务开启了推理表监控。## 如何运行在修改教程中的参数后,在离线开发->编排空间中选择EMR PySpark任务运行。## 定时调度修改相应参数并调试完成后,您可以将该任务配置为定时执行,以便持续加工推理表数据。您还可以将加工后推理表数据与标签数据进行关联,生成包含标签数据的推理表,并在数据质量模块为推理表配置机器学习相关监控指标,持续监控模型质量。"""from pyspark.sql import DataFrame, SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql import types as Tfrom typing import Union, List, Dict, Optional, Sequence, Anyfrom pyspark.sql.streaming import StreamingQueryfrom pyspark.sql.types import (StructType, StructField, StringType, LongType, DoubleType,ArrayType, TimestampType)"""### 定义原始推理表数据格式原始推理表数据按照特定结构以JSON格式存储在用户指定的COS桶中,request_schema代表用户输入请求结构,response_schema代表模型输出结构,请根据模型服务对应的实际输入输出格式进行调整,其余字段为推理表固定结构,一般不需要调整。"""request_schema = StructType([StructField("client_request_id", StringType(), True),StructField("dataframe_split", StructType([StructField("columns", ArrayType(StringType()), True),StructField("data", ArrayType(ArrayType(DoubleType())), True),]), True),])request_metadata_schema = StructType([StructField("model_name", StringType(), True),StructField("model_version", StringType(), True),StructField("service_id", StringType(), True),])response_schema = StructType([StructField("predictions", ArrayType(LongType()), True),])schema = StructType([StructField("client_request_id", StringType(), True),StructField("execution_duration_ms", LongType(), True),StructField("request", request_schema, True),StructField("request_date", StringType(), True),StructField("request_metadata", request_metadata_schema, True),StructField("request_time", TimestampType(), True),StructField("requester", StringType(), True),StructField("response", response_schema, True),StructField("sampling_fraction", DoubleType(), True),StructField("status_code", LongType(), True),StructField("wedata_request_id", StringType(), True),])# 数据源参数SOURCE_PATH = "cosn://存储桶路径/" # 推理表存储路径CHECKPOINT_PATH = "cosn://存储桶路径/checkpoint" # check_point存储路径,建议与推理表存储路径一致# 结果表参数UNPACKED_TABLE_NAME = "testdb.unpacked_test_inference_table" # 结果表表名,需要为hive表名称:<数据库>.<数据表>MODEL_ID_COL = "model_id" # 结果表模型ID列,用于标识模型EXAMPLE_ID_COL = "example_id" # 结果表记录的唯一ID列,用户更新操作PREDICTION_COL = "prediction" # 预测值列名称FEATURE_COLUMNS = ["sepal_length", "sepal_width", "petal_length", "petal_width"] # 特征列名称,根据模型特征调整def process_requests(requests_raw: DataFrame) -> DataFrame:"""将请求特征展开为单独列,并与对应的预测结果配对Args:requests_raw: 待处理数据Return:处理后数据"""# 过滤成功请求requests_success = requests_raw.filter(F.col("status_code") == 200).drop("status_code")# 生成模型标识requests_identified = requests_success \\.withColumn(MODEL_ID_COL, F.concat(F.col("request_metadata").getItem("model_name"), F.lit("_"), F.col("request_metadata").getItem("model_version"))) \\.drop("request_metadata")# 展开特征列和预测结果# 1. 获取特征列名、特征数据、预测值requests_with_features = (requests_identified.withColumn("feature_columns", F.col("request.dataframe_split.columns")).withColumn("feature_data", F.col("request.dataframe_split.data")).withColumn("predictions", F.col("response.predictions")))# 2. 将每个数据行与对应的预测结果配对requests_exploded = (requests_with_features.withColumn("feature_prediction_pairs",F.arrays_zip(F.col("feature_data"), F.col("predictions"))).withColumn("feature_prediction_pairs",F.explode(F.col("feature_prediction_pairs"))).withColumn("feature_row", F.col("feature_prediction_pairs.feature_data")).withColumn(PREDICTION_COL, F.col("feature_prediction_pairs.predictions")))# 3. 动态创建特征列requests_with_feature_cols = requests_explodedfor i, col_name in enumerate(FEATURE_COLUMNS):requests_with_feature_cols = (requests_with_feature_cols.withColumn(col_name, F.col("feature_row")[i]))# 4. 添加记录唯一ID,便于后续进行upsert操作requests_with_example_id = requests_with_feature_cols \\.withColumn(EXAMPLE_ID_COL, F.expr("uuid()"))# 5. 清理临时列requests_processed = (requests_with_example_id.drop("feature_columns", "feature_data", "feature_prediction_pairs", "feature_row", "predictions", "request", "response"))return requests_processeddef create_table(spark: SparkSession,table_name: str,df: Optional[DataFrame] = None,partition_expr: str = None,description: Optional[str] = None,):"""创建hive表Args:spark: SparkSession 实例table_name: hive表全称(格式:数据库.数据表)df: 初始数据(可选,用于推断schema)partition_expr: 分区列(优化存储查询)description: 表描述Raises:ValueError: 创建异常时抛出"""# 推断表schematable_schema = df.schemapart_col_nam = partition_expr.split(' ')[0]# 构建列定义columns_ddl = []for field in table_schema.fields:# 建表时过滤掉分区列if field.name == part_col_nam:continuedata_type = field.dataType.simpleString().upper()col_def = f"`{field.name}` {data_type}"if not field.nullable:col_def += " NOT NULL"columns_ddl.append(col_def)# 构建建表语句ddl = f"""CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns_ddl)})PARTITIONED BY ({partition_expr})TBLPROPERTIES ('comment'= '{description or ''}')"""# 打印sqlprint(f"create table ddl: {ddl}\\n")# 执行DDLtry:spark.sql(ddl)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")def write_batch_to_hive(batch_df: DataFrame, batch_id: int, table_name: str):# 控制小文件:按分区列重分区out_df = batch_df.repartition(8, "dt")out_df.write.mode("append").format("hive").saveAsTable(table_name,partitionBy="dt")spark = SparkSession.builder.appName("Operate DB Example").enableHiveSupport().getOrCreate()# 构建流式 DataFramedf = (spark.readStream.format("json").schema(schema).load(SOURCE_PATH))# 处理流式数据unpacked_df = process_requests(df)# 插入分区列requests_with_dt = (unpacked_df.withColumn("dt", F.substring(F.col("request_date"), 1, 10)) # 提取 YYYY-MM-DD 格式的日期)# 创建结果表create_table(spark, UNPACKED_TABLE_NAME, unpacked_df, 'dt STRING')# 设置Hive动态分区spark.sql("SET hive.exec.dynamic.partition=true")spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")# 分批写入结果表query = (requests_with_dt.writeStream.outputMode("append").option("checkpointLocation", CHECKPOINT_PATH).trigger(once=True).foreachBatch(lambda df, id: write_batch_to_hive(df, id, UNPACKED_TABLE_NAME)).start())query.awaitTermination()# 查询结果数据spark.sql(f"select * from {UNPACKED_TABLE_NAME}").show(10)

from pyspark.sql import DataFrame, SparkSessionfrom typing import Union, List, Dict, Optional, Sequence, Any# 待关联标签的推理表表名:需要为DLC内表的三段式表名称:<数据目录>.<数据库>.<数据表>UNPACKED_TABLE_NAME = "DataLakeCatalog.test.unpacked_test_inference_table"# 标签数据表配置:可以为多个,每个表中必须包含结构(<表名>, <join是需要保留的字段名列表>, <join时用于等值条件连接的字段列表>)JOIN_TABLES = [('DataLakeCatalog.test.test_label_table', ['label', 'example_id'], ['example_id'])]# 关联标签数据后的推理表表名FULLY_QUALIFIED_TABLE_NAME = "DataLakeCatalog.test.test_fully_qualified_table_name"# 窗口大小: 限制每次运行此Notebook脚本最多处理多长时间的数据,早于此窗口的数据如果尚未被处理,将被忽略,为了保证所有数据都被处理,调度周期应该小于此窗口大小PROCESSING_WINDOW_DAYS = 10# 插入结果表时的主键字段MERGE_COLS = ["request_time", "example_id"]def create_table(spark: SparkSession,table_name: str,df: Optional[DataFrame] = None,partition_expr: str = None,description: Optional[str] = None,):"""创建dlc内表Args:spark: SparkSession 实例table_name: 表全称(格式:数据目录.数据库.数据表)df: 初始数据(用于推断schema)partition_expr: 分区列(优化存储查询)description: 表描述Raises:ValueError: 创建异常时抛出"""# 推断表schematable_schema = df.schema# 构建列定义columns_ddl = []for field in table_schema.fields:data_type = field.dataType.simpleString().upper()col_def = f"`{field.name}` {data_type}"if not field.nullable:col_def += " NOT NULL"columns_ddl.append(col_def)# 构建建表语句ddl = f"""CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns_ddl)})USING icebergPARTITIONED BY ({partition_expr})TBLPROPERTIES ('comment'= '{description or ''}','format-version'= '2','write.metadata.previous-versions-max'= '100','write.metadata.delete-after-commit.enabled'= 'true','smart-optimizer.inherit' = 'none','smart-optimizer.written.enable' = 'enable')"""# 打印sqlprint(f"create table ddl: {ddl}\\n")# 执行DDLtry:spark.sql(ddl)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")def upsert_table(spark: SparkSession,target_table_name: str,merge_cols: list,df: DataFrame,):"""使用upsert方式向目标表写入数据Args:spark: SparkSession 实例target_table_name: 结果表全称(格式:数据目录.数据库.数据表)merge_cols: 更新时的主键df: 待更新数据Raises:ValueError: 写入异常时抛出"""merge_condition = " AND ".join([f"target.{col} = source.{col}" for col in merge_cols])# 创建临时视图df.createOrReplaceTempView("source_data")# 执行MERGE操作merge_sql = f"""MERGE INTO {target_table_name} AS targetUSING source_data AS sourceON {merge_condition}WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *"""# 执行DMLtry:spark.sql(merge_sql)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")spark = SparkSession.builder.appName("Operate DB Example").getOrCreate()# 每次读取 PROCESSING_WINDOW_DAYS的数据requests_processed = spark.table(UNPACKED_TABLE_NAME) \\.filter(f"CAST(request_time AS DATE) >= current_date() - (INTERVAL {PROCESSING_WINDOW_DAYS} DAYS)")if requests_processed.count() > 0:# 关联标签表for table_name, preserve_cols, join_cols in JOIN_TABLES:join_data = spark.table(table_name)requests_processed = requests_processed.join(join_data.select(preserve_cols), on=join_cols, how="left")# 创建结果表create_table(spark, FULLY_QUALIFIED_TABLE_NAME, requests_processed, 'days(request_time)')# 写入结果表upsert_table(spark, FULLY_QUALIFIED_TABLE_NAME, MERGE_COLS, requests_processed)spark.sql(f"select * from {FULLY_QUALIFIED_TABLE_NAME}").show(10)
from pyspark.sql import DataFrame, SparkSessionfrom typing import Union, List, Dict, Optional, Sequence, Any# 待关联标签的推理表表名:需要为Hive表名称:<数据库>.<数据表>UNPACKED_TABLE_NAME = "testdb.unpacked_test_inference_table"# 标签数据表配置:可以为多个,每个表中必须包含结构(<表名>, <join是需要保留的字段名列表>, <join时用于等值条件连接的字段列表>)JOIN_TABLES = [('testdb.test_label_table', ['label', 'example_id'], ['example_id'])]# 关联标签数据后的推理表表名FULLY_QUALIFIED_TABLE_NAME = "testdb.test_fully_qualified_table_name"# 窗口大小: 限制每次运行此Notebook脚本最多处理多长时间的数据,早于此窗口的数据如果尚未被处理,将被忽略,为了保证所有数据都被处理,调度周期应该小于此窗口大小PROCESSING_WINDOW_DAYS = 10def create_table(spark: SparkSession,table_name: str,df: Optional[DataFrame] = None,partition_expr: str = None,description: Optional[str] = None,):"""创建hive表Args:spark: SparkSession 实例table_name: hive表全称(格式:数据库.数据表)df: 初始数据(可选,用于推断schema)partition_expr: 分区列(优化存储查询)description: 表描述Raises:ValueError: 创建异常时抛出"""# 推断表schematable_schema = df.schemapart_col_nam = partition_expr.split(' ')[0]# 构建列定义columns_ddl = []for field in table_schema.fields:# 建表时过滤掉分区列if field.name == part_col_nam:continuedata_type = field.dataType.simpleString().upper()col_def = f"`{field.name}` {data_type}"if not field.nullable:col_def += " NOT NULL"columns_ddl.append(col_def)# 构建建表语句ddl = f"""CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns_ddl)})PARTITIONED BY ({partition_expr})TBLPROPERTIES ('comment'= '{description or ''}')"""# 打印sqlprint(f"create table ddl: {ddl}\\n")# 执行DDLtry:spark.sql(ddl)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")def partition_overwrite(spark, source_df, target_table):""""""try:# 检查数据是否为空if source_df.count() == 0:print(f"警告: 没有数据需要写入到 {target_table}")return# 显示要覆盖的分区partitions = source_df.select("dt").distinct().collect()partition_list = [row.dt for row in partitions]print(f"将要覆盖的分区: {partition_list}")# 设置参数并写入spark.sql("SET hive.exec.dynamic.partition=true")spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")source_df.repartition(8, "dt") \\.write \\.mode("overwrite") \\.format("hive") \\.saveAsTable(target_table)print(f"成功覆盖 {len(partition_list)} 个分区到表 {target_table}")except Exception as e:print(f"写入表 {target_table} 时出错: {str(e)}")raisespark = SparkSession.builder.appName("Operate DB Example").getOrCreate()# 每次读取 PROCESSING_WINDOW_DAYS分区的数据,处理后进行覆盖写入requests_processed = spark.table(UNPACKED_TABLE_NAME) \\.filter(f"dt >= date_format(current_date() - INTERVAL {PROCESSING_WINDOW_DAYS} DAYS, 'yyyy-MM-dd')")for table_name, preserve_cols, join_cols in JOIN_TABLES:join_data = spark.table(table_name)requests_processed = requests_processed.join(join_data.select(preserve_cols), on=join_cols, how="left")create_table(spark, FULLY_QUALIFIED_TABLE_NAME, requests_processed, 'dt STRING')partition_overwrite(spark, requests_processed, FULLY_QUALIFIED_TABLE_NAM)spark.sql(f"select * from {FULLY_QUALIFIED_TABLE_NAME}").show(10)










文档反馈