分享

使用Java编写并运行Spark应用程序

xioaxu790 2014-12-24 20:03:58 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 49649
问题导读
1、统计分析程序需要实现哪几个功能点?
2、要分析某网站的访问日志信息思路怎样?
3、怎样通过Web控制台来查看当前执行应用程序的状态信息?






我们首先提出这样一个简单的需求:
现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况。这里我拿我网站的日志记录行示例,如下所示:
  1. .205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
  2. .205.198.92 - - [21/Feb/2014:00:00:11 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
  3. .205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html/ HTTP/1.1" 301 26 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
  4. .205.198.92 - - [21/Feb/2014:00:00:12 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
  5. .205.241.229 - - [21/Feb/2014:00:00:13 +0800] "GET /archives/526.html HTTP/1.1" 200 12080 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
  6. .205.241.229 - - [21/Feb/2014:00:00:15 +0800] "POST /wp-comments-post.php HTTP/1.1" 302 26 "http://shiyanjun.cn/archives/526.html/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 Firefox/23.0"
复制代码


Java实现Spark应用程序(Application)
我们实现的统计分析程序,有如下几个功能点:
  • 从HDFS读取日志数据文件
  • 将每行的第一个字段(IP地址)抽取出来
  • 统计每个IP地址出现的次数
  • 根据每个IP地址出现的次数进行一个降序排序
  • 根据IP地址,调用GeoIP库获取IP所属国家
  • 打印输出结果,每行的格式:[国家代码] IP地址 频率

下面,看我们使用Java实现的统计分析应用程序代码,如下所示:
  1. package org.shirdrn.spark.job;
  2. import java.io.File;
  3. import java.io.IOException;
  4. import java.util.Arrays;
  5. import java.util.Collections;
  6. import java.util.Comparator;
  7. import java.util.List;
  8. import java.util.regex.Pattern;
  9. import org.apache.commons.logging.Log;
  10. import org.apache.commons.logging.LogFactory;
  11. import org.apache.spark.api.java.JavaPairRDD;
  12. import org.apache.spark.api.java.JavaRDD;
  13. import org.apache.spark.api.java.JavaSparkContext;
  14. import org.apache.spark.api.java.function.FlatMapFunction;
  15. import org.apache.spark.api.java.function.Function2;
  16. import org.apache.spark.api.java.function.PairFunction;
  17. import org.shirdrn.spark.job.maxmind.Country;
  18. import org.shirdrn.spark.job.maxmind.LookupService;
  19. import scala.Serializable;
  20. import scala.Tuple2;
  21. public class IPAddressStats implements Serializable {
  22.      private static final long serialVersionUID = 8533489548835413763L;
  23.      private static final Log LOG = LogFactory.getLog(IPAddressStats.class);
  24.      private static final Pattern SPACE = Pattern.compile(" ");
  25.      private transient LookupService lookupService;
  26.      private transient final String geoIPFile;
  27.      public IPAddressStats(String geoIPFile) {
  28.           this.geoIPFile = geoIPFile;
  29.           try {
  30.                // lookupService: get country code from a IP address
  31.                File file = new File(this.geoIPFile);
  32.                LOG.info("GeoIP file: " + file.getAbsolutePath());
  33.                lookupService = new AdvancedLookupService(file, LookupService.GEOIP_MEMORY_CACHE);
  34.           } catch (IOException e) {
  35.                throw new RuntimeException(e);
  36.           }
  37.      }
  38.      @SuppressWarnings("serial")
  39.      public void stat(String[] args) {
  40.           JavaSparkContext ctx = new JavaSparkContext(args[0], "IPAddressStats",
  41.                     System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(IPAddressStats.class));
  42.           JavaRDD<String> lines = ctx.textFile(args[1], 1);
  43.           // splits and extracts ip address filed
  44.           JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
  45.                @Override
  46.                public Iterable<String> call(String s) {
  47.                     // 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" "Mozilla/5.0 (Windows NT 5.1; rv:11.0) Gecko/20100101 Firefox/11.0"
  48.                     // ip address
  49.                     return Arrays.asList(SPACE.split(s)[0]);
  50.                }
  51.           });
  52.           // map
  53.           JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
  54.                @Override
  55.                public Tuple2<String, Integer> call(String s) {
  56.                     return new Tuple2<String, Integer>(s, 1);
  57.                }
  58.           });
  59.           // reduce
  60.           JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
  61.                @Override
  62.                public Integer call(Integer i1, Integer i2) {
  63.                     return i1 + i2;
  64.                }
  65.           });
  66.           List<Tuple2<String, Integer>> output = counts.collect();
  67.           // sort statistics result by value
  68.           Collections.sort(output, new Comparator<Tuple2<String, Integer>>() {
  69.                @Override
  70.                public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
  71.                     if(t1._2 < t2._2) {
  72.                          return 1;
  73.                     } else if(t1._2 > t2._2) {
  74.                          return -1;
  75.                     }
  76.                     return 0;
  77.                }
  78.           });
  79.           writeTo(args, output);
  80.      }
  81.      private void writeTo(String[] args, List<Tuple2<String, Integer>> output) {
  82.           for (Tuple2<?, ?> tuple : output) {
  83.                Country country = lookupService.getCountry((String) tuple._1);
  84.                LOG.info("[" + country.getCode() + "] " + tuple._1 + "\t" + tuple._2);
  85.           }
  86.      }
  87.      public static void main(String[] args) {
  88.           // ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
  89.           if (args.length < 3) {
  90.                System.err.println("Usage: IPAddressStats <master> <inFile> <GeoIPFile>");
  91.                System.err.println("    Example: org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat");
  92.                System.exit(1);
  93.           }
  94.           String geoIPFile = args[2];
  95.           IPAddressStats stats = new IPAddressStats(geoIPFile);
  96.           stats.stat(args);
  97.           System.exit(0);
  98.      }
  99. }
复制代码



具体实现逻辑,可以参考代码中的注释。我们使用Maven管理构建Java程序,首先看一下我的pom配置中所依赖的软件包,如下所示:
  1. <dependencies>
  2.           <dependency>
  3.                <groupId>org.apache.spark</groupId>
  4.                <artifactId>spark-core_2.10</artifactId>
  5.                <version>0.9.0-incubating</version>
  6.           </dependency>
  7.           <dependency>
  8.                <groupId>log4j</groupId>
  9.                <artifactId>log4j</artifactId>
  10.                <version>1.2.16</version>
  11.           </dependency>
  12.           <dependency>
  13.                <groupId>dnsjava</groupId>
  14.                <artifactId>dnsjava</artifactId>
  15.                <version>2.1.1</version>
  16.           </dependency>
  17.           <dependency>
  18.                <groupId>commons-net</groupId>
  19.                <artifactId>commons-net</artifactId>
  20.                <version>3.1</version>
  21.           </dependency>
  22.           <dependency>
  23.                <groupId>org.apache.hadoop</groupId>
  24.                <artifactId>hadoop-client</artifactId>
  25.                <version>1.2.1</version>
  26.           </dependency>
  27.      </dependencies>
复制代码



需要说明的是,当我们将程序在Spark集群上运行时,它要求我们的编写的Job能够进行序列化,如果某些字段不需要序列化或者无法序列化,可以直接使用transient修饰即可,如上面的属性lookupService没有实现序列化接口,使用transient使其不执行序列化,否则的话,可能会出现类似如下的错误:
  1. /03/10 22:34:06 INFO scheduler.DAGScheduler: Failed to run collect at IPAddressStats.java:76
  2. Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: org.shirdrn.spark.job.IPAddressStats
  3.      at org.apache.spark.scheduler.DAGScheduler$anonfun$org$apache$spark$scheduler$DAGScheduler$abortStage$1.apply(DAGScheduler.scala:1028)
  4.      at org.apache.spark.scheduler.DAGScheduler$anonfun$org$apache$spark$scheduler$DAGScheduler$abortStage$1.apply(DAGScheduler.scala:1026)
  5.      at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  6.      at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
  7.      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$abortStage(DAGScheduler.scala:1026)
  8.      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$submitMissingTasks(DAGScheduler.scala:794)
  9.      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$submitStage(DAGScheduler.scala:737)
  10.      at org.apache.spark.scheduler.DAGScheduler$anonfun$org$apache$spark$scheduler$DAGScheduler$submitStage$4.apply(DAGScheduler.scala:741)
  11.      at org.apache.spark.scheduler.DAGScheduler$anonfun$org$apache$spark$scheduler$DAGScheduler$submitStage$4.apply(DAGScheduler.scala:740)
  12.      at scala.collection.immutable.List.foreach(List.scala:318)
  13.      at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$submitStage(DAGScheduler.scala:740)
  14.      at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
  15.      at org.apache.spark.scheduler.DAGScheduler$anonfun$start$1$anon$2$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
  16.      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
  17.      at akka.actor.ActorCell.invoke(ActorCell.scala:456)
  18.      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
  19.      at akka.dispatch.Mailbox.run(Mailbox.scala:219)
  20.      at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
  21.      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  22.      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  23.      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  24.      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
复制代码


在Spark集群上运行Java程序

这里,我使用了Maven管理构建Java程序,实现上述代码以后,使用Maven的maven-assembly-plugin插件,配置内容如下所示:
  1. <plugin>
  2.      <artifactId>maven-assembly-plugin</artifactId>
  3.      <configuration>
  4.           <archive>
  5.                <manifest>
  6.                     <mainClass>org.shirdrn.spark.job.UserAgentStats</mainClass>
  7.                </manifest>
  8.           </archive>
  9.           <descriptorRefs>
  10.                <descriptorRef>jar-with-dependencies</descriptorRef>
  11.           </descriptorRefs>
  12.           <excludes>
  13.                <exclude>*.properties</exclude>
  14.                <exclude>*.xml</exclude>
  15.           </excludes>
  16.      </configuration>
  17.      <executions>
  18.           <execution>
  19.                <id>make-assembly</id>
  20.                <phase>package</phase>
  21.                <goals>
  22.                     <goal>single</goal>
  23.                </goals>
  24.           </execution>
  25.      </executions>
  26. </plugin>
复制代码



将相关依赖库文件都打进程序包里面,最后拷贝JAR文件到Linux系统下(不一定非要在Spark集群的Master节点上),保证该节点上Spark的环境变量配置正确即可看。Spark软件发行包解压缩后,可以看到脚本bin/run-example,我们可以直接修改该脚本,将对应的路径指向我们实现的Java程序包(修改变量EXAMPLES_DIR以及我们的JAR文件存放位置相关的内容),使用该脚本就可以运行,脚本内容如下所示:
  1. cygwin=false
  2. case "`uname`" in
  3.     CYGWIN*) cygwin=true;;
  4. esac
  5. SCALA_VERSION=2.10
  6. # Figure out where the Scala framework is installed
  7. FWDIR="$(cd `dirname $0`/..; pwd)"
  8. # Export this as SPARK_HOME
  9. export SPARK_HOME="$FWDIR"
  10. # Load environment variables from conf/spark-env.sh, if it exists
  11. if [ -e "$FWDIR/conf/spark-env.sh" ] ; then
  12.   . $FWDIR/conf/spark-env.sh
  13. fi
  14. if [ -z "$1" ]; then
  15.   echo "Usage: run-example <example-class> [<args>]" >&2
  16.   exit 1
  17. fi
  18. # Figure out the JAR file that our examples were packaged into. This includes a bit of a hack
  19. # to avoid the -sources and -doc packages that are built by publish-local.
  20. EXAMPLES_DIR="$FWDIR"/java-examples
  21. SPARK_EXAMPLES_JAR=""
  22. if [ -e "$EXAMPLES_DIR"/*.jar ]; then
  23.   export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/*.jar`
  24. fi
  25. if [[ -z $SPARK_EXAMPLES_JAR ]]; then
  26.   echo "Failed to find Spark examples assembly in $FWDIR/examples/target" >&2
  27.   echo "You need to build Spark with sbt/sbt assembly before running this program" >&2
  28.   exit 1
  29. fi
  30. # Since the examples JAR ideally shouldn't include spark-core (that dependency should be
  31. # "provided"), also add our standard Spark classpath, built using compute-classpath.sh.
  32. CLASSPATH=`$FWDIR/bin/compute-classpath.sh`
  33. CLASSPATH="$SPARK_EXAMPLES_JAR:$CLASSPATH"
  34. if $cygwin; then
  35.     CLASSPATH=`cygpath -wp $CLASSPATH`
  36.     export SPARK_EXAMPLES_JAR=`cygpath -w $SPARK_EXAMPLES_JAR`
  37. fi
  38. # Find java binary
  39. if [ -n "${JAVA_HOME}" ]; then
  40.   RUNNER="${JAVA_HOME}/bin/java"
  41. else
  42.   if [ `command -v java` ]; then
  43.     RUNNER="java"
  44.   else
  45.     echo "JAVA_HOME is not set" >&2
  46.     exit 1
  47.   fi
  48. fi
  49. # Set JAVA_OPTS to be able to load native libraries and to set heap size
  50. JAVA_OPTS="$SPARK_JAVA_OPTS"
  51. JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
  52. # Load extra JAVA_OPTS from conf/java-opts, if it exists
  53. if [ -e "$FWDIR/conf/java-opts" ] ; then
  54.   JAVA_OPTS="$JAVA_OPTS `cat $FWDIR/conf/java-opts`"
  55. fi
  56. export JAVA_OPTS
  57. if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
  58.   echo -n "Spark Command: "
  59.   echo "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
  60.   echo "========================================"
  61.   echo
  62. fi
复制代码


在Spark上运行我们开发的Java程序,执行如下命令:
  1. cd /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1
  2. ./bin/run-my-java-example org.shirdrn.spark.job.IPAddressStats spark://m1:7077 hdfs://m1:9000/user/shirdrn/wwwlog20140222.log /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
复制代码


我实现的程序类org.shirdrn.spark.job.IPAddressStats运行需要3个参数:

Spark集群主节点URL:例如我的是spark://m1:7077
输入文件路径:业务相关的,我这里是从HDFS上读取文件hdfs://m1:9000/user/shirdrn/wwwlog20140222.log
GeoIP库文件:业务相关的,用来计算IP地址所属国家的外部文件
如果程序没有错误,能够正常运行,控制台输出程序运行日志,示例如下所示:
  1. /03/10 22:17:24 INFO job.IPAddressStats: GeoIP file: /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/GeoIP_DATABASE.dat
  2. SLF4J: Class path contains multiple SLF4J bindings.
  3. SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  4. SLF4J: Found binding in [jar:file:/home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class]
  5. SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
  6. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
  7. /03/10 22:17:25 INFO slf4j.Slf4jLogger: Slf4jLogger started
  8. /03/10 22:17:25 INFO Remoting: Starting remoting
  9. /03/10 22:17:25 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@m1:57379]
  10. /03/10 22:17:25 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@m1:57379]
  11. /03/10 22:17:25 INFO spark.SparkEnv: Registering BlockManagerMaster
  12. /03/10 22:17:25 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140310221725-c1cb
  13. /03/10 22:17:25 INFO storage.MemoryStore: MemoryStore started with capacity 143.8 MB.
  14. /03/10 22:17:25 INFO network.ConnectionManager: Bound socket to port 45189 with id = ConnectionManagerId(m1,45189)
  15. /03/10 22:17:25 INFO storage.BlockManagerMaster: Trying to register BlockManager
  16. /03/10 22:17:25 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager m1:45189 with 143.8 MB RAM
  17. /03/10 22:17:25 INFO storage.BlockManagerMaster: Registered BlockManager
  18. /03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
  19. /03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
  20. /03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:49186
  21. /03/10 22:17:25 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.95.3.56:49186
  22. /03/10 22:17:25 INFO spark.SparkEnv: Registering MapOutputTracker
  23. /03/10 22:17:25 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-56c3e30d-a01b-4752-83d1-af1609ab2370
  24. /03/10 22:17:25 INFO spark.HttpServer: Starting HTTP Server
  25. /03/10 22:17:25 INFO server.Server: jetty-7.x.y-SNAPSHOT
  26. /03/10 22:17:25 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:52073
  27. /03/10 22:17:26 INFO server.Server: jetty-7.x.y-SNAPSHOT
  28. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
  29. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
  30. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
  31. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
  32. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
  33. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
  34. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
  35. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
  36. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
  37. /03/10 22:17:26 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
  38. /03/10 22:17:26 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
  39. /03/10 22:17:26 INFO ui.SparkUI: Started Spark Web UI at http://m1:4040
  40. /03/10 22:17:26 INFO spark.SparkContext: Added JAR /home/shirdrn/cloud/programs/spark-0.9.0-incubating-bin-hadoop1/java-examples/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.95.3.56:52073/jars/spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1394515046396
  41. /03/10 22:17:26 INFO client.AppClient$ClientActor: Connecting to master spark://m1:7077...
  42. /03/10 22:17:26 INFO storage.MemoryStore: ensureFreeSpace(60341) called with curMem=0, maxMem=150837657
  43. /03/10 22:17:26 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 58.9 KB, free 143.8 MB)
  44. /03/10 22:17:26 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140310221726-0000
  45. /03/10 22:17:27 INFO client.AppClient$ClientActor: Executor added: app-20140310221726-0000/0 on worker-20140310221648-s1-52544 (s1:52544) with 1 cores
  46. /03/10 22:17:27 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140310221726-0000/0 on hostPort s1:52544 with 1 cores, 512.0 MB RAM
  47. /03/10 22:17:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  48. /03/10 22:17:27 WARN snappy.LoadSnappy: Snappy native library not loaded
  49. /03/10 22:17:27 INFO client.AppClient$ClientActor: Executor updated: app-20140310221726-0000/0 is now RUNNING
  50. /03/10 22:17:27 INFO mapred.FileInputFormat: Total input paths to process : 1
  51. /03/10 22:17:27 INFO spark.SparkContext: Starting job: collect at IPAddressStats.java:77
  52. /03/10 22:17:27 INFO scheduler.DAGScheduler: Registering RDD 4 (reduceByKey at IPAddressStats.java:70)
  53. /03/10 22:17:27 INFO scheduler.DAGScheduler: Got job 0 (collect at IPAddressStats.java:77) with 1 output partitions (allowLocal=false)
  54. /03/10 22:17:27 INFO scheduler.DAGScheduler: Final stage: Stage 0 (collect at IPAddressStats.java:77)
  55. /03/10 22:17:27 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1)
  56. /03/10 22:17:27 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1)
  57. /03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70), which has no missing parents
  58. /03/10 22:17:27 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at IPAddressStats.java:70)
  59. /03/10 22:17:27 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
  60. /03/10 22:17:28 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@s1:59233/user/Executor#-671170811] with ID 0
  61. /03/10 22:17:28 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 0: s1 (PROCESS_LOCAL)
  62. /03/10 22:17:28 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 2396 bytes in 5 ms
  63. /03/10 22:17:29 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager s1:47282 with 297.0 MB RAM
  64. /03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 0 in 3376 ms on s1 (progress: 0/1)
  65. /03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0)
  66. /03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 1 (reduceByKey at IPAddressStats.java:70) finished in 4.420 s
  67. /03/10 22:17:32 INFO scheduler.DAGScheduler: looking for newly runnable stages
  68. /03/10 22:17:32 INFO scheduler.DAGScheduler: running: Set()
  69. /03/10 22:17:32 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
  70. /03/10 22:17:32 INFO scheduler.DAGScheduler: failed: Set()
  71. /03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool
  72. /03/10 22:17:32 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List()
  73. /03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70), which is now runnable
  74. /03/10 22:17:32 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[6] at reduceByKey at IPAddressStats.java:70)
  75. /03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
  76. /03/10 22:17:32 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 1 on executor 0: s1 (PROCESS_LOCAL)
  77. /03/10 22:17:32 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 2255 bytes in 1 ms
  78. /03/10 22:17:32 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@s1:33534
  79. /03/10 22:17:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 120 bytes
  80. /03/10 22:17:32 INFO scheduler.TaskSetManager: Finished TID 1 in 282 ms on s1 (progress: 0/1)
  81. /03/10 22:17:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
  82. /03/10 22:17:32 INFO scheduler.DAGScheduler: Stage 0 (collect at IPAddressStats.java:77) finished in 0.314 s
  83. /03/10 22:17:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
  84. /03/10 22:17:32 INFO spark.SparkContext: Job finished: collect at IPAddressStats.java:77, took 4.870958309 s
  85. /03/10 22:17:32 INFO job.IPAddressStats: [CN] 58.246.49.218     312
  86. /03/10 22:17:32 INFO job.IPAddressStats: [KR] 1.234.83.77     300
  87. /03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.16     212
  88. /03/10 22:17:32 INFO job.IPAddressStats: [CN] 110.85.72.254     207
  89. /03/10 22:17:32 INFO job.IPAddressStats: [CN] 27.150.229.134     185
  90. /03/10 22:17:32 INFO job.IPAddressStats: [HK] 180.178.52.181     181
  91. /03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.37.210.212     180
  92. /03/10 22:17:32 INFO job.IPAddressStats: [CN] 222.77.226.83     176
  93. /03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.11.205     169
  94. /03/10 22:17:32 INFO job.IPAddressStats: [CN] 120.43.9.19     165
  95. ...
复制代码



我们也可以通过Web控制台来查看当前执行应用程序(Application)的状态信息,通过Master节点的8080端口(如:http://m1:8080/)就能看到集群的应用程序(Application)状态信息。
另外,需要说明的时候,如果在Unix环境下使用Eclipse使用Java开发Spark应用程序,也能够直接通过Eclipse连接Spark集群,并提交开发的应用程序,然后交给集群去处理。


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条