howtodown 发表于 2015-1-31 21:25:29

为ZooKeeper, Kafka 和 Spark 应用编写单元测试示例


问题导读


1.如何为zookeeper编写单元测试?
2.如何为Kafka编写单元测试?
3.如何为Spark编写单元测试?

static/image/hrline/4.gif






ZooKeeper, Kafka和Spark是当下流行的大数据平台工具之一。这两年得到飞速的发展,国内厂商也越来越多的使用它们。 本站有多篇文章介绍了它们的开发指南, 如: 跟着实例学习ZooKeeper的用法 Kafka和Spring集成实践 Kafka快速入门 Spark Streaming 集成 Kafka 总结 Spark 开发指南 ......官方网站提供了很多的代码例子,互联网上也有很多的开发例子,你可以很容易的学习如果编写基于这些平台框架的技术。 但是如何为这些应用编写单元测试呢? 本文提供了几种编写单元测试的技术。所有的例子都可以在 这里 找到。为ZooKeeper应用编写单元测试一般我们不直接使用ZooKeeper的API编写Client代码,而是使用Curator或者i0itec-zkclient等包装的API开发。 原因是ZooKeeper本身提供的API太过底层,我们需要处理各种异常,并且API使用不方便。有几种方式可以实现一个ZooKeeper simulator。
[*]Curator TestingServer Curator提供了一个TestingServer类,可以模拟ZooKeeper,用来测试Curator应用或者其它使用ZooKeeper的应用。 你需要在pom.xml中引用curator-test:
<dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-test</artifactId>
        <version>${curator.version}</version>
</dependency>

测试代码如下:public class CuratorAppTest {
        private TestingServer server;
       
        @BeforeClass
        public void setUp() throws Exception {
                server = new TestingServer();
                server.start();
        }
        @AfterClass
        public void tearDown() throws IOException {
                server.stop();
        }
       
        @Test
        public void testSetAndGetData() {               
                CuratorApp app = new CuratorApp();
                String payload = System.currentTimeMillis() + "";
                String result = app.setAndGetData(server.getConnectString(), payload);
                assertEquals(result, payload);
        }
       
        @Test
        public void testWatch() throws Exception {               
                CuratorApp app = new CuratorApp();
                app.watch(server.getConnectString());
        }
}


[*]EmbeddedZookeeper Kafka为了它的单元测试实现一个嵌入式的ZooKeeper,代码相当简单:
package kafka.zk
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.NIOServerCnxnFactory
import kafka.utils.TestUtils
import java.net.InetSocketAddress
import kafka.utils.Utils
import org.apache.kafka.common.utils.Utils.getPort
class EmbeddedZookeeper(val connectString: String) {
val snapshotDir = TestUtils.tempDir()
val logDir = TestUtils.tempDir()
val tickTime = 500
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, tickTime)
val factory = new NIOServerCnxnFactory()
factory.configure(new InetSocketAddress("127.0.0.1", getPort(connectString)), 0)
factory.startup(zookeeper)
def shutdown() {
    Utils.swallow(zookeeper.shutdown())
    Utils.swallow(factory.shutdown())
    Utils.rm(logDir)
    Utils.rm(snapshotDir)
}

}



测试工具类:package kafka.zk
import org.scalatest.junit.JUnit3Suite
import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ZKStringSerializer, TestZKUtils, Utils}
trait ZooKeeperTestHarness extends JUnit3Suite {
val zkConnect: String = TestZKUtils.zookeeperConnect
var zookeeper: EmbeddedZookeeper = null
var zkClient: ZkClient = null
val zkConnectionTimeout = 6000
val zkSessionTimeout = 6000
override def setUp() {
    super.setUp
    zookeeper = new EmbeddedZookeeper(zkConnect)
    zkClient = new ZkClient(zookeeper.connectString, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
}
override def tearDown() {
    Utils.swallow(zkClient.close())
    Utils.swallow(zookeeper.shutdown())
    super.tearDown
}
}


[*]自己实现 Kafka的类无法直接在java中使用,你可以使用一些花招, 比如把trait改为abstract class改造使用。 我们可以参考Kafka的scala代码使用java自己实现一个, 主要例如org.apache.zookeeper.server.ZooKeeperServer和org.apache.zookeeper.server.NIOServerCnxnFactory。 如这个代码: gist 或者如stackoverflow上讨论的:
Properties startupProperties = ...
QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
try {
    quorumConfiguration.parseProperties(startupProperties);
} catch(Exception e) {
    throw new RuntimeException(e);
}
zooKeeperServer = new ZooKeeperServerMain();
final ServerConfig configuration = new ServerConfig();
configuration.readFrom(quorumConfiguration);
new Thread() {
    public void run() {
      try {
            zooKeeperServer.runFromConfig(configuration);
      } catch (IOException e) {
            log.error("ZooKeeper Failed", e);
      }
    }
}.start();

为Kafka应用编写单元测试Kafka代码本身就提供了单元测试,所以我们编写Kafka producer应用和consumer应用时可以参考这些应用。 你可以在线查看这些单元测试以及它们可以重用的文件: kafka单元测试类 KafkaServerTestHarness 提供了一个基本测试trait:trait KafkaServerTestHarness extends JUnit3Suite with ZooKeeperTestHarness {
val configs: List
var servers: List = null
override def setUp() {
    super.setUp
    if(configs.size <= 0)
      throw new KafkaException("Must suply at least one server config.")
    servers = configs.map(TestUtils.createServer(_))
}
override def tearDown() {
    servers.map(server => server.shutdown())
    servers.map(server => server.config.logDirs.map(Utils.rm(_)))
    super.tearDown
}
}

以及提供一个初始化好producer和consumer的Trait:trait ProducerConsumerTestHarness extends JUnit3Suite with KafkaServerTestHarness {
    val port: Int
    val host = "localhost"
    var producer: Producer = null
    var consumer: SimpleConsumer = null
override def setUp() {
      super.setUp
      val props = TestUtils.getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), "kafka.utils.StaticPartitioner")
      producer = new Producer(new ProducerConfig(props))
      consumer = new SimpleConsumer(host, port, 1000000, 64*1024, "")
    }
   override def tearDown() {
   producer.close()
   consumer.close()
   super.tearDown
   }
}


如果你想在代码中使用它们,你需要引入:<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <classifier>test</classifier>
        <scope>test</scope>
        <version>${kafka.version}</version>
</dependency>
如果你想在Java直接实现这些trait是不可以的,这些Trait都有实现方法,在Java中无法直接使用,可能你需要把trait改为abstract class。 或者你使用它的TestUtils创建。@BeforeClass
public void setup() {
        ......
}
@AfterClass
public void teardown() {
        ......
}
@Test
public void testReceive() {
        Properties consumerProps = TestUtils.createConsumerProperties(zkServer.connectString(), "group_1", "consumer_id", 1000);
        consumerProps.putAll(kafkaProps);
        ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        // now launch all the threads
        ExecutorService executor = Executors.newFixedThreadPool(2);
        // now create an object to consume the messages
        int threadNumber = 0;
        for (final KafkaStream<byte[], byte[]> stream : streams) {
                executor.submit(new ConsumerThread(stream, threadNumber));
                threadNumber++;
        }
        // setup producer
        Properties properties = TestUtils.getProducerConfig("localhost:" + port);
        properties.put("serializer.class", StringEncoder.class.getCanonicalName());
        ProducerConfig pConfig = new ProducerConfig(properties);
        Producer<Integer, String> producer = new Producer<>(pConfig);
        // send message
        for (int i = 0; i < 10; i++) {
                KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, "test-message-" + i);
                List<KeyedMessage<Integer, String>> messages = new ArrayList<KeyedMessage<Integer, String>>();
                messages.add(data);
                // producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
                producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
        }
        try {
                Thread.sleep(20000);
        } catch (InterruptedException e) {
                e.printStackTrace();
        }
        executor.shutdownNow();
        producer.close();
}
为Spark应用编写单元测试如果是为普通的Spark应用编写单元测试代码, 比较简单,创建SparkContext时只需将master设为local即可。确保在finally或者teardown方法中调用SparkContext.stop(),因为Spark不允许在同一个程序中拥有两个SparkContext。 以SparkPi为例。public final class JavaSparkPi {
        public static void main(String[] args) throws Exception {
                double pi = calculatePi(args);               
                System.out.println("Pi is roughly " + pi);
               
        }
        public static double calculatePi(String[] args) {
                SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
                JavaSparkContext jsc = new JavaSparkContext(sparkConf);
                int slices = (args.length == 1) ? Integer.parseInt(args) : 2;
                int n = 100000 * slices;
                List<Integer> l = new ArrayList<Integer>(n);
                for (int i = 0; i < n; i++) {
                        l.add(i);
                }
                JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
                int count = dataSet.map(new Function<Integer, Integer>() {
                        @Override
                        public Integer call(Integer integer) {
                                double x = Math.random() * 2 - 1;
                                double y = Math.random() * 2 - 1;
                                return (x * x + y * y < 1) ? 1 : 0;
                        }
                }).reduce(new Function2<Integer, Integer, Integer>() {
                        @Override
                        public Integer call(Integer integer, Integer integer2) {
                                return integer + integer2;
                        }
                });
                double pi = 4.0 * count / n;
                jsc.stop();
                return pi;
        }
}
单元测试类:public class JavaSparkPiTest {
        @Test
        public void testPi() {
                Properties props = System.getProperties();               
                props.setProperty("spark.master", "local");
                props.setProperty("spark.rdd.compress", "true");
                props.setProperty("spark.executor.memory", "1g");
                try {
                        double pi = JavaSparkPi.calculatePi(new String[]{"1"});
                        assertTrue((pi -3.14) < 1);
                } catch (Exception e) {
                        fail(e.getMessage(),e);
                }
        }
}

对于Spark Streaming应用, 单元测试相对复杂,因为你需要集成其它的框架。 如果你的应用集成Kafka,你可以使用上面的Kafka的测试类, 如果你的应用集成Mongo,你可以使用真实的Mongo或者fake Mongo如Fongo, 如果你的应用集成TCP 流, 你需要实现一个TCP server simulator, 基本上你应该寻找或者实现要集成的框架的simulator。



hb1984 发表于 2015-2-3 17:59:14

谢谢楼主分享。   
页: [1]
查看完整版本: 为ZooKeeper, Kafka 和 Spark 应用编写单元测试示例