彻底明白Flink系统学习31-2:Table API和SQL之读取外部数据
问题导读1.Flink表格式都支持哪些格式?
2.Flink表格式都改如何定义和使用?
3.Flink提供的额外哪些TableSource和TableSinks?
上一篇:
彻底明白Flink系统学习31-1:Table API和SQL之读取外部数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26721
表格式
Flink提供了一组可与表连接器一起使用的表格式。
格式标记表示与连接器匹配的格式类型。
CSV格式
CSV格式允许读取和写入以逗号分隔的行。
Java|Scala实现
.withFormat(
new Csv()
.field("field1", Types.STRING) // required: ordered format fields
.field("field2", Types.TIMESTAMP)
.fieldDelimiter(",") // optional: string delimiter "," by default
.lineDelimiter("\n") // optional: string delimiter "\n" by default
.quoteCharacter('"') // optional: single character for string values, empty by default
.commentPrefix('#') // optional: string to indicate comments, empty by default
.ignoreFirstLine() // optional: ignore the first line, by default it is not skipped
.ignoreParseErrors() // optional: skip records with parse error instead of failing by default
)
YAML
format:
type: csv
fields: # required: ordered format fields
- name: field1
type: VARCHAR
- name: field2
type: TIMESTAMP
field-delimiter: "," # optional: string delimiter "," by default
line-delimiter: "\n" # optional: string delimiter "\n" by default
quote-character: '"' # optional: single character for string values, empty by default
comment-prefix: '#' # optional: string to indicate comments, empty by default
ignore-first-line: false # optional: boolean flag to ignore the first line, by default it is not skipped
ignore-parse-errors: true# optional: skip records with parse error instead of failing by default
CSV格式包含在Flink中,不需要其他依赖项。
注意:目前写入行的CSV格式有限。 仅支持自定义字段分隔符作为可选参数。
JSON格式
格式: Serialization Schema 格式: Deserialization Schema
JSON format 允许读取和写入给定格式schema相对应的JSON 数据。 格式schema 可以定义为Flink类型,JSON模式(schema),也可以从所需的表模式(schema)派生。 Flink类型支持更类似SQL的定义并映射到相应的SQL数据类型。 JSON模式允许更复杂和嵌套的结构。
如果格式架构( format schema)等于表模式(schema),则还可以自动派生模式(schema )。 这允许仅定义一次模式(schema )信息。 格式(format )的名称,类型和字段顺序由表的模式(schema)确定。 如果时间属性的来源不是字段,则会忽略它们。 表模式(schema )中的from定义被解释为格式中的字段重命名。
Java|Scala
.withFormat(
new Json()
.failOnMissingField(true) // optional: flag whether to fail if a field is missing or not, false by default
// required: define the schema either by using type information which parses numbers to corresponding types
.schema(Type.ROW(...))
// or by using a JSON schema which parses to DECIMAL and TIMESTAMP
.jsonSchema(
"{" +
"type: 'object'," +
"properties: {" +
" lon: {" +
" type: 'number'" +
" }," +
" rideTime: {" +
" type: 'string'," +
" format: 'date-time'" +
" }" +
"}" +
"}"
)
// or use the table's schema
.deriveSchema()
)
YAML
format:
type: json
fail-on-missing-field: true # optional: flag whether to fail if a field is missing or not, false by default
# required: define the schema either by using a type string which parses numbers to corresponding types
schema: "ROW(lon FLOAT, rideTime TIMESTAMP)"
# or by using a JSON schema which parses to DECIMAL and TIMESTAMP
json-schema: >
{
type: 'object',
properties: {
lon: {
type: 'number'
},
rideTime: {
type: 'string',
format: 'date-time'
}
}
}
# or use the table's schema
derive-schema: true
下表显示了JSON模式(schema )类型到Flink SQL类型的映射:
JSON schemaFlink SQL
objectROW
booleanBOOLEAN
arrayARRAY
numberDECIMAL
integerDECIMAL
stringVARCHAR
string with format: date-timeTIMESTAMP
string with format: dateDATE
string with format: timeTIME
string with encoding: base64ARRAY
nullNULL (unsupported yet)
目前,Flink仅支持JSON模式规范draft-07的子集。 Union 类型(以及allOf,anyOf,not)尚不支持。 oneOf和类型数组仅支持指定为nullability。
支持链接到文档中的通用定义的简单引用,如下面更复杂的示例所示:
{
"definitions": {
"address": {
"type": "object",
"properties": {
"street_address": {
"type": "string"
},
"city": {
"type": "string"
},
"state": {
"type": "string"
}
},
"required": [
"street_address",
"city",
"state"
]
}
},
"type": "object",
"properties": {
"billing_address": {
"$ref": "#/definitions/address"
},
"shipping_address": {
"$ref": "#/definitions/address"
},
"optional_address": {
"oneOf": [
{
"type": "null"
},
{
"$ref": "#/definitions/address"
}
]
}
}
}
缺少字段处理:默认情况下,缺少的JSON字段设置为null。 如果缺少字段,可以启用严格的JSON解析来取消源【source 】(和查询)。
确保将JSON格式添加为依赖项。
Avro 格式
格式: Serialization Schema 格式: Deserialization Schema
Apache Avro格式允许读取和写入相对应的给定格式模式(schema)的Avro数据。 格式模式(format schema)可以定义为Avro指定记录的完全qualified 类名,也可以定义为Avro模式(schema )字符串。 如果使用类名,则在运行时期间类必须在类路径中可用。
Java|Scala实现
.withFormat(
new Avro()
// required: define the schema either by using an Avro specific record class
.recordClass(User.class)
// or by using an Avro schema
.avroSchema(
"{" +
"\"type\": \"record\"," +
"\"name\": \"test\"," +
"\"fields\" : [" +
" {\"name\": \"a\", \"type\": \"long\"}," +
" {\"name\": \"b\", \"type\": \"string\"}" +
"]" +
"}"
)
)
YAML
format:
type: avro
# required: define the schema either by using an Avro specific record class
record-class: "org.organization.types.User"
# or by using an Avro schema
avro-schema: >
{
"type": "record",
"name": "test",
"fields" : [
{"name": "a", "type": "long"},
{"name": "b", "type": "string"}
]
}
Avro类型映射到相应的SQL数据类型。 仅支持Union类型以指定可为空(nullability ),否则它们将转换为ANY类型。 下表显示了映射:
Avro schemaFlink SQL
recordROW
enumVARCHAR
arrayARRAY
mapMAP
unionnon-null type or ANY
fixedARRAY
stringVARCHAR
bytesARRAY
intINT
longBIGINT
floatFLOAT
doubleDOUBLE
booleanBOOLEAN
int with logicalType: dateDATE
int with logicalType: time-millisTIME
int with logicalType: time-microsINT
long with logicalType: timestamp-millisTIMESTAMP
long with logicalType: timestamp-microsBIGINT
bytes with logicalType: decimalDECIMAL
fixed with logicalType: decimalDECIMAL
nullNULL (unsupported yet)
Avro使用Joda-Time来表示特定记录类中的逻辑日期和时间类型。 Joda-Time依赖不是Flink 分布式的一部分。 因此,确保Joda-Time在运行时期间与特定记录类一起位于类路径中。 通过模式(schema )字符串指定的Avro格式不需要Joda-Time。
确保添加Apache Avro依赖项。
进一步的TableSources和TableSinks
尚未将以下表源和接收器迁移(或尚未完全迁移)到新的统一接口。
这些是Flink提供的额外TableSource:
类名Maven 依赖批处理?流?描述
OrcTableSourceflink-orcYNORC文件的TableSource。
这些是Flink提供的附加TableSink:
类名Maven 依赖批处理?流? 描述
CsvTableSinkflink-tableYAppendCSV文件的简单sink 。
JDBCAppendTableSinkflink-jdbcYAppend将JDBC表写入Table sink
CassandraAppendTableSinkflink-connector-cassandraNAppend写表到 Cassandra 表.
OrcTableSource
OrcTableSource读取ORC文件。 ORC是结构化数据的文件格式,并以压缩的列式表示形式存储数据。 ORC非常高效,支持投影(projection )和滤波器下推(filter push-down)。
创建OrcTableSource,如下所示:
// create Hadoop Configuration
Configuration config = new Configuration();
OrcTableSource orcTableSource = OrcTableSource.builder()
// path to ORC file(s). NOTE: By default, directories are recursively scanned.
.path("file:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
// Hadoop configuration
.withConfiguration(config)
// build OrcTableSource
.build();
// create Hadoop Configuration
val config = new Configuration()
val orcTableSource = OrcTableSource.builder()
// path to ORC file(s). NOTE: By default, directories are recursively scanned.
.path("file:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
// Hadoop configuration
.withConfiguration(config)
// build OrcTableSource
.build()
注意:OrcTableSource尚不支持ORC的Union类型。
CsvTableSink
CsvTableSink向一个或多个CSV文件emits 表。
接收器仅支持仅追加流表。 它不能用于emit 不断更新的表。 有关详细信息,请参阅表到流转换的文档。 emit 流表时,行至少写入一次(如果启用了检查点),并且CsvTableSink不会将输出文件拆分为存储桶文件,而是连续写入相同的文件。
CsvTableSink sink = new CsvTableSink(
path, // output path
"|", // optional: delimit files by '|'
1, // optional: write to a single file
WriteMode.OVERWRITE);// optional: override existing files
tableEnv.registerTableSink(
"csvOutputTable",
// specify table schema
new String[]{"f0", "f1"},
new TypeInformation[]{Types.STRING, Types.INT},
sink);
Table table = ...
table.insertInto("csvOutputTable");
val sink: CsvTableSink = new CsvTableSink(
path, // output path
fieldDelim = "|", // optional: delimit files by '|'
numFiles = 1, // optional: write to a single file
writeMode = WriteMode.OVERWRITE)// optional: override existing files
tableEnv.registerTableSink(
"csvOutputTable",
// specify table schema
Array("f0", "f1"),
Array](Types.STRING, Types.INT),
sink)
val table: Table = ???
table.insertInto("csvOutputTable")
JDBCAppendTableSink
JDBCAppendTableSink将表emit到JDBC连接器。 接收器仅支持仅追加流表。 它不能用于emit不断更新的表。 有关详细信息,请参阅表到流转换的文档。
JDBCAppendTableSink将每个Table行至少插入一次数据库表(如果启用了检查点)。 但是,可以使用REPLACE或INSERT OVERWRITE指定插入查询(insertion query)以执行对数据库的upsert写入。
要使用JDBC接收器,必须将JDBC连接器依赖项(flink-jdbc)添加到项目中。 然后,可以使用JDBCAppendSinkBuilder创建接收器:
JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build();
tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
new String[]{"id"},
new TypeInformation[]{Types.INT},
sink);
Table table = ...
table.insertInto("jdbcOutputTable");
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build()
tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
Array("id"),
Array](Types.INT),
sink)
val table: Table = ???
table.insertInto("jdbcOutputTable")
与使用JDBCOutputFormat类似,必须显式指定JDBC驱动程序的名称,JDBC URL,要执行的查询以及JDBC表的字段类型。
CassandraAppendTableSink
CassandraAppendTableSink向Cassandra表emit一个表。 接收器仅支持仅追加流表。 它不能用于emit不断更新的表。 有关详细信息,请参阅表到流转换的文档。
如果启用了检查点,CassandraAppendTableSink会将所有行至少插入一次Cassandra表中。 但是,可以将查询指定为upsert查询。
要使用CassandraAppendTableSink,必须将Cassandra连接器依赖项(flink-connector-cassandra)添加到项目中。 下面的示例显示了如何使用CassandraAppendTableSink。
ClusterBuilder builder = ... // configure Cassandra cluster connection
CassandraAppendTableSink sink = new CassandraAppendTableSink(
builder,
// the query must match the schema of the table
"INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)");
tableEnv.registerTableSink(
"cassandraOutputTable",
// specify table schema
new String[]{"id", "name", "value"},
new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},
sink);
Table table = ...
table.insertInto(cassandraOutputTable);
val builder: ClusterBuilder = ... // configure Cassandra cluster connection
val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
builder,
// the query must match the schema of the table
"INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)")
tableEnv.registerTableSink(
"cassandraOutputTable",
// specify table schema
Array("id", "name", "value"),
Array](Types.INT, Types.STRING, Types.DOUBLE),
sink)
val table: Table = ???
table.insertInto(cassandraOutputTable)
最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
感谢分享
页:
[1]