o13674976542 发表于 2019-5-20 16:12:00

求解 sparksql 得出结果后遍历 将结果放入 hashmap 为什么放不进去

public class GetRank {
    public static void rank(String stuID,ArrayList list){
      HashMap<String, Double> result = new HashMap<>();
      SparkConf sparkConf = new SparkConf().setAppName(stuID + "Rank").setMaster("local");
      JavaSparkContext sc = new JavaSparkContext(sparkConf);
      SQLContext sqlContext = new SQLContext(sc);
      JavaRDD<String> dataRDD = sc.parallelize(list);
      JavaRDD<Row> datasRDD = dataRDD.map(new Function<String, Row>() {

            @Override
            public Row call(String line) throws Exception {
                String[] infos = line.split(":");
                return RowFactory.create(infos, Float.parseFloat(infos));
            }
      });
      List structFields = new ArrayList();
      structFields.add(DataTypes.createStructField("uuid",DataTypes.StringType,true));
      structFields.add(DataTypes.createStructField("rate", DataTypes.FloatType,true));
      StructType structType = DataTypes.createStructType(structFields);
      Dataset<Row> tableRDD = sqlContext.createDataFrame(datasRDD, structType);
      tableRDD.createOrReplaceTempView("rank");
      Dataset<Row> sql = sqlContext.sql("select * from rank order by rate desc");
      sql.foreach(row ->{

                String id = row.getString(0);
                float rate = row.getFloat(1);
                System.out.println(id+":"+rate);
                result.put(id,Double.valueOf(rate));
                System.out.println("已放入");

      });
      System.out.println(result.size());
      System.out.println(result.get("00000000001"));


    }
}


日志为 :
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/05/20 16:04:20 INFO SparkContext: Running Spark version 2.2.2
19/05/20 16:04:23 INFO SparkContext: Submitted application: 201560240130Rank
19/05/20 16:04:23 INFO SecurityManager: Changing view acls to: vivi
19/05/20 16:04:23 INFO SecurityManager: Changing modify acls to: vivi
19/05/20 16:04:23 INFO SecurityManager: Changing view acls groups to:
19/05/20 16:04:23 INFO SecurityManager: Changing modify acls groups to:
19/05/20 16:04:23 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; userswith view permissions: Set(vivi); groups with view permissions: Set(); userswith modify permissions: Set(vivi); groups with modify permissions: Set()
19/05/20 16:04:24 INFO Utils: Successfully started service 'sparkDriver' on port 58013.
19/05/20 16:04:24 INFO SparkEnv: Registering MapOutputTracker
19/05/20 16:04:24 INFO SparkEnv: Registering BlockManagerMaster
19/05/20 16:04:24 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/05/20 16:04:24 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/05/20 16:04:24 INFO DiskBlockManager: Created local directory at C:\Users\vivi\AppData\Local\Temp\blockmgr-826e7bed-5785-495f-9af5-8417bb6a0baf
19/05/20 16:04:24 INFO MemoryStore: MemoryStore started with capacity 1995.0 MB
19/05/20 16:04:24 INFO SparkEnv: Registering OutputCommitCoordinator
19/05/20 16:04:24 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/05/20 16:04:25 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://169.254.2.162:4040
19/05/20 16:04:25 INFO Executor: Starting executor ID driver on host localhost
19/05/20 16:04:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 58028.
19/05/20 16:04:25 INFO NettyBlockTransferService: Server created on 169.254.2.162:58028
19/05/20 16:04:25 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/05/20 16:04:25 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 169.254.2.162, 58028, None)
19/05/20 16:04:25 INFO BlockManagerMasterEndpoint: Registering block manager 169.254.2.162:58028 with 1995.0 MB RAM, BlockManagerId(driver, 169.254.2.162, 58028, None)
19/05/20 16:04:25 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 169.254.2.162, 58028, None)
19/05/20 16:04:25 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 169.254.2.162, 58028, None)
19/05/20 16:04:28 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/Users/vivi/Desktop/EndWork/spark-warehouse/').
19/05/20 16:04:28 INFO SharedState: Warehouse path is 'file:/C:/Users/vivi/Desktop/EndWork/spark-warehouse/'.
19/05/20 16:04:29 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
19/05/20 16:04:29 INFO SparkSqlParser: Parsing command: rank
19/05/20 16:04:30 INFO SparkSqlParser: Parsing command: select * from rank order by rate desc
19/05/20 16:04:31 INFO CodeGenerator: Code generated in 558.471922 ms
19/05/20 16:04:31 INFO CodeGenerator: Code generated in 40.682776 ms
19/05/20 16:04:31 INFO SparkContext: Starting job: toJavaRDD at GetRank.java:40
19/05/20 16:04:31 INFO DAGScheduler: Got job 0 (toJavaRDD at GetRank.java:40) with 2 output partitions
19/05/20 16:04:31 INFO DAGScheduler: Final stage: ResultStage 0 (toJavaRDD at GetRank.java:40)
19/05/20 16:04:31 INFO DAGScheduler: Parents of final stage: List()
19/05/20 16:04:31 INFO DAGScheduler: Missing parents: List()
19/05/20 16:04:31 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD at toJavaRDD at GetRank.java:40), which has no missing parents
19/05/20 16:04:31 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 9.6 KB, free 1995.0 MB)
19/05/20 16:04:31 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 5.1 KB, free 1995.0 MB)
19/05/20 16:04:31 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 169.254.2.162:58028 (size: 5.1 KB, free: 1995.0 MB)
19/05/20 16:04:31 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1015
19/05/20 16:04:31 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD at toJavaRDD at GetRank.java:40) (first 15 tasks are for partitions Vector(0, 1))
19/05/20 16:04:31 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
19/05/20 16:04:31 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 4884 bytes)
19/05/20 16:04:31 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 4903 bytes)
19/05/20 16:04:31 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/05/20 16:04:31 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
19/05/20 16:04:31 INFO CodeGenerator: Code generated in 24.181298 ms
19/05/20 16:04:32 INFO CodeGenerator: Code generated in 73.807185 ms
19/05/20 16:04:32 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 1626 bytes result sent to driver
19/05/20 16:04:32 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1569 bytes result sent to driver
19/05/20 16:04:32 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 451 ms on localhost (executor driver) (1/2)
19/05/20 16:04:32 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 425 ms on localhost (executor driver) (2/2)
19/05/20 16:04:32 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/05/20 16:04:32 INFO DAGScheduler: ResultStage 0 (toJavaRDD at GetRank.java:40) finished in 0.478 s
19/05/20 16:04:32 INFO DAGScheduler: Job 0 finished: toJavaRDD at GetRank.java:40, took 0.739246 s
19/05/20 16:04:32 INFO SparkContext: Starting job: foreach at GetRank.java:41
19/05/20 16:04:32 INFO DAGScheduler: Registering RDD 7 (toJavaRDD at GetRank.java:40)
19/05/20 16:04:32 INFO DAGScheduler: Got job 1 (foreach at GetRank.java:41) with 7 output partitions
19/05/20 16:04:32 INFO DAGScheduler: Final stage: ResultStage 2 (foreach at GetRank.java:41)
19/05/20 16:04:32 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1)
19/05/20 16:04:32 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1)
19/05/20 16:04:32 INFO DAGScheduler: Submitting ShuffleMapStage 1 (MapPartitionsRDD at toJavaRDD at GetRank.java:40), which has no missing parents
19/05/20 16:04:32 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 11.8 KB, free 1995.0 MB)
19/05/20 16:04:32 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 6.2 KB, free 1995.0 MB)
19/05/20 16:04:32 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 169.254.2.162:58028 (size: 6.2 KB, free: 1995.0 MB)
19/05/20 16:04:32 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1015
19/05/20 16:04:32 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 1 (MapPartitionsRDD at toJavaRDD at GetRank.java:40) (first 15 tasks are for partitions Vector(0, 1))
19/05/20 16:04:32 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
19/05/20 16:04:32 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, executor driver, partition 0, PROCESS_LOCAL, 4873 bytes)
19/05/20 16:04:32 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 3, localhost, executor driver, partition 1, PROCESS_LOCAL, 4892 bytes)
19/05/20 16:04:32 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
19/05/20 16:04:32 INFO Executor: Running task 1.0 in stage 1.0 (TID 3)
19/05/20 16:04:32 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1300 bytes result sent to driver
19/05/20 16:04:32 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1214 bytes result sent to driver
19/05/20 16:04:32 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 93 ms on localhost (executor driver) (1/2)
19/05/20 16:04:32 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 94 ms on localhost (executor driver) (2/2)
19/05/20 16:04:32 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/05/20 16:04:32 INFO DAGScheduler: ShuffleMapStage 1 (toJavaRDD at GetRank.java:40) finished in 0.097 s
19/05/20 16:04:32 INFO DAGScheduler: looking for newly runnable stages
19/05/20 16:04:32 INFO DAGScheduler: running: Set()
19/05/20 16:04:32 INFO DAGScheduler: waiting: Set(ResultStage 2)
19/05/20 16:04:32 INFO DAGScheduler: failed: Set()
19/05/20 16:04:32 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD at toJavaRDD at GetRank.java:40), which has no missing parents
19/05/20 16:04:32 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 16.6 KB, free 1995.0 MB)
19/05/20 16:04:32 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 8.5 KB, free 1994.9 MB)
19/05/20 16:04:32 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 169.254.2.162:58028 (size: 8.5 KB, free: 1995.0 MB)
19/05/20 16:04:32 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1015
19/05/20 16:04:32 INFO DAGScheduler: Submitting 7 missing tasks from ResultStage 2 (MapPartitionsRDD at toJavaRDD at GetRank.java:40) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6))
19/05/20 16:04:32 INFO TaskSchedulerImpl: Adding task set 2.0 with 7 tasks
19/05/20 16:04:32 INFO TaskSetManager: Starting task 6.0 in stage 2.0 (TID 4, localhost, executor driver, partition 6, PROCESS_LOCAL, 4726 bytes)
19/05/20 16:04:32 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 5, localhost, executor driver, partition 0, ANY, 4726 bytes)
19/05/20 16:04:32 INFO Executor: Running task 6.0 in stage 2.0 (TID 4)
19/05/20 16:04:32 INFO Executor: Running task 0.0 in stage 2.0 (TID 5)
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 2 blocks
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 6 ms
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 6 ms
19/05/20 16:04:32 INFO CodeGenerator: Code generated in 28.274807 ms
19/05/20 16:04:32 INFO CodeGenerator: Code generated in 56.112124 ms
19/05/20 16:04:32 INFO Executor: Finished task 6.0 in stage 2.0 (TID 4). 1691 bytes result sent to driver
19/05/20 16:04:32 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 6, localhost, executor driver, partition 1, ANY, 4726 bytes)
19/05/20 16:04:32 INFO TaskSetManager: Finished task 6.0 in stage 2.0 (TID 4) in 222 ms on localhost (executor driver) (1/7)
19/05/20 16:04:32 INFO Executor: Running task 1.0 in stage 2.0 (TID 6)
19/05/20 16:04:32 INFO Executor: Finished task 0.0 in stage 2.0 (TID 5). 1691 bytes result sent to driver
00000000005:0.94
已放入
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
19/05/20 16:04:32 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 7, localhost, executor driver, partition 2, ANY, 4726 bytes)
19/05/20 16:04:32 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 5) in 242 ms on localhost (executor driver) (2/7)
19/05/20 16:04:32 INFO Executor: Running task 2.0 in stage 2.0 (TID 7)
00000000003:0.91
已放入
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
00000000004:0.91
已放入
19/05/20 16:04:32 INFO Executor: Finished task 1.0 in stage 2.0 (TID 6). 1691 bytes result sent to driver
19/05/20 16:04:32 INFO TaskSetManager: Starting task 3.0 in stage 2.0 (TID 8, localhost, executor driver, partition 3, ANY, 4726 bytes)
19/05/20 16:04:32 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 6) in 78 ms on localhost (executor driver) (3/7)
19/05/20 16:04:32 INFO Executor: Running task 3.0 in stage 2.0 (TID 8)
00000000002:0.78
已放入
19/05/20 16:04:32 INFO Executor: Finished task 2.0 in stage 2.0 (TID 7). 1691 bytes result sent to driver
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
19/05/20 16:04:32 INFO TaskSetManager: Starting task 4.0 in stage 2.0 (TID 9, localhost, executor driver, partition 4, ANY, 4726 bytes)
19/05/20 16:04:32 INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID 7) in 78 ms on localhost (executor driver) (4/7)
19/05/20 16:04:32 INFO Executor: Finished task 3.0 in stage 2.0 (TID 8). 1691 bytes result sent to driver
00000000007:0.77
已放入
19/05/20 16:04:32 INFO Executor: Running task 4.0 in stage 2.0 (TID 9)
19/05/20 16:04:32 INFO TaskSetManager: Starting task 5.0 in stage 2.0 (TID 10, localhost, executor driver, partition 5, ANY, 4726 bytes)
19/05/20 16:04:32 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 8) in 63 ms on localhost (executor driver) (5/7)
19/05/20 16:04:32 INFO Executor: Running task 5.0 in stage 2.0 (TID 10)
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 2 ms
00000000006:0.35
已放入
19/05/20 16:04:32 INFO Executor: Finished task 5.0 in stage 2.0 (TID 10). 1648 bytes result sent to driver
19/05/20 16:04:32 INFO TaskSetManager: Finished task 5.0 in stage 2.0 (TID 10) in 87 ms on localhost (executor driver) (6/7)
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 2 blocks
19/05/20 16:04:32 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
00000000001:0.58
已放入
19/05/20 16:04:32 INFO Executor: Finished task 4.0 in stage 2.0 (TID 9). 1648 bytes result sent to driver
19/05/20 16:04:32 INFO TaskSetManager: Finished task 4.0 in stage 2.0 (TID 9) in 131 ms on localhost (executor driver) (7/7)
19/05/20 16:04:32 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
19/05/20 16:04:32 INFO DAGScheduler: ResultStage 2 (foreach at GetRank.java:41) finished in 0.444 s
19/05/20 16:04:32 INFO DAGScheduler: Job 1 finished: foreach at GetRank.java:41, took 0.601323 s
0
null
Mon May 20 16:04:32 CST 2019
19/05/20 16:04:32 INFO SparkContext: Invoking stop() from shutdown hook
19/05/20 16:04:32 INFO SparkUI: Stopped Spark web UI at http://169.254.2.162:4040
19/05/20 16:04:33 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/05/20 16:04:33 INFO MemoryStore: MemoryStore cleared
19/05/20 16:04:33 INFO BlockManager: BlockManager stopped
19/05/20 16:04:33 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/20 16:04:33 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/20 16:04:33 INFO SparkContext: Successfully stopped SparkContext
19/05/20 16:04:33 INFO ShutdownHookManager: Shutdown hook called
19/05/20 16:04:33 INFO ShutdownHookManager: Deleting directory C:\Users\vivi\AppData\Local\Temp\spark-a01e90f8-654a-4d04-89dc-2b1121d3c22f

Process finished with exit code 0

为什么呢 想不明白

hyj 发表于 2019-5-20 20:16:33




代码放到图示所示,看看什么情况。猜测应该外面值没有获取到。

o13674976542 发表于 2019-5-20 20:21:08

hyj 发表于 2019-5-20 20:16
代码放到图示所示,看看什么情况。猜测应该外面值没有获取到。

你好 ,放里边每次 size 都是 1 放进去了,为什么会这样呢

hyj 发表于 2019-5-20 20:47:16

o13674976542 发表于 2019-5-20 20:21
你好 ,放里边每次 size 都是 1 放进去了,为什么会这样呢

可能你的00000.1,被视为整数所以为1,而非字符串0000001

o13674976542 发表于 2019-5-21 08:54:28

hyj 发表于 2019-5-20 20:47
可能你的00000.1,被视为整数所以为1,而非字符串0000001

你好 感谢回复,我的代码是没有问题的吗?另外 为什么每次 size 都是0呢

s060403072 发表于 2019-5-21 09:36:46

o13674976542 发表于 2019-5-21 08:54
你好 感谢回复,我的代码是没有问题的吗?另外 为什么每次 size 都是0呢

确保结果正确即可。
要理解我们的程序不是传统程序,一段代码,可能在多台机器执行。所以你的result可能不是你所理解的那样。而是在多台执行的结果。

o13674976542 发表于 2019-5-21 09:44:05

s060403072 发表于 2019-5-21 09:36
确保结果正确即可。
要理解我们的程序不是传统程序,一段代码,可能在多台机器执行。所以你的result可能 ...

你好 那这样子的话 如果我想把结果拿出来 只能保存到数据库或者文件?

o13674976542 发表于 2019-5-21 09:55:58

感谢回答,已解决,将HashMap 提到最外边 加上static 解决

o13674976542 发表于 2019-5-21 10:39:32

o13674976542 发表于 2019-5-21 09:55
感谢回答,已解决,将HashMap 提到最外边 加上static 解决

还是不行,多线程的问题 放的东西又变得无序。。。。
页: [1]
查看完整版本: 求解 sparksql 得出结果后遍历 将结果放入 hashmap 为什么放不进去