tencent cloud

文档反馈

HDFS 数据导入

最后更新时间:2021-07-01 14:29:45

    概述

    本文介绍了 HDFS 数据导入 ClickHouse 集群的两种方案。本文介绍两种可行的实践方案,一种适合数据量较少的场景。另一种适合大数据场景。本文实战基于版本19.16.12.49。

    说明:

    想获取更多关于 ClickHouse 技术交流,可 提交工单,我们将您拉入 ClickHouse 技术交流群。

    详细步骤

    外表导入方案

    这种方式适合数据量较少的场景,导入步骤如下:

    • 在 ClickHouse 中建立 HDFS Engine 外表,用于读取 HDFS 数据。
    • 在 ClickHouse 中创建普通表(通常是 MergeTree 系列)存储 HDFS 中的数据。
    • 从外表中 SELECT 数据 INSERT 到普通表,完成数据导入。

    步骤1:创建 HDFS Engine 外表

    CREATE TABLE source
    (
       `id` UInt32, 
       `name` String, 
       `comment` String
    )
    ENGINE = HDFS('hdfs://172.30.1.146:4007/clickhouse/globs/*.csv', 'CSV')
    

    HDFS Engine 使用方法ENGINE = HDFS(URI, format),可参考 Table Engine HDFS

    URI为 HDFS 路径,如果包含通配符,则表是只读的。通配符的文件匹配在查询时执行,而不是在创建表时执行。也就是说,如果两次查询之间匹配的文件数目或者内容有变化,两次查询的结果才能够体现这种差异。支持的通配符如下:

    • *匹配除路径分隔符/外的任意数量的字符,包括空字符串。
    • ?匹配一个字符。
    • {some_string,another_string,yet_another_one}匹配some_stringanother_string或者yet_another_one
    • {N..M}匹配 N 到 M 的数字,包括 N 和 M,例如{1..3}匹配1、2、3。

    format支持的格式,详见 Formats for Input and Output Data

    步骤2:创建普通表

    CREATE TABLE dest
    (
       `id` UInt32, 
       `name` String, 
       `comment` String
    )
    ENGINE = MergeTree()
    ORDER BY id
    

    步骤3:导入数据

    INSERT INTO dest SELECT *
    FROM source
    

    步骤4:查询数据

    SELECT *
    FROM dest
    LIMIT 2
    

    JDBC Driver 并行导入方案

    ClickHouse 提供了 JDBC 的访问方式,并提供了官方的 Driver,此外还有第三方的 Driver,详情可参见 JDBC Driver

    ClickHouse 与 Hadoop/Spark 等大数据生态紧密结合,通过开发 Spark 或者 MapReduce 应用,利用大数据平台的并发处理能力,可以将 HDFS 上的大批量数据的快速导入 ClickHouse。Spark 还支持 Hive 等其他数据源,因此这种方式也可实现 Hive 等其他数据源的导入。

    下面举例说明 Spark Python 并发导入数据:

    步骤1:创建普通表

    CREATE TABLE default.hdfs_loader_table
    (
      `id` UInt32, 
      `name` String, 
      `comment` String
    )
    ENGINE = MergeTree()
    PARTITION BY id
    ORDER BY id
    

    步骤2:开发 Spark Python 应用

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    from pyspark.sql import SparkSession
    import sys
    if __name__ == '__main__':
      if len(sys.argv) != 2:
          print("Usage: clickhouse-spark <path>", file=sys.stderr)
          sys.exit(-1)
        spark = SparkSession.builder \
          .appName("clickhouse-spark") \
          .enableHiveSupport() \
          .getOrCreate()
        url = "jdbc:clickhouse://172.30.1.15:8123/default"
      driver = 'ru.yandex.clickhouse.ClickHouseDriver'
      properties = {
          'driver': driver,
          "socket_timeout": "300000",
          "rewriteBatchedStatements": "true",
          "batchsize": "1000000",
          "numPartitions": "4",
          'user': 'default',
          'password': 'test'
      }
      spark.read.csv(sys.argv[1], schema="""id INT, name String, comment String""").write.jdbc(
          url=url,
          table='hdfs_loader_table',
          mode='append',
          properties=properties,
      )
    

    url 的格式为jdbc:clickhouse://host:port/database,其中 port 为 clickhosue 的 HTTP 协议端口,默认为8123。

    properties 中的部分参数含义如下:

    • socket_timeout 为超时时间,单位 ms,详情可参考 这里
    • rewriteBatchedStatements 打开 JDBC Driver 的批量执行 SQL 功能。
    • batchsize 批量写入数据条数,可以适当调高,有利于提高写入性能。
    • numPartitions 数据写入并行度,也决定了 JDBC 并发连接数。batchsizenumPartitions 可参考 JDBC To Other Databases

    步骤3:提交 Spark 任务

    #!/usr/bin/env bash
    spark-submit \
    --master yarn \
    --jars ./clickhouse-jdbc-0.2.4.jar,./guava-19.0.jar \
    clickhouse-spark.py hdfs:///clickhouse/globs
    

    Spark Python 需要注意clickhouse-jdbc-0.2.4.jar依赖的 jar 版本,可以解压该 jar 文件,查看 pom.xml 中的配置,对比 Spark 环境的 jar 包是否版本匹配。版本不匹配时可能会出现错误 Could not initialize class ru.yandex.clickhouse.ClickHouseUtil。这时需要下载正确版本的 jar 包,通过 spark-submit 命令行参数--jars提交。

    步骤4:查询数据

    SELECT *
    FROM hdfs_loader_table
    LIMIT 2
    

    补充阅读

    下面介绍两种直接读写 HDFS 的方式,一般用作从 HDFS 导入数据到 ClickHouse。这两种方式的读写速度比较慢,且不支持如下功能,可参考 Table Engine HDFS

    • ALTERSELECT...SAMPLE操作
    • 索引(Indexes)
    • 复制(Replication)

    Table Engine

    1. 创建表
      CREATE TABLE hdfs_engine_table(id UInt32, name String, comment String) ENGINE=HDFS('hdfs://172.30.1.146:4007/clickhouse/hdfs_engine_table', 'CSV')
      
    2. Insert 测试数据
      INSERT INTO hdfs_engine_table VALUES(1, 'zhangsan', 'hello zhangsan'),(2, 'lisi', 'hello lisi')
      
    3. 查询
      SELECT * FROM hdfs_engine_table
      ┌─id─┬─name─────┬─comment────────┐
      │  1 │ zhangsan │ hello zhangsan │
      │  2 │ lisi     │ hello lisi     │
      └────┴──────────┴────────────────┘
      
    4. 查看 HDFS 文件
      hadoop fs -cat /clickhouse/hdfs_engine_table
      1,"zhangsan","hello zhangsan"
      2,"lisi","hello lisi"
      

    Table Function

    在使用上与 Table Engine 方式的区别仅是创建表语法稍有差异,示例如下:

    CREATE TABLE hdfs_function_table AS hdfs('hdfs://172.30.1.146:4007/clickhouse/hdfs_function_table', 'CSV', 'id UInt32, name String, comment String')
    

    参考资料

    联系我们

    联系我们,为您的业务提供专属服务。

    技术支持

    如果你想寻求进一步的帮助,通过工单与我们进行联络。我们提供7x24的工单服务。

    7x24 电话支持