本帖最后由 nettman 于 2014-5-18 20:17 编辑
继续接着上面的内容:
Hadoop 包括一个检查 HDFS 状态的浏览器界面。 图 7 显示了单词计数作业的输出。 图 7. 使用浏览器查看 HDFS
Cloudera 网站免费提供了一个更复杂的控制台。这个控制台提供了大量超出标准 Hadoop Web 界面的功能。请注意,图 8 所示的 HDFS 健康状态为 Bad。 图 8. 由 Cloudera Manager 管理的 Hadoop 服务
为什么是 Bad(不好)?因为在单个虚拟机中,HDFS 无法制作数据块的三个副本。当块不足以复制时,就会存在数据丢失的风险,因此系统的健康状态是不好的。您没有尝试在单个节点上运行生产 Hadoop 作业,这是好事。
您的 MapReduce 作业并不会受限于 Java。最后这个 MapReduce 示例使用 Hadoop Streaming 支持用 Python 编写的一个映射程序和用 AWK 编写的缩减程序。不,您不必是一个 Java 大师也可以编写 map-reduce!
Mark Twain 并不是 Cooper 的铁杆粉丝。在这个用例中,Hadoop 将提供比较 Twain 和 Cooper 的一些简单的文学评论。Flesch-Kincaid 测试对特定文本的阅读级别进行计算。此分析的因素之一是句子的平均长度。解析句子原来比只是查找句号字符要复杂得多。openNLP 包和 Python NLTK 包有出色的句子分析程序。为了简单起见,清单 8 中的示例将使用字长替代一个单词中的音节数。如果您想将这项工作带到一个新的水平,在 MapReduce 中实施 Flesch-Kincaid 测试,抓取 Web,并计算出您最喜爱的新闻站点的阅读级别。
清单 8. 基于 Python 的映射程序文学评论
- # here is the mapper we'll connect to the streaming hadoop interface
-
- # the mapper is reading the text in the file - not really appreciating Twain's humor
- #
-
- # modified from
- # http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
- $ cat mapper.py
- #!/usr/bin/env python
- import sys
-
- # read stdin
- for linein in sys.stdin:
- # strip blanks
- linein = linein.strip()
- # split into words
- mywords = linein.split()
- # loop on mywords, output the length of each word
- for word in mywords:
- # the reducer just cares about the first column,
- # normally there is a key - value pair
- print '%s %s' % (len(word), 0)
复制代码
针对单词 “Twain” 的映射程序输出将是 5 0。字长按数值顺序进行排序,并按排序顺序提交给缩减程序。在清单 9 和清单 10 中的示例中,不需要对数据进行排序,就可以得到正确的输出,但排序是内置在 MapReduce 基础架构中的,无论如何都会发生。
清单 9. 用于文学评论的 AWK 缩减程序
- # the awk code is modified from [url]http://www.commandlinefu.com[/url]
-
- # awk is calculating
- # NR - the number of words in total
- # sum/NR - the average word length
- # sqrt(mean2/NR) - the standard deviation
-
- $ cat statsreducer.awk
- awk '{delta = $1 - avg; avg += delta / NR; \
- mean2 += delta * ($1 - avg); sum=$1+sum } \
- END { print NR, sum/NR, sqrt(mean2 / NR); }'
复制代码
清单 10. 使用 Hadoop Streaming 运行 Python 映射程序和 AWK 缩减程序
- # test locally
-
- # because we're using Hadoop Streaming, we can test the
- # mapper and reducer with simple pipes
-
- # the "sort" phase is a reminder the keys are sorted
- # before presentation to the reducer
- #in this example it doesn't matter what order the
- # word length values are presented for calculating the std deviation
-
- $ zcat ../DS.txt.gz | ./mapper.py | sort | ./statsreducer.awk
- 215107 4.56068 2.50734
-
- # now run in hadoop with streaming
-
- # CDH4
- hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
- -input HF.txt -output HFstats -file ./mapper.py -file \
- ./statsreducer.awk -mapper ./mapper.py -reducer ./statsreducer.awk
-
- # CDH3
- $ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \
- -input HF.txt -output HFstats -file ./mapper.py -file ./statsreducer.awk \
- -mapper ./mapper.py -reducer ./statsreducer.awk
-
- $ hls HFstats
- Found 3 items
- -rw-r--r-- 1 cloudera supergroup 0 2012-08-12 15:38 /user/cloudera/HFstats/_SUCCESS
- drwxr-xr-x - cloudera supergroup 0 2012-08-12 15:37 /user/cloudera/HFstats/_logs
- -rw-r--r-- 1 cloudera ... 24 2012-08-12 15:37 /user/cloudera/HFstats/part-00000
-
- $ hcat /user/cloudera/HFstats/part-00000
- 113365 4.11227 2.17086
-
- # now for cooper
-
- $ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \
- -input DS.txt.gz -output DSstats -file ./mapper.py -file ./statsreducer.awk \
- -mapper ./mapper.py -reducer ./statsreducer.awk
-
- $ hcat /user/cloudera/DSstats/part-00000
- 215107 4.56068 2.50734
复制代码
Mark Twain 的粉丝若知道 Hadoop 发现 Cooper 使用较长的单词,并且其标准偏差令人震惊,那么他们就可以愉快地放松了(幽默意图)。当然,要是假设较短的单词会更好。让我们继续,下一步是将 HDFS 中的数据写入 Informix 和 DB2。
使用 Sqoop 通过 JDBC 将来自 HDFS 的数据写入 Informix、DB2 或 MySQL
Sqoop Apache 项目是一个开源的基于 JDBC 的 Hadoop,用于数据库的数据移动实用程序。Sqoop 最初由在 Cloudera 的黑客马拉松 (hackathon) 创建,后来成为开源的工具。
将数据从 HDFS 移动到关系数据库是一种常见的用例。HDFS 和 map-reduce 在执行繁重工作方面是非常棒的。对于简单的查询或 Web 站点的后端存储,在关系存储区中缓存 map-reduce 输出是一个很好的设计模式。您可以避免重新运行 map-reduce 单词计数,只需将结果 Sqoop 到 Informix 和 DB2 中即可。您已经生成了关于 Twain 和 Cooper 的数据,现在,让我们把这些数据移动到一个数据库,如清单 11 所示。
清单 11. JDBC 驱动程序安装
- #Sqoop needs access to the JDBC driver for every
- # database that it will access
-
- # please copy the driver for each database you plan to use for these exercises
- # the MySQL database and driver are already installed in the virtual image
- # but you still need to copy the driver to the sqoop/lib directory
-
- #one time copy of jdbc driver to sqoop lib directory
- $ sudo cp Informix_JDBC_Driver/lib/ifxjdbc*.jar /usr/lib/sqoop/lib/
- $ sudo cp db2jdbc/db2jcc*.jar /usr/lib/sqoop/lib/
- $ sudo cp /usr/lib/hive/lib/mysql-connector-java-5.1.15-bin.jar /usr/lib/sqoop/lib/
复制代码
清单 12 至 15 所示的示例分别对应于每种数据库。请跳到您感兴趣的示例,包括 Informix、DB2 或 MySQL。对于掌握多种数据库语言的人,请享受执行每个示例的乐趣。如果这里没有包括您首选的数据库,让这些示例在其他地方工作也不会是一个巨大的挑战。
清单 12. Informix 用户:Sqoop 将单词计数的结果写入 Informix
- # create a target table to put the data
- # fire up dbaccess and use this sql
- # create table wordcount ( word char(36) primary key, n int);
-
- # now run the sqoop command
- # this is best put in a shell script to help avoid typos...
-
- $ sqoop export -D sqoop.export.records.per.statement=1 \
- --fields-terminated-by '\t' --driver com.informix.jdbc.IfxDriver \
- --connect \
- "jdbc:informix-sqli://myhost:54321/stores_demo:informixserver=i7;user=me;password=mypw" \
- --table wordcount --export-dir /user/cloudera/HF.out
复制代码
清单 13. Informix 用户:Sqoop 将单词计数的结果写入 Informix
复制代码
清单 14. DB2 用户:Sqoop 将单词计数的结果写入 DB2
- # here is the db2 syntax
- # create a destination table for db2
- #
- #db2 => connect to sample
- #
- # Database Connection Information
- #
- # Database server = DB2/LINUXX8664 10.1.0
- # SQL authorization ID = DB2INST1
- # Local database alias = SAMPLE
- #
- #db2 => create table wordcount ( word char(36) not null primary key , n int)
- #DB20000I The SQL command completed successfully.
- #
-
- sqoop export -D sqoop.export.records.per.statement=1 \
- --fields-terminated-by '\t' \
- --driver com.ibm.db2.jcc.DB2Driver \
- --connect "jdbc:db2://192.168.1.131:50001/sample" \
- --username db2inst1 --password db2inst1 \
- --table wordcount --export-dir /user/cloudera/HF.out
-
- 12/08/09 12:32:59 WARN tool.BaseSqoopTool: Setting your password on the
- command-line is insecure. Consider using -P instead.
- 12/08/09 12:32:59 INFO manager.SqlManager: Using default fetchSize of 1000
- 12/08/09 12:32:59 INFO tool.CodeGenTool: Beginning code generation
- 12/08/09 12:32:59 INFO manager.SqlManager: Executing SQL statement:
- SELECT t.* FROM wordcount AS t WHERE 1=0
- 12/08/09 12:32:59 INFO manager.SqlManager: Executing SQL statement:
- SELECT t.* FROM wordcount AS t WHERE 1=0
- 12/08/09 12:32:59 INFO orm.CompilationManager: HADOOP_HOME is /usr/lib/hadoop
- 12/08/09 12:32:59 INFO orm.CompilationManager: Found hadoop core jar
- at: /usr/lib/hadoop/hadoop-0.20.2-cdh3u4-core.jar
- 12/08/09 12:33:00 INFO orm.CompilationManager: Writing jar
- file: /tmp/sqoop-cloudera/compile/5532984df6e28e5a45884a21bab245ba/wordcount.jar
- 12/08/09 12:33:00 INFO mapreduce.ExportJobBase: Beginning export of wordcount
- 12/08/09 12:33:01 INFO manager.SqlManager: Executing SQL statement:
- SELECT t.* FROM wordcount AS t WHERE 1=0
- 12/08/09 12:33:02 INFO input.FileInputFormat: Total input paths to process : 1
- 12/08/09 12:33:02 INFO input.FileInputFormat: Total input paths to process : 1
- 12/08/09 12:33:02 INFO mapred.JobClient: Running job: job_201208091208_0002
- 12/08/09 12:33:03 INFO mapred.JobClient: map 0% reduce 0%
- 12/08/09 12:33:14 INFO mapred.JobClient: map 24% reduce 0%
- 12/08/09 12:33:17 INFO mapred.JobClient: map 44% reduce 0%
- 12/08/09 12:33:20 INFO mapred.JobClient: map 67% reduce 0%
- 12/08/09 12:33:23 INFO mapred.JobClient: map 86% reduce 0%
- 12/08/09 12:33:24 INFO mapred.JobClient: map 100% reduce 0%
- 12/08/09 12:33:25 INFO mapred.JobClient: Job complete: job_201208091208_0002
- 12/08/09 12:33:25 INFO mapred.JobClient: Counters: 16
- 12/08/09 12:33:25 INFO mapred.JobClient: Job Counters
- 12/08/09 12:33:25 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=21648
- 12/08/09 12:33:25 INFO mapred.JobClient: Total time spent by all
- reduces waiting after reserving slots (ms)=0
- 12/08/09 12:33:25 INFO mapred.JobClient: Total time spent by all
- maps waiting after reserving slots (ms)=0
- 12/08/09 12:33:25 INFO mapred.JobClient: Launched map tasks=1
- 12/08/09 12:33:25 INFO mapred.JobClient: Data-local map tasks=1
- 12/08/09 12:33:25 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
- 12/08/09 12:33:25 INFO mapred.JobClient: FileSystemCounters
- 12/08/09 12:33:25 INFO mapred.JobClient: HDFS_BYTES_READ=138350
- 12/08/09 12:33:25 INFO mapred.JobClient: FILE_BYTES_WRITTEN=69425
- 12/08/09 12:33:25 INFO mapred.JobClient: Map-Reduce Framework
- 12/08/09 12:33:25 INFO mapred.JobClient: Map input records=13838
- 12/08/09 12:33:25 INFO mapred.JobClient: Physical memory (bytes) snapshot=105148416
- 12/08/09 12:33:25 INFO mapred.JobClient: Spilled Records=0
- 12/08/09 12:33:25 INFO mapred.JobClient: CPU time spent (ms)=9250
- 12/08/09 12:33:25 INFO mapred.JobClient: Total committed heap usage (bytes)=42008576
- 12/08/09 12:33:25 INFO mapred.JobClient: Virtual memory (bytes) snapshot=596447232
- 12/08/09 12:33:25 INFO mapred.JobClient: Map output records=13838
- 12/08/09 12:33:25 INFO mapred.JobClient: SPLIT_RAW_BYTES=126
- 12/08/09 12:33:25 INFO mapreduce.ExportJobBase: Transferred 135.1074 KB
- in 24.4977 seconds (5.5151 KB/sec)
- 12/08/09 12:33:25 INFO mapreduce.ExportJobBase: Exported 13838 records.
-
- # check on the results...
- #
- #db2 => select count(*) from wordcount
- #
- #1
- #-----------
- # 13838
- #
- # 1 record(s) selected.
- #
- #
复制代码
清单 15. MySQL 用户:Sqoop 将单词计数的结果写入 MySQL
- # if you don't have Informix or DB2 you can still do this example
- # mysql - it is already installed in the VM, here is how to access
-
- # one time copy of the JDBC driver
-
- sudo cp /usr/lib/hive/lib/mysql-connector-java-5.1.15-bin.jar /usr/lib/sqoop/lib/
-
- # now create the database and table
-
- $ mysql -u root
- Welcome to the MySQL monitor. Commands end with ; or \g.
- Your MySQL connection id is 45
- Server version: 5.0.95 Source distribution
-
- Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved.
-
- Oracle is a registered trademark of Oracle Corporation and/or its
- affiliates. Other names may be trademarks of their respective
- owners.
-
- Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
-
- mysql> create database mydemo;
- Query OK, 1 row affected (0.00 sec)
-
- mysql> use mydemo
- Database changed
- mysql> create table wordcount ( word char(36) not null primary key, n int);
- Query OK, 0 rows affected (0.00 sec)
-
- mysql> exit
- Bye
-
- # now export
-
- $ sqoop export --connect jdbc:mysql://localhost/mydemo \
- --table wordcount --export-dir /user/cloudera/HF.out \
- --fields-terminated-by '\t' --username root
复制代码
使用 Sqoop 将数据从 Informix 和 DB2 导入到 HDFS
使用 Sqoop 也可以实现将数据插入 Hadoop HDFS。通过导入参数可以控制此双向功能。
这两种产品自带的样本数据库有一些您可以为此目的使用的简单数据集。清单 16 显示了 Sqoop 每台服务器的语法和结果。
对于 MySQL 用户,请调整以下 Informix 或 DB2 示例中的语法。
清单 16. Sqoop 从 Informix 样本数据库导入到 HDFS
- $ sqoop import --driver com.informix.jdbc.IfxDriver \
- --connect \
- "jdbc:informix-sqli://192.168.1.143:54321/stores_demo:informixserver=ifx117" \
- --table orders \
- --username informix --password useyours
-
- 12/08/09 14:39:18 WARN tool.BaseSqoopTool: Setting your password on the command-line
- is insecure. Consider using -P instead.
- 12/08/09 14:39:18 INFO manager.SqlManager: Using default fetchSize of 1000
- 12/08/09 14:39:18 INFO tool.CodeGenTool: Beginning code generation
- 12/08/09 14:39:19 INFO manager.SqlManager: Executing SQL statement:
- SELECT t.* FROM orders AS t WHERE 1=0
- 12/08/09 14:39:19 INFO manager.SqlManager: Executing SQL statement:
- SELECT t.* FROM orders AS t WHERE 1=0
- 12/08/09 14:39:19 INFO orm.CompilationManager: HADOOP_HOME is /usr/lib/hadoop
- 12/08/09 14:39:19 INFO orm.CompilationManager: Found hadoop core jar
- at: /usr/lib/hadoop/hadoop-0.20.2-cdh3u4-core.jar
- 12/08/09 14:39:21 INFO orm.CompilationManager: Writing jar
- file: /tmp/sqoop-cloudera/compile/0b59eec7007d3cff1fc0ae446ced3637/orders.jar
- 12/08/09 14:39:21 INFO mapreduce.ImportJobBase: Beginning import of orders
- 12/08/09 14:39:21 INFO manager.SqlManager: Executing SQL statement:
- SELECT t.* FROM orders AS t WHERE 1=0
- 12/08/09 14:39:22 INFO db.DataDrivenDBInputFormat: BoundingValsQuery:
- SELECT MIN(order_num), MAX(order_num) FROM orders
- 12/08/09 14:39:22 INFO mapred.JobClient: Running job: job_201208091208_0003
- 12/08/09 14:39:23 INFO mapred.JobClient: map 0% reduce 0%
- 12/08/09 14:39:31 INFO mapred.JobClient: map 25% reduce 0%
- 12/08/09 14:39:32 INFO mapred.JobClient: map 50% reduce 0%
- 12/08/09 14:39:36 INFO mapred.JobClient: map 100% reduce 0%
- 12/08/09 14:39:37 INFO mapred.JobClient: Job complete: job_201208091208_0003
- 12/08/09 14:39:37 INFO mapred.JobClient: Counters: 16
- 12/08/09 14:39:37 INFO mapred.JobClient: Job Counters
- 12/08/09 14:39:37 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=22529
- 12/08/09 14:39:37 INFO mapred.JobClient: Total time spent by all reduces
- waiting after reserving slots (ms)=0
- 12/08/09 14:39:37 INFO mapred.JobClient: Total time spent by all maps
- waiting after reserving slots (ms)=0
- 12/08/09 14:39:37 INFO mapred.JobClient: Launched map tasks=4
- 12/08/09 14:39:37 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0
- 12/08/09 14:39:37 INFO mapred.JobClient: FileSystemCounters
- 12/08/09 14:39:37 INFO mapred.JobClient: HDFS_BYTES_READ=457
- 12/08/09 14:39:37 INFO mapred.JobClient: FILE_BYTES_WRITTEN=278928
- 12/08/09 14:39:37 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=2368
- 12/08/09 14:39:37 INFO mapred.JobClient: Map-Reduce Framework
- 12/08/09 14:39:37 INFO mapred.JobClient: Map input records=23
- 12/08/09 14:39:37 INFO mapred.JobClient: Physical memory (bytes) snapshot=291364864
- 12/08/09 14:39:37 INFO mapred.JobClient: Spilled Records=0
- 12/08/09 14:39:37 INFO mapred.JobClient: CPU time spent (ms)=1610
- 12/08/09 14:39:37 INFO mapred.JobClient: Total committed heap usage (bytes)=168034304
- 12/08/09 14:39:37 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2074587136
- 12/08/09 14:39:37 INFO mapred.JobClient: Map output records=23
- 12/08/09 14:39:37 INFO mapred.JobClient: SPLIT_RAW_BYTES=457
- 12/08/09 14:39:37 INFO mapreduce.ImportJobBase: Transferred 2.3125 KB in 16.7045
- seconds (141.7585 bytes/sec)
- 12/08/09 14:39:37 INFO mapreduce.ImportJobBase: Retrieved 23 records.
-
- # now look at the results
-
- $ hls
- Found 4 items
- -rw-r--r-- 1 cloudera supergroup 459386 2012-08-08 19:34 /user/cloudera/DS.txt.gz
- drwxr-xr-x - cloudera supergroup 0 2012-08-08 19:38 /user/cloudera/HF.out
- -rw-r--r-- 1 cloudera supergroup 597587 2012-08-08 19:35 /user/cloudera/HF.txt
- drwxr-xr-x - cloudera supergroup 0 2012-08-09 14:39 /user/cloudera/orders
- $ hls orders
- Found 6 items
- -rw-r--r-- 1 cloudera supergroup 0 2012-08-09 14:39 /user/cloudera/orders/_SUCCESS
- drwxr-xr-x - cloudera supergroup 0 2012-08-09 14:39 /user/cloudera/orders/_logs
- -rw-r--r-- 1 cloudera ...roup 630 2012-08-09 14:39 /user/cloudera/orders/part-m-00000
- -rw-r--r-- 1 cloudera supergroup
- 564 2012-08-09 14:39 /user/cloudera/orders/part-m-00001
- -rw-r--r-- 1 cloudera supergroup
- 527 2012-08-09 14:39 /user/cloudera/orders/part-m-00002
- -rw-r--r-- 1 cloudera supergroup
- 647 2012-08-09 14:39 /user/cloudera/orders/part-m-00003
-
- # wow there are four files part-m-0000x
- # look inside one
-
- # some of the lines are edited to fit on the screen
- $ hcat /user/cloudera/orders/part-m-00002
- 1013,2008-06-22,104,express ,n,B77930 ,2008-07-10,60.80,12.20,2008-07-31
- 1014,2008-06-25,106,ring bell, ,n,8052 ,2008-07-03,40.60,12.30,2008-07-10
- 1015,2008-06-27,110, ,n,MA003 ,2008-07-16,20.60,6.30,2008-08-31
- 1016,2008-06-29,119, St. ,n,PC6782 ,2008-07-12,35.00,11.80,null
- 1017,2008-07-09,120,use ,n,DM354331 ,2008-07-13,60.00,18.00,null
复制代码
为什么有四个不同的文件,而且每个文件只包含一部分数据?Sqoop 是一个高度并行化的实用程序。如果一个运行 Sqoop 的具有 4000 个节点的集群从数据库全力执行导入操作,那么 4000 个数据库连接看起来非常像针对数据库的拒绝服务攻击。Sqoop 的默认连接限制是 4 个 JDBC 连接。每个连接在 HDFS 中生成一个数据文件。因此会有四个文件。不用担心,您会看到 Hadoop 如何毫无难度地跨这些文件进行工作。
下一步是导入一个 DB2 表。如清单 17 所示,通过指定 -m 1 选项,就可以导入没有主键的表,其结果也是一个单一文件。
未完待续,请看后面!!!
|