import timefrom datetime import datetime, timedeltaimport jaydebeapifrom airflow import DAGfrom airflow.operators.python_operator import PythonOperatorjdbc_url='jdbc:dlc:dlc.tencentcloudapi.com?task_type=SparkSQLTask&database_name={dataBaseName}&datasource_connection_name={dataSourceName}®ion={region}&data_engine_name={engineName}'user = 'xxx'pwd = 'xxx'dirver = 'com.tencent.cloud.dlc.jdbc.DlcDriver'jar_file = '/root/airflow/jars/dlc-jdbc-2.5.3-jar-with-dependencies.jar'def createTable():sqlStr = 'create table if not exists db.tb1 (c1 int, c2 string)'conn = jaydebeapi.connect(dirver, jdbc_url, [user, pwd], jar_file)curs = conn.cursor()curs.execute(sqlStr)rows = curs.rowcount.realif rows != 0:result = curs.fetchall()print(result)curs.close()conn.close()def insertValues():sqlStr = "insert into db.tb1 values (111, 'this is test')"conn = jaydebeapi.connect(dirver,jdbc_url, [user, pwd], jar_file)curs = conn.cursor()curs.execute(sqlStr)rows = curs.rowcount.realif rows != 0:result = curs.fetchall()print(result)curs.close()conn.close()def selectColums():sqlStr = 'select * from db.tb1'conn = jaydebeapi.connect(dirver, jdbc_url, [user, pwd], jar_file)curs = conn.cursor()curs.execute(sqlStr)rows = curs.rowcount.realif rows != 0:result = curs.fetchall()print(result)curs.close()conn.close()def get_time():print('Current time is:', datetime.now().strftime('%Y-%m-%d %H:%M:%S'))return time.time()default_args = {'owner': 'tencent', # owner's name'start_date': datetime(2024, 11, 1), # the first execution start time, in UTC'retries': 2, # number of retry attempts on failure'retry_delay': timedelta(minutes=1), # retry interval on failure}dag = DAG(dag_id='airflow_dlc_test', # DAG ID, should consist only of letters, numbers, and underscoresdefault_args=default_args, # externally defined parameters in dic formatschedule_interval=timedelta(minutes=1), # defines the frequency at which the DAG runs; can be configured for days, weeks, hours, minutes, seconds, or millisecondscatchup=False # when executing the DAG, all tasks scheduled from the start time up to the present will be executed. The default value is True.)t1 = PythonOperator(task_id='create_table',python_callable=createTable,dag=dag)t2 = PythonOperator(task_id='insert_values',python_callable=insertValues,dag=dag)t3 = PythonOperator(task_id='select_values',python_callable=selectColums,dag=dag)t4 = PythonOperator(task_id='print_time',python_callable=get_time,dag=dag)t1 >> t2 >> [t3, t4]
Parameters | Description |
jdbc_url | JDBC connection address and configuration parameters. For more details, see Hive JDBC Access, Presto JDBC Access, and DLC JDBC Access. |
user | SecretId |
pwd | SecretKey |
dirver | Load the JDBC driver. For more details, see Hive JDBC Access, Presto JDBC Access, and DLC JDBC Access. |
jar_file | The storage path of the driver JAR package. Replace it with the absolute path where the corresponding engine's JDBC driver JAR package is stored. For more details, see Hive JDBC Access, Presto JDBC Access, and DLC JDBC Access. |


Feedback