如何给你的 spark streaming 做单元测试
我们都知道,从写代码到测试再到上线, 越到后期认真,你付出的时间和精力成本越大,单元测试是阻挡 bug 的第一道防线,不写单元测试的程序员不是一个合格的程序员,有了单元测试后,妈妈再也不用担心我进行重构了,今天我来谈下如何对 spark streaming 来进行单元测试
拿最常见的一个 spark streaming 业务场景来看, spark streaming 消费 kafka 中的数据, 首先你需要 mock 你的数据源, 启动一个内嵌的 zookeeper 和 kafka server 引用相关依赖的jar包 [mw_shl_code=bash,true] compile 'org.apache.kafka:kafka-clients:0.10.0.1'
compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '0.10.0.1'
testCompile group: 'org.scalatest', name: 'scalatest_2.11', version: '2.2.6'[/mw_shl_code] 好了下面我们启动一个内嵌的 zookeeper 和一个内嵌的 kafka server
引用相关依赖的jar包
[mw_shl_code=bash,true]brokerConf = new KafkaConfig(brokerConfiguration, doLog = false) server = new KafkaServer(brokerConf) server.startup()[/mw_shl_code] brokerConf = new KafkaConfig(brokerConfiguration, doLog = false)server = new KafkaServer(brokerConf)server.startup()这时候我们有了 mock 的 zk 和 kakfa server, 下面我们来使用 scalatest 框架写几个测试例子
这个例子中,我创建一个 spark streaming 实例, 使用 kafkaTestUtils 创建一个 topic 并且打入一些json 格式的消息,
[mw_shl_code=bash,true]val data = Map("{'domain': 'www.a.com', 'size': 6}" -> 7, "{'domain': 'www.b.com', 'size': 7}" -> 9)
kafkaTestUtils.createTopic(topic)
kafkaTestUtils.sendMessages(topic, data)[/mw_shl_code] 后面的数字代表打入本条消息重复打入的次数
创建一个 DirectKafkaInputDStream, 读取 kafka 中的消息,转换成 json 格式, 转换为 Dataset, 使用sql语句进行聚合,在外部定义一个变量,用来累加每次聚合的结果,
最后判断总的聚合结果是否等于期待值
[mw_shl_code=bash,true]eventually(timeout(40000.milliseconds), interval(200.milliseconds)) {
assert(resultresult.get("www.a.com") == 42L)
assert(resultresult.get("www.b.com") == 63L)
}[/mw_shl_code] 这里使用了了 scalatest 里面的 eventually 方法,也就是我们主线程会一直阻塞在这里,这时候 spark streaming 会不断生成批次job运行, 在40s内,每隔 200 ms 去检测一次,如果等于期待值,就pass, 如果超时也没等于期待值,那么这个 unit test 就fail了。
链接
|