程序员人生 网站导航

基于Hadoop生态圈的数据仓库实践 —— 环境搭建(三)

栏目:数据库应用时间:2016-07-01 13:30:49
3、建立数据仓库示例模型
        Hadoop及其相干服务安装配置好后,下面用1个小而完全的示例说明多维模型及其相干ETL技术在Hadoop上的具体实现。

1. 设计ERD
        操作型系统是1个销售定单系统,初始时只有产品、客户、定单3个表,ERD以下图所示。


        多维数据仓库包括有1个销售定单事实表,产品、客户、定单、日期4个维度表,ERD以下图所示。


        作为示例,上面这些ERD里的属性都很简单,看属性名字便知其含义。维度表除日期维度外,其它3个表都在源表的基础上增加了代理键、版本号、生效日期、过期日期4个属性,用来处理渐变维(SCD)。日期维度有其特殊性,该维度数据1旦生成绩不会改变,所以不需要版本号、生效日期、过期日期。代理键是维度表的主键。事实表援用维度表的代理键作为自己的外键,销售金额是当前事实表中的唯1度量。

2. Hive相干配置
        使用Hive作为多维数据仓库的主要挑战是处理渐变维(SCD)和生成代理键。处理渐变维需要配置Hive支持行级更新,并在建表时选择适当的文件格式。生成代理键在关系数据库中1般都是用自增列或序列对象,但Hive中没有这样的机制,得用其它办法实现,在后面ETL部份再详细讨论。
(1)选择文件格式
(本段摘译自https://acadgild.com/blog/file-formats-in-apache-hive/)
         Hive是Hadoop上的数据仓库组件,它便于查询和管理散布式存储上的大数据集。Hive提供了1种称为HiveQL的语言,允许用户进行类似于SQL的查询。和SQL1样,HiveQL只处理结构化数据。缺省时Hive使用内建的derby数据库存储元数据,也能够配置Hive使用MySQL数据库存储元数据。Hive里的数据终究存储在HDFS的文件中,它可以处理以下4种文件格式:
  • TEXTFILE
  • SEQUENCEFILE
  • RCFILE
  • ORCFILE
        在深入各种类型的文件格式前,先看1下甚么是文件格式。

        文件格式
        所谓文件格式是1种信息被存储或编码成计算机文件的方式。在Hive中文件格式指的是记录怎样被存储到文件中。当我们处理结构化数据时,每条记录都有自己的结构。记录在文件中是如何编码的即定义了文件格式。
        不同文件格式的主要区分在于它们的数据编码、紧缩率、使用的空间和磁盘I/O。
        Hive在导入数据时其实不验证数据与表模式是不是匹配,但是它会验证文件格式是不是和表定义的相匹配。

        TEXTFILE
        TEXTFILE是Hadoop里最经常使用的输入输出格式,也是Hive的缺省文件格式。如果表定义为TEXTFILE,则可以向该表中导入以逗号、Tab或空格作为分隔符的数据,也能够导入JSON数据。TEXTFILE格式缺省每行被认为是1条记录。
        TEXTFILE格式的输入输出包是:
org.apache.hadoop.mapred.TextInputFormat org.apache.hadoop.mapred.TextOutputFormat
        示例:
-- 建立TEXTFILE格式的表 create table olympic(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT) row format delimited fields terminated by '\t' stored as textfile; -- 向表中导入数据 load data local inpath '/home/kiran/Downloads/olympic_data.csv' into table olympic; -- 查询表 select athelete from olympic;

        SEQUENCEFILE
        我们知道Hadoop处理少许大文件比大量小文件的性能要好。如果文件小于Hadoop里定义的块尺寸,可以认为是小文件。如果有大量小文件,那末元数据的增长将转化为NameNode的开消。为了解决这个问题,Hadoop引入了sequence文件,将sequence作为存储小文件的容器。
        Sequence文件是由2进制键值对组成的平面文件。Hive将查询转换成MapReduce作业时,决定1个给定记录的哪些键值对被使用。Sequence文件是可分割的2进制格式,主要的用处是联合两个或多个小文件组成1个sequence文件。
        SEQUENCEFILE格式的输入输出包是:
org.apache.hadoop.mapred.SequenceFileInputFormat org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
        示例:
-- 建立SEQUENCEFILE格式的表 create table olympic_sequencefile(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT) row format delimited fields terminated by '\t' stored as sequencefile; -- 向表中导入数据 -- 与TEXTFILE有些不同,由于SEQUENCEFILE是2进制格式,所以需要从其它表向SEQUENCEFILE表插入数据。 INSERT OVERWRITE TABLE olympic_sequencefile SELECT * FROM olympic; -- 查询表 select athelete from olympic_sequencefile;

        RCFILE
        RCFILE指的是Record Columnar File,1种高紧缩率的2进制文件格式,被用于在1个时间点操作多行的场景。RCFILEs是由2进制键值对组成的平面文件,这点与SEQUENCEFILE非常相似。RCFILE以记录的情势存储表中的列,即列存储方式。它先分割行做水平分区,然后分割列做垂直分区。RCFILE把1行的元数据作为键,把行数据作为值。这类面向列的存储在履行数据分析时更高效。
        RCFILE格式的输入输出包是:
org.apache.hadoop.hive.ql.io.RCFileInputFormat org.apache.hadoop.hive.ql.io.RCFileOutputFormat
        示例:
-- 建立RCFILE格式的表 create table olympic_rcfile(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT) row format delimited fields terminated by '\t' stored as rcfile -- 向表中导入数据 -- 不能直接向RCFILE表中导入数据,需要从其它表向RCFILE表插入数据。 INSERT OVERWRITE TABLE olympic_rcfile SELECT * FROM olympic; -- 查询表 select athelete from olympic_rcfile;

        ORCFILE
        ORC指的是Optimized Row Columnar,就是说相对其它文件格式,它以更优化的方式存储数据。ORC能将原始数据的大小缩减75%,从而提升了数据处理的速度。OCR比Text、Sequence和RC文件格式有更好的性能。而且ORC是目前Hive中唯1支持事务的文件格式。
        ORCFILE格式的输入输出包是:
org.apache.hadoop.hive.ql.io.orc
        示例:
-- 建立ORCFILE格式的表 create table olympic_orcfile(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT) row format delimited fields terminated by '\t' stored as orcfile; -- 向表中导入数据 -- 不能直接向ORCFILE表中导入数据,需要从其它表向ORCFILE表插入数据。 INSERT OVERWRITE TABLE olympic_orcfile SELECT * FROM olympic; -- 查询表 select athelete from olympic_orcfile;

        应当根据数据需求选择适当的文件格式,例如,
        a)如果数据有参数化的分隔符,那末可以选择TEXTFILE格式。
        b)如果数据所在文件比块尺寸小,可以选择SEQUENCEFILE格式。
        c)如果想履行数据分析,并高效地存储数据,可以选择RCFILE格式。
        d)如果希望减小数据所需的存储空间并提升性能,可以选额ORCFILE格式。

        对多维数据仓库来讲,需要处理SCD,必定要用到行级更新,所以所有TDS(转换后的数据存储)里的表,除日期维度表外,其它表都是用ORCFILE格式。日期维度表数据1旦生成绩不会修改,所以使用TEXTFILE格式。RDS(原始数据存储)里的表使用缺省的TEXTFILE格式。


(2)支持行级更新
        在1个典型的星型模式数据仓库中,维度表随时间的变化很缓慢。例如,1个零售商开了1家新商店,需要将新店数据加到商店表,或1个已有商店的营业面积或其它需要跟踪的特性改变了。这些改变会致使插入或修改个别记录。Hive从0.14版本开始支持事务和行级更新,但缺省是不支持的,需要1些附加的配置。

        a)配置Hive支持事务
        CDH 5.7.0包括的Hive版本是1.1.0,可以支持事务及行级更新,但此版本的中文支持问题较多。编辑hive-site.xml配置文件,添加支持事务的属性。
vi /etc/hive/conf.cloudera.hive/hive-site.xml
<!-- 添加以下配置项以支持事务 --> <property> <name>hive.support.concurrency</name> <value>true</value> </property> <property> <name>hive.exec.dynamic.partition.mode</name> <value>nonstrict</value> </property> <property> <name>hive.txn.manager</name> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value> </property> <property> <name>hive.compactor.initiator.on</name> <value>true</value> </property> <property> <name>hive.compactor.worker.threads</name> <value>1</value> </property>

        b)添加Hive元数据
[root@cdh2~]#mysql -u root -p hive
INSERT INTO NEXT_LOCK_ID VALUES(1); INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1); INSERT INTO NEXT_TXN_ID VALUES(1); COMMIT;
        说明:如果这3个表没有数据,履行行级更新时会报以下毛病:org.apache.hadoop.hive.ql.lockmgr.DbTxnManager FAILED: Error in acquiring locks: Error communicating with the metastore

        c)测试
        重启Hive,然后履行下面的HiveQL语句
[root@cdh2~]#beeline -u jdbc:hive2://
use test; -- 建立测试表 create table t1(id int, name string) clustered by (id) into 8 buckets stored as orc TBLPROPERTIES ('transactional'='true');
        说明:
  • 必须存储为ORC格式
  • 建表语句必须带有into buckets子句和stored as orc TBLPROPERTIES ('transactional'='true')子句,并且不能带有sorted by子句。
-- 测试insert insert into t1 values (1,'aaa'); insert into t1 values (2,'bbb'); select* from t1;
        查询结果以下图所示。
-- 测试update update t1 set name='ccc' where id=1; select* from t1;
        查询结果以下图所示。
-- 测试delete delete from t1 where id=2; select* from t1;
        查询结果以下图所示。
-- 对已有非ORC表的转换
-- 在本地文件/root/a.txt中写入以下4行数据
1,a,US,CA 2,b,US,CB 3,c,CA,BB 4,d,CA,BC

-- 建立非分区表并加载数据 use test; CREATE TABLE t1 (id INT, name STRING, cty STRING, st STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; LOAD DATA LOCAL INPATH '/root/a.txt' INTO TABLE t1; SELECT * FROM t1; -- 建立外部份区事务表并加载数据 CREATE EXTERNAL TABLE t2 (id INT, name STRING) PARTITIONED BY (country STRING, state STRING) CLUSTERED BY (id) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'); INSERT INTO T2 PARTITION (country, state) SELECT * FROM T1; SELECT * FROM t2;
        查询结果以下图所示。

-- 修改数据 INSERT INTO TABLE t2 PARTITION (country, state) VALUES (5,'e','DD','DD'); UPDATE t2 SET name='f' WHERE id=1; DELETE FROM t2 WHERE name='b'; SELECT * FROM t2;
        查询结果以下图所示。

        说明:
  • 不能修改bucket列的值,否则会报以下毛病:FAILED: SemanticException [Error 10302]: Updating values of bucketing columns is not supported.  Column id.
  • 对已有非ORC表的转换,只能通过新建ORC表再向新表迁移数据的方式,直接修改原表的文件格式属性是不行的(有兴趣的可以试试,我是踩到过坑了)。
3. 建立数据库
        在本示例中,源数据库表就是前面提到的操作型系统的摹拟。在CDH1上的MySQL中建立源数据库表。RDS存储原始数据,作为源数据到数据仓库的过渡,在CDH2上的Hive中建RDS库表。TDS即为转化后的多维数据仓库,在CDH2上的Hive中建TDS库表。

(1)建立源数据数据库
        建立源数据数据库表的SQL脚本以下:
-- 建立源数据库 DROP DATABASE IF EXISTS source; CREATE DATABASE source; -- 建立源库表 USE source; -- 建立客户表 CREATE TABLE customer ( customer_number INT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '客户编号,主键', customer_name VARCHAR(50) comment '客户名称', customer_street_address VARCHAR(50) comment '客户住址', customer_zip_code INT comment '邮编', customer_city VARCHAR(30) comment '所在城市', customer_state VARCHAR(2) comment '所在省分' ); -- 建立产品表 CREATE TABLE product ( product_code INT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '产品编码,主键', product_name VARCHAR(30) comment '产品名称', product_category VARCHAR(30) comment '产品类型' ); -- 建立销售定单表 CREATE TABLE sales_order ( order_number INT NOT NULL AUTO_INCREMENT PRIMARY KEY comment '定单号,主键', customer_number INT comment '客户编号', product_code INT comment '产品编码', order_date DATE comment '定单日期', entry_date DATE comment '登记日期', order_amount DECIMAL(10 , 2 ) comment '销售金额', foreign key (customer_number) references customer (customer_number) on delete cascade on update cascade, foreign key (product_code) references product (product_code) on delete cascade on update cascade );
(2)生成源库测试数据
        生成源库测试数据的SQL脚本以下:
USE source; -- 生成客户表测试数据 INSERT INTO customer (customer_name, customer_street_address, customer_zip_code, customer_city, customer_state) VALUES ('Really Large Customers', '7500 Louise Dr.',17050, 'Mechanicsburg','PA'), ('Small Stores', '2500 Woodland St.',17055, 'Pittsburgh','PA'), ('Medium Retailers','1111 Ritter Rd.',17055,'Pittsburgh','PA'), ('Good Companies','9500 Scott St.',17050,'Mechanicsburg','PA'), ('Wonderful Shops','3333 Rossmoyne Rd.',17050,'Mechanicsburg','PA'), ('Loyal Clients','7070 Ritter Rd.',17055,'Pittsburgh','PA'), ('Distinguished Partners','9999 Scott St.',17050,'Mechanicsburg','PA'); -- 生成产品表测试数据 INSERT INTO product (product_name, product_category ) VALUES ('Hard Disk Drive', 'Storage'), ('Floppy Drive', 'Storage'), ('LCD Panel', 'Monitor'); -- 生成销售定单表测试数据 INSERT INTO sales_order VALUES (1, 1, 1, '2013-02-01', '2013-02-01', 1000) , (2, 2, 2, '2013-02⑴0', '2013-02⑴0', 1000) , (3, 3, 3, '2013-03-01', '2013-03-01', 4000) , (4, 4, 1, '2013-04⑴5', '2013-04⑴5', 4000) , (5, 5, 2, '2013-05⑵0', '2013-05⑵0', 6000) , (6, 6, 3, '2013-07⑶0', '2013-07⑶0', 6000) , (7, 7, 1, '2013-09-01', '2013-09-01', 8000) , (8, 1, 2, '2013⑴1⑴0', '2013⑴1⑴0', 8000) , (9, 2, 3, '2014-01-05', '2014-01-05', 1000) , (10, 3, 1, '2014-02⑴0', '2014-02⑴0', 1000) , (11, 4, 2, '2014-03⑴5', '2014-03⑴5', 2000) , (12, 5, 3, '2014-04⑵0', '2014-04⑵0', 2500) , (13, 6, 1, '2014-05⑶0', '2014-05⑶0', 3000) , (14, 7, 2, '2014-06-01', '2014-06-01', 3500) , (15, 1, 3, '2014-07⑴5', '2014-07⑴5', 4000) , (16, 2, 1, '2014-08⑶0', '2014-08⑶0', 4500) , (17, 3, 2, '2014-09-05', '2014-09-05', 1000) , (18, 4, 3, '2014⑴0-05', '2014⑴0-05', 1000) , (19, 5, 1, '2015-01⑴0', '2015-01⑴0', 4000) , (20, 6, 2, '2015-02⑵0', '2015-02⑵0', 4000) , (21, 7, 3, '2015-02⑵8', '2015-02⑵8', 4000); COMMIT;
(3)建立RDS库表
        建立RDS库表的HiveQL脚本以下:
-- 建立RDS数据库 DROP DATABASE IF EXISTS rds CASCADE; CREATE DATABASE rds; -- 建立RDS库表 USE rds; -- 建立客户过渡表 CREATE TABLE customer ( customer_number INT comment 'number', customer_name VARCHAR(30) comment 'name', customer_street_address VARCHAR(30) comment 'address', customer_zip_code INT comment 'zipcode', customer_city VARCHAR(30) comment 'city', customer_state VARCHAR(2) comment 'state' ); -- 建立产品过渡表 CREATE TABLE product ( product_code INT comment 'code', product_name VARCHAR(30) comment 'name', product_category VARCHAR(30) comment 'category' ); -- 建立销售定单过渡表 CREATE TABLE sales_order ( order_number INT comment 'order number', customer_number INT comment 'customer number', product_code INT comment 'product code', order_date DATE comment 'order date', entry_date DATE comment 'entry date', order_amount DECIMAL(10 , 2 ) comment 'order amount' );
(4)建立TDS库表
        建立TDS库表的HiveQL脚本以下:
-- 建立数据仓库数据库 DROP DATABASE IF EXISTS dw CASCADE; CREATE DATABASE dw; -- 建立数据仓库表 USE dw; -- 建立客户维度表 CREATE TABLE customer_dim ( customer_sk INT comment 'surrogate key', customer_number INT comment 'number', customer_name VARCHAR(50) comment 'name', customer_street_address VARCHAR(50) comment 'address', customer_zip_code INT comment 'zipcode', customer_city VARCHAR(30) comment 'city', customer_state VARCHAR(2) comment 'state', version INT comment 'version', effective_date DATE comment 'effective date', expiry_date DATE comment 'expiry date' ) CLUSTERED BY (customer_sk) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'); -- 建立产品维度表 CREATE TABLE product_dim ( product_sk INT comment 'surrogate key', product_code INT comment 'code', product_name VARCHAR(30) comment 'name', product_category VARCHAR(30) comment 'category', version INT comment 'version', effective_date DATE comment 'effective date', expiry_date DATE comment 'expiry date' ) CLUSTERED BY (product_sk) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'); -- 建立定单维度表 CREATE TABLE order_dim ( order_sk INT comment 'surrogate key', order_number INT comment 'number', version INT comment 'version', effective_date DATE comment 'effective date', expiry_date DATE comment 'expiry date' ) CLUSTERED BY (order_sk) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'); -- 建立销售定单事实表 CREATE TABLE sales_order_fact ( order_sk INT comment 'order surrogate key', customer_sk INT comment 'customer surrogate key', product_sk INT comment 'product surrogate key', order_date_sk INT comment 'date surrogate key', order_amount DECIMAL(10 , 2 ) comment 'order amount' ) CLUSTERED BY (order_sk) INTO 8 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true');
(5)建立日期维度表并生成数据
        使用下面的shell命令建立日期维度表并生成数据:
./date_dim_generate.sh 2000-01-01 2020⑴2⑶1
        date_dim_generate.sh shell脚本文件内容以下:
#!/bin/bash date1="$1" date2="$2" tempdate=`date -d "$date1" +%F` tempdateSec=`date -d "$date1" +%s` enddateSec=`date -d "$date2" +%s` min=1 max=`expr \( $enddateSec - $tempdateSec \) / \( 24 \* 60 \* 60 \) + 1` cat /dev/null > ./date_dim.csv while [ $min -le $max ] do month=`date -d "$tempdate" +%m` month_name=`date -d "$tempdate" +%B` quarter=`echo $month | awk '{print int(($0⑴)/3)+1}'` year=`date -d "$tempdate" +%Y` echo ${min}","${tempdate}","${month}","${month_name}","${quarter}","${year} >> ./date_dim.csv tempdate=`date -d "+$min day $date1" +%F` tempdateSec=`date -d "+$min day $date1" +%s` min=`expr $min + 1` done beeline -u jdbc:hive2://cdh2:10000/dw -f create_table_date_dim.sql --silent hdfs dfs -put -f date_dim.csv /user/hive/warehouse/dw.db/date_dim/
        create_table_date_dim.sql SQL脚本内容以下:
drop table if exists date_dim; create table date_dim ( date_sk int comment 'surrogate key', date date comment 'date,yyyy-mm-dd', month tinyint comment 'month', month_name varchar(9) comment 'month name', quarter tinyint comment 'quarter', year smallint comment 'year' ) comment 'date dimension table' row format delimited fields terminated by ',' stored as textfile;
        说明:
        a)HiveQL脚本中的列注释没有使用中文,这是由于Hive 1.1.0中,中文注释会在show create table命令中显示乱码,要解决这个问题需要重新编译Hive的源码,简单起见,这里都是用了英文列注释。关于1.1.0中的这个bug,可参考https://issues.apache.org/jira/browse/HIVE⑴1837。示例数据中没有中文,也是出于类似的缘由。
        b)维度表虽然使用了代理键,但不能将它设置为主键,在数据库级也不能确保其唯1性。Hive中并没有主外键、唯1非空束缚这些关系数据库的概念。
        c)sales_order.entry_date表示定单登记的日期,1般情况下应当同等于定单日期(sales_order.order_date),但有时二者是不同的,等实验进行到“迟到的事实”时会详细说明。
        d)关于日期维度数据装载
        日期维度在数据仓库中是1个特殊角色。日期维度包括时间概念,而时间是最重要的,由于数据仓库的主要功能之1就是存储历史数据,所以每一个数据仓库里的数据都有1个时间特点。装载日期数据有3个经常使用方法:

  • 预装载
  • 逐日装载1天
  • 从源数据装载日期
        在3种方法中,预装载最容易,也是本实验所采取的方法。使用预装载插入1个时间段里的所有日期。比如,本示例预装载21年的日期维度数据,从2000年1月1日到2020年12月31日。使用这个方法,在数据仓库生命周期中,只需要预装载日期维度1次。预装载的缺点是:
  • 提早消耗磁盘空间
  • 可能不需要所有的日期(稀疏使用)
------分隔线----------------------------
------分隔线----------------------------

最新技术推荐