问题导读
1.如何为zookeeper编写单元测试?
2.如何为Kafka编写单元测试?
3.如何为Spark编写单元测试?
ZooKeeper, Kafka和Spark是当下流行的大数据平台工具之一。这两年得到飞速的发展,国内厂商也越来越多的使用它们。 本站有多篇文章介绍了它们的开发指南, 如: 跟着实例学习ZooKeeper的用法 Kafka和Spring集成实践 Kafka快速入门 Spark Streaming 集成 Kafka 总结 Spark 开发指南 ......官方网站提供了很多的代码例子,互联网上也有很多的开发例子,你可以很容易的学习如果编写基于这些平台框架的技术。 但是如何为这些应用编写单元测试呢? 本文提供了几种编写单元测试的技术。所有的例子都可以在 这里 找到。 为ZooKeeper应用编写单元测试一般我们不直接使用ZooKeeper的API编写Client代码,而是使用Curator或者i0itec-zkclient等包装的API开发。 原因是ZooKeeper本身提供的API太过底层,我们需要处理各种异常,并且API使用不方便。 有几种方式可以实现一个ZooKeeper simulator。 - Curator TestingServer Curator提供了一个TestingServer类,可以模拟ZooKeeper,用来测试Curator应用或者其它使用ZooKeeper的应用。 你需要在pom.xml中引用curator-test:
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-test</artifactId>
- <version>${curator.version}</version>
- </dependency>
复制代码
测试代码如下: - public class CuratorAppTest {
- private TestingServer server;
-
- @BeforeClass
- public void setUp() throws Exception {
- server = new TestingServer();
- server.start();
- }
- @AfterClass
- public void tearDown() throws IOException {
- server.stop();
- }
-
- @Test
- public void testSetAndGetData() {
- CuratorApp app = new CuratorApp();
- String payload = System.currentTimeMillis() + "";
- String result = app.setAndGetData(server.getConnectString(), payload);
- assertEquals(result, payload);
- }
-
- @Test
- public void testWatch() throws Exception {
- CuratorApp app = new CuratorApp();
- app.watch(server.getConnectString());
- }
- }
复制代码
- EmbeddedZookeeper Kafka为了它的单元测试实现一个嵌入式的ZooKeeper,代码相当简单:
- package kafka.zk
- import org.apache.zookeeper.server.ZooKeeperServer
- import org.apache.zookeeper.server.NIOServerCnxnFactory
- import kafka.utils.TestUtils
- import java.net.InetSocketAddress
- import kafka.utils.Utils
- import org.apache.kafka.common.utils.Utils.getPort
- class EmbeddedZookeeper(val connectString: String) {
- val snapshotDir = TestUtils.tempDir()
- val logDir = TestUtils.tempDir()
- val tickTime = 500
- val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
- val factory = new NIOServerCnxnFactory()
- factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0)
- factory.startup(zookeeper)
- def shutdown() {
- Utils.swallow(zookeeper.shutdown())
- Utils.swallow(factory.shutdown())
- Utils.rm(logDir)
- Utils.rm(snapshotDir)
- }
-
- }
复制代码
测试工具类: - package kafka.zk
- import org.scalatest.junit.JUnit3Suite
- import org.I0Itec.zkclient.ZkClient
- import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
- trait ZooKeeperTestHarness extends JUnit3Suite {
- val zkConnect: String = TestZKUtils.zookeeperConnect
- var zookeeper: EmbeddedZookeeper = null
- var zkClient: ZkClient = null
- val zkConnectionTimeout = 6000
- val zkSessionTimeout = 6000
- override def setUp() {
- super.setUp
- zookeeper = new EmbeddedZookeeper(zkConnect)
- zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
- }
- override def tearDown() {
- Utils.swallow(zkClient.close())
- Utils.swallow(zookeeper.shutdown())
- super.tearDown
- }
- }
复制代码
- 自己实现 Kafka的类无法直接在java中使用,你可以使用一些花招, 比如把trait改为abstract class改造使用。 我们可以参考Kafka的scala代码使用java自己实现一个, 主要例如org.apache.zookeeper.server.ZooKeeperServer和org.apache.zookeeper.server.NIOServerCnxnFactory。 如这个代码: gist 或者如stackoverflow上讨论的:
- Properties startupProperties = ...
- QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
- try {
- quorumConfiguration.parseProperties(startupProperties);
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- zooKeeperServer = new ZooKeeperServerMain();
- final ServerConfig configuration = new ServerConfig();
- configuration.readFrom(quorumConfiguration);
- new Thread() {
- public void run() {
- try {
- zooKeeperServer.runFromConfig(configuration);
- } catch (IOException e) {
- log.error("ZooKeeper Failed", e);
- }
- }
- }.start();
复制代码
为Kafka应用编写单元测试Kafka代码本身就提供了单元测试,所以我们编写Kafka producer应用和consumer应用时可以参考这些应用。 你可以在线查看这些单元测试以及它们可以重用的文件: kafka单元测试类 KafkaServerTestHarness 提供了一个基本测试trait: - trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
- val configs: List[KafkaConfig]
- var servers: List[KafkaServer] = null
- override def setUp() {
- super.setUp
- if(configs.size <= 0)
- throw new KafkaException("Must suply at least one server config.")
- servers = configs.map(TestUtils.createServer(_))
- }
- override def tearDown() {
- servers.map(server => server.shutdown())
- servers.map(server => server.config.logDirs.map(Utils.rm(_)))
- super.tearDown
- }
- }
复制代码
以及提供一个初始化好producer和consumer的Trait: - trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
- val port: Int
- val host = "localhost"
- var producer: Producer[String, String] = null
- var consumer: SimpleConsumer = null
- override def setUp() {
- super.setUp
- val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
- producer = new Producer(new ProducerConfig(props))
- consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
- }
- override def tearDown() {
- producer.close()
- consumer.close()
- super.tearDown
- }
- }
复制代码
如果你想在代码中使用它们,你需要引入: - <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <classifier>test</classifier>
- <scope>test</scope>
- <version>${kafka.version}</version>
- </dependency>
复制代码
如果你想在Java直接实现这些trait是不可以的,这些Trait都有实现方法,在Java中无法直接使用,可能你需要把trait改为abstract class。 或者你使用它的TestUtils创建。 - @BeforeClass
- public void setup() {
- ......
- }
- @AfterClass
- public void teardown() {
- ......
- }
- @Test
- public void testReceive() {
- Properties consumerProps = TestUtils.createConsumerProperties(zkServer.connectString(), "group_1", "consumer_id", 1000);
- consumerProps.putAll(kafkaProps);
- ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
- ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(topic, new Integer(1));
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
- List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
- // now launch all the threads
- ExecutorService executor = Executors.newFixedThreadPool(2);
- // now create an object to consume the messages
- int threadNumber = 0;
- for (final KafkaStream<byte[], byte[]> stream : streams) {
- executor.submit(new ConsumerThread(stream, threadNumber));
- threadNumber++;
- }
- // setup producer
- Properties properties = TestUtils.getProducerConfig("localhost:" + port);
- properties.put("serializer.class", StringEncoder.class.getCanonicalName());
- ProducerConfig pConfig = new ProducerConfig(properties);
- Producer<Integer, String> producer = new Producer<>(pConfig);
- // send message
- for (int i = 0; i < 10; i++) {
- KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, "test-message-" + i);
- List<KeyedMessage<Integer, String>> messages = new ArrayList<KeyedMessage<Integer, String>>();
- messages.add(data);
- // producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
- producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
- }
- try {
- Thread.sleep(20000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- executor.shutdownNow();
- producer.close();
- }
复制代码
为Spark应用编写单元测试如果是为普通的Spark应用编写单元测试代码, 比较简单,创建SparkContext时只需将master设为local即可。确保在finally或者teardown方法中调用SparkContext.stop(),因为Spark不允许在同一个程序中拥有两个SparkContext。 以SparkPi为例。 - public final class JavaSparkPi {
- public static void main(String[] args) throws Exception {
- double pi = calculatePi(args);
- System.out.println("Pi is roughly " + pi);
-
- }
- public static double calculatePi(String[] args) {
- SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
- JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
- int n = 100000 * slices;
- List<Integer> l = new ArrayList<Integer>(n);
- for (int i = 0; i < n; i++) {
- l.add(i);
- }
- JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
- int count = dataSet.map(new Function<Integer, Integer>() {
- @Override
- public Integer call(Integer integer) {
- double x = Math.random() * 2 - 1;
- double y = Math.random() * 2 - 1;
- return (x * x + y * y < 1) ? 1 : 0;
- }
- }).reduce(new Function2<Integer, Integer, Integer>() {
- @Override
- public Integer call(Integer integer, Integer integer2) {
- return integer + integer2;
- }
- });
- double pi = 4.0 * count / n;
- jsc.stop();
- return pi;
- }
- }
复制代码
单元测试类: - public class JavaSparkPiTest {
- @Test
- public void testPi() {
- Properties props = System.getProperties();
- props.setProperty("spark.master", "local[4]");
- props.setProperty("spark.rdd.compress", "true");
- props.setProperty("spark.executor.memory", "1g");
- try {
- double pi = JavaSparkPi.calculatePi(new String[]{"1"});
- assertTrue((pi -3.14) < 1);
- } catch (Exception e) {
- fail(e.getMessage(),e);
- }
- }
- }
复制代码
对于Spark Streaming应用, 单元测试相对复杂,因为你需要集成其它的框架。 如果你的应用集成Kafka,你可以使用上面的Kafka的测试类, 如果你的应用集成Mongo,你可以使用真实的Mongo或者fake Mongo如Fongo, 如果你的应用集成TCP 流, 你需要实现一个TCP server simulator, 基本上你应该寻找或者实现要集成的框架的simulator。
|