分享

spark技术分享



公众号:
gh_38b2ebb9fcfe

功能介绍
大数据技术分享,主要关于spark、Scala技术

经典文章推荐:


1.spark 比 mapreduce 好在哪


工作中还有面试中, 经常被问到一个问题,  spark 究竟比 mapreduce 好在哪里,为什么备受推崇, 有些人宣称spark 是大数据的未来, spark 宣布了 Hadoop 的死刑,  这种话到底能不能讲, 会不会被打脸
首先,理清一个基本概念, hadoop = hdfs + yarn + mapreduce
hdfs 现在是大数据分布式存储的标配, 如果公司没有牛x到可以自主开发一套分布式存储, 一般开源都是选择 hdfs 作为大数据分布式存储
yarn 是一个资源管理框架, 是负责把底层物理资源抽象为容器,然后管理调度这些容器, 交付给应用者。 spark on yarn 也是很常见的一种使用场景。
mapreduce 通过简单的Mapper和Reducer的抽象提供一个编程模型,就是对任务进行划分, 然后把划分后的任务分发到多个机器上去执行(map 过程), 最后对多台机器执行的结果进行汇总(reduce 过程), 全部灵感都来自于google 的这篇论文, 是这篇论文的一个开源实现
https://static.googleusercontent.com/media/research.google.com/zh-CN//archive/mapreduce-osdi04.pdf
这个论文可经典了, 建议没事读一读。
好了,  spark 宣布了 Hadoop 的死刑, 大家现在再来看这句话, 是不是在胡扯,
spark 只能来和 mapreduce 进行比较, 因为都是计算引擎, 才有比较的意义。
我认为 spark 是比mapreduce 要牛x的, 因为我是一路从 mapreduce  转到 spark 的,一路踩着坑过来的,对两种计算框架的使用都有比较深刻的体会吧。
spark 相对mapreduce 的优势,个人觉得有这几点吧
  • 写 mapreduce 应用都是使用 java, 总感觉抽象层次比较低, 要写很多代码才能完成业务逻辑,好多都是费代码, spark 基于RDD的抽象, 提供了一系列简洁易用, 抽象层次比较高的 api,而且表达力比较强,效率快了不止一点半点,谁用谁知道,用的都说好
  • mapreduce 一个job 只有简单的 map reduce 两步, 如果业务逻辑比较复杂, 那就得搞很多有依赖关系的job, 先跑哪个后跑哪个, 也要开发者自己来管理解决, 这个很恶心, spark 则通过DAG 的方式对多阶段应用支持的特别好,可以更自然表达出计算逻辑。如果多个map操作的RDD的分区不变,是可以放在同一个Task中进行,
  • mapreduce 中间结果都是使用 hdfs 存储,这意味着大量的磁盘io,磁盘io 读写是非常低效的,而 spark 中间结果放在内存中,内存放不下了会写入本地磁盘,而不是HDFS。 spark 调优后,一般中间结果都是 cache 到内存中的,这种机制无疑可以简单有效地解决MapReduce在迭代计算时反复读取写出磁盘的问题 , spark 刚一出道的时候擅长的就是处理复杂迭代的交互任务,可以进行无限次的迭代操作, 事实上当前机器学习的大多数算法都是迭代算法,因此spark 对于解决这一问题具有很大的应用价值。


2.spark bulkload 写入hbase



为什么
  • BulkLoad不会写WAL,也不会产生flush以及split。
  • 如果我们大量调用PUT接口插入数据,可能会导致大量的GC操作。除了影响性能之外,严重时甚至可能会对HBase节点的稳定性造成影响。但是采用BulkLoad就不会有这个顾虑。
  • 过程中没有大量的接口调用消耗性能
  • 可以利用spark 强大的计算能力
原理
上面是一个总的执行流程图, 数据生成,HFile转换以及HFile加载, 下面是HFile 的格式, 就是个key value 存储结构,
key 是由行健column family 和限定符指定,  然后再加上key的索引,
1.png
2.png
3.png



数据生成
我的源数据是个域名数据库,表示域名的归属, 数据json举例 {“domain”:”www.a.com”,  “uid”:”12345678”} ,
这些数据在线上一个服务的接口可以获取
创建了两个actor, 一个actor 使用scalaj库发http请求获取数据,  发送给另外一个actor输出到一个文本里面,
由于数据里面只有250w条数据, 我给每个域名后面添加一个uuid字符串, 把数据放大40倍,最终生成一个1亿条数据的文件
大约10个G, 每行类似 www.a.com,12345678上传hdfs备用,
case class DomainId(domain:String, id:String)
processMessage ! DomainId(domain, uid)

更多链接






已有(3)人评论

跳转到指定楼层
desehawk 发表于 2018-4-8 17:36:55
3.spark 资源动态调整设计和实现剖析

总体描述
这里有executor 个数动态变化的图
1.png
可以看到, 初始状态的会积压等待  schedulerBacklogTimeout 时间, 然后以越来越快的速度递增,斜率随sustainedSchedulerBacklogTimeout 2倍速度增加, 一个stage 结束后,恢复初始状态,executor 数目根据配置有一个上限和下限, 当 job 结束, executor 空闲一定时间, 然后会一个个 被 remove 掉。

ExecutorAllocationManager 是 一个根据工作负载动态分配和删除 executors 的管家, ExecutorAllocationManager 维持一个动态调整的目标executors数目,  并且定期同步到资源管理者,也就是 yarn 或者 mesos。
启动的时候根据配置设置一个目标executors数目, spark 运行过程中会根据等待(pending)和正在运行(running)的tasks数目动态调整目标executors数目。
当前的executors 个数超过实际负载所需的个数时,会触发减少目标executors数目。
目标executors数目一般会调整为等待(pending)和正在运行(running)的tasks的数目和,
当有过多的task 积压, 等待调度的时候, 会触发增加目标executors数目。
如果调度队列没有在 N 秒内消费完, 就增加新的 executors。 如果积压的情况还会持续 M 秒, 就会触发增加更多的executors, 每一轮增加executors的幅度指数上升, 直到到达上限, 需求的数目根据配置的属性 和当前的 正在运行(running)的tasks的数目和 确定。
增长的幅度受到两个因素控制,
  • 开始的时候, 应该缓慢增加,防止激增超出实际需要, 我们还得一个个删除
  • 目标executors数目很高的情况持续了一段时间后, 就应该快速递增,否则会让少量 executors 吃重太长时间。

删除的策略倒是很简单:
如果一个 executor 已经空闲了 K秒,  意外着它短时间内都不会被调度到, 删除之。
在任何一种情况下, 都没有重试逻辑, 因为我们假定, 资源管理器收到异步请求后, 可以很好的完成。
以下是相关的配置参数
  • spark.dynamicAllocation.enabled           是否开启动态资源调整
  • spark.dynamicAllocation.minExecutors      executors 数目的下限
  • spark.dynamicAllocation.maxExecutors        executors 数目的上限
  • spark.dynamicAllocation.initialExecutors   初始  executors 数目
  • spark.dynamicAllocation.schedulerBacklogTimeout(M)  第一次积压的情况持续多少秒会,会触发调整
  • spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(N)  第一次初始积压超时触发调整后, 如果积压的情况持续多少秒会,会触发调整
  • spark.dynamicAllocation.executorIdleTimeout(K) 空闲多长时间, 会触发删除 executor



监听获取压力信息
在 new  ExecutorAllocationManager 的时候会new 一个ExecutorAllocationListener放在spark 的 listenBus 中, 这里使用了监听者模式, 就是发生以下事件的时候, 我们注册的钩子函数都会被调用到, 下面表格中列出了, 监听那些事件, 以及事件发生的时候 ExecutorAllocationManager 做了什么
这个  监听器 给动态资源调整提供了调整的依据,  因为只有 资源调整器知道了积压情况 才能进行调整。

消息
处理
onStageSubmitted
一个stage 开始的时候,更新 stageid 和 task个数关系, 启动第一次积压定时,更新资源管理器的task 偏向 host的信息
onStageCompleted
恢复到没有积压过task的状态,递增的幅度设置为 1
onTaskStart
numRunningTasks加一,如果当前task是最后一个pending  task, 恢复到没有积压过task的状态,递增的幅度设置为 1 , 运行task的executor如果是空闲状态,标记为不空闲
onTaskEnd
numRunningTasks减一,判断如果运行task的executor里面没有运行一个task,标记为空闲 idle 状态
onExecutorAdded
executorIds中加入 Executor
onExecutorRemoved
从各种集合中删除该  Executor
我们可以看到, 当stage 开始的时候, 启动第一次积压定时, 定时的时长可以通过 spark.dynamicAllocation.schedulerBacklogTimeout 参数控制,  如果超过了这个时间, 就会触发增加更多的executors, 如果一个 stage 运行完了, 就会恢复到没有积压过task的状态, 递增的幅度设置为 1 , 因为下一个stage 开始的时候应该缓慢增加。不能在上一个 stage的递增速度的基础下递增的更快,防止激增超出实际需要, 我们还得一个个删除。
如果一个executor里面运行了task, 就会把这个 executor 从待删除的executor集合中拿走,


更多链接



回复

使用道具 举报

desehawk 发表于 2018-4-8 17:49:52
如何给你的 spark streaming 做单元测试


我们都知道,从写代码到测试再到上线, 越到后期认真,你付出的时间和精力成本越大,单元测试是阻挡 bug 的第一道防线,不写单元测试的程序员不是一个合格的程序员,有了单元测试后,妈妈再也不用担心我进行重构了,今天我来谈下如何对 spark streaming 来进行单元测试

拿最常见的一个 spark streaming 业务场景来看, spark streaming 消费 kafka 中的数据, 首先你需要 mock 你的数据源, 启动一个内嵌的 zookeeper 和 kafka server
引用相关依赖的jar包
[mw_shl_code=bash,true] compile 'org.apache.kafka:kafka-clients:0.10.0.1'
compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.10.0.1'

testCompile group: 'org.scalatest', name: 'scalatest_2.11', version: '2.2.6'[/mw_shl_code]
好了下面我们启动一个内嵌的 zookeeper 和一个内嵌的 kafka server
1.png

引用相关依赖的jar包

[mw_shl_code=bash,true]brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)
server = new KafkaServer(brokerConf)
server.startup()[/mw_shl_code]
brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)server = new KafkaServer(brokerConf)server.startup()
这时候我们有了 mock 的 zk 和 kakfa  server, 下面我们来使用 scalatest 框架写几个测试例子

2.png

这个例子中,我创建一个 spark streaming 实例, 使用 kafkaTestUtils 创建一个 topic 并且打入一些json 格式的消息,

[mw_shl_code=bash,true]val data = Map("{'domain': 'www.a.com', 'size': 6}" -> 7, "{'domain': 'www.b.com', 'size': 7}" -> 9)
kafkaTestUtils.createTopic(topic)
kafkaTestUtils.sendMessages(topic, data)[/mw_shl_code]
后面的数字代表打入本条消息重复打入的次数
创建一个 DirectKafkaInputDStream, 读取 kafka 中的消息,转换成 json 格式, 转换为 Dataset, 使用sql语句进行聚合,在外部定义一个变量,用来累加每次聚合的结果,
最后判断总的聚合结果是否等于期待值
[mw_shl_code=bash,true]eventually(timeout(40000.milliseconds), interval(200.milliseconds)) {
   assert(resultresult.get("www.a.com") == 42L)
   assert(resultresult.get("www.b.com") == 63L)
}[/mw_shl_code]
这里使用了了 scalatest 里面的 eventually 方法,也就是我们主线程会一直阻塞在这里,这时候 spark streaming 会不断生成批次job运行, 在40s内,每隔 200 ms 去检测一次,如果等于期待值,就pass, 如果超时也没等于期待值,那么这个 unit test 就fail了。

链接


回复

使用道具 举报

chengcong 发表于 2018-4-9 09:55:02
谢谢楼主分享!~
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条