各位大佬早上好,有个问题想咨询一下,我现在做实时数据流处理,用flume进行数据采集,然后将采集的数据放到kafka,然后在用sparkstreaming 进行消费,但是我在测试的时候发现,kafka-console-consumer.s 可以打印出接收到的数据,但是spark streaming 还没有接收到数据,要等一段时间以后才可能接收到数据并处理,中间有一段时间的延迟,理论上spark streaming应该可以实时的接收到数据,即使有延迟,理论上也应该在毫秒级别,但是我这要等好几分钟才可以,请问这是什么问题
val conf = new SparkConf().setAppName("KafkaCustomerManager Demo").setMaster(master)
val ssc = new StreamingContext(conf, Seconds(timeInterval.toInt))
ssc.checkpoint(CHECKPOINT_PATH)
val kafkaStream: InputDStream[(String, String)] = kafkaHelper.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic.split(",").toSet)
还有,就是设置master时,如果我设置为spark://***:7077的时候,程序就不停的做checkout操作,不执行我后续的逻辑操作,设置成local[*]时会继续后续动作,请问这又是为什么
spark-submit --master spark://master:7077 --jars lib/metrics-core-2.2.0.jar --class com.ztsmart.search.etl.realtime.KafkaCustomerManager zt_smart_search_etl.jar 4 master:9092 master:2181 gis_map zsmart spark://***:7077
这是我执行的命令
conf.set("spark.streaming.blockInterval", SPARK_STREAMING_BLOCKiNTERVAL)
conf.set("spark.serializer", SPARK_SERIALIZER)
conf.set("spark.storage.memoryFraction", SPARK_STORAGE_MEMORYfRACTION) //executor分配给缓存的内存比例,默认为0.6即60%,剩下40%为task运行的内存,实际上40%是偏小的
conf.set("spark.locality.wait", SPARK_LOCALITY_WAIT) //6000毫秒
conf.set("spark.streaming.kafka.maxRatePerPartition", SPARK_STREAMING_KAFKA_MAXRATEPERPARTITION) // 限制每秒钟从topic的每个partition最多消费的消息条数
conf.set("spark.driver.allowMultipleContexts", SPARK_DRIVER_ALLOWMULTIPLECONTEXTS)
//shuffle优化
conf.set("spark.shuffle.consolidateFiles", SPARK_SHUFFLE_CONSOLIDATEFILES)
conf.set("spark.reducer.maxSizeInFlight", SPARK_REDUCER_MAXSIZEINFLIGHT)
conf.set("spark.shuffle.file.buffer", SPARK_SHUFFLE_FILE_BUFFER)
conf.set("spark.shuffle.io.maxRetries", SPARK_SHUFFLE_IO_MAXRETRIES)
conf.set("spark.shuffle.io.retryWait", SPARK_SHUFFLE_IO_RETRYWAIT)
conf.set("spark.shuffle.memoryFraction", SPARK_SHUFFLE_MEMORYFRACTION)
##########spark 参数#############
spark.streaming.blockInterval=50ms
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.storage.memoryFraction=0.4
spark.locality.wait=6000
spark.streaming.kafka.maxRatePerPartition=35000
spark.driver.allowMultipleContexts=true
################flush###################
spark.shuffle.consolidateFiles=true
spark.reducer.maxSizeInFlight=150m
spark.shuffle.file.buffer=128k
spark.shuffle.io.maxRetries=8
spark.shuffle.io.retryWait=6s
spark.shuffle.memoryFraction=0.3
warehouseLocation=hdfs://master:9000/usr/hive/warehouse
auto.offset.reset=largest
group.id=gis_map
checkpoint.path=.
这是我设置的参数信息
|
|