Hadoop Mapreduce之PHP实例

目前Hadoop是大家用的最普遍的大数据处理平台,但Hadoop本身是用JAVA开发的,作为PHP开发人员想要掺和大数据咋整呢?还好,Hadoop提供了大量的工具给其他语言的开发者。Hadoop Streaming就是Hadoop提供给其他语言开发者开发Mapreduce程序的工具。下边我们来体验一下PHP开发、测试和运行Mapreduce任务。

一、准备

二、原理

Hadoop Streaming是Hadoop工具集中的一个工具,说白了就是一个jar包,它负责从标准输出读取数据,并将中间结果输出到标准输出,同时负责创建、提交和监控mapreduce任务。那么这就为我们提供了一个思路,我们只需要和从标准输出读取和输出数据一样写代码即可。

上述jar包一般在Hadoop的安装目录下,文件位置和文件名根据版本号不同而不同,比如2.6.2版如下:

$HAOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.2.jar

可以在Hadoop的安装目录下搜索文件名即可:

find -name '*streaming*'

Hadoop Streaming基本用法:

hadoop jar hadoop-streaming-2.6.2.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper /bin/cat \
    -reducer /usr/bin/wc

此命令不赘述,后边实际用到我们再解释,如果想学习具体细节请查看参考手册

三、Mapreduce简要介绍

Mapreduce是Hadoop处理数据的框架。Mapreduce主要包括两个过程,一个是Map过程,在集群内的机器中分节点处理数据;一个是Reduce过程,将Map过程产生的结果进行处理。见下图(图片来源)

ssd1

根据上述理解,我们就是要开发map程序和reduce程序,下边我们用PHP实现。

四、任务简述

接下来,我们实现如下功能:

使用PHP开发语言,利用Mapreduce处理用户数据,统计每个城市的注册用户数。

用户数据如下(users.txt):

1,张三,23,beijing,10086,
2,李四,34,shanghai,10000,
3,王五,20,beijing,10010,
……

基本结构为“ID,name, age, city, telphone,”,每条数据一行。

五、Mapper实现

mapper.php

#!/usr/bin/php
<?php
$count = 0;
while($line = fgets(STDIN)) {
    $line = trim($line);
    $user = explode(',', $line);
    echo $user[3]." 1\n";
}

这个脚本没啥特别的,其实就是CLI模式的PHP脚本,大家可能注意到了第一行不是&lt;?php 而是#!/usr/bin/php。这个需要稍微说明一下,这个是所谓的shebang,通常我们执行PHP脚本,可能这么执行php mapper.php,如果加了第一行,我们只需要这样执行即可:./mapper.sh(是不是很眼熟!)。如果你不知道PHP安装在哪里,可以通过which php来找到它的路径。

此外因为Hadoop认为脚本都是可执行的程序,那么我们需要给这个mapper.php赋予可执行权限:

chmod +x mapper.php

上述程序很简单,只是从标准输出(STDIN)读取数据,然后取出城市信息,并标记一个1,每行一个输出出来。

六、Reducer实现

#!/usr/bin/php
<?php
$result = array();
while($line = fgets(STDIN)) {
    list($city, $count) = explode(' ', $line);
    if(!isset($result[$city])) $result[$city] = 0; 
    $result[$city] += $count;
}
foreach($result as $key=>$value){
    echo "$key $value\n";
}

这段程序也比较简单,就是从标准输出(STDIN)按行读取数据,然后统计各城市的人数而已。同样我们需要shebang,并给reducer.php赋予可执行权限。

七、调试代码

目前我们得到了三个原材料文件:

  • user.txt 存储用户数据信息,每行代表一条用户信息(可填充少量数据进行测试使用)。
  • mapper.php 可执行的mapper脚本,具有shabang,并已赋予可执行权限。
  • reducer.php 可执行的reducer脚本,具有shabang,并已赋予可执行权限。

调试mapper程序:

$ cat user.txt|./mapper.php
shandong 1
beijing 1
beijing 1

将少量数据放入user.txt中,使用Linux命令行工具进行调试。

调试reducer程序:

$ cat user.txt | ./mapper.php | ./reducer.php 
beijing 2
shandong 1

实际上,reducer负责处理多个mapper产生的结果,此处我们仅模拟一个mapper输出然后用reducer处理即可,就能够完成任务目标,通过测试。

八、实际运行

8.1 启动Hadoop伪分布式集群
$HADOOP_HOME/sbin/start-dfs.sh

$HADOOP_HOME/sbin/start-yar.sh

运行完上述两条命令,使用jps命令验证一下,如果有DataNode、 NodeManager、NameNode、ResourceManager、SecondaryNameNode说明启动成功。

8.2 创建目录并将文件存储到HDFS中
$HADOOP_HOME/bin/hdfs dfs -mkdir /user
$HADOOP_HOME/bin/hdfs dfs -mkdi /user/<username>
$HADOOP_HOME/bin/hdfs dfs -mkdi /user/<username>/input

$HADOOP_HOME/bin/hdfs dfs -put user.txt /user/<username>/input

此处一定要注意<username>就是你当前登陆Linux系统的用用户名,比如我的是’hadoop’。最后一条命令一定要在user.txt所在目录执行,你懂的。

8.3 执行MAPpreduce命令
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.2.jar \
-input input/user.txt \
-output output2 \
-mapper /home/hadoop/Documents/src/php/mapper.php \
-reducer /home/hadoop/Documents/src/php/reducer.php

Hadoop可以运行jar文件,用java写代码就是用了这个原理,我们没辙只能通过streaming来搞,所以传入的是文章开头提到的那个hadoop-streaming文件。主要参数:

  • input 输入的文件路径,这里是指HDFS中的路径。
  • output Mapreduce运行结果输出目录,也是指HDFS中的目录,这个目录必须是不存在,Hadoop会自动重建,否则报路径已存在错误。
  • mapper 我们的Mapper脚本,即mapper.php, 但必须写绝对路径,这里指本地文件路径,如果写相对路径或者脚本执行错误,会报配置文件配置错误。
  • reducer 我们的Reducer脚本,即reducer.php, 同样需要绝对路径。

执行结束输出:

packageJobJar: [/tmp/hadoop-unjar2712470127417447142/] [] /tmp/streamjob561393005597605928.jar tmpDir=null
15/11/28 00:04:34 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/11/28 00:04:34 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/11/28 00:04:35 INFO mapred.FileInputFormat: Total input paths to process : 1
15/11/28 00:04:35 INFO mapreduce.JobSubmitter: number of splits:2
15/11/28 00:04:36 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448639746810_0003
15/11/28 00:04:36 INFO impl.YarnClientImpl: Submitted application application_1448639746810_0003
15/11/28 00:04:36 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1448639746810_0003/
15/11/28 00:04:36 INFO mapreduce.Job: Running job: job_1448639746810_0003
15/11/28 00:04:42 INFO mapreduce.Job: Job job_1448639746810_0003 running in uber mode : false
15/11/28 00:04:42 INFO mapreduce.Job:  map 0% reduce 0%
15/11/28 00:04:51 INFO mapreduce.Job:  map 100% reduce 0%
15/11/28 00:04:56 INFO mapreduce.Job:  map 100% reduce 100%
15/11/28 00:04:57 INFO mapreduce.Job: Job job_1448639746810_0003 completed successfully
15/11/28 00:04:57 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=46
        FILE: Number of bytes written=324442
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=338
        HDFS: Number of bytes written=23
        HDFS: Number of read operations=9
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=2
        Launched reduce tasks=1
        Data-local map tasks=2
        Total time spent by all maps in occupied slots (ms)=12457
        Total time spent by all reduces in occupied slots (ms)=3178
        Total time spent by all map tasks (ms)=12457
        Total time spent by all reduce tasks (ms)=3178
        Total vcore-seconds taken by all map tasks=12457
        Total vcore-seconds taken by all reduce tasks=3178
        Total megabyte-seconds taken by all map tasks=12755968
        Total megabyte-seconds taken by all reduce tasks=3254272
    Map-Reduce Framework
        Map input records=3
        Map output records=3
        Map output bytes=34
        Map output materialized bytes=52
        Input split bytes=200
        Combine input records=0
        Combine output records=0
        Reduce input groups=2
        Reduce shuffle bytes=52
        Reduce input records=3
        Reduce output records=2
        Spilled Records=6
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=668
        CPU time spent (ms)=2320
        Physical memory (bytes) snapshot=674447360
        Virtual memory (bytes) snapshot=6298329088
        Total committed heap usage (bytes)=491257856
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=138
    File Output Format Counters 
        Bytes Written=23
15/11/28 00:04:57 INFO streaming.StreamJob: Output directory: output2
8.4 查看结果
$ $HADOOP_HOME/bin/hdfs dfs -cat output2/*
beijing 2    
shandong 1

由于运算结果是保存在HDFS中的,所以我们需要使用hdfs的cat命令去查看。

九、趟过的坑

如下遇到的问题,均为复现而得到。

9.1 文件夹已存在
packageJobJar: [/tmp/hadoop-unjar986578381948307717/] [] /tmp/streamjob9098881249462859146.jar tmpDir=null
15/11/28 23:46:00 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/11/28 23:46:01 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/11/28 23:46:01 ERROR streaming.StreamJob: Error Launching job : Output directory hdfs://localhost:9000/user/hadoop/output2 already exists
Streaming Command Failed!

从错误的内容可以看出是目录已经存在,解决办法就是删除它:

$HADOOP_HOME/bin/hdfs dfs -rm -r -f output2
9.2 等待输出错误
packageJobJar: [/tmp/hadoop-unjar7696480168906810677/] [] /tmp/streamjob226646231265106849.jar tmpDir=null
15/11/28 23:52:02 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/11/28 23:52:02 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/11/28 23:52:03 INFO mapred.FileInputFormat: Total input paths to process : 1
15/11/28 23:52:03 INFO mapreduce.JobSubmitter: number of splits:2
15/11/28 23:52:04 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448639746810_0004
15/11/28 23:52:04 INFO impl.YarnClientImpl: Submitted application application_1448639746810_0004
15/11/28 23:52:04 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1448639746810_0004/
15/11/28 23:52:04 INFO mapreduce.Job: Running job: job_1448639746810_0004
15/11/28 23:52:12 INFO mapreduce.Job: Job job_1448639746810_0004 running in uber mode : false
15/11/28 23:52:12 INFO mapreduce.Job:  map 0% reduce 0%
15/11/28 23:52:28 INFO mapreduce.Job:  map 100% reduce 0%
15/11/28 23:52:31 INFO mapreduce.Job: Task Id : attempt_1448639746810_0004_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

15/11/28 23:52:31 INFO mapreduce.Job: Task Id : attempt_1448639746810_0004_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 2
    at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

这个错误是因为Hadoop无法执行你的程序,原因是你的脚本中没有添加shabang。

9.3 配置错误
packageJobJar: [/tmp/hadoop-unjar5507801207525106156/] [] /tmp/streamjob5584049820660820845.jar tmpDir=null
15/11/28 23:55:27 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/11/28 23:55:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
15/11/28 23:55:28 INFO mapred.FileInputFormat: Total input paths to process : 1
15/11/28 23:55:28 INFO mapreduce.JobSubmitter: number of splits:2
15/11/28 23:55:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448639746810_0005
15/11/28 23:55:29 INFO impl.YarnClientImpl: Submitted application application_1448639746810_0005
15/11/28 23:55:30 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1448639746810_0005/
15/11/28 23:55:30 INFO mapreduce.Job: Running job: job_1448639746810_0005
15/11/28 23:55:37 INFO mapreduce.Job: Job job_1448639746810_0005 running in uber mode : false
15/11/28 23:55:37 INFO mapreduce.Job:  map 0% reduce 0%
15/11/28 23:55:45 INFO mapreduce.Job: Task Id : attempt_1448639746810_0005_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:446)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    ... 9 more
Caused by: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:38)
    ... 14 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    ... 17 more
Caused by: java.lang.RuntimeException: configuration exception
    at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:222)
    at org.apache.hadoop.streaming.PipeMapper.configure(PipeMapper.java:66)
    ... 22 more
Caused by: java.io.IOException: Cannot run program "/home/hadoop/Documents/src/php/mapper.php": error=13, Permission denied
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.hadoop.streaming.PipeMapRed.configure(PipeMapRed.java:209)
    ... 23 more
Caused by: java.io.IOException: error=13, Permission denied
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.(UNIXProcess.java:248)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 24 more

这个错误比较隐晦,只提醒配置错误,起初以为是配置文件找不到,经过“苦心钻研”才发现,这里的配置就是指我们的input/output/mapper/reducer这几个参数的值,触发这个错误原因有很多,包括但不限于如下几种情况:

  • mapper或者reducer程序有错误
  • mapper或者reducer程序路径找不到,所以要写绝对路径
  • mapper或者reducer程序没有赋予可执行权限,`chmod +x mapper.php`

十、总结

经过上述艰难的若干步,我们就完成了第一个Mapreduce程序的PHP版本,其实PHP在处理文本上具有先天性的优势,比如处理json或者XML等,感觉上PHP还是很适合开发Mapreduce程序的。

上述例子,我们完成了根据城市统计注册用户数,这个例子如果用SQL表示大约是这样:

SELCT city, count(*) _cnt
FROM user
GROUP BY city

本文仅用于展示使用PHP如何开发、测试和运行Mapreduce任务,关于上述程序仅用于参考。

 

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注