产品概述
产品优势
应用场景
import timefrom datetime import datetime, timedeltaimport jaydebeapifrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorjdbc_url='jdbc:dlc:dlc.tencentcloudapi.com?task_type=SparkSQLTask&database_name={dataBaseName}&datasource_connection_name={dataSourceName}®ion={region}&data_engine_name={engineName}'user = 'xxx'pwd = 'xxx'dirver = 'com.tencent.cloud.dlc.jdbc.DlcDriver'jar_file = '/root/airflow/jars/dlc-jdbc-2.5.3-jar-with-dependencies.jar'def createTable():sqlStr = 'create table if not exists db.tb1 (c1 int, c2 string)'conn = jaydebeapi.connect(dirver, jdbc_url, [user, pwd], jar_file)curs = conn.cursor()curs.execute(sqlStr)rows = curs.rowcount.realif rows != 0:result = curs.fetchall()print(result)curs.close()conn.close()def insertValues():sqlStr = "insert into db.tb1 values (111, 'this is test')"conn = jaydebeapi.connect(dirver,jdbc_url, [user, pwd], jar_file)curs = conn.cursor()curs.execute(sqlStr)rows = curs.rowcount.realif rows != 0:result = curs.fetchall()print(result)curs.close()conn.close()def selectColums():sqlStr = 'select * from db.tb1'conn = jaydebeapi.connect(dirver, jdbc_url, [user, pwd], jar_file)curs = conn.cursor()curs.execute(sqlStr)rows = curs.rowcount.realif rows != 0:result = curs.fetchall()print(result)curs.close()conn.close()def get_time():print('当前时间是:', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))return time.time()default_args = {'owner': 'tencent', # 拥有者名称'start_date': datetime(2024, 11, 1), # 第一次开始执行的时间,为 UTC 时间'retries': 2, # 失败重试次数'retry_delay': timedelta(minutes=1), # 失败重试间隔}dag = DAG(dag_id='airflow_dlc_test', # DAG id ,必须完全由字母、数字、下划线组成default_args=default_args, # 外部定义的 dic 格式的参数schedule_interval=timedelta(minutes=1), # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒catchup=False # 执行DAG时,将开始时间到目前所有该执行的任务都执行,默认为True)t1 = PythonOperator(task_id='create_table',python_callable=createTable,dag=dag)t2 = PythonOperator(task_id='insert_values',python_callable=insertValues,dag=dag)t3 = PythonOperator(task_id='select_values',python_callable=selectColums,dag=dag)t4 = PythonOperator(task_id='print_time',python_callable=get_time,dag=dag)t1 >> t2 >> [t3, t4]
参数 | 说明 |
jdbc_url | |
user | SecretId |
pwd | SecretKey |
dirver | |
jar_file |


文档反馈