±¾Ìû×îºóÓÉ howtodown ÓÚ 2014-8-24 20:07 ±à¼
ÎÊÌâµ¼¶Á£º
1.ʲôÊÇSpark Streaming£¿
2.Spark Streaming¿ÉÒÔ½ÓÊÜÄÇЩÊý¾ÝÔ´£¿
3.Dstream£¬ÎÒÃÇ¿ÉÒÔ½øÐÐÄÄÁ½ÖÖ²Ù×÷£¿
²Î¿¼£ºSpark£ºÒ»¸ö¸ßЧµÄ·Ö²¼Ê½¼ÆËãϵͳ
ÔÚ¿´spark Streaming£¬ÎÒÃÇÐèÒªÊ×ÏÈÖªµÀʲôÊÇSpark streaming£¿
Spark streaming: ¹¹½¨ÔÚSparkÉÏ´¦ÀíStreamÊý¾ÝµÄ¿ò¼Ü£¬»ù±¾µÄÔÀíÊǽ«StreamÊý¾Ý·Ö³ÉСµÄʱ¼äƬ¶Ï£¨¼¸Ã룩£¬ÒÔÀàËÆbatchÅúÁ¿´¦ÀíµÄ·½Ê½À´´¦ÀíÕâС²¿·ÖÊý¾Ý¡£Spark Streaming¹¹½¨ÔÚSparkÉÏ£¬Ò»·½ÃæÊÇÒòΪSparkµÄµÍÑÓ³ÙÖ´ÐÐÒýÇ棨100ms+£©¿ÉÒÔÓÃÓÚʵʱ¼ÆË㣬ÁíÒ»·½ÃæÏà±È»ùÓÚRecordµÄÆäËü´¦Àí¿ò¼Ü£¨ÈçStorm£©£¬RDDÊý¾Ý¼¯¸üÈÝÒ××ö¸ßЧµÄÈÝ´í´¦Àí¡£´ËÍâСÅúÁ¿´¦ÀíµÄ·½Ê½Ê¹µÃËü¿ÉÒÔͬʱ¼æÈÝÅúÁ¿ºÍʵʱÊý¾Ý´¦ÀíµÄÂß¼ºÍËã·¨¡£·½±ãÁËһЩÐèÒªÀúÊ·Êý¾ÝºÍʵʱÊý¾ÝÁªºÏ·ÖÎöµÄÌض¨Ó¦Óó¡ºÏ¡£
OverviewSpark StreamingÊôÓÚSparkµÄºËÐÄapi£¬ËüÖ§³Ö¸ßÍÌÍÂÁ¿¡¢Ö§³ÖÈÝ´íµÄʵʱÁ÷Êý¾Ý´¦Àí¡£ Ëü¿ÉÒÔ½ÓÊÜÀ´×ÔKafka, Flume, Twitter, ZeroMQºÍTCP SocketµÄÊý¾ÝÔ´£¬Ê¹Óüòµ¥µÄapiº¯Êý±ÈÈç map, reduce, join, windowµÈ²Ù×÷£¬»¹¿ÉÒÔÖ±½ÓʹÓÃÄÚÖõĻúÆ÷ѧϰËã·¨¡¢Í¼Ëã·¨°üÀ´´¦ÀíÊý¾Ý¡£
ËüµÄ¹¤×÷Á÷³ÌÏñÏÂÃæµÄͼËùʾһÑù£¬½ÓÊܵ½ÊµÊ±Êý¾Ýºó£¬¸øÊý¾Ý·ÖÅú´Î£¬È»ºó´«¸øSpark Engine´¦Àí×îºóÉú³É¸ÃÅú´ÎµÄ½á¹û¡£
ËüÖ§³ÖµÄÊý¾ÝÁ÷½ÐDstream£¬Ö±½ÓÖ§³ÖKafka¡¢FlumeµÄÊý¾ÝÔ´¡£DstreamÊÇÒ»ÖÖÁ¬ÐøµÄRDDs£¬ÏÂÃæÊÇÒ»¸öÀý×Ó°ïÖú´ó¼ÒÀí½âDstream¡£
A Quick Example
- // ´´½¨StreamingContext£¬1ÃëÒ»¸öÅú´Î
- val ssc = new StreamingContext(sparkConf, Seconds(1));
-
- // »ñµÃÒ»¸öDStream¸ºÔðÁ¬½Ó ¼àÌý¶Ë¿Ú:µØÖ·
- val lines = ssc.socketTextStream(serverIP, serverPort);
-
- // ¶ÔÿһÐÐÊý¾ÝÖ´ÐÐSplit²Ù×÷
- val words = lines.flatMap(_.split(" "));
- // ͳ¼ÆwordµÄÊýÁ¿
- val pairs = words.map(word => (word, 1));
- val wordCounts = pairs.reduceByKey(_ + _);
-
- // Êä³ö½á¹û
- wordCounts.print();
-
- ssc.start(); // ¿ªÊ¼
- ssc.awaitTermination(); // ¼ÆËãÍê±ÏÍ˳ö
¸´ÖÆ´úÂë
¾ßÌåµÄ´úÂë¿ÉÒÔ·ÃÎÊÕâ¸öÒ³Ã棺
https://github.com/apache/incuba ... workWordCount.scala
Èç¹ûÒѾװºÃSparkµÄÅóÓÑ£¬ÎÒÃÇ¿ÉÒÔͨ¹ýÏÂÃæµÄÀý×ÓÊÔÊÔ¡£
Ê×ÏÈ£¬Æô¶¯Netcat£¬Õâ¸ö¹¤¾ßÔÚUnix-likeµÄϵͳ¶¼´æÔÚ£¬ÊǸö¼òÒ×µÄÊý¾Ý·þÎñÆ÷¡£
ʹÓÃÏÂÃæÕâ¾äÃüÁîÀ´Æô¶¯Netcat£º
¸´ÖÆ´úÂë
½Ó×ÅÆô¶¯example
- $ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
¸´ÖÆ´úÂë
ÔÚNetcatÕâ¶ËÊäÈëhello world£¬¿´SparkÕâ±ßµÄ
- # TERMINAL 1:
- # Running Netcat
-
- $ nc -lk 9999
-
- hello world
-
- ...
- # TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount
-
- $ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999
- ...
- -------------------------------------------
- Time: 1357008430000 ms
- -------------------------------------------
- (hello,1)
- (world,1)
- ...
¸´ÖÆ´úÂë
BasicsÏÂÃæÕâ¿éÊÇÈçºÎ±àд´úÂëµÄÀ²£¬ÍÛßÇßÇ£¡ Ê×ÏÈÎÒÃÇÒªÔÚSBT»òÕßMaven¹¤³ÌÌí¼ÓÒÔÏÂÐÅÏ¢£º
- groupId = org.apache.spark
- artifactId = spark-streaming_2.10
- version = 0.9.0-incubating
¸´ÖÆ´úÂë
- //ÐèҪʹÓÃÒ»ÏÂÊý¾ÝÔ´µÄ£¬»¹ÒªÌí¼ÓÏàÓ¦µÄÒÀÀµ
- Source Artifact
- Kafka spark-streaming-kafka_2.10
- Flume spark-streaming-flume_2.10
- Twitter spark-streaming-twitter_2.10
- ZeroMQ spark-streaming-zeromq_2.10
- MQTT spark-streaming-mqtt_2.10
¸´ÖÆ´úÂë
½ÓמÍÊÇʵÀý»¯
- new StreamingContext(master, appName, batchDuration, [sparkHome], [jars])
¸´ÖÆ´úÂë
ÕâÊÇ֮ǰµÄÀý×Ó¶ÔDStreamµÄ²Ù×÷¡£
Input Sources
³ýÁËsocketsÖ®Í⣬ÎÒÃÇ»¹¿ÉÒÔÕâÑù´´½¨Dstream
- streamingContext.fileStream(dataDirectory)
¸´ÖÆ´úÂë
ÕâÀïÓÐ3¸öÒªµã£º £¨1£©dataDirectoryϵÄÎļþ¸ñʽ¶¼ÊÇÒ»Ñù £¨2£©ÔÚÕâ¸öĿ¼Ï´´½¨Îļþ¶¼ÊÇͨ¹ýÒƶ¯»òÕßÖØÃüÃûµÄ·½Ê½´´½¨µÄ £¨3£©Ò»µ©Îļþ½øÈ¥Ö®ºó¾Í²»ÄÜÔٸıä
¼ÙÉèÎÒÃÇÒª´´½¨Ò»¸öKafkaµÄDstream¡£
- import org.apache.spark.streaming.kafka._
- KafkaUtils.createStream(streamingContext, kafkaParams, ...)
¸´ÖÆ´úÂë
Operations¶ÔÓÚDstream£¬ÎÒÃÇ¿ÉÒÔ½øÐÐÁ½ÖÖ²Ù×÷£¬transformations ºÍ output
Transformations
- Transformation Meaning
- map(func) ¶Ôÿһ¸öÔªËØÖ´ÐÐfunc·½·¨
- flatMap(func) ÀàËÆmapº¯Êý£¬µ«ÊÇ¿ÉÒÔmapµ½0+¸öÊä³ö
- filter(func) ¹ýÂË
- repartition(numPartitions) Ôö¼Ó·ÖÇø£¬Ìá¸ß²¢ÐжÈ
- union(otherStream) ºÏ²¢Á½¸öÁ÷
- count() ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ ͳ¼ÆÔªËصĸöÊý
- reduce(func) ¶ÔRDDsÀïÃæµÄÔªËؽøÐоۺϲÙ×÷£¬2¸öÊäÈë²ÎÊý£¬1¸öÊä³ö²ÎÊý
- countByValue() Õë¶ÔÀàÐÍͳ¼Æ£¬µ±Ò»¸öDstreamµÄÔªËصÄÀàÐÍÊÇKµÄʱºò£¬µ÷ÓÃËü»á·µ»ØÒ»¸öеÄDstream£¬°üº¬<K,Long>¼üÖµ¶Ô£¬LongÊÇÿ¸öK³öÏÖµÄƵÂÊ¡£
- reduceByKey(func, [numTasks]) ¶ÔÓÚÒ»¸ö(K, V)ÀàÐ͵ÄDstream£¬ÎªÃ¿¸ökey£¬Ö´ÐÐfuncº¯Êý£¬Ä¬ÈÏÊÇlocalÊÇ2¸öỊ̈߳¬clusterÊÇ8¸öỊ̈߳¬Ò²¿ÉÒÔÖ¸¶¨numTasks
- join(otherStream, [numTasks]) °Ñ(K, V)ºÍ(K, W)µÄDstreamÁ¬½Ó³ÉÒ»¸ö(K, (V, W))µÄÐÂDstream
- cogroup(otherStream, [numTasks]) °Ñ(K, V)ºÍ(K, W)µÄDstreamÁ¬½Ó³ÉÒ»¸ö(K, Seq[V], Seq[W])µÄÐÂDstream
- transform(func) ת»»²Ù×÷£¬°ÑÔÀ´µÄRDDͨ¹ýfuncת»»³ÉÒ»¸öеÄRDD
- updateStateByKey(func) Õë¶ÔkeyʹÓÃfuncÀ´¸üÐÂ״̬ºÍÖµ£¬¿ÉÒÔ½«state¸ÃΪÈκÎÖµ
¸´ÖÆ´úÂë
UpdateStateByKey OperationʹÓÃÕâ¸ö²Ù×÷£¬ÎÒÃÇÊÇÏ£Íû±£´æËü״̬µÄÐÅÏ¢£¬È»ºó³ÖÐøµÄ¸üÐÂËü£¬Ê¹ÓÃËüÓÐÁ½¸ö²½Ö裺 £¨1£©¶¨Òå״̬£¬Õâ¸ö״̬¿ÉÒÔÊÇÈÎÒâµÄÊý¾ÝÀàÐÍ £¨2£©¶¨Òå״̬¸üк¯Êý£¬´ÓÇ°Ò»¸ö״̬¸ü¸ÄеÄ״̬ ÏÂÃæչʾһ¸öÀý×Ó£º
- def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
- val newCount = ... // add the new values with the previous running count to get the new count
- Some(newCount)
- }
¸´ÖÆ´úÂë
Ëü¿ÉÒÔÓÃÔÚ°üº¬(word, 1) µÄDstreamµ±ÖУ¬±ÈÈçÇ°ÃæչʾµÄexample
- val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
¸´ÖÆ´úÂë
Ëü»áÕë¶ÔÀïÃæµÄÿ¸öwordµ÷ÓÃһϸüк¯Êý£¬newValuesÊÇ×îеÄÖµ£¬runningCountÊÇ֮ǰµÄÖµ¡£
Transform OperationºÍtransformWithÒ»Ñù£¬¿ÉÒÔ¶ÔÒ»¸öDstream½øÐÐRDD->RDD²Ù×÷£¬±ÈÈçÎÒÃÇÒª¶ÔDstreamÁ÷ÀïµÄRDDºÍÁíÍâÒ»¸öÊý¾Ý¼¯½øÐÐjoin²Ù×÷£¬µ«ÊÇDstreamµÄAPIûÓÐÖ±½Ó±©Â¶³öÀ´£¬ÎÒÃǾͿÉÒÔʹÓÃtransform·½·¨À´½øÐÐÕâ¸ö²Ù×÷£¬ÏÂÃæÊÇÀý×Ó£º
- val spamInfoRDD = sparkContext.hadoopFile(...) // RDD containing spam information
-
- val cleanedDStream = inputDStream.transform(rdd => {
- rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
- ...
- })
¸´ÖÆ´úÂë
ÁíÍ⣬ÎÒÃÇÒ²¿ÉÒÔÔÚÀïÃæʹÓûúÆ÷ѧϰËã·¨ºÍͼËã·¨¡£
Window Operations
¡¢
ÏȾٸöÀý×Ó°É£¬±ÈÈçÇ°ÃæµÄword countµÄÀý×Ó£¬ÎÒÃÇÏëҪÿ¸ô10Ãë¼ÆËãÒ»ÏÂ×î½ü30ÃëµÄµ¥´Ê×ÜÊý¡£
ÎÒÃÇ¿ÉÒÔʹÓÃÒÔÏÂÓï¾ä£º
- // Reduce last 30 seconds of data, every 10 seconds
- val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
¸´ÖÆ´úÂë
ÕâÀïÃæÌáµ½ÁËwindowsµÄÁ½¸ö²ÎÊý£º
£¨1£©window length£ºwindowµÄ³¤¶ÈÊÇ30Ã룬×î½ü30ÃëµÄÊý¾Ý
£¨2£©slice interval£º¼ÆËãµÄʱ¼ä¼ä¸ô
ͨ¹ýÕâ¸öÀý×Ó£¬ÎÒÃÇ´ó¸ÅÄܹ»´°¿ÚµÄÒâ˼ÁË£¬¶¨ÆÚ¼ÆË㻬¶¯µÄÊý¾Ý¡£
ÏÂÃæÊÇwindowµÄһЩ²Ù×÷º¯Êý£¬»¹ÊÇÓеã¶ùÀí½â²»ÁËwindowµÄ¸ÅÄMeaning¾Í²»·ÒëÁË£¬Ö±½Óɾµô
- Transformation Meaning
- window(windowLength, slideInterval)
- countByWindow(windowLength, slideInterval)
- reduceByWindow(func, windowLength, slideInterval)
- reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
- reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
- countByValueAndWindow(windowLength, slideInterval, [numTasks])
¸´ÖÆ´úÂë
Output Operations
- Output Operation Meaning
- print() ´òÓ¡µ½¿ØÖÆ̨
- foreachRDD(func) ¶ÔDstreamÀïÃæµÄÿ¸öRDDÖ´ÐÐfunc£¬±£´æµ½Íⲿϵͳ
- saveAsObjectFiles(prefix, [suffix]) ±£´æÁ÷µÄÄÚÈÝΪSequenceFile, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]".
- saveAsTextFiles(prefix, [suffix]) ±£´æÁ÷µÄÄÚÈÝΪÎı¾Îļþ, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]".
- saveAsHadoopFiles(prefix, [suffix]) ±£´æÁ÷µÄÄÚÈÝΪhadoopÎļþ, ÎļþÃû : "prefix-TIME_IN_MS[.suffix]".
¸´ÖÆ´úÂë
Persistence DstreamÖеÄRDDÒ²¿ÉÒÔµ÷ÓÃpersist()·½·¨±£´æÔÚÄÚ´æµ±ÖУ¬µ«ÊÇ»ùÓÚwindowºÍstateµÄ²Ù×÷£¬reduceByWindow,reduceByKeyAndWindow,updateStateByKeyËüÃǾÍÊÇÒþʽµÄ±£´æÁË£¬ÏµÍ³ÒѾ°ïËü×Ô¶¯±£´æÁË¡£ ´ÓÍøÂç½ÓÊÕµÄÊý¾Ý(such as, Kafka, Flume, sockets, etc.)£¬Ä¬ÈÏÊDZ£´æÔÚÁ½¸ö½ÚµãÀ´ÊµÏÖÈÝ´íÐÔ£¬ÒÔÐòÁл¯µÄ·½Ê½±£´æÔÚÄÚ´æµ±ÖС£
RDD Checkpointing ״̬µÄ²Ù×÷ÊÇ»ùÓÚ¶à¸öÅú´ÎµÄÊý¾ÝµÄ¡£Ëü°üÀ¨»ùÓÚwindowµÄ²Ù×÷ºÍupdateStateByKey¡£ÒòΪ״̬µÄ²Ù×÷ÒªÒÀÀµÓÚÉÏÒ»¸öÅú´ÎµÄÊý¾Ý£¬ËùÒÔËüÒª¸ù¾Ýʱ¼ä£¬²»¶ÏÀÛ»ýÔªÊý¾Ý¡£ÎªÁËÇå¿ÕÊý¾Ý£¬ËüÖ§³ÖÖÜÆÚÐԵļì²éµã£¬Í¨¹ý°ÑÖмä½á¹û±£´æµ½hdfsÉÏ¡£ÒòΪ¼ì²é²Ù×÷»áµ¼Ö±£´æµ½hdfsÉϵĿªÏú£¬ËùÒÔÉèÖÃÕâ¸öʱ¼ä¼ä¸ô£¬ÒªºÜÉ÷ÖØ¡£¶ÔÓÚСÅú´ÎµÄÊý¾Ý£¬±ÈÈçÒ»ÃëµÄ£¬¼ì²é²Ù×÷»á´ó´ó½µµÍÍÌÍÂÁ¿¡£µ«ÊǼì²éµÄ¼ä¸ôÌ«³¤£¬»áµ¼ÖÂÈÎÎñ±ä´ó¡£Í¨³£À´Ëµ£¬5-10ÃëµÄ¼ì²é¼ä¸ôʱ¼äÊDZȽϺÏÊʵġ£
- ssc.checkpoint(hdfsPath) //ÉèÖüì²éµãµÄ±£´æλÖÃ
- dstream.checkpoint(checkpointInterval) //ÉèÖüì²éµã¼ä¸ô
¸´ÖÆ´úÂë
¶ÔÓÚ±ØÐëÉèÖüì²éµãµÄDstream£¬±ÈÈçͨ¹ýupdateStateByKeyºÍreduceByKeyAndWindow´´½¨µÄDstream£¬Ä¬ÈÏÉèÖÃÊÇÖÁÉÙ10Ãë¡£
Performance Tuning¶ÔÓÚµ÷ÓÅ£¬¿ÉÒÔ´ÓÁ½¸ö·½Ã濼ÂÇ£º £¨1£©ÀûÓü¯Èº×ÊÔ´£¬¼õÉÙ´¦Àíÿ¸öÅú´ÎµÄÊý¾ÝµÄʱ¼ä £¨2£©¸øÿ¸öÅú´ÎµÄÊý¾ÝÁ¿µÄÉ趨һ¸öºÏÊʵĴóС
Level of ParallelismÏñһЩ·Ö²¼Ê½µÄ²Ù×÷£¬±ÈÈçreduceByKeyºÍreduceByKeyAndWindow£¬Ä¬ÈϵÄ8¸ö²¢·¢Ị̈߳¬¿ÉÒÔͨ¹ý¶ÔÓ¦µÄº¯ÊýÌá¸ßËüµÄÖµ£¬»òÕßͨ¹ýÐ޸IJÎÊýspark.default.parallelismÀ´Ìá¸ßÕâ¸öĬÈÏÖµ¡£
Task Launching Overheadsͨ¹ý½øÐеÄÈÎÎñÌ«¶àÒ²²»ºÃ£¬±ÈÈçÿÃë50¸ö£¬·¢ËÍÈÎÎñµÄ¸ºÔؾͻá±äµÃºÜÖØÒª£¬ºÜÄÑʵÏÖѹÃ뼶µÄʱÑÓÁË£¬µ±È»¿ÉÒÔͨ¹ýѹËõÀ´½µµÍÅú´ÎµÄ´óС¡£
Setting the Right Batch SizeҪʹÁ÷³ÌÐòÄÜÔÚ¼¯ÈºÉÏÎȶ¨µÄÔËÐУ¬ÒªÊ¹´¦ÀíÊý¾ÝµÄËٶȸúÉÏÊý¾ÝÁ÷ÈëµÄËٶȡ£×îºÃµÄ·½Ê½¼ÆËãÕâ¸öÅúÁ¿µÄ´óС£¬ÎÒÃÇÊ×ÏÈÉèÖÃbatch sizeΪ5-10ÃëºÍÒ»¸öºÜµÍµÄÊý¾ÝÊäÈëËٶȡ£È·ÊµÏµÍ³ÄܸúÉÏÊý¾ÝµÄËٶȵÄʱºò£¬ÎÒÃÇ¿ÉÒÔ¸ù¾Ý¾ÑéÉèÖÃËüµÄ´óС£¬Í¨¹ý²é¿´ÈÕÖ¾¿´¿´Total delayµÄ¶à³¤Ê±¼ä¡£Èç¹ûdelayµÄСÓÚbatchµÄ£¬ÄÇôϵͳ¿ÉÒÔÎȶ¨£¬Èç¹ûdelayÒ»Ö±Ôö¼Ó£¬ËµÃ÷ϵͳµÄ´¦ÀíËٶȸú²»ÉÏÊý¾ÝµÄÊäÈëËٶȡ£
24/7 OperationSparkĬÈϲ»»áÍü¼ÇÔªÊý¾Ý£¬±ÈÈçÉú³ÉµÄRDD£¬´¦ÀíµÄstages£¬µ«ÊÇSpark StreamingÊÇÒ»¸ö24/7µÄ³ÌÐò£¬ËüÐèÒªÖÜÆÚÐÔµÄÇåÀíÔªÊý¾Ý£¬Í¨¹ýspark.cleaner.ttlÀ´ÉèÖᣱÈÈçÎÒÉèÖÃËüΪ600£¬µ±³¬¹ý10·ÖÖÓµÄʱºò£¬Spark¾Í»áÇå³þËùÓÐÔªÊý¾Ý£¬È»ºó³Ö¾Ã»¯RDDs¡£µ«ÊÇÕâ¸öÊôÐÔÒªÔÚSparkContext ´´½¨Ö®Ç°ÉèÖá£
µ«ÊÇÕâ¸öÖµÊǺÍÈκεÄwindow²Ù×÷°ó¶¨¡£Spark»áÒªÇóÊäÈëÊý¾ÝÔÚ¹ýÆÚÖ®ºó±ØÐë³Ö¾Ã»¯µ½ÄÚ´æµ±ÖУ¬ËùÒÔ±ØÐëÉèÖÃdelayµÄÖµÖÁÉÙºÍ×î´óµÄwindow²Ù×÷Ò»Ö£¬Èç¹ûÉèÖÃСÁË£¬¾Í»á±¨´í¡£
Monitoring³ýÁËSparkÄÚÖõļà¿ØÄÜÁ¦£¬»¹¿ÉÒÔStreamingListenerÕâ¸ö½Ó¿ÚÀ´»ñÈ¡Åú´¦ÀíµÄʱ¼ä, ²éѯʱÑÓ, È«²¿µÄ¶Ëµ½¶ËµÄÊÔÑé¡£
Memory TuningĬÈϵģ¬ËùÓг־û¯µÄRDD¶¼»áͨ¹ý±»SparkµÄLRUËã·¨ÌÞ³ý³öÄڴ棬Èç¹ûÉèÖÃÁËspark.cleaner.ttl£¬¾Í»áÖÜÆÚÐÔµÄÇåÀí£¬µ«ÊÇÕâ¸ö²ÎÊýÉèÖÃÒªºÜ½÷É÷¡£Ò»¸ö¸üºÃµÄ·½·¨ÊÇÉèÖÃspark.streaming.unpersistΪtrue£¬Õâ¾ÍÈÃSparkÀ´¼ÆËãÄÄЩRDDÐèÒª³Ö¾Ã»¯£¬ÕâÑùÓÐÀûÓÚÌá¸ßGCµÄ±íÏÖ¡£ ÍƼöʹÓÃconcurrent mark-and-sweep GC£¬ËäÈ»ÕâÑù»á½µµÍϵͳµÄÍÌÍÂÁ¿£¬µ«ÊÇÕâÑùÓÐÖúÓÚ¸üÎȶ¨µÄ½øÐÐÅú´¦Àí¡£
Fault-tolerance PropertiesFailure of a Worker NodeÏÂÃæÓÐÁ½ÖÖʧЧµÄ·½Ê½£º 1.ʹÓÃhdfsÉϵÄÎļþ£¬ÒòΪhdfsÊÇ¿É¿¿µÄÎļþϵͳ£¬ËùÒÔ²»»áÓÐÈκεÄÊý¾ÝʧЧ¡£ 2.Èç¹ûÊý¾ÝÀ´Ô´ÊÇÍøÂ磬±ÈÈçKafkaºÍFlume£¬ÎªÁË·ÀֹʧЧ£¬Ä¬ÈÏÊÇÊý¾Ý»á±£´æµ½2¸ö½ÚµãÉÏ£¬µ«ÊÇÓÐÒ»ÖÖ¿ÉÄÜÐÔÊǽÓÊÜÊý¾ÝµÄ½Úµã¹ÒÁË£¬ÄÇôÊý¾Ý¿ÉÄܻᶪʧ£¬ÒòΪËü»¹Ã»À´µÃ¼°°ÑÊý¾Ý¸´ÖƵ½ÁíÍâÒ»¸ö½Úµã¡£
Failure of the Driver NodeΪÁËÖ§³Ö24/7²»¼ä¶ÏµÄ´¦Àí£¬SparkÖ§³ÖÇý¶¯½ÚµãʧЧºó£¬ÖØлָ´¼ÆËã¡£Spark Streaming»áÖÜÆÚÐÔµÄдÊý¾Ýµ½hdfsϵͳ£¬¾ÍÊÇÇ°ÃæµÄ¼ì²éµãµÄÄǸöĿ¼¡£Çý¶¯½ÚµãʧЧ֮ºó£¬StreamingContext¿ÉÒÔ±»»Ö¸´µÄ¡£
ΪÁËÈÃÒ»¸öSpark Streaming³ÌÐòÄܹ»±»»Ø¸´£¬ËüÐèÒª×öÒÔϲÙ×÷£º £¨1£©µÚÒ»´ÎÆô¶¯µÄʱºò£¬´´½¨ StreamingContext£¬´´½¨ËùÓеÄstreams£¬È»ºóµ÷ÓÃstart()·½·¨¡£ £¨2£©»Ö¸´ºóÖØÆôµÄ£¬±ØÐëͨ¹ý¼ì²éµãµÄÊý¾ÝÖØд´½¨StreamingContext¡£
ÏÂÃæÊÇÒ»¸öʵ¼ÊµÄÀý×Ó£º ͨ¹ýStreamingContext.getOrCreateÀ´¹¹ÔìStreamingContext£¬¿ÉÒÔʵÏÖÉÏÃæËù˵µÄ¡£
- // Function to create and setup a new StreamingContext
- def functionToCreateContext(): StreamingContext = {
- val ssc = new StreamingContext(...) // new context
- val lines = ssc.socketTextStream(...) // create DStreams
- ...
- ssc.checkpoint(checkpointDirectory) // set checkpoint directory
- ssc
- }
-
- // Get StreaminContext from checkpoint data or create a new one
- val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
-
- // Do additional setup on context that needs to be done,
- // irrespective of whether it is being started or restarted
- context. ...
-
- // Start the context
- context.start()
- context.awaitTermination()
¸´ÖÆ´úÂë
ÔÚstand-aloneµÄ²¿ÊðģʽÏÂÃ棬Çý¶¯½ÚµãʧЧÁË£¬Ò²¿ÉÒÔ×Ô¶¯»Ö¸´£¬ÈñðµÄÇý¶¯½ÚµãÌæ´úËü¡£Õâ¸ö¿ÉÒÔÔÚ±¾µØ½øÐвâÊÔ£¬ÔÚÌá½»µÄʱºò²ÉÓÃsuperviseģʽ£¬µ±Ìá½»Á˳ÌÐòÖ®ºó£¬Ê¹ÓÃjps²é¿´½ø³Ì£¬¿´µ½ÀàËÆDriverWrapper¾ÍɱËÀËü£¬Èç¹ûÊÇʹÓÃYARNģʽµÄ»°¾ÍµÃʹÓÃÆäËü·½Ê½À´ÖØÐÂÆô¶¯ÁË¡£
ÕâÀï˳±ãÌáÒ»ÏÂÏò¿Í»§¶ËÌá½»³ÌÐò°É£¬Ö®Ç°×ܽáµÄʱºò°ÑÕâ¿é¸øÂäÏÂÁË¡£
- ./bin/spark-class org.apache.spark.deploy.Client launch
- [client-options] \
- <cluster-url> <application-jar-url> <main-class> \
- [application-options]
-
- cluster-url: masterµÄµØÖ·.
- application-jar-url: jar°üµÄµØÖ·£¬×îºÃÊÇhdfsÉϵÄ,´øÉÏhdfs£º//...·ñÔòÒªËùÓеĽڵãµÄĿ¼Ï¶¼ÓÐÕâ¸öjarµÄ
- main-class: Òª·¢²¼µÄ³ÌÐòµÄmainº¯ÊýËùÔÚÀà.
- Client Options:
- --memory <count> (Çý¶¯³ÌÐòµÄÄڴ棬µ¥Î»ÊÇMB)
- --cores <count> (ΪÄãµÄÇý¶¯³ÌÐò·ÖÅä¶àÉÙ¸öºËÐÄ)
- --supervise (½ÚµãʧЧµÄʱºò£¬ÊÇ·ñÖØÐÂÆô¶¯Ó¦ÓÃ)
- --verbose (´òÓ¡ÔöÁ¿µÄÈÕÖ¾Êä³ö)
¸´ÖÆ´úÂë
ÔÚδÀ´µÄ°æ±¾£¬»áÖ§³ÖËùÓеÄÊý¾ÝÔ´µÄ¿É»Ö¸´ÐÔ¡£
ΪÁ˸üºÃµÄÀí½â»ùÓÚHDFSµÄÇý¶¯½ÚµãʧЧ»Ö¸´£¬ÏÂÃæÓÃÒ»¸ö¼òµ¥µÄÀý×ÓÀ´ËµÃ÷£º
- Time Number of lines in input file Output without driver failure Output with driver failure
- ¡¡¡¡10 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡10 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡10
- ¡¡¡¡20 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡20 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡20
- ¡¡¡¡30 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡30 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡30
- ¡¡¡¡40 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡40 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡[DRIVER FAILS] no output
- ¡¡¡¡50 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡50 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡no output
- ¡¡¡¡60 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡60 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡no output
- ¡¡¡¡70 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡70 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡[DRIVER RECOVERS] 40, 50, 60, 70
- ¡¡¡¡80 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡80 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡80
- ¡¡¡¡90 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡90 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡90
- ¡¡100 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ 100 ¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡¡ ¡¡¡¡¡¡ 100
¸´ÖÆ´úÂë
ÔÚ4µÄʱºò³öÏÖÁË´íÎó£¬40,50,60¶¼Ã»ÓÐÊä³ö£¬µ½70µÄʱºò»Ö¸´ÁË£¬»Ö¸´Ö®ºó°Ñ֮ǰûÊä³öµÄÒ»ÏÂ×ÓÈ«²¿Êä³ö¡£
á¯Óñº£
|