Type | Description |
Interface operation feature | 1. View basic information and feature fields of the feature table; 2. Support batch import of feature tables; 3. Support creating feature synchronization. |
Feature Engineering API | Support performing CRUD operations, synchronization, and consumption on online/offline feature tables via calls in Studio. |








%pip install tencent-wedata-feature-engineering
Parameter Name | Required | Field Type | Field Description | Input parameter example |
spark | Yes | Optional[SparkSession] | An initialized SparkSession object, which will be auto-created if not provided. | SparkSession.builder.getOrCreate() |
cloud_secret_id | Yes | str | Cloud service key ID | "your_secret_id" |
cloud_secret_key | Yes | str | service key | "your_secret_key" |
Result name | Field Type | Field Description |
client | FeatureStoreClient | Return a FeatureStoreClient instance |
from wedata.feature_store.client import FeatureStoreClient# Build a feature engineering client instanceclient = FeatureStoreClient(spark, cloud_secret_id=cloud_secret_id, cloud_secret_key=cloud_secret_key)
Parameter Name | Required | Field Type | Field Description | Input parameter example |
name | Yes | str | feature table full name (format: <table>) | "user_features" |
primary_keys | Yes | Union[str, List[str]] | Primary key column name (supports composite primary key) | "user_id" or ["user_id", "session_id"] |
timestamp_key | Yes | str | timestamp key (for tense feature) | "timestamp" |
engine_type | Yes | wedata.feature_store.constants.engine_types.EngineTypes | Engine Type EngineTypes.HIVE_ENGINE -- HIVE engine EngineTypes. ICEBERG_ENGINE -- ICEBERG engine | EngineTypes.HIVE_ENGINE |
data_source_name | Yes | str | Data Source Name | "hive_datasource" |
database_name | No | Optional[str] | Database Name | "feature_db" |
df | No | Optional[DataFrame] | initial data (used for inferring schema) | spark.createDataFrame([...]) |
partition_columns | No | Union[str, List[str], None] | partition column (optimize storage queries) | "date" or ["date", "region"] |
schema | No | Optional[StructType] | Table structure definition (required when df is not provided) | StructType([...]) |
description | No | Optional[str] | Business Description | User feature table |
tags | No | Optional[Dict[str, str]] | Business Tags | {"domain": "user", "version": "v1"} |
catalog_name | If catalog is not enabled, you must not enter Required if catalog is enabled | str | Feature table catalog name | "DataLakeCatalog" |
Result name | Field Type | Field Description |
feature_table | FeatureTable | FeatureTable object containing table metadata |
from wedata.feature_store.constants.engine_types import EngineTypes# create user's feature table--EMR on hiveeature_table = client.create_table(name=table_name, # Table nameprimary_keys=["wine_id"], # Primary keydf=wine_features_df, # dataengine_type=EngineTypes.HIVE_ENGINE,data_source_name=data_source_name,timestamp_key="event_timestamp",tags={ # Business tag"purpose": "demo","create_by": "wedata"})# Create user feature table--DLC on iceberg (catalog not enabled)feature_table = client.create_table(name=table_name, # Table namedatabase_name=database_name,primary_keys=["wine_id"], # Primary keydf=wine_features_df, # dataengine_type=EngineTypes.ICEBERG_ENGINE,data_source_name=data_source_name,timestamp_key="event_timestamp",tags={ # Business tag"purpose": "demo","create_by": "wedata"})# Create user feature table--DLC on iceberg (catalog enabled)feature_table = client.create_table(name=table_name, # Table nameprimary_keys=["wine_id"], # Primary keydf=wine_features_df, # dataengine_type=EngineTypes.ICEBERG_ENGINE,data_source_name=data_source_name,timestamp_key="event_timestamp",tags={ # Business tag"purpose": "demo","create_by": "wedata"},catalog_name=catalog_name)
Parameter Name | Required | Field Type | Field Description | Input parameter example |
name | Yes | str | feature table name | "user_features" |
timestamp_key | Yes | str | timestamp key (for follow-up online/offline feature sync) | "timestamp" |
engine_type | Yes | wedata.feature_store.constants.engine_types.EngineTypes | Engine Type EngineTypes.HIVE_ENGINE -- HIVE engine EngineTypes. ICEBERG_ENGINE -- ICEBERG engine | EngineTypes.HIVE_ENGINE |
data_source_name | Yes | str | Data Source Name | "hive_datasource" |
database_name | No | Optional[str] | feature library name | "feature_db" |
primary_keys | No | Union[str, List[str]] | Primary key column name (Valid only when engine_type is EngineTypes.HIVE_ENGINE) | "user_id" |
catalog_name | If catalog is not enabled, you must not enter Required if catalog is enabled | str | Feature table catalog name | "DataLakeCatalog" |
Result name | Field Type | Field Description |
dataframe | pyspark.DataFrame | DataFrame object containing table data |
from wedata.feature_store.constants.engine_types import EngineTypes# Register feature table--EMR on hiveclient.register_table(database_name=database_name, name=register_table_name, timestamp_key="event_timestamp",engine_type=EngineTypes.HIVE_ENGINE, data_source_name=data_source_name, primary_keys=["wine_id",])# Register feature table--DLC on iceberg (catalog not enabled)client.register_table(database_name=database_name, name=register_table_name, timestamp_key="event_timestamp",engine_type=EngineTypes.ICEBERG_ENGINE, data_source_name=data_source_name, primary_keys=["wine_id",])# register feature table--DLC on iceberg(enable catalog)client.register_table(database_name=database_name, name=register_table_name, timestamp_key="event_timestamp",engine_type=EngineTypes.HIVE_ENGINE, data_source_name=data_source_name, primary_keys=["wine_id",], catalog_name=catalog_name)
Parameter Name | Required | Field Type | Field Description | Input parameter example |
name | Yes | str | feature table name | "user_features" |
database_name | No | Optional[str] | feature library name | "feature_db" |
is_online | No | bool | Whether to read the online feature table (default: False) | True |
online_config | No | Optional[RedisStoreConfig] | Online feature table configuration (Valid only when is_online is True) | RedisStoreConfig(...) |
entity_row | No | Optional[List[Dict[str, Any]]] | Entity row data (Valid at that time when is_online is True) | [{"user_id": ["123", "456"]}] |
catalog_name | If catalog is not enabled, you must not enter Required if catalog is enabled | str | Feature table catalog name | "DataLakeCatalog" |
Result name | Field Type | Field Description |
dataframe | pyspark.DataFrame | DataFrame object containing table data |
# Read offline feature table--EMR on hive--&--DLC on iceberg(catalog not enabled)get_df = client.read_table(name=table_name, database_name=database_name)get_df.show(30)# Read online feature table--EMR on hive--&--DLC on iceberg(catalog not enabled)online_config = RedisStoreConfig(host=redis_host, port=redis_password, db=redis_db, password=redis_password)primary_keys_rows = [{"wine_id": 1},{"wine_id": 3}]result = client.read_table(name=table_name,database_name=database_name, is_online=True, online_config=online_config, entity_row=primary_keys_rows)result.show()# Read offline feature table--DLC on iceberg(enable catalog)get_df = client.read_table(name=table_name, database_name=database_name, catalog_name=catalog_name)get_df.show(30)# Read online feature table--DLC on iceberg(enable catalog)primary_keys_rows = [{"wine_id": 1},{"wine_id": 3}]result = client.read_table(name=table_name,database_name=database_name, is_online=True, online_config=online_config, entity_row=primary_keys_rows, catalog_name=catalog_name)result.show()
Parameter Name | Required | Field Type | Field Description | Input parameter example |
name | Yes | str | feature table name | "user_features" |
database_name | No | Optional[str] | feature library name | "feature_db" |
catalog_name | If catalog is not enabled, you must not enter Required if catalog is enabled | str | Feature table catalog name | "DataLakeCatalog" |
Result name | Field Type | Field Description |
feature_table | FeatureTable | FeatureTable object containing table metadata |
# Get feature table data--EMR on hive--&--DLC on iceberg(catalog not enabled)featureTable = client.get_table(name=table_name, database_name=database_name)# Get online feature table data--DLC on iceberg(enable catalog)featureTable = client.get_table(name=table_name, database_name=database_name, catalog_name=catalog_name)
Parameter Name | Required | Field Type | Field Description | Input parameter example |
name | Yes | str | Name of the feature table to delete | "user_features" |
database_name | No | Optional[str] | feature library name | "feature_db" |
catalog_name | Required if catalog is enabled | str | Feature table catalog name | "DataLakeCatalog" |
Result name | Field Type | Field Description |
N/A | None | No return value |
# drop feature table data--EMR on hive--&--DLC on iceberg(disable catalog)client.drop_table(table_name)# drop feature table data--DLC on iceberg(enable catalog)client.drop_table(table_name,catalog_name="DataLakeCatalog")
Parameter Name | Required | Field Type | Field Description | Input parameter example |
name | Yes | str | feature table full name (format: <table>) | "user_features" |
df | No | Optional[DataFrame] | DataFrame with data to write | spark.createDataFrame([...]) |
database_name | No | Optional[str] | feature library name | "feature_db" |
mode | No | Optional[str] | Write Mode (default APPEND) | "overwrite" |
checkpoint_location | No | Optional[str] | Stream processing checkpoint location | "/checkpoints/user_features" |
trigger | No | Dict[str, Any] | Stream processing trigger configuration | {"processingTime": "10 seconds"} |
catalog_name | If catalog is not enabled, you must not enter Required if catalog is enabled | str | Feature table catalog name | "DataLakeCatalog" |
Result name | Field Type | Field Description |
streaming_query | Optional[StreamingQuery] | If it is a stream write, return a StreamingQuery object, otherwise return None. |
from wedata.feature_store.constants.constants import APPEND# batch write--EMR on hive--&--DLC on iceberg(disable catalog)client.write_table(name="user_features",df=user_data_df,database_name="feature_db",mode="append")# strem write--EMR on hive--&--DLC on iceberg(disable catalog)streaming_query = client.write_table(name="user_features",df=streaming_df,database_name="feature_db",checkpoint_location="/checkpoints/user_features",trigger={"processingTime": "10 seconds"})# batch write--EMR on hive--&--DLC on iceberg(enable catalog)client.write_table(name="user_features",df=user_data_df,database_name="feature_db",mode="append",catalog_name=catalog_name)# strem write--EMR on hive--&--DLC on iceberg(enable catalog)streaming_query = client.write_table(name="user_features",df=streaming_df,database_name="feature_db",checkpoint_location="/checkpoints/user_features",trigger={"processingTime": "10 seconds"},catalog_name=catalog_name)
Parameter Name | Required | Field Type | Field Description | Input parameter example |
table_name | Yes | str | feature table name | "user_features" |
lookup_key | Yes | Union[str, List[str]] | The key used to join the feature table and training set, typically the primary key. | "user_id" or ["user_id", "session_id"] |
is_online | No | bool | Whether to read the online feature table (default: False) | True |
online_config | No | Optional[RedisStoreConfig] | Online feature table configuration (Valid only when is_online is True) | RedisStoreConfig(...) |
feature_names | No | Union[str, List[str], None] | Feature name to search in the feature table | ["age", "gender", "preferences"] |
rename_outputs | No | Optional[Dict[str, str]] | Feature rename map | {"age": "user_age"} |
timestamp_lookup_key | No | Optional[str] | Timestamp key for time point search | "event_timestamp" |
lookback_window | No | Optional[datetime.timedelta] | Time point search backtracking window | datetime.timedelta(days=1) |
Result name | Field Type | Field Description |
FeatureLookup object | FeatureLookup | Constructed FeatureLookup instance |
from wedata.feature_store.entities.feature_lookup import FeatureLookup# feature lookup--EMR on hive--&--DLC on iceberg(disable catalog)--&--DLC on iceberg(enable catalog)wine_feature_lookup = FeatureLookup(table_name=table_name,lookup_key="wine_id",timestamp_lookup_key="event_timestamp")
Parameter Name | Required | Field Type | Field Description | Input parameter example |
df | No | Optional[DataFrame] | DataFrame with data to write | spark.createDataFrame([...]) |
feature_lookups | Yes | List[Union[FeatureLookup, FeatureFunction]] | Feature query list | [FeatureLookup(...), FeatureFunction(...)] |
label | Yes | Union[str, List[str], None] | Tag column name | "is_churn" or ["label1", "label2"] |
exclude_columns | No | Optional[List[str]] | Excluding column names | ["user_id", "timestamp"] |
database_name | No | Optional[str] | feature library name | "feature_db" |
catalog_name | If catalog is not enabled, you must not enter Required if catalog is enabled | str | Feature table catalog name | "DataLakeCatalog" |
Result name | Field Type | Field Description |
training_set | TrainingSet | Constructed TrainingSet instance |
from wedata.feature_store.entities.feature_lookup import FeatureLookupfrom wedata.feature_store.entities.feature_function import FeatureFunction# define feature lookupwine_feature_lookup = FeatureLookup(table_name=table_name,lookup_key="wine_id",timestamp_lookup_key="event_timestamp")# prepare training datainference_data_df = wine_df.select(f"wine_id", "quality", "event_timestamp")# create trainingset--EMR on hive--&--DLC on iceberg(disable catalog)training_set = client.create_training_set(df=inference_data_df, # basic datafeature_lookups=[wine_feature_lookup], # feature search configurationlabel="quality", # tag columnexclude_columns=["wine_id", "event_timestamp"] # exclude unnecessary columns)# create trainingset--DLC on iceberg(enable catalog)training_set = client.create_training_set(df=inference_data_df, # basic datafeature_lookups=[wine_feature_lookup], # feature search configurationlabel="quality", # tag columnexclude_columns=["wine_id", "event_timestamp"], # exclude unnecessary columnsdatabase_name=database_name,catalog_name=catalog_name)# get final training DataFrametraining_df = training_set.load_df()# print trainingset dataprint(f"\\n=== Training set data ===")training_df.show(10, True)
Parameter Name | Required | Field Type | Field Description | Input parameter example |
model | Yes | Any | Model object to record | sklearn.RandomForestClassifier() |
artifact_path | Yes | str | Model storage path | "churn_model" |
flavor | Yes | ModuleType | MLflow model type module | mlflow.sklearn |
training_set | No | Optional[TrainingSet] | TrainingSet object used by the training model | training_set |
registered_model_name | No | Optional[str] | Model name to register (enter the catalog model name when catalog is enabled) | "churn_prediction_model" |
model_registry_uri | No | Optional[str] | Address of the model registry | "databricks://model-registry" |
await_registration_for | No | int | Wait for model registration to complete (default: 300 seconds) | 600 |
infer_input_example | No | bool | Whether to automatically record input example (default False) | True |
# Training modelfrom sklearn.model_selection import train_test_splitfrom sklearn.metrics import classification_reportfrom sklearn.ensemble import RandomForestClassifierimport mlflow.sklearnimport pandas as pdimport osproject_id=os.environ["WEDATA_PROJECT_ID"]mlflow.set_experiment(experiment_name=expirement_name)# set mlflow tracking_urimlflow.set_tracking_uri("http://30.22.36.75:5000")# convert Spark DataFrame to Pandas DataFrame to traintrain_pd = training_df.toPandas()# delete timestamp# train_pd.drop('event_timestamp', axis=1)# prepare features and tagsX = train_pd.drop('quality', axis=1)y = train_pd['quality']# split traningset and testsetX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# or convert datatime to timestamp(second)for col in X_train.select_dtypes(include=['datetime', 'datetimetz']):X_train[col] = X_train[col].astype('int64') // 10**9 # Convert nanoseconds to seconds# Verify that no missing values cause the dtype to be downgraded to object.X_train = X_train.fillna(X_train.median(numeric_only=True))# Initialize, train and log the model.--EMR on hive--&--DLC on iceberg(disable catalog)--&--DLC on iceberg(enable catalog)model = RandomForestClassifier(n_estimators=100, max_depth=3, random_state=42)model.fit(X_train, y_train)with mlflow.start_run():client.log_model(model=model,artifact_path="wine_quality_prediction", # model artifact pathflavor=mlflow.sklearn,training_set=training_set,registered_model_name=model_name, # model name (if catalog is enabled, must be catalog model name))
Parameter Name | Required | Field Type | Field Description | Input parameter example |
model_uri | Yes | str | MLflow model URI location | "models:/churn_model/1" |
df | Yes | DataFrame | DataFrame for reasoning | spark.createDataFrame([...]) |
result_type | No | str | Model return type (default "double") | "string" |
Result name | Field Type | Field Description |
predictions | DataFrame | DataFrame containing forecast results |
# run score_batch--EMR on hive--&--DLC on iceberg(disable catalog)--&--DLC on iceberg(enable catalog)result = client.score_batch(model_uri=f"models:/{model_name}/1", df=wine_df)result.show()
Parameter Name | Required | Field Type | Field Description | Input parameter example |
table_name | Yes | str | Offline feature table full name (format: <table>) | "user_features" |
data_source_name | Yes | str | Data Source Name | "hive_datasource" |
database_name | No | Optional[str] | Database Name | "feature_db" |
is_cycle | No | bool | Whether to enable periodic release (default: False) | True |
cycle_obj | No | TaskSchedulerConfiguration | Periodic task configuration object | TaskSchedulerConfiguration(...) |
is_use_default_online | No | bool | Whether to use the default online storage configuration (default: True) | False |
online_config | No | RedisStoreConfig | Custom Online Storage Configuration | RedisStoreConfig(...) |
catalog_name | If catalog is not enabled, you must not enter Required if catalog is enabled | str | Feature table catalog name | "DataLakeCatalog" |
Result name | Field Type | Field Description |
N/A | None | No return value |
from wedata.feature_store.common.store_config.redis import RedisStoreConfigfrom wedata.feature_store.cloud_sdk_client.models import TaskSchedulerConfigurationonline_config = RedisStoreConfig(host=redis_host, port=redis_password, db=redis_db, password=redis_password)# publish a offline feature table to online feature table--EMR on hive--&--DLC on iceberg(disable catalog)# Case 1 Pass the offline table table_name from the specified offline data source to the default online data source in snapshot mode.result = client.publish_table(table_name=table_name, data_source_name=data_source_name, database_name=database_name,is_cycle=False, is_use_default_online=True)# Case 2 Sync the offline table table_name from the specified offline data source to the designated non-default data source in snapshot mode. Note that the IP address here must be consistent with the login IP information of the entered data source.result = client.publish_table(name=table_name, data_source_name=data_source_name, database_name=database_name,is_cycle=False, is_use_default_online=False, online_config=online_config)# Case 3 Sync the offline table table_name from the specified offline data source to the default online data source in cycle mode, with a run interval of every five minutes.cycle_obj = TaskSchedulerConfiguration()cycle_obj.CrontabExpression = "0 0/5 * * * ?"result = client.publish_table(name=table_name, data_source_name=data_source_name, database_name=database_name,is_cycle=True, cycle_obj=cycle_obj, is_use_default_online=True)# publish a offline feature table to online feature table--DLC on iceberg(enable catalog)# Case 1 Pass the offline table table_name from the specified offline data source to the default online data source in snapshot mode.result = client.publish_table(table_name=table_name, data_source_name=data_source_name, database_name=database_name,is_cycle=False, is_use_default_online=True, catalog_name=catalog_name)# Case 2 Sync the offline table table_name from the specified offline data source to the designated non-default data source in snapshot mode. Note that the IP address here must be consistent with the login IP information of the entered data source.result = client.publish_table(table_name=table_name, data_source_name=data_source_name, database_name=database_name,is_cycle=False, is_use_default_online=False, online_config=online_config, catalog_name=catalog_name)# Case 3 Sync the offline table table_name from the specified offline data source to the default online data source in cycle mode, with a run interval of every five minutes.cycle_obj = TaskSchedulerConfiguration()cycle_obj.CrontabExpression = "0 0/5 * * * ?"result = client.publish_table(table_name=table_name, data_source_name=data_source_name, database_name=database_name,is_cycle=True, cycle_obj=cycle_obj, is_use_default_online=True, catalog_name=catalog_name)
Parameter Name | Required | Field Type | Field Description | Input parameter example |
table_name | Yes | str | Offline feature table full name (format: <table>) | "user_features" |
online_config | No | RedisStoreConfig | Custom Online Storage Configuration | RedisStoreConfig(...) |
database_name | No | Optional[str] | Database Name | "feature_db" |
Result name | Field Type | Field Description |
N/A | None | No return value |
from wedata.feature_store.common.store_config.redis import RedisStoreConfig# delete online feature table--EMR on hive--&--DLC on iceberg(disable catalog)--&--DLC on iceberg(enable catalog)online_config = RedisStoreConfig(host=redis_host, port=redis_password, db=redis_db, password=redis_password)client.drop_online_table(table_name=table_name, database_name=database_name, online_config=online_config)
Feedback