本帖最后由 pig2 于 2019-1-2 16:51 编辑
问题导读
1.Flink内置哪些文件系统?
2.Flink都支持哪些Hadoop版本?
3.Flink对Avro的支持,需要哪些工作?
上一篇
彻底明白Flink系统学习16:【Flink1.7】DataSet 编程之Data Sinks详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26546
1.读取文件
Flink内置支持以下文件系统:文件系统 | Scheme | 注意 | Hadoop Distributed File System (HDFS) | hdfs:// | 支持所有HDFS版本 | Amazon S3 | s3:// | 支持通过Hadoop文件系统实现(见下文) | MapR file system | maprfs:// | 用户必须手动将所需的jar文件放在lib / dir中 | Alluxio | alluxio:// | 支持通过Hadoop文件系统实现(见下文) |
2.使用Hadoop文件系统实现
Apache Flink允许用户使用任何实现org.apache.hadoop.fs.FileSystem接口的文件系统。 继承Hadoop文件系统的有
还有更多。
Flink为了使用Hadoop文件系统,需要确保:
1.flink-conf.yaml将fs.hdfs.hadoopconf属性设置为Hadoop配置目录。 对于自动测试或从IDE运行,需要包含flink-conf.yaml,文件中设置FLINK_CONF_DIR环境变量。
2.Hadoop配置(在该目录中)在文件core-site.xml中具有所需文件系统的配置项。如下面S3和Alluxio
3.Flink安装的lib /文件夹中提供使用文件系统所需的类(在运行Flink的所有计算机上)。如果无法将文件放入目录,Flink还会按照HADOOP_CLASSPATH环境变量,以将Hadoop jar文件添加到classpath中。
Amazon S3
S3配置和所需类库,可参考
Deployment & Operations - Deployment - AWS - S3: Simple Storage Service
Alluxio
对于Alluxio支持,将以下内容添加到core-site.xml文件中:
[mw_shl_code=xml,true]<property>
<name>fs.alluxio.impl</name>
<value>alluxio.hadoop.FileSystem</value>
</property>[/mw_shl_code]
使用Hadoop Input/OutputFormat 封装连接到其它系统
Flink允许用户访问许多不同的系统作为数据源或接收器。 该系统的设计非常容易扩展。 与Apache Hadoop类似,Flink具有InputFormats和OutputFormats的概念。
这些InputFormats的一个实现是HadoopInputFormat。 这是一个封装,允许用户使用Flink的现有的Hadoop输入格式。
Flink链接其它系统,可参考有关Flink中Hadoop兼容性的更多信息。
Flink对Avro的支持
Flink对Apache Avro提供了广泛的内置支持。 这样可以使用Flink轻松读取Avro文件。 此外,Flink的序列化框架能够处理从Avro架构生成的类。 确保将Flink Avro依赖项包含在项目的pom.xml中。
[mw_shl_code=scala,true]<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.7.0</version>
</dependency>[/mw_shl_code]
要从Avro文件中读取数据,必须指定AvroInputFormat。
例子:
[mw_shl_code=scala,true]AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataSet<User> usersDS = env.createInput(users);[/mw_shl_code]
注意,User是Avro生成的POJO。 Flink还允许执行这些POJO的基于字符串的key selection 。 例如:
[mw_shl_code=scala,true]usersDS.groupBy("name")
[/mw_shl_code]
注意,使用Flink可以使用GenericData.Record类型,但不推荐使用。 由于记录包含完整的模式,因此其数据密集,因此可能使用起来很慢。
Flink的POJO字段选择也适用于Avro生成的POJO。 但是,只有在将字段类型正确写入生成的类时才可以使用。 如果字段的类型为Object,则不能将该字段用作连接或分组键。 在Avro中指定一个字段,如{“name”:“type_double_test”,“type”:“double”},工作正常,但是将其指定为只包含一个字段的UNION类型({“name”:“type_double_test”, “type”:[“double”]},)将生成一个Object类型的字段。 请注意,指定可空类型({“name”:“type_double_test”,“type”:[“null”,“double”]},)是可能的!
最新经典文章,欢迎关注公众号
|
|