目前Hadoop是大家用的最普遍的大数据处理平台,但Hadoop本身是用JAVA开发的,作为PHP开发人员想要掺和大数据咋整呢?还好,Hadoop提供了大量的工具给其他语言的开发者。Hadoop Streaming就是Hadoop提供给其他语言开发者开发Mapreduce程序的工具。下边我们来体验一下PHP开发、测试和运行Mapreduce任务。
一、准备
- linux 操作系统
- PHP
- 搭建Hadoop环境(超出本文范围,请参考手册)
二、原理
Hadoop Streaming是Hadoop工具集中的一个工具,说白了就是一个jar包,它负责从标准输出读取数据,并将中间结果输出到标准输出,同时负责创建、提交和监控mapreduce任务。那么这就为我们提供了一个思路,我们只需要和从标准输出读取和输出数据一样写代码即可。
上述jar包一般在Hadoop的安装目录下,文件位置和文件名根据版本号不同而不同,比如2.6.2版如下:
$HAOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.2.jar
可以在Hadoop的安装目录下搜索文件名即可:
find -name '*streaming*'
Hadoop Streaming基本用法:
hadoop jar hadoop-streaming-2.6.2.jar \ -input myInputDirs \ -output myOutputDir \ -mapper /bin/cat \ -reducer /usr/bin/wc
此命令不赘述,后边实际用到我们再解释,如果想学习具体细节请查看参考手册
三、Mapreduce简要介绍
Mapreduce是Hadoop处理数据的框架。Mapreduce主要包括两个过程,一个是Map过程,在集群内的机器中分节点处理数据;一个是Reduce过程,将Map过程产生的结果进行处理。见下图(图片来源)
根据上述理解,我们就是要开发map程序和reduce程序,下边我们用PHP实现。
四、任务简述
接下来,我们实现如下功能:
使用PHP开发语言,利用Mapreduce处理用户数据,统计每个城市的注册用户数。
用户数据如下(users.txt):
1,张三,23,beijing,10086, 2,李四,34,shanghai,10000, 3,王五,20,beijing,10010, ……
基本结构为“ID,name, age, city, telphone,”,每条数据一行。
五、Mapper实现
mapper.php
#!/usr/bin/php <?php $count = 0; while($line = fgets(STDIN)) { $line = trim($line); $user = explode(',', $line); echo $user[3]." 1\n"; }
这个脚本没啥特别的,其实就是CLI模式的PHP脚本,大家可能注意到了第一行不是<?php
而是#!/usr/bin/php
。这个需要稍微说明一下,这个是所谓的shebang,通常我们执行PHP脚本,可能这么执行php mapper.php
,如果加了第一行,我们只需要这样执行即可:./mapper.sh
(是不是很眼熟!)。如果你不知道PHP安装在哪里,可以通过which php
来找到它的路径。
此外因为Hadoop认为脚本都是可执行的程序,那么我们需要给这个mapper.php赋予可执行权限:
chmod +x mapper.php
上述程序很简单,只是从标准输出(STDIN)读取数据,然后取出城市信息,并标记一个1,每行一个输出出来。
六、Reducer实现
#!/usr/bin/php <?php $result = array(); while($line = fgets(STDIN)) { list($city, $count) = explode(' ', $line); if(!isset($result[$city])) $result[$city] = 0; $result[$city] += $count; } foreach($result as $key=>$value){ echo "$key $value\n"; }
这段程序也比较简单,就是从标准输出(STDIN)按行读取数据,然后统计各城市的人数而已。同样我们需要shebang,并给reducer.php赋予可执行权限。
七、调试代码
目前我们得到了三个原材料文件:
- user.txt 存储用户数据信息,每行代表一条用户信息(可填充少量数据进行测试使用)。
- mapper.php 可执行的mapper脚本,具有shabang,并已赋予可执行权限。
- reducer.php 可执行的reducer脚本,具有shabang,并已赋予可执行权限。
调试mapper程序:
$ cat user.txt|./mapper.php shandong 1 beijing 1 beijing 1
将少量数据放入user.txt中,使用Linux命令行工具进行调试。
调试reducer程序:
$ cat user.txt | ./mapper.php | ./reducer.php beijing 2 shandong 1
实际上,reducer负责处理多个mapper产生的结果,此处我们仅模拟一个mapper输出然后用reducer处理即可,就能够完成任务目标,通过测试。
八、实际运行
8.1 启动Hadoop伪分布式集群
$HADOOP_HOME/sbin/start-dfs.sh $HADOOP_HOME/sbin/start-yar.sh
运行完上述两条命令,使用jps命令
验证一下,如果有DataNode、 NodeManager、NameNode、ResourceManager、SecondaryNameNode说明启动成功。
8.2 创建目录并将文件存储到HDFS中
$HADOOP_HOME/bin/hdfs dfs -mkdir /user $HADOOP_HOME/bin/hdfs dfs -mkdi /user/<username> $HADOOP_HOME/bin/hdfs dfs -mkdi /user/<username>/input $HADOOP_HOME/bin/hdfs dfs -put user.txt /user/<username>/input
此处一定要注意<username>就是你当前登陆Linux系统的用用户名,比如我的是’hadoop’。最后一条命令一定要在user.txt所在目录执行,你懂的。
8.3 执行MAPpreduce命令
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.2.jar \ -input input/user.txt \ -output output2 \ -mapper /home/hadoop/Documents/src/php/mapper.php \ -reducer /home/hadoop/Documents/src/php/reducer.php
Hadoop可以运行jar文件,用java写代码就是用了这个原理,我们没辙只能通过streaming来搞,所以传入的是文章开头提到的那个hadoop-streaming文件。主要参数:
- input 输入的文件路径,这里是指HDFS中的路径。
- output Mapreduce运行结果输出目录,也是指HDFS中的目录,这个目录必须是不存在,Hadoop会自动重建,否则报路径已存在错误。
- mapper 我们的Mapper脚本,即mapper.php, 但必须写绝对路径,这里指本地文件路径,如果写相对路径或者脚本执行错误,会报配置文件配置错误。
- reducer 我们的Reducer脚本,即reducer.php, 同样需要绝对路径。
执行结束输出:
packageJobJar: [/tmp/hadoop-unjar2712470127417447142/] [] /tmp/streamjob561393005597605928.jar tmpDir=null 15/11/28 00:04:34 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 15/11/28 00:04:34 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 15/11/28 00:04:35 INFO mapred.FileInputFormat: Total input paths to process : 1 15/11/28 00:04:35 INFO mapreduce.JobSubmitter: number of splits:2 15/11/28 00:04:36 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448639746810_0003 15/11/28 00:04:36 INFO impl.YarnClientImpl: Submitted application application_1448639746810_0003 15/11/28 00:04:36 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1448639746810_0003/ 15/11/28 00:04:36 INFO mapreduce.Job: Running job: job_1448639746810_0003 15/11/28 00:04:42 INFO mapreduce.Job: Job job_1448639746810_0003 running in uber mode : false 15/11/28 00:04:42 INFO mapreduce.Job: map 0% reduce 0% 15/11/28 00:04:51 INFO mapreduce.Job: map 100% reduce 0% 15/11/28 00:04:56 INFO mapreduce.Job: map 100% reduce 100% 15/11/28 00:04:57 INFO mapreduce.Job: Job job_1448639746810_0003 completed successfully 15/11/28 00:04:57 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=46 FILE: Number of bytes written=324442 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=338 HDFS: Number of bytes written=23 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=12457 Total time spent by all reduces in occupied slots (ms)=3178 Total time spent by all map tasks (ms)=12457 Total time spent by all reduce tasks (ms)=3178 Total vcore-seconds taken by all map tasks=12457 Total vcore-seconds taken by all reduce tasks=3178 Total megabyte-seconds taken by all map tasks=12755968 Total megabyte-seconds taken by all reduce tasks=3254272 Map-Reduce Framework Map input records=3 Map output records=3 Map output bytes=34 Map output materialized bytes=52 Input split bytes=200 Combine input records=0 Combine output records=0 Reduce input groups=2 Reduce shuffle bytes=52 Reduce input records=3 Reduce output records=2 Spilled Records=6 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=668 CPU time spent (ms)=2320 Physical memory (bytes) snapshot=674447360 Virtual memory (bytes) snapshot=6298329088 Total committed heap usage (bytes)=491257856 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=138 File Output Format Counters Bytes Written=23 15/11/28 00:04:57 INFO streaming.StreamJob: Output directory: output2
8.4 查看结果
$ $HADOOP_HOME/bin/hdfs dfs -cat output2/* beijing 2 shandong 1
由于运算结果是保存在HDFS中的,所以我们需要使用hdfs的cat命令去查看。
九、趟过的坑
如下遇到的问题,均为复现而得到。
9.1 文件夹已存在
packageJobJar: [/tmp/hadoop-unjar986578381948307717/] [] /tmp/streamjob9098881249462859146.jar tmpDir=null 15/11/28 23:46:00 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 15/11/28 23:46:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 15/11/28 23:46:01 ERROR streaming.StreamJob: Error Launching job : Output directory hdfs://localhost:9000/user/hadoop/output2 already exists Streaming Command Failed!
从错误的内容可以看出是目录已经存在,解决办法就是删除它:
$HADOOP_HOME/bin/hdfs dfs -rm -r -f output2
9.2 等待输出错误
packageJobJar: [/tmp/hadoop-unjar7696480168906810677/] [] /tmp/streamjob226646231265106849.jar tmpDir=null 15/11/28 23:52:02 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 15/11/28 23:52:02 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 15/11/28 23:52:03 INFO mapred.FileInputFormat: Total input paths to process : 1 15/11/28 23:52:03 INFO mapreduce.JobSubmitter: number of splits:2 15/11/28 23:52:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448639746810_0004 15/11/28 23:52:04 INFO impl.YarnClientImpl: Submitted application application_1448639746810_0004 15/11/28 23:52:04 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1448639746810_0004/ 15/11/28 23:52:04 INFO mapreduce.Job: Running job: job_1448639746810_0004 15/11/28 23:52:12 INFO mapreduce.Job: Job job_1448639746810_0004 running in uber mode : false 15/11/28 23:52:12 INFO mapreduce.Job: map 0% reduce 0% 15/11/28 23:52:28 INFO mapreduce.Job: map 100% reduce 0% 15/11/28 23:52:31 INFO mapreduce.Job: Task Id : attempt_1448639746810_0004_m_000001_0, Status : FAILED Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 15/11/28 23:52:31 INFO mapreduce.Job: Task Id : attempt_1448639746810_0004_m_000000_0, Status : FAILED Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2 at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322) at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535) at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61) at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
这个错误是因为Hadoop无法执行你的程序,原因是你的脚本中没有添加shabang。
9.3 配置错误
packageJobJar: [/tmp/hadoop-unjar5507801207525106156/] [] /tmp/streamjob5584049820660820845.jar tmpDir=null 15/11/28 23:55:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 15/11/28 23:55:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032 15/11/28 23:55:28 INFO mapred.FileInputFormat: Total input paths to process : 1 15/11/28 23:55:28 INFO mapreduce.JobSubmitter: number of splits:2 15/11/28 23:55:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448639746810_0005 15/11/28 23:55:29 INFO impl.YarnClientImpl: Submitted application application_1448639746810_0005 15/11/28 23:55:30 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1448639746810_0005/ 15/11/28 23:55:30 INFO mapreduce.Job: Running job: job_1448639746810_0005 15/11/28 23:55:37 INFO mapreduce.Job: Job job_1448639746810_0005 running in uber mode : false 15/11/28 23:55:37 INFO mapreduce.Job: map 0% reduce 0% 15/11/28 23:55:45 INFO mapreduce.Job: Task Id : attempt_1448639746810_0005_m_000001_0, Status : FAILED Error: java.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:446) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) ... 9 more Caused by: java.lang.RuntimeException: Error in configuring object at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:38) ... 14 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106) ... 17 more Caused by: java.lang.RuntimeException: configuration exception at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:222) at org.apache.hadoop.streaming.PipeMapper.configure(PipeMapper.java:66) ... 22 more Caused by: java.io.IOException: Cannot run program "/home/hadoop/Documents/src/php/mapper.php": error=13, Permission denied at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048) at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209) ... 23 more Caused by: java.io.IOException: error=13, Permission denied at java.lang.UNIXProcess.forkAndExec(Native Method) at java.lang.UNIXProcess.(UNIXProcess.java:248) at java.lang.ProcessImpl.start(ProcessImpl.java:134) at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) ... 24 more
这个错误比较隐晦,只提醒配置错误,起初以为是配置文件找不到,经过“苦心钻研”才发现,这里的配置就是指我们的input/output/mapper/reducer这几个参数的值,触发这个错误原因有很多,包括但不限于如下几种情况:
- mapper或者reducer程序有错误
- mapper或者reducer程序路径找不到,所以要写绝对路径
- mapper或者reducer程序没有赋予可执行权限,`chmod +x mapper.php`
十、总结
经过上述艰难的若干步,我们就完成了第一个Mapreduce程序的PHP版本,其实PHP在处理文本上具有先天性的优势,比如处理json或者XML等,感觉上PHP还是很适合开发Mapreduce程序的。
上述例子,我们完成了根据城市统计注册用户数,这个例子如果用SQL表示大约是这样:
SELCT city, count(*) _cnt FROM user GROUP BY city
本文仅用于展示使用PHP如何开发、测试和运行Mapreduce任务,关于上述程序仅用于参考。
Be First to Comment