这样的操作在map端或者reduce端均可。下面以一个实际业务场景中的例子来简要说明。
问题简要描述:
假如reduce输入的key是Text(String),value是BytesWritable(byte[]),不同key的种类为100万个,value的大小平均为30k左右,每个key大概对应 100个value,要求对每一个key建立两个文件,一个用来不断添加value中的二进制数据,一个用来记录各个value在文件中的位置索引。(大量的小文件会影响HDFS的性能,所以最好对这些小文件进行拼接)
当文件数量较小时,可以考虑使用MultipleOutput来进行key-value的分流,可以按照key的不同,将其输出到不同的文件或者目录中。但是reduce的数量只能为1,不然每个reduce都会生成相同的目录或者文件,不能达到最终的目的。此外最重要的是,操作系统对每个进程打开的文件数量的限制,默认为1024,集群的各个datanode可能会配置更高的值,但最多在几万左右,仍然是一个限制因素。不能满足百万文件的需求。
reduce的主要目的是用来归并key-value并输出到HDFS上,我们当然也可以在reduce中进行其他的操作,比如文件读写。因为默认的partitioner保证同一个key的数据肯定会在同一个reduce中,所以在每个reduce中只用打开两个文件进行读写即可(一个索引文件,一个数据文件)。并发度由reduce数量决定,将reduce数量设为256,那我们就可以同时处理256个key的数据(partioner保证了不同reduce处理的key不同,不会引起文件读写冲突)。这样的并发度的效率是很客观的,可以在较短的时间内完成需求。
思路是这样,但同时由于hdfs的特性以及hadoop的任务调度,在文件读写过程中,仍有可能会出现很多问题,下面简要说些一些常见的会碰到的问题。
1.org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException异常
这可能是最经常碰到的一个问题。可能的原因如下:
(1)文件流冲突。
一般创建文件时都会打开一个供写入的文件流。而我们希望是追加,所以如果使用了错误的API ,就有可能引起上述问题。以FileSystem类为例,如果使用create()方法之后再调用append()方法,就会抛出上述异常。所以最好使用createNewFile方法,只创建文件,不打开流。
(2)mapreduce推测执行机制
mapreduce 为了提高效率,会在一个任务启动之后,同时启动一些相同的任务(attempt),其中有一个attempt成功完成之后,视为整个task完成,其结果 作为最终结果,并且杀掉那些较慢的attempt。集群一般会开启此选项以优化性能(以空间换时间)。但在本问题环境下推测执行却不太合适。因为我们一般希望一个task 用来处理一个文件,但如果启动推测执行,会有几个attempt同时试图操作同一个文件,就会引发异常。所以最好关掉此选项,将 mapred.reduce.max.attempts 设为1,或者将mapred.reduce.tasks.speculative.execution设为false.
但此时仍有可能会出现问题。因为如果一个task的唯一attempt出现问题,在被kill掉之后,task仍会另起一个attempt,此时因为前一个attempt异常终止,仍有可能会影响到新起的attempt的文件操作,引发异常。所以最安全的方法是,借鉴推测执行的机制(每个attempt各自生成自己的结果,最终选择一个作为最终结果),以每个attempt的id号为后缀附加到所操作的文件上,同时捕获所有文件操作的异常并处理,这样可避免文件的读写冲突。Context可以用来获取运行时的一些上下文信息,可以很容易得到attempt的id号。注意,此时如果开启推测执行也可以,但是会生成很多相同的文件(每个attempt一份),仍然不是最好的解决方法。
同时,我们可以利用reduce的输出来记录运行“不正常的” key.这些task大多数是attempt_0被杀掉而重启了一个attempt_1,所以下面的文件一般为两份。可以对这些情况的key输出(文件异常或者attemptID > 0),并进行一些后续处理,比如文件重命名,或者紧对这些key重新写入。因为此种情况的key一般只占极少数,所以并不影响总体的效率。
2.文件异常处理
最好能将mapreduce中的所有文件操作都设置好异常处理。不然一个文件异常就有可能会使整个job失败。所以从效率来讲,最好是在文件发生异常时将其key作为reduce的输出以进行记录。因为同时mapreduce会重启一个task attempts重新进行文件读写,可保证我们得到最终的数据,最后所需的只是对那些异常的key进行一些简单的文件重命名操作即可。
3.多目录以及文件拼接
如果我们将key的种类设为1000万,上述方法会生成太多的小文件从而影响hdfs的性能,另外,因为所有文件都在同一个目录下,会导致同一个目录下文件数目过多而影响访问效率。
在创建文件的同时建立多个子目录,一个有用的方法是以reduce的taskid来建立子目录。这样有多少个reduce就可以建立多少个子目录,不会有文件冲突。同一个reduce处理的key都会在同一个目录下。
文件拼接要考虑的一个索引的问题。为了将文件索引建立的尽量简单,应该尽量保证同一个key的所有数据都在同一个大文件中。这可以利用key的hashCode来实现。如果我们想在每个目录下建立1000个文件,只需将hashCode对1000取余即可。
hadoop,文件并发,操作
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?