pig2 发表于 2019-2-21 12:20:25

彻底明白Flink系统学习31-2:Table API和SQL之读取外部数据

问题导读
1.Flink表格式都支持哪些格式?
2.Flink表格式都改如何定义和使用?
3.Flink提供的额外哪些TableSource和TableSinks?


上一篇:
彻底明白Flink系统学习31-1:Table API和SQL之读取外部数据
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26721

表格式
Flink提供了一组可与表连接器一起使用的表格式。

格式标记表示与连接器匹配的格式类型。

CSV格式
CSV格式允许读取和写入以逗号分隔的行。

Java|Scala实现
.withFormat(
new Csv()
    .field("field1", Types.STRING)    // required: ordered format fields
    .field("field2", Types.TIMESTAMP)
    .fieldDelimiter(",")            // optional: string delimiter "," by default
    .lineDelimiter("\n")            // optional: string delimiter "\n" by default
    .quoteCharacter('"')            // optional: single character for string values, empty by default
    .commentPrefix('#')               // optional: string to indicate comments, empty by default
    .ignoreFirstLine()                // optional: ignore the first line, by default it is not skipped
    .ignoreParseErrors()            // optional: skip records with parse error instead of failing by default
)

YAML
format:
type: csv
fields:                  # required: ordered format fields
    - name: field1
      type: VARCHAR
    - name: field2
      type: TIMESTAMP
field-delimiter: ","       # optional: string delimiter "," by default
line-delimiter: "\n"       # optional: string delimiter "\n" by default
quote-character: '"'       # optional: single character for string values, empty by default
comment-prefix: '#'      # optional: string to indicate comments, empty by default
ignore-first-line: false   # optional: boolean flag to ignore the first line, by default it is not skipped
ignore-parse-errors: true# optional: skip records with parse error instead of failing by default
CSV格式包含在Flink中,不需要其他依赖项。

注意:目前写入行的CSV格式有限。 仅支持自定义字段分隔符作为可选参数。

JSON格式
格式: Serialization Schema 格式: Deserialization Schema

JSON format 允许读取和写入给定格式schema相对应的JSON 数据。 格式schema 可以定义为Flink类型,JSON模式(schema),也可以从所需的表模式(schema)派生。 Flink类型支持更类似SQL的定义并映射到相应的SQL数据类型。 JSON模式允许更复杂和嵌套的结构。

如果格式架构( format schema)等于表模式(schema),则还可以自动派生模式(schema )。 这允许仅定义一次模式(schema )信息。 格式(format )的名称,类型和字段顺序由表的模式(schema)确定。 如果时间属性的来源不是字段,则会忽略它们。 表模式(schema )中的from定义被解释为格式中的字段重命名。

Java|Scala
.withFormat(
new Json()
    .failOnMissingField(true)   // optional: flag whether to fail if a field is missing or not, false by default

    // required: define the schema either by using type information which parses numbers to corresponding types
    .schema(Type.ROW(...))

    // or by using a JSON schema which parses to DECIMAL and TIMESTAMP
    .jsonSchema(
      "{" +
      "type: 'object'," +
      "properties: {" +
      "    lon: {" +
      "      type: 'number'" +
      "    }," +
      "    rideTime: {" +
      "      type: 'string'," +
      "      format: 'date-time'" +
      "    }" +
      "}" +
      "}"
    )

    // or use the table's schema
    .deriveSchema()
)

YAML
format:
type: json
fail-on-missing-field: true   # optional: flag whether to fail if a field is missing or not, false by default

# required: define the schema either by using a type string which parses numbers to corresponding types
schema: "ROW(lon FLOAT, rideTime TIMESTAMP)"

# or by using a JSON schema which parses to DECIMAL and TIMESTAMP
json-schema: >
    {
      type: 'object',
      properties: {
      lon: {
          type: 'number'
      },
      rideTime: {
          type: 'string',
          format: 'date-time'
      }
      }
    }

# or use the table's schema
derive-schema: true

下表显示了JSON模式(schema )类型到Flink SQL类型的映射:

JSON schemaFlink SQL
objectROW
booleanBOOLEAN
arrayARRAY
numberDECIMAL
integerDECIMAL
stringVARCHAR
string with format: date-timeTIMESTAMP
string with format: dateDATE
string with format: timeTIME
string with encoding: base64ARRAY
nullNULL (unsupported yet)

目前,Flink仅支持JSON模式规范draft-07的子集。 Union 类型(以及allOf,anyOf,not)尚不支持。 oneOf和类型数组仅支持指定为nullability。

支持链接到文档中的通用定义的简单引用,如下面更复杂的示例所示:
{
"definitions": {
    "address": {
      "type": "object",
      "properties": {
      "street_address": {
          "type": "string"
      },
      "city": {
          "type": "string"
      },
      "state": {
          "type": "string"
      }
      },
      "required": [
      "street_address",
      "city",
      "state"
      ]
    }
},
"type": "object",
"properties": {
    "billing_address": {
      "$ref": "#/definitions/address"
    },
    "shipping_address": {
      "$ref": "#/definitions/address"
    },
    "optional_address": {
      "oneOf": [
      {
          "type": "null"
      },
      {
          "$ref": "#/definitions/address"
      }
      ]
    }
}
}
缺少字段处理:默认情况下,缺少的JSON字段设置为null。 如果缺少字段,可以启用严格的JSON解析来取消源【source 】(和查询)。

确保将JSON格式添加为依赖项。

Avro 格式
格式: Serialization Schema 格式: Deserialization Schema

Apache Avro格式允许读取和写入相对应的给定格式模式(schema)的Avro数据。 格式模式(format schema)可以定义为Avro指定记录的完全qualified 类名,也可以定义为Avro模式(schema )字符串。 如果使用类名,则在运行时期间类必须在类路径中可用。

Java|Scala实现
.withFormat(
new Avro()

    // required: define the schema either by using an Avro specific record class
    .recordClass(User.class)

    // or by using an Avro schema
    .avroSchema(
      "{" +
      "\"type\": \"record\"," +
      "\"name\": \"test\"," +
      "\"fields\" : [" +
      "    {\"name\": \"a\", \"type\": \"long\"}," +
      "    {\"name\": \"b\", \"type\": \"string\"}" +
      "]" +
      "}"
    )
)

YAML
format:
type: avro

# required: define the schema either by using an Avro specific record class
record-class: "org.organization.types.User"

# or by using an Avro schema
avro-schema: >
    {
      "type": "record",
      "name": "test",
      "fields" : [
      {"name": "a", "type": "long"},
      {"name": "b", "type": "string"}
      ]
    }

Avro类型映射到相应的SQL数据类型。 仅支持Union类型以指定可为空(nullability ),否则它们将转换为ANY类型。 下表显示了映射:


Avro schemaFlink SQL
recordROW
enumVARCHAR
arrayARRAY
mapMAP
unionnon-null type or ANY
fixedARRAY
stringVARCHAR
bytesARRAY
intINT
longBIGINT
floatFLOAT
doubleDOUBLE
booleanBOOLEAN
int with logicalType: dateDATE
int with logicalType: time-millisTIME
int with logicalType: time-microsINT
long with logicalType: timestamp-millisTIMESTAMP
long with logicalType: timestamp-microsBIGINT
bytes with logicalType: decimalDECIMAL
fixed with logicalType: decimalDECIMAL
nullNULL (unsupported yet)

Avro使用Joda-Time来表示特定记录类中的逻辑日期和时间类型。 Joda-Time依赖不是Flink 分布式的一部分。 因此,确保Joda-Time在运行时期间与特定记录类一起位于类路径中。 通过模式(schema )字符串指定的Avro格式不需要Joda-Time。

确保添加Apache Avro依赖项。

进一步的TableSources和TableSinks
尚未将以下表源和接收器迁移(或尚未完全迁移)到新的统一接口。

这些是Flink提供的额外TableSource:


类名Maven 依赖批处理?流?描述
OrcTableSourceflink-orcYNORC文件的TableSource。

这些是Flink提供的附加TableSink:
类名Maven 依赖批处理?流?      描述
CsvTableSinkflink-tableYAppendCSV文件的简单sink 。
JDBCAppendTableSinkflink-jdbcYAppend将JDBC表写入Table sink
CassandraAppendTableSinkflink-connector-cassandraNAppend写表到 Cassandra 表.


OrcTableSource

OrcTableSource读取ORC文件。 ORC是结构化数据的文件格式,并以压缩的列式表示形式存储数据。 ORC非常高效,支持投影(projection )和滤波器下推(filter push-down)。

创建OrcTableSource,如下所示:
// create Hadoop Configuration
Configuration config = new Configuration();

OrcTableSource orcTableSource = OrcTableSource.builder()
// path to ORC file(s). NOTE: By default, directories are recursively scanned.
.path("file:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
// Hadoop configuration
.withConfiguration(config)
// build OrcTableSource
.build();

// create Hadoop Configuration
val config = new Configuration()

val orcTableSource = OrcTableSource.builder()
// path to ORC file(s). NOTE: By default, directories are recursively scanned.
.path("file:///path/to/data")
// schema of ORC files
.forOrcSchema("struct<name:string,addresses:array<struct<street:string,zip:smallint>>>")
// Hadoop configuration
.withConfiguration(config)
// build OrcTableSource
.build()
注意:OrcTableSource尚不支持ORC的Union类型。

CsvTableSink

CsvTableSink向一个或多个CSV文件emits 表。

接收器仅支持仅追加流表。 它不能用于emit 不断更新的表。 有关详细信息,请参阅表到流转换的文档。 emit 流表时,行至少写入一次(如果启用了检查点),并且CsvTableSink不会将输出文件拆分为存储桶文件,而是连续写入相同的文件。
CsvTableSink sink = new CsvTableSink(
    path,                  // output path
    "|",                   // optional: delimit files by '|'
    1,                     // optional: write to a single file
    WriteMode.OVERWRITE);// optional: override existing files

tableEnv.registerTableSink(
"csvOutputTable",
// specify table schema
new String[]{"f0", "f1"},
new TypeInformation[]{Types.STRING, Types.INT},
sink);

Table table = ...
table.insertInto("csvOutputTable");
val sink: CsvTableSink = new CsvTableSink(
    path,                           // output path
    fieldDelim = "|",               // optional: delimit files by '|'
    numFiles = 1,                     // optional: write to a single file
    writeMode = WriteMode.OVERWRITE)// optional: override existing files

tableEnv.registerTableSink(
"csvOutputTable",
// specify table schema
Array("f0", "f1"),
Array](Types.STRING, Types.INT),
sink)

val table: Table = ???
table.insertInto("csvOutputTable")


JDBCAppendTableSink
JDBCAppendTableSink将表emit到JDBC连接器。 接收器仅支持仅追加流表。 它不能用于emit不断更新的表。 有关详细信息,请参阅表到流转换的文档。

JDBCAppendTableSink将每个Table行至少插入一次数据库表(如果启用了检查点)。 但是,可以使用REPLACE或INSERT OVERWRITE指定插入查询(insertion query)以执行对数据库的upsert写入。

要使用JDBC接收器,必须将JDBC连接器依赖项(flink-jdbc)添加到项目中。 然后,可以使用JDBCAppendSinkBuilder创建接收器:
JDBCAppendTableSink sink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build();

tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
new String[]{"id"},
new TypeInformation[]{Types.INT},
sink);

Table table = ...
table.insertInto("jdbcOutputTable");
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:ebookshop")
.setQuery("INSERT INTO books (id) VALUES (?)")
.setParameterTypes(INT_TYPE_INFO)
.build()

tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
Array("id"),
Array](Types.INT),
sink)

val table: Table = ???
table.insertInto("jdbcOutputTable")
与使用JDBCOutputFormat类似,必须显式指定JDBC驱动程序的名称,JDBC URL,要执行的查询以及JDBC表的字段类型。

CassandraAppendTableSink
CassandraAppendTableSink向Cassandra表emit一个表。 接收器仅支持仅追加流表。 它不能用于emit不断更新的表。 有关详细信息,请参阅表到流转换的文档。

如果启用了检查点,CassandraAppendTableSink会将所有行至少插入一次Cassandra表中。 但是,可以将查询指定为upsert查询。

要使用CassandraAppendTableSink,必须将Cassandra连接器依赖项(flink-connector-cassandra)添加到项目中。 下面的示例显示了如何使用CassandraAppendTableSink。

ClusterBuilder builder = ... // configure Cassandra cluster connection

CassandraAppendTableSink sink = new CassandraAppendTableSink(
builder,
// the query must match the schema of the table
"INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)");

tableEnv.registerTableSink(
"cassandraOutputTable",
// specify table schema
new String[]{"id", "name", "value"},
new TypeInformation[]{Types.INT, Types.STRING, Types.DOUBLE},
sink);

Table table = ...
table.insertInto(cassandraOutputTable);

val builder: ClusterBuilder = ... // configure Cassandra cluster connection

val sink: CassandraAppendTableSink = new CassandraAppendTableSink(
builder,
// the query must match the schema of the table
"INSERT INTO flink.myTable (id, name, value) VALUES (?, ?, ?)")

tableEnv.registerTableSink(
"cassandraOutputTable",
// specify table schema
Array("id", "name", "value"),
Array](Types.INT, Types.STRING, Types.DOUBLE),
sink)

val table: Table = ???
table.insertInto(cassandraOutputTable)

最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg

美丽天空 发表于 2019-2-23 12:22:44

感谢分享
页: [1]
查看完整版本: 彻底明白Flink系统学习31-2:Table API和SQL之读取外部数据