package scala;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import java.nio.ByteBuffer;
public static void JavaFlumeEventTest(String master, String host, int port) {
Duration batchInterval = new Duration(2000);
JavaStreamingContext ssc = new JavaStreamingContext(master,
"FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
JavaDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(ssc, host, port, storageLevel);
flumeStream.count().map(new Function<java.lang.Long, String>() {
@Override
public String call(java.lang.Long in) {
return "Received " + in + " flume events.";
}
}).print();
ssc.start();
ssc.awaitTermination();
}
楼主看下上面程序,ssc中是可以指定master的
|