分享

泛型

zstu 2018-5-29 20:03:40 发表于 疑问解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 9 6381
有一个json字符串,想将它转换成一个自定义的类,但不确定json字符串对应的schema是对应哪个类,所以想用泛型来实现,
[mw_shl_code=java,true]def genericsTest[T:ClassTag](rdd: RDD[(String, String)]) = {
    val res = rdd.mapPartitions(partition => {
      val gson: Gson = GsonUtil.getGson
      partition.map(x => gson.fromJson(x._2, new TypeToken[T]() {}.getType).asInstanceOf[T])
    })
    println(res.count())
    res
  }[/mw_shl_code]上面代码中T如果是class A,那函数结果就想变成RDD[A],如果是B,那函数结果就想变成RDD[B],但我这个函数的泛型写得不对,想请教一下怎么写?

已有(9)人评论

跳转到指定楼层
hello2018 发表于 2018-5-30 07:26:06
既然json,转换为类,然后类在转换为RDD?
楼主是否考虑json直接转为RDD
回复

使用道具 举报

hello2018 发表于 2018-5-30 07:30:39
spark2 DataFrame可以直接读取json,如果非要是RDD,DF可以转换为RDD
回复

使用道具 举报

zstu 发表于 2018-5-30 11:13:49
hello2018 发表于 2018-5-30 07:30
spark2 DataFrame可以直接读取json,如果非要是RDD,DF可以转换为RDD

你好,我用的spark Streaming 接收到的是RDD
回复

使用道具 举报

hello2018 发表于 2018-5-30 12:05:58
本帖最后由 hello2018 于 2018-5-30 12:21 编辑
zstu 发表于 2018-5-30 11:13
你好,我用的spark Streaming 接收到的是RDD

楼主是说上面方法不行?还是什么情况?

回复

使用道具 举报

zstu 发表于 2018-5-31 09:47:34
hello2018 发表于 2018-5-30 12:05
楼主是说上面方法不行?还是什么情况?

不好意思,有点忙,回复的晚了。因为我们使用的是spark Streaming ,所以读出来的时候就是RDD[String],不是DataFrame,除非用StructuredStreaming,但我们还没用这个。
回复

使用道具 举报

desehawk 发表于 2018-5-31 17:43:17
zstu 发表于 2018-5-31 09:47
不好意思,有点忙,回复的晚了。因为我们使用的是spark Streaming ,所以读出来的时候就是RDD[String], ...

上面输出的是什么?差别在什么地方?gson.fromJson,这里楼主封装了吧。
回复

使用道具 举报

zstu 发表于 2018-5-31 19:49:11
desehawk 发表于 2018-5-31 17:43
上面输出的是什么?差别在什么地方?gson.fromJson,这里楼主封装了吧。

上面的输出就是用不同的Java Bean 构建不同的schem的DataFrame,差别就是我现在是RDD[String]想用泛型转换成DataFrame,Structured Streaming是直接转,但我们还没用,gson.fromJson没有封装,这个是Gson包的API
回复

使用道具 举报

desehawk 发表于 2018-5-31 20:06:47
zstu 发表于 2018-5-31 19:49
上面的输出就是用不同的Java Bean 构建不同的schem的DataFrame,差别就是我现在是RDD[String]想用泛型转 ...




[mw_shl_code=scala,true]def genericsTest[T:ClassTag](rdd: RDD[(String, String)]) = {
    val res = rdd.mapPartitions(partition => {
      val gson: Gson = GsonUtil.getGson
      partition.map(x => gson.fromJson(x._2, new TypeToken[T]() {}.getType).asInstanceOf[T])
    })
    println(res.count())
    res
  }[/mw_shl_code]
这段代码应该失败了吧,和你预期有什么区别?
输出的是什么?
你想要什么结果。

回复

使用道具 举报

zstu 发表于 2018-6-1 10:25:03
desehawk 发表于 2018-5-31 20:06
[mw_shl_code=scala,true]def genericsTest[T:ClassTag](rdd: RDD[(String, String)]) = {
    va ...


我想要的是假如T是Person类,那返回的就是RDD[Person],如果是Student类,那返回的就是RDD[Student],就是想把这个函数写成泛型的形式,函数中new TypeToken[Person]这个Person类不想写死。
[mw_shl_code=applescript,true]def genericsTest(rdd: RDD[(String, String)]) = {
    val res = rdd.mapPartitions(partition => {
      val gson: Gson = GsonUtil.getGson
      partition.map(x => gson.fromJson(x._2, new TypeToken[Person]() {}.getType).asInstanceOf[Person])
    })
    println(res.count())
    res
  }[/mw_shl_code]
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条