分享

spark分布式处理同一个文件的问题

回帖奖励 8 云币      回复本帖可获得 1 云币奖励! 每人限 1 次
应用场景:
       间隔一个时间段来一次数据,使用spark做数据的清洗。有效数据写到hive表中,但是有一部分数据只有开始,结束标记还没有过来,需要写到文件中暂存,这里不管是本地文件系统还是hdfs文件,我应该如何去设计,写这个暂存文件。

       分布式是在同时处理一个问题,那么这么多条数据同时去写一个文件,应该会产生问题,那么应该选择什么样更好的办法解决


       还有一个问题,就是 saveAsTextFile  保存文件是先建立一个目录,再有一个_SUCCESS 和 part 文件。那么如何去直接保存为一个txt文件呢。
注意此处是保存为一个txt文件,不是加一个 .repartiton(1) 就可以解决的,我试过了。


        还希望能给出建议,多谢!

已有(9)人评论

跳转到指定楼层
liuzhixin137 发表于 2016-6-15 11:02:42
@howtodown @pig2 @hyj

3位大神,看你们的帖子看的最多了。
回复

使用道具 举报

nextuser 发表于 2016-6-15 13:33:37

回帖奖励 +1 云币

有一部分数据只有开始,结束标记还没有过来,需要写到文件中暂存,这里不管是本地文件系统还是hdfs文件,我应该如何去设计,写这个暂存文件。
无无论是flume还是kafka.正在传输的应该都有标记的。如果还没有看到结束标记,就不能读取了。


如果按照楼主的意思,可能会存在诸多bug,甚至运行不正常
回复

使用道具 举报

liuzhixin137 发表于 2016-6-15 13:46:02
nextuser 发表于 2016-6-15 13:33
有一部分数据只有开始,结束标记还没有过来,需要写到文件中暂存,这里不管是本地文件系统还是hdfs文件,我 ...

每个用户都有一个开始观看和结束观看的行为,我现在是统计开始观看和结束观看的时间。

有可能该用户的结束观看的信息 在这个时间段还没有过来,那么我需要记录下他的开始观看时间,等到某个时间段他的结束观看的数据发过来之后
合起来作为一条记录写到hive表中。每个用户都可能出现这样的情况。

那么当很多用户的数据过来之后,我应该如何去处理这个暂时存储的数据呢?
回复

使用道具 举报

liuzhixin137 发表于 2016-6-15 13:49:26
开始观看和结束观看的数据都是完整的单条数据。只是数据中的标记不同。

我使用RDD做的转换已经完成了数据清洗和计算时间。只剩下这部分。

回复

使用道具 举报

arsenduan 发表于 2016-6-15 16:41:47

回帖奖励 +1 云币

楼主考虑问题还是比较细的,剩下的可以多找些事实的框架资料。楼主可以参考下其他框架,其实 原理都是差不多的,更多的楼主可以在搜一下
大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合
http://www.aboutyun.com/forum.php?mod=viewthread&tid=6855



回复

使用道具 举报

liuzhixin137 发表于 2016-6-15 16:49:11
arsenduan 发表于 2016-6-15 16:41
楼主考虑问题还是比较细的,剩下的可以多找些事实的框架资料。楼主可以参考下其他框架,其实 原理都是差不 ...

额,都已经确定了使用spark core做数据清洗,而且到了现在也换不了其他的。

所以只能想想怎么去解决这个问题。



@Riordon
回复

使用道具 举报

arsenduan 发表于 2016-6-15 17:02:49
liuzhixin137 发表于 2016-6-15 16:49
额,都已经确定了使用spark core做数据清洗,而且到了现在也换不了其他的。

所以只能想想怎么去解决这 ...

并非让楼主换框架,而是看他们的原理。
楼主的文件需要完全读完或则使用暂时文件应该是不可取的,这样效率太慢了。
可以参考下kafka和storm的原理。
kafka你传递完毕之后,storm就可以读取了。
同样如果楼主使用spark,也可以传递一部分之后,接着读取,而不是非要暂时缓存或则整个传递完才能读取。
所以了解他们的原理,对于楼主当前的思路是由帮助的
回复

使用道具 举报

liuzhixin137 发表于 2016-6-15 17:16:22
arsenduan 发表于 2016-6-15 17:02
并非让楼主换框架,而是看他们的原理。
楼主的文件需要完全读完或则使用暂时文件应该是不可取的,这样效 ...

我考虑了一下,假如有很多条记录,每次记录都要从文件去匹配,效率确实慢。

我再考虑一下有没有其他的方法可以做这种记录暂时的存储。

多谢!
回复

使用道具 举报

liuzhixin137 发表于 2016-6-16 16:30:04
我想了一下,觉得写到一个文件是不可行的。

那么就需要写到多个hdfs文件中去,这样应该不会有问题,至于效率的问题,这里是需要一个近实时的分析,所以影响应该不大。

那么写到多个文件应该怎么操作呢?
我要写的数据格式是    id, channel_id, start_time
那么我可以写到以id为文件夹的文件中去。下次从文本中可以直接读取到RDD中,类似于
val text = sc.textFile("hdfs://master:8020/usr/data/Storage/*")这样读取。
写文件 就用hdfs的java API 写就可以了。

我最开始问到的:
就是 saveAsTextFile  保存文件是先建立一个目录,再有一个_SUCCESS 和 part 文件。那么如何去直接保存为一个txt文件呢。

这个问题其实是没必要的,因为saveAsTextFile("hdfs://master:8020/usr/data/Storage/201606161629")
会建立该目录,并且在该目录下会有 _SUCCESS 和 part_ 文件

下次读取的时候可以这样写 val text = sc.textFile("hdfs://master:8020/usr/data/Storage/*/part_*")就可以读到了。
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条