Friday, July 6, 2012

Touch Hadoop Streaming

First of all, why hadoop streaming?

我能想象到需要使用hadoop streaming的原因:
  • 有很多非java的积累代码必须运行在hadoop上;
  • 使用其他语言(C/C++)编写mapreduce会比java更节省CPU或内存,或者更快得足以抵消掉hadoop streaming引入的性能开销;
  • 有时候使用脚本临时处理任务比较方便;
  • 我就是爱使用X语言!!

Hadoop Wiki上已经有一篇很完整介绍 hadoop streaming 的文章了。大抵上会用java写hadoop mapreduce,就对hadoop streaming没什么理解障碍。

假如我有两个脚本,mapcmd 和 reducecmd,分别作为map和reduce的处理函数。则使用hadoop streaming执行这个mapreduce job,命令为:



使用起来跟java写mapreduce程序一样简单(甚至你觉得更简单些)。

Hadoop streaming 通过 stdin/stdout 来在java进程与mapcmd/reducecmd之间传输数据。默认的情况下,将<key,value>按照 "key + \t + value" 的形式传输给 mapcmd/reducecmd;而mapcmd/reducecmd 也必须按照 "key + \t + value" 的形式返回给 java进程。


相对于原生的 java mapreduce接口,唯一不同的是reducecmd每接收一行是<key,value>,而不是<key,[value1,value2,...]>。至于为什么不直接给reducecmd按照 "key\tvalue1,value2,..."类似的形式传过去,初衷无法考证,或许考虑到<key,value>对于脚本来说接口更简单一些。

总而言之,无论是怎么将key和value编码给可执行程序的,甚至通过什么传输给可执行程序的,终究就是一个协议而已。而这一切,完全是在hadoop原生的mapreduce框架之外处理的。

实现上,只要在mapper/reducer处将往返可执行程序的“桥”搭建好即可。因此,hadoop streaming本身就是一个mapreduce程序,跟我们平时所写的java mapreduce程序并无多大差别。

在 hadoop streaming中,扮演“搭桥”角色的 mapper 和 reducer 分别是 PipeMapper 和 PipeReducer。此外,旧的mapreduce接口(即mapred) 中除了Mapper之外,还有一个MapRunnable的接口可以扩展,方便用户扩展调用Mapper的方式(譬如多线程);而没有对应的ReduceRunnable。所以,hadoop streaming中,负责map和可执行程序传输数据的为PipeMapRunner和PipeMapper,负责reduce和可执行程序传输数据的为PipeReducer。

Hadoop streaming 之所以说可以桥接任何可执行程序,是因为:

  • 使用stdin/stdout做进程间数据交互
  • 交互的默认数据编码足够简单

因此,hadoop streaming并不需要为每种可能的可执行程序开发任何适配接口。不同于hadoop pipes,利用socket和binary编码来进行数据交互,譬如脚本用户是不愿编写过多的代码来接收数据的,自然需要接口库适配(当然,hadoop pipes接口用起来明显自然一点,至于效率是不是更好,则不得而知了)。

hadoop streaming 将数据编码做成可扩展的,下行(Java to STDIN) 的编码由InputWriter负责,默认为TextInputWriter,上行(STDOUT to Java)的编码为OutputReader,默认为TextOutputReader。

PipeMapper和PipeReducer所做的事情是类似的,在进行真正的map和reduce之前,运行用户指定的可执行程序,并启动单独的线程读取程序的stdout,以便在map开始之后进行捕捉。然后map过程中,将数据用InputWriter编码写到可执行程序的stdin。用户进程处理完数据输出到stdout之后,之前启动读取stdout的线程读取到数据,用OutputReader解码写到OutputCollector上面去。其余流程与原生的一致。



Hadoop pipes 的实现层次与hadoop streaming 相差不大。主要的差别在于hadoop pipes隐藏了数据交互细节,而hadoop streaming将这个交给了用户。


Friday, September 16, 2011

slaves.sh的小细节


Hadoop集群中,批量地对slaves节点进行操作,对于略懂得shell脚本技巧的人其实并不是一个大的问题。简单地说,就是对slaves文件中的每个host进行循环,然后ssh执行命令即可。近来看同事操作,总算发现bin下有一个slaves.sh,已经做了同样的事情,实在是省下不少typing work!

后来因为一个不相干的问题稍微被看一下slaves.sh的代码,下面最关键的一句,却没完全看明白:

ssh $HADOOP_SSH_OPTS $slave $"${@// /\\ }" \
   2>&1 | sed "s/^/$slave: /" &

其中 ${@// /\\ } 难理解。看到这里,你就知道下面文章实际与hadoop没有多大关系,而是跟 shell 脚本有关系了。

先简单说明变量字符串字串替换的语法,譬如有变量v:

v="hello borqs, hello me"
[bing@myvm ~]$ echo ${v/hello/great}
great borqs, hello me
[bing@myvm ~]$ echo ${v//hello/great}
great borqs, great me

当v后面是一个斜杠时,是指替换第一处"hello"为"great";当后面是两个斜杠时,是指替换所有的"hello"为"great"。

其次,是 ${@// /\\ } 的语义。用小脚本 print_args.sh 作为演示:

$ cat print_args.sh
#!/bin/bash

echo "${@// /\\ }"
echo "$@"
echo "$#"

分别执行下面的命令:

$ ./print_args.sh i say "hello hanborqs"     
i say hello\ hanborqs
i say hello hanborqs
3

也就是说,${@// /\\ } 是对$@这个数组中的每一个元素进行了空格转义,而不是对"$@"这一个整个字符串进行空格转义(这是最初困扰我的)。起初,我以为 ${@// /\\ } 会将参数替换为"i\ say\ hello\ hanborqs"。这也是我最开始无法理解 slaves.sh 中这处代码的地方:有与没有是一样的。
其实,$@ 也无非就是一个数组罢了。也就是说,这个字符串替换语法,在应用于一个Array上面时,应当是对先每个元素替换。当然,这样设计才是合理的 :) 。Anyway,让我们再尝试一下:

$ arr=("i" "say" "hello borqs")
$ echo ${arr[@]}
i say hello borqs
$ echo ${#arr[@]}
3
$ echo ${arr[@]// /\\ /}
i say hello\ /borqs

这样,就能确定这个字串替换语法中应用于Array时,的确考虑了计算的优先次序的问题。
回过来看 slaves.sh。slaves.sh这样实现,无非是为了解决传递带空格的参数或命令的问题。譬如,想在slaves.sh 上都创建一个名为“hello world.txt”,则可以这样运用:
slaves.sh touch "hello world.txt"
如果想创建"hello"文件和"world.txt",则应当:
slaves.sh touch hello world.txt
同样地,如果你要执行的命令(第三方的命令吧?)或脚本的名字中有空格,用引号括起来就行了,slaves.sh是可以应付的。everything works just like in a bash shell !

Monday, August 16, 2010

万恶的指针退化

我估计,还是有人像我一样,懒得不想知道一个指针与数组的区别, 反正用起来都是一样的。比如:
char buff[SIZE];
或者
char *pBuff = new char[SIZE];
除了一个是静态内存,一个是动态,恐怕也再也没有多大分别了。毕竟,用法是大致相仿。如:
sprintf(buff, "hello buff!\n"); // or pBuff
printf("%s\n", buff); // or pBuff, it's OK

至此,一切都还不错。有一天,我们为了安全起见,希望将sprintf改成snprintf,于是:
snprintf(buff, sizeof(buff), "hello buff!\n");
printf("%s\n", buff);
好,完成。嗒嗒嗒,编译运行,通过。一切都还风平浪静。于是,我们继续航行,今天,又写了3000行代码。次日,你发现buff可能行不通,要换成pBuff。所以,你轻易地:
snprintf(pBuff, sizeof(pBuff), "hello buff!\n");
printf("%s\n", buff);
可是,当再次运行时,你发现你的“船”出了问题了。结果,只打印了"hel",不知道是"help",还是"hell"。要知道,sizeof(buff)的结果为SIZE,但sizeof(pBuff)的结果恒为4,即指针的大小。幸运的是,所有的一切祸根来自于你把“buff”改成了“pBuff”,所以你意味到了数组与指针有多么不同。但是,并不是每次都是这么幸运的。我的朋友多次问起我指针与数组的区别。我总是随意地进行了答复,用起来都一样,别理它。终于有一天,我发现了自己犯下了一个低级无耻的错误——别忘了,数组是有大小信息的。
于是,我觉得,我应该将pBuff改回buff比较好:
snprintf(buff, sizeof(buff), "hello buff!\n");
大后天,代码越来越多了,适当的重构变成了一个不错的主意。于是,我将打印放到了一个函数中(当然,真的是算是一个好主意):
void printbuff(char buff[]) {
snprintf(buff, sizeof(buff), "hello buff!\n");
printf("%s\n", buff);
}

int main() {
char buff[SIZE];
printbuff(buff);
...
}

老问题又出现了。buff明明还是数组,为何会错呢?莫非是printbuff的形参中,没有指定大小信息。实际上,数组名在传入函数的时候,数组就退化成了指针,这个退化集中表现为数组的大小信息丢失了(还有其它表现么?)。这就是为什么,C语言的老师在教我们写函数时,除了传递数组名,还要传一个数组大小。
void printbuff(char *buff, int buffSize) {
...
关于数组退化,我记得在 Effective C++ 还是哪本小册子上看过。推荐如我这般的菜鸟阅读!