分享

彻底明白Flink系统学习17:【Flink1.7】DataSet 编程之如何读取外部文件

pig2 2019-1-2 16:50:50 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 13914
本帖最后由 pig2 于 2019-1-2 16:51 编辑

问题导读

1.Flink内置哪些文件系统?
2.Flink都支持哪些Hadoop版本?
3.Flink对Avro的支持,需要哪些工作?

上一篇
彻底明白Flink系统学习16:【Flink1.7】DataSet 编程之Data Sinks详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26546


1.读取文件
Flink内置支持以下文件系统:
文件系统
Scheme
注意
Hadoop Distributed File System (HDFS)  hdfs://支持所有HDFS版本
Amazon S3s3://支持通过Hadoop文件系统实现(见下文)
MapR file systemmaprfs://用户必须手动将所需的jar文件放在lib / dir中
Alluxioalluxio://  支持通过Hadoop文件系统实现(见下文)

2.使用Hadoop文件系统实现
Apache Flink允许用户使用任何实现org.apache.hadoop.fs.FileSystem接口的文件系统。 继承Hadoop文件系统的有

还有更多。

Flink为了使用Hadoop文件系统,需要确保:
1.flink-conf.yaml将fs.hdfs.hadoopconf属性设置为Hadoop配置目录。 对于自动测试或从IDE运行,需要包含flink-conf.yaml,文件中设置FLINK_CONF_DIR环境变量。
2.Hadoop配置(在该目录中)在文件core-site.xml中具有所需文件系统的配置项。如下面S3和Alluxio
3.Flink安装的lib /文件夹中提供使用文件系统所需的类(在运行Flink的所有计算机上)。如果无法将文件放入目录,Flink还会按照HADOOP_CLASSPATH环境变量,以将Hadoop jar文件添加到classpath中。

Amazon S3

S3配置和所需类库,可参考
Deployment & Operations - Deployment - AWS - S3: Simple Storage Service

Alluxio
对于Alluxio支持,将以下内容添加到core-site.xml文件中:
[mw_shl_code=xml,true]<property>
  <name>fs.alluxio.impl</name>
  <value>alluxio.hadoop.FileSystem</value>
</property>[/mw_shl_code]

使用Hadoop Input/OutputFormat 封装连接到其它系统

Flink允许用户访问许多不同的系统作为数据源或接收器。 该系统的设计非常容易扩展。 与Apache Hadoop类似,Flink具有InputFormats和OutputFormats的概念。

这些InputFormats的一个实现是HadoopInputFormat。 这是一个封装,允许用户使用Flink的现有的Hadoop输入格式。


Flink链接其它系统,可参考有关Flink中Hadoop兼容性的更多信息。

Flink对Avro的支持
Flink对Apache Avro提供了广泛的内置支持。 这样可以使用Flink轻松读取Avro文件。 此外,Flink的序列化框架能够处理从Avro架构生成的类。 确保将Flink Avro依赖项包含在项目的pom.xml中。
[mw_shl_code=scala,true]<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-avro</artifactId>
  <version>1.7.0</version>
</dependency>[/mw_shl_code]
要从Avro文件中读取数据,必须指定AvroInputFormat。
例子:
[mw_shl_code=scala,true]AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataSet<User> usersDS = env.createInput(users);[/mw_shl_code]
注意,User是Avro生成的POJO。 Flink还允许执行这些POJO的基于字符串的key selection 。 例如:
[mw_shl_code=scala,true]usersDS.groupBy("name")
[/mw_shl_code]
注意,使用Flink可以使用GenericData.Record类型,但不推荐使用。 由于记录包含完整的模式,因此其数据密集,因此可能使用起来很慢。

Flink的POJO字段选择也适用于Avro生成的POJO。 但是,只有在将字段类型正确写入生成的类时才可以使用。 如果字段的类型为Object,则不能将该字段用作连接或分组键。 在Avro中指定一个字段,如{“name”:“type_double_test”,“type”:“double”},工作正常,但是将其指定为只包含一个字段的UNION类型({“name”:“type_double_test”, “type”:[“double”]},)将生成一个Object类型的字段。 请注意,指定可空类型({“name”:“type_double_test”,“type”:[“null”,“double”]},)是可能的!

最新经典文章,欢迎关注公众号


已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条