








Field Classification | | Description(Optional) | Description |
Service Name | | The unique identifier name of the model service. | It is advisable to name according to business purpose. Click to go to the service details page. |
Service status | | Current lifecycle status of the service. | Common statuses include draft (starting), running, stopped. |
Service type | | Identify the application field of the service. | For example: machine learning service, model service. |
Running version/Number of versions | | Active service version/Total number of versions | Show the multi-version management of the service group, such as 0/1 means 1 version in total but not started. |
Service group description | | Description of the service group. | Facilitate team collaboration and quickly understand the purpose of the service group. |
Created by | | Create the user account for this service. | For auditing and permission trace. |
Creation time. | | The time when the service was first created. | Arranged in chronological order for quick search of recent services. |
Operation | Edit | Modify the basic configuration of the service group. | View service group details, support updating service group description and parameters such as resource configuration. |
| debug | Enter the online debugging page to test the API. | Provide a standardized REST API call address (service group must be in running state). |
| add version | Deploy a new model version under the same service. | Implement grayscale release or A/B testing scenarios for models, supporting up to two service versions. |
| Monitoring | View performance metrics. | Monitor QPS, number of concurrent requests, and CPU/MEM/GPU resource utilization rate. |
| Logs | View container logs at runtime. | Support by instance filtering and time range search for locating service group exception causes |
| Start | Control the running state of service groups. | Click Start to change the service group to running state. Stop releases service resources. |
| Deleted Object | Completely remove the model service asset and its associated resources. | The operation is irreversible. Confirm the service group has stopped running before deletion. |
{"client_request_id": "123456","dataframe_split": {"columns": ["fixed_acidity", "volatile_acidity", "citric_acid", "residual_sugar", "chlorides", "free_sulfur_dioxide", "total_sulfur_dioxide", "density", "pH", "sulphates", "alcohol"],"data": [[1, 0.8, 0.7, 0.5, 1, 1.5, 0.8, 0.7, 0.5, 0.8, 0.5]]}}
{"client_request_id": "123456","dataframe_split": {"columns": ["fixed_acidity", "volatile_acidity", "citric_acid", "residual_sugar", "chlorides", "free_sulfur_dioxide", "total_sulfur_dioxide", "density", "pH", "sulphates", "alcohol"],"data": [[1, 0.8, 0.7, 0.5, 1, 1.5, 0.8, 0.7, 0.5, 0.8, 0.5],[0.9, 0.7, 0.6, 0.4, 0.9, 1.2, 0.7, 0.6, 0.4, 0.7, 0.4]]}}
curl -X POST https://[service call address]/predict \\-H 'Authorization: [Authentication Token]' \\-H 'Content-Type: application/json' \\-d '{"client_request_id":"123456","dataframe_split":{"columns":["fixed_acidity","volatile_acidity","citric_acid","residual_sugar","chlorides","free_sulfur_dioxide","total_sulfur_dioxide","density","pH","sulphates","alcohol"],"data":[[1,0.8,0.7,0.5,1,1.5,0.8,0.7,0.5,0.8,0.5]]}}'




Field Classification | Description(Optional) | Description | |
ID | | Unique identifier of the service version instance. | A unique string automatically generated by the system for precise positioning and case-sensitive differentiation of service versions. |
Service Version Name | | The specific version flag of this service. | Facilitates multi-version management and grayscale release. |
State | | Current lifecycle status of the service version. | Show the version running state: draft/running/stopped. |
Description(Optional) | | Describe the purpose or feature of the service version. | Help understand service version differences in team collaboration. |
Model Source | | Publish the underlying model assets associated with the service. | Show which model name in the management module the service uses (for example, ml_wine_db_wine_model). |
Resource source | | The compute resource type consumed by this version during operation. | The resource is identified as coming from "platform pay-as-you-go resources" or "CVM resource group". |
Traffic Allocation | | The traffic weight received by this version during multi-version coexistence. | Displays the percentage of traffic assigned to this version in A/B testing or grayscale release scenarios. |
Created by | | Initialize the user account for this service version. | Record the creator of this version for permission management and CloudAudit. |
Creation time. | | The time when the service version was first created. | The system automatically records to track model iterations based on time. |
Operation | Updating | Modify the configuration message of the current service version. | View service version details, support updating version description and parameters such as resource configuration. |
| Monitoring | View real-time performance metrics of this version. | Visual monitoring view for QPS, call latency, CPU/GPU occupancy rate, etc. |
| Logs | Retrieve the running logs of the service instance. | Used to troubleshoot runtime issues such as model code errors and input/output exceptions. |
| Start/Stop | Manually control the running status of the service instance. | Click Start to change the service group to running state. Stop releases service resources. |
| Deleted Object | Permanently remove the service version. | The operation is irreversible. Confirm the version has stopped running before deletion. |

Field Name | Description(Optional) | Description |
Service Name | The specific version flag of this service. | Facilitates multi-version management and grayscale release. |
Service ID. | Unique identifier of the service version instance. | A unique string automatically generated by the system for precise positioning and case-sensitive differentiation of service versions. |
Service Description | Describe the purpose or feature of the service version. | Help understand service version differences in team collaboration. |
Billing Mode | Settlement method for resources | Pay-as-you-go shows the cost structure of model operation. |
Resource Specification | The computing power size is assigned to this version. | For example, 4C8G determines the model complexity and concurrency limit this version can handle. |
Access model and version | Associated model asset details. | Display the model name and specific MLflow version number (such as Version2) in model management. |
Number of Replicas | Total number of running container instances. | High availability of services is determined. Multi-replica can share traffic volume and provide disaster recovery protection. |
Request Traffic Throttling | Traffic protection threshold. | Show the QPS cap. If the threshold is reached, the system will trigger packet loss or queue protection. |
Generate Authentication | Service security control switch. | Display whether signature authentication (API Key) is enabled. If not enabled, the address can be accessed directly. |
Field/Operation | Description(Optional) | Operations Guidance |
Instance name. | Identification of the underlying compute unit (Pod). | Assign a unique ID to a running service node. |
Instance State | Physical status of the instance. | Waiting indicates pulling images from registry or pending scheduling; Running means the service is ready. |
restart attempt | How often the container exits abnormally. | Core troubleshooting metrics. If the count keeps increasing, the model may contain memory leaks or code errors. |
Monitoring | Instance-level monitoring entry. | View individual instance $CPU/MEM$ consumption and check whether resource skew exists. |
Logs | Container standard output stream. | View model load logic and predict error stack with the most direct method. |
Restart | Force reboot an instance. | An urgent method to quickly restore services when an instance becomes dead or responds extremely slow. |
Access a container | Interactive terminal. | Allow entering the container to run commands, check file path, permission, or environment dependency. |

Metric Category | Core Metric Items | Business Value Description |
Traffic information | Network traffic, QPS, QPS throttling count, number of concurrent requests | Assess API call pressure. Concurrency request count and throttling count are key signals to assess whether to increase replica quantity. |
Resource information | CPU usage, MEM usage, video memory utilization, GPU usage | Monitor underlying hardware load, including video memory utilization and GPU usage. |
Instance information | Total number of instances Number of running instances | Intuitively display service availability and confirm whether the actual number of containers running conforms to the expected configuration. |





This example demonstrates how to use Spark Streaming to read the reasoning table in COS storage in streaming mode, process key fields (feature, predicted value), and write to the DLC internal table.## PrerequisiteEnsure the COS access credential associated with the DLC engine has permission to access the reasoning table storage bucket.2. Underwrite the model service has enabled reasoning table monitoring.## How to runAfter modifying parameters in the tutorial, select DLC PySpark task from offline development > orchestration space to run.## Scheduled jobAfter modifying the corresponding parameters and completing debugging, you can configure the task for scheduled execution to continuously process reasoning table data. You can also associate the processed reasoning table data with tag data to generate a reasoning table containing tag data, and configure machine learning-related monitoring metrics for the reasoning table in the data quality module to continuously monitor model quality."""from pyspark.sql import DataFrame, SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql import types as Tfrom typing import Union, List, Dict, Optional, Sequence, Anyfrom pyspark.sql.streaming import StreamingQueryfrom pyspark.sql.types import (StructType, StructField, StringType, LongType, DoubleType,ArrayType, TimestampType)"""### Define original reasoning table data formatThe original reasoning table data is stored in the user-specified COS bucket in JSON format with a specific structure. request_schema represents the input request structure, and response_schema represents the model output structure. Adjust according to the actual input and output format of the model service. Other fields are part of the fixed structure of the reasoning table and generally not needed to adjust."""request_schema = StructType([StructField("client_request_id", StringType(), True),StructField("dataframe_split", StructType([StructField("columns", ArrayType(StringType()), True),StructField("data", ArrayType(ArrayType(DoubleType())), True),]), True),])request_metadata_schema = StructType([StructField("model_name", StringType(), True),StructField("model_version", StringType(), True),StructField("service_id", StringType(), True),])response_schema = StructType([StructField("predictions", ArrayType(LongType()), True),])schema = StructType([StructField("client_request_id", StringType(), True),StructField("execution_duration_ms", LongType(), True),StructField("request", request_schema, True),StructField("request_date", StringType(), True),StructField("request_metadata", request_metadata_schema, True),StructField("request_time", TimestampType(), True),StructField("requester", StringType(), True),StructField("response", response_schema, True),StructField("sampling_fraction", DoubleType(), True),StructField("status_code", LongType(), True),StructField("wedata_request_id", StringType(), True),])# data source parametersSOURCE_PATH = "cosn://bucket path/" # storage path of the reasoning tableCHECKPOINT_PATH = "cosn://bucket path/checkpoint" # storage path of check_point, recommended to match the reasoning table storage path# result table parametersUNPACKED_TABLE_NAME = "DataLakeCatalog.test.unpacked_test_inference_table" # Result table name, need to be a three-segment table name in DLC: <data catalog>.<database>.<data table>MODEL_ID_COL = "model_id" # model ID column in the result table, used for identification modelEXAMPLE_ID_COL = "example_id" # unique ID column for result table entries, used for update operationsPREDICTION_COL = "prediction" # column name for predicted valueFEATURE_COLUMNS = ["sepal_length", "sepal_width", "petal_length", "petal_width"] # feature column names, adjust based on model featuresdef process_requests(requests_raw: DataFrame) -> DataFrame:"""Expand request features into separate columns and pair them with prediction resultsArgs:requests_raw: pending dataReturn:processed data"""filter successful requestsrequests_success = requests_raw.filter(F.col("status_code") == 200).drop("status_code")# Generate model flagrequests_identified = requests_success \\.withColumn(MODEL_ID_COL, F.concat(F.col("request_metadata").getItem("model_name"), F.lit("_"), F.col("request_metadata").getItem("model_version"))) \\.drop("request_metadata")# Unfold feature column and prediction result# 1. Get feature column names, feature data, predicted valuerequests_with_features = (requests_identified.withColumn("feature_columns", F.col("request.dataframe_split.columns")).withColumn("feature_data", F.col("request.dataframe_split.data")).withColumn("predictions", F.col("response.predictions")))# 2. Pair each data row with the corresponding prediction resultrequests_exploded = (requests_with_features.withColumn("feature_prediction_pairs",F.arrays_zip(F.col("feature_data"), F.col("predictions"))).withColumn("feature_prediction_pairs",F.explode(F.col("feature_prediction_pairs"))).withColumn("feature_row", F.col("feature_prediction_pairs.feature_data")).withColumn(PREDICTION_COL, F.col("feature_prediction_pairs.predictions")))# 3. dynamically create feature columnrequests_with_feature_cols = requests_explodedfor i, col_name in enumerate(FEATURE_COLUMNS):requests_with_feature_cols = (requests_with_feature_cols.withColumn(col_name, F.col("feature_row")[i]))# 4. Add record unique ID, making it easy to perform upsert operationrequests_with_example_id = requests_with_feature_cols \\.withColumn(EXAMPLE_ID_COL, F.expr("uuid()"))# 5. Clean up temporary columnsrequests_processed = (requests_with_example_id.drop("feature_columns", "feature_data", "feature_prediction_pairs", "feature_row", "predictions", "request", "response"))return requests_processeddef create_table(spark: SparkSession,table_name: str,df: Optional[DataFrame] = None,partition_expr: str = None,description: Optional[str] = None,):"""Create a dlc internal tableArgs:spark: SparkSession instancetable_name: full table name (format: data catalog.database.data table)df: initial data (used for infer schema)partition_expr: partition column (optimized storage query)description: table descriptionRaises:ValueError: throw create exception"""# Infer table schematable_schema = df.schema# Build column definitioncolumns_ddl = []for field in table_schema.fields:data_type = field.dataType.simpleString().upper()col_def = f"`{field.name}` {data_type}"if not field.nullable:col_def += " NOT NULL"columns_ddl.append(col_def)# Build table creation statementddl = f"""CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns_ddl)})USING icebergPARTITIONED BY ({partition_expr})TBLPROPERTIES ('comment'= '{description or ''}','format-version'= '2','write.metadata.previous-versions-max'= '100','write.metadata.delete-after-commit.enabled'= 'true','smart-optimizer.inherit' = 'none','smart-optimizer.written.enable' = 'enable')"""# Print sqlprint(f"create table ddl: {ddl}\\n")# execute DDLtry:spark.sql(ddl)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")spark = SparkSession.builder.appName("Operate DB Example").getOrCreate()# Build a streaming DataFramedf = (spark.readStream.format("json").schema(schema).load(SOURCE_PATH))# Process streaming dataunpacked_df = process_requests(df)# Create a result table with request_time as partition field, using daily partitioning, which can be adjusted by service request volumecreate_table(spark, UNPACKED_TABLE_NAME, unpacked_df, 'days(request_time)')# Write dataquery = unpacked_df.writeStream \\.format("parquet") \\.outputMode("append") \\.option("checkpointLocation", CHECKPOINT_PATH) \\.trigger(once=True) \\.toTable(UNPACKED_TABLE_NAME)query.awaitTermination()spark.sql(f"select * from {UNPACKED_TABLE_NAME}").show(10)
"""This example demonstrates how to use Spark Streaming to read the reasoning table from COS storage in streaming mode, process key fields (feature, predicted value), and write to the EMR Hive table.## PrerequisiteEnsure the COS access credential associated with the DLC engine has permission to access the reasoning table storage bucket.2. Underwrite the model service has enabled reasoning table monitoring.## How to runAfter modifying parameters in the tutorial, select EMR PySpark task from offline development > orchestration space to run.## Scheduled jobAfter modifying the corresponding parameters and completing debugging, you can configure the task for scheduled execution to continuously process reasoning table data. You can also associate the processed reasoning table data with tag data to generate a reasoning table containing tag data, and configure machine learning-related monitoring metrics for the reasoning table in the data quality module to continuously monitor model quality."""from pyspark.sql import DataFrame, SparkSessionfrom pyspark.sql import functions as Ffrom pyspark.sql import types as Tfrom typing import Union, List, Dict, Optional, Sequence, Anyfrom pyspark.sql.streaming import StreamingQueryfrom pyspark.sql.types import (StructType, StructField, StringType, LongType, DoubleType,ArrayType, TimestampType)"""### Define original reasoning table data formatThe original reasoning table data is stored in the user-specified COS bucket in JSON format with a specific structure. request_schema represents the input request structure, and response_schema represents the model output structure. Adjust according to the actual input and output format of the model service. Other fields are part of the fixed structure of the reasoning table and generally not needed to adjust."""request_schema = StructType([StructField("client_request_id", StringType(), True),StructField("dataframe_split", StructType([StructField("columns", ArrayType(StringType()), True),StructField("data", ArrayType(ArrayType(DoubleType())), True),]), True),])request_metadata_schema = StructType([StructField("model_name", StringType(), True),StructField("model_version", StringType(), True),StructField("service_id", StringType(), True),])response_schema = StructType([StructField("predictions", ArrayType(LongType()), True),])schema = StructType([StructField("client_request_id", StringType(), True),StructField("execution_duration_ms", LongType(), True),StructField("request", request_schema, True),StructField("request_date", StringType(), True),StructField("request_metadata", request_metadata_schema, True),StructField("request_time", TimestampType(), True),StructField("requester", StringType(), True),StructField("response", response_schema, True),StructField("sampling_fraction", DoubleType(), True),StructField("status_code", LongType(), True),StructField("wedata_request_id", StringType(), True),])# data source parametersSOURCE_PATH = "cosn://bucket path/" # storage path of the reasoning tableCHECKPOINT_PATH = "cosn://bucket path/checkpoint" # storage path of check_point, recommended to match the reasoning table storage path# result table parametersUNPACKED_TABLE_NAME = "testdb.UNPACKED_test_inference_TABLE" # Result TABLE NAME, need to be a hive TABLE NAME: <database>.<data TABLE>MODEL_ID_COL = "model_id" # model ID column in the result table, used for identification modelEXAMPLE_ID_COL = "example_id" # unique ID column for result table entries, used for update operationsPREDICTION_COL = "prediction" # column name for predicted valueFEATURE_COLUMNS = ["sepal_length", "sepal_width", "petal_length", "petal_width"] # feature column names, adjust based on model featuresdef process_requests(requests_raw: DataFrame) -> DataFrame:"""Expand request features into separate columns and pair them with prediction resultsArgs:requests_raw: pending dataReturn:processed data"""filter successful requestsrequests_success = requests_raw.filter(F.col("status_code") == 200).drop("status_code")# Generate model flagrequests_identified = requests_success \\.withColumn(MODEL_ID_COL, F.concat(F.col("request_metadata").getItem("model_name"), F.lit("_"), F.col("request_metadata").getItem("model_version"))) \\.drop("request_metadata")# Unfold feature column and prediction result# 1. Get feature column names, feature data, predicted valuerequests_with_features = (requests_identified.withColumn("feature_columns", F.col("request.dataframe_split.columns")).withColumn("feature_data", F.col("request.dataframe_split.data")).withColumn("predictions", F.col("response.predictions")))# 2. Pair each data row with the corresponding prediction resultrequests_exploded = (requests_with_features.withColumn("feature_prediction_pairs",F.arrays_zip(F.col("feature_data"), F.col("predictions"))).withColumn("feature_prediction_pairs",F.explode(F.col("feature_prediction_pairs"))).withColumn("feature_row", F.col("feature_prediction_pairs.feature_data")).withColumn(PREDICTION_COL, F.col("feature_prediction_pairs.predictions")))# 3. dynamically create feature columnrequests_with_feature_cols = requests_explodedfor i, col_name in enumerate(FEATURE_COLUMNS):requests_with_feature_cols = (requests_with_feature_cols.withColumn(col_name, F.col("feature_row")[i]))# 4. Add record unique ID, making it easy to perform upsert operationrequests_with_example_id = requests_with_feature_cols \\.withColumn(EXAMPLE_ID_COL, F.expr("uuid()"))# 5. Clean up temporary columnsrequests_processed = (requests_with_example_id.drop("feature_columns", "feature_data", "feature_prediction_pairs", "feature_row", "predictions", "request", "response"))return requests_processeddef create_table(spark: SparkSession,table_name: str,df: Optional[DataFrame] = None,partition_expr: str = None,description: Optional[str] = None,):"""Create a Hive table.Args:spark: SparkSession instancetable_name: full hive table name (format: database.data table)df: initial data (optional, used for infer schema)partition_expr: partition column (optimized storage query)description: table descriptionRaises:ValueError: throw create exception"""# Infer table schematable_schema = df.schemapart_col_nam = partition_expr.split(' ')[0]# Build column definitioncolumns_ddl = []for field in table_schema.fields:# Filter out partition columns when creating a tableif field.name == part_col_nam:continuedata_type = field.dataType.simpleString().upper()col_def = f"`{field.name}` {data_type}"if not field.nullable:col_def += " NOT NULL"columns_ddl.append(col_def)# Build table creation statementddl = f"""CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns_ddl)})PARTITIONED BY ({partition_expr})TBLPROPERTIES ('comment'= '{description or ''}')"""# Print sqlprint(f"create table ddl: {ddl}\\n")# execute DDLtry:spark.sql(ddl)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")def write_batch_to_hive(batch_df: DataFrame, batch_id: int, table_name: str):# Control small files: repartition by partition columnout_df = batch_df.repartition(8, "dt")out_df.write.mode("append").format("hive").saveAsTable(table_name,partitionBy="dt")spark = SparkSession.builder.appName("Operate DB Example").enableHiveSupport().getOrCreate()# Build a streaming DataFramedf = (spark.readStream.format("json").schema(schema).load(SOURCE_PATH))# Process streaming dataunpacked_df = process_requests(df)# Insert partition columnrequests_with_dt = (unpacked_df.withColumn("dt", F.substring(F.col("request_date"), 1, 10)) # Extract date in YYYY-MM-DD format)# Create a result tablecreate_table(spark, UNPACKED_TABLE_NAME, unpacked_df, 'dt STRING')# Set Hive dynamic partitionspark.sql("SET hive.exec.dynamic.partition=true")spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")# Batch write to result tablequery = (requests_with_dt.writeStream.outputMode("append").option("checkpointLocation", CHECKPOINT_PATH).trigger(once=True).foreachBatch(lambda df, id: write_batch_to_hive(df, id, UNPACKED_TABLE_NAME)).start())query.awaitTermination()# Query result dataspark.sql(f"select * from {UNPACKED_TABLE_NAME}").show(10)

from pyspark.sql import DataFrame, SparkSessionfrom typing import Union, List, Dict, Optional, Sequence, Any# Table name for the reasoning table to be associated with a tag: It needs to be a three-segment table name in DLC: <data catalog>.<database>.<data table>UNPACKED_TABLE_NAME = "DataLakeCatalog.test.unpacked_test_inference_table"# Tag data table configuration: can be multiple, in each table must contain structure (<table name>, <list of field names to be retained in join>, <list of field names used for equivalent condition connection in join>)JOIN_TABLES = [('DataLakeCatalog.test.test_label_table', ['label', 'example_id'], ['example_id'])]# Table name for reasoning after associating tag dataFULLY_QUALIFIED_TABLE_NAME = "DataLakeCatalog.test.test_fully_qualified_table_name"# Window size: Limits how long the data processed by this Notebook script each run. Data older than this window will be ignored if not yet processed. To ensure all data is processed, the scheduling interval should be smaller than this window size.PROCESSING_WINDOW_DAYS = 10# primary key field for result table insertionMERGE_COLS = ["request_time", "example_id"]def create_table(spark: SparkSession,table_name: str,df: Optional[DataFrame] = None,partition_expr: str = None,description: Optional[str] = None,):"""Create a dlc internal tableArgs:spark: SparkSession instancetable_name: full table name (format: data catalog.database.data table)df: initial data (used for infer schema)partition_expr: partition column (optimized storage query)description: table descriptionRaises:ValueError: throw create exception"""# Infer table schematable_schema = df.schema# Build column definitioncolumns_ddl = []for field in table_schema.fields:data_type = field.dataType.simpleString().upper()col_def = f"`{field.name}` {data_type}"if not field.nullable:col_def += " NOT NULL"columns_ddl.append(col_def)# Build table creation statementddl = f"""CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns_ddl)})USING icebergPARTITIONED BY ({partition_expr})TBLPROPERTIES ('comment'= '{description or ''}','format-version'= '2','write.metadata.previous-versions-max'= '100','write.metadata.delete-after-commit.enabled'= 'true','smart-optimizer.inherit' = 'none','smart-optimizer.written.enable' = 'enable')"""# Print sqlprint(f"create table ddl: {ddl}\\n")# execute DDLtry:spark.sql(ddl)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")def upsert_table(spark: SparkSession,target_table_name: str,merge_cols: list,df: DataFrame,):"""Write data to the target table using the upsert methodArgs:spark: SparkSession instancetarget_table_name: full result table name (format: data catalog.database.data table)merge_cols: primary key when updatingdf: data to be updatedRaises:ValueError: throw write exception"""merge_condition = " AND ".join([f"target.{col} = source.{col}" for col in merge_cols])# Create a temporary viewdf.createOrReplaceTempView("source_data")# Perform MERGE operationmerge_sql = f"""MERGE INTO {target_table_name} AS targetUSING source_data AS sourceON {merge_condition}WHEN MATCHED THEN UPDATE SET *WHEN NOT MATCHED THEN INSERT *"""# execute DMLtry:spark.sql(merge_sql)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")spark = SparkSession.builder.appName("Operate DB Example").getOrCreate()# read data of PROCESSING_WINDOW_DAYS each timerequests_processed = spark.table(UNPACKED_TABLE_NAME) \\.filter(f"CAST(request_time AS DATE) >= current_date() - (INTERVAL {PROCESSING_WINDOW_DAYS} DAYS)")if requests_processed.count() > 0:# Associated tag tablefor table_name, preserve_cols, join_cols in JOIN_TABLES:join_data = spark.table(table_name)requests_processed = requests_processed.join(join_data.select(preserve_cols), on=join_cols, how="left")# Create a result tablecreate_table(spark, FULLY_QUALIFIED_TABLE_NAME, requests_processed, 'days(request_time)')# Write to result tableupsert_table(spark, FULLY_QUALIFIED_TABLE_NAME, MERGE_COLS, requests_processed)spark.sql(f"select * from {FULLY_QUALIFIED_TABLE_NAME}").show(10)
from pyspark.sql import DataFrame, SparkSessionfrom typing import Union, List, Dict, Optional, Sequence, Any# Name of the table to be associated with tags: needs to be a Hive table name: <database>.<data table>UNPACKED_TABLE_NAME = "testdb.unpacked_test_inference_table"# Tag data table configuration: can be multiple, in each table must contain structure (<table name>, <list of field names to be retained in join>, <list of field names used for equivalent condition connection in join>)JOIN_TABLES = [('testdb.test_label_table', ['label', 'example_id'], ['example_id'])]# Table name for reasoning after associating tag dataFULLY_QUALIFIED_TABLE_NAME = "testdb.test_fully_qualified_table_name"# Window size: Limits how long the data processed by this Notebook script each run. Data older than this window will be ignored if not yet processed. To ensure all data is processed, the scheduling interval should be smaller than this window size.PROCESSING_WINDOW_DAYS = 10def create_table(spark: SparkSession,table_name: str,df: Optional[DataFrame] = None,partition_expr: str = None,description: Optional[str] = None,):"""Create a Hive table.Args:spark: SparkSession instancetable_name: full hive table name (format: database.data table)df: initial data (optional, used for infer schema)partition_expr: partition column (optimized storage query)description: table descriptionRaises:ValueError: throw create exception"""# Infer table schematable_schema = df.schemapart_col_nam = partition_expr.split(' ')[0]# Build column definitioncolumns_ddl = []for field in table_schema.fields:# Filter out partition columns when creating a tableif field.name == part_col_nam:continuedata_type = field.dataType.simpleString().upper()col_def = f"`{field.name}` {data_type}"if not field.nullable:col_def += " NOT NULL"columns_ddl.append(col_def)# Build table creation statementddl = f"""CREATE TABLE IF NOT EXISTS {table_name} ({', '.join(columns_ddl)})PARTITIONED BY ({partition_expr})TBLPROPERTIES ('comment'= '{description or ''}')"""# Print sqlprint(f"create table ddl: {ddl}\\n")# execute DDLtry:spark.sql(ddl)except Exception as e:raise ValueError(f"Failed to create table: {str(e)}") from eprint(f"create table {table_name} done")def partition_overwrite(spark, source_df, target_table):""""""try:Check if data is emptyif source_df.count() == 0:print(f"Warning: no data writing required to {target_table}")return# Display partitions to overwritepartitions = source_df.select("dt").distinct().collect()partition_list = [row.dt for row in partitions]print(f"Partitions to overwrite: {partition_list}")# Set parameters and writespark.sql("SET hive.exec.dynamic.partition=true")spark.sql("SET hive.exec.dynamic.partition.mode=nonstrict")spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")source_df.repartition(8, "dt") \\.write \\.mode("overwrite") \\.format("hive") \\.saveAsTable(target_table)print(f"Successfully overwrote {len(partition_list)} partitions to table {target_table}")except Exception as e:print(f"Error occurred while writing to table {target_table}: {str(e)}")raisespark = SparkSession.builder.appName("Operate DB Example").getOrCreate()# read data of PROCESSING_WINDOW_DAYS partition each time, then overwrite after processingrequests_processed = spark.table(UNPACKED_TABLE_NAME) \\.filter(f"dt >= date_format(current_date() - INTERVAL {PROCESSING_WINDOW_DAYS} DAYS, 'yyyy-MM-dd')")for table_name, preserve_cols, join_cols in JOIN_TABLES:join_data = spark.table(table_name)requests_processed = requests_processed.join(join_data.select(preserve_cols), on=join_cols, how="left")create_table(spark, FULLY_QUALIFIED_TABLE_NAME, requests_processed, 'dt STRING')partition_overwrite(spark, requests_processed, FULLY_QUALIFIED_TABLE_NAM)spark.sql(f"select * from {FULLY_QUALIFIED_TABLE_NAME}").show(10)










Feedback