彻底明白Flink系统学习31-1:Table API和SQL之读取外部数据
本帖最后由 pig2 于 2019-2-18 18:50 编辑问题导读
1.Flink表的类型有哪几种?
2.API与SQL客户端分别什么方式指定连接器?
3.Scala和Java如何创建连接器?
4.YAML配置的作用是什么?
5.所有连接器都可用于批量和流式传输,是否正确?
上一篇文章彻底明白Flink系统学习30:查询配置
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26696
Flink的Table API和SQL程序可以连接到其他外部系统,以便读取和写入批处理表和流表。 表源(source)提供对存储在外部系统(例如数据库,键值存储,消息队列或文件系统)中的数据的访问。 表接收器(sink)将表发送到外部存储系统。 根据源和接收器的类型,它们支持不同的格式,如CSV,Parquet或ORC。
本文介绍如何声明内置表源和/或表接收器(sink),并在Flink中注册它们。 注册源或接收器后,可以通过Table API和SQL语句访问它。
注意如果要实现自己的自定义表源或接收器,请查看后面用户定义的源和接收器。
依赖
下表列出了所有可用的连接器和格式。 它们的相互兼容性在表连接器和表格格式的相应部分中标记。 下表提供了使用构建自动化工具(如Maven或SBT)和带有SQL JAR包的SQL Client的两个项目的依赖关系信息。
连接器Connectors
名称版本Maven 依赖SQL客户端JAR
Filesystem 内置Built-in
Elasticsearch6flink-connector-elasticsearch6Download
Apache Kafka0.8flink-connector-kafka-0.8无法使用
Apache Kafka0.9flink-connector-kafka-0.9Download
Apache Kafka0.10flink-connector-kafka-0.10Download
Apache Kafka0.11flink-connector-kafka-0.11Download
Apache Kafka0.11+ (universal)flink-connector-kafkaDownload
格式Formats
名称Maven依赖SQL 客户端JAR
CSV内置Built-in
JSONflink-jsonDownload
Apache Avroflink-avroDownload
概述
从Flink 1.6开始,与外部系统的连接声明与实际实现分离。
也可以指定连接
[*]以编程方式在表和SQL API的org.apache.flink.table.descriptors下使用Descriptor
[*]或声明性地通过SQL客户端的YAML配置文件。
这不仅可以更好地统一API和SQL Client,还可以在自定义实现的情况下实现更好的可扩展性,而无需更改实际声明。
每个声明都类似于SQL CREATE TABLE语句。可以预先定义表的名称,表的模式(schema),连接器以及用于连接到外部系统的数据格式。
连接器描述了存储表数据的外部系统。可以在此处声明Kafka或常规文件系统等存储系统。连接器可能已经提供了具有字段和架构的固定格式。
某些系统支持不同的数据格式。例如,存储在Kafka或文件中的表可以使用CSV,JSON或Avro对其行进行编码。数据库连接器可能需要此处的表模式。每个连接器都记录了存储系统是否需要定义格式。不同的系统还需要不同类型的格式(例如,面向列的格式与面向行的格式)。该文档说明了哪些格式类型和连接器兼容。
表模式定义了向SQL查询公开的表的模式。它描述了源(source)如何将数据格式映射到表模式,接收器(sink)同样也是。架构可以访问连接器或格式定义的字段。它可以使用一个或多个字段来提取或插入时间属性。如果输入字段没有确定性字段顺序,则模式清楚地定义列名称,它们的顺序和原点。
后续部分将更详细地介绍每个定义部分(连接器,格式和架构)。以下示例显示了如何传递它们:
Java或则Scala实现
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.registerTableSource("MyTable")
YAML:
name: MyTable
type: source
update-mode: append
connector: ...
format: ...
schema: ...
表的类型(源,接收器或两者)确定表的注册方式。 如果是表类型,则表源和表接收器都以相同的名称注册。 从逻辑上讲,这意味着我们可以读取和写入这样的表,类似于常规DBMS中的表。
对于流式查询,更新模式声明如何在动态表和存储系统之间进行通信以进行连续查询。
以下代码显示了如何连接到Kafka以读取Avro记录的完整示例。
Java或则Scala
tableEnvironment
// declare the external system to connect to
.connect(
new Kafka()
.version("0.10")
.topic("test-input")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
// declare a format for this system
.withFormat(
new Avro()
.avroSchema(
"{" +
"\"namespace\": \"org.myorganization\"," +
"\"type\": \"record\"," +
"\"name\": \"UserMessage\"," +
" \"fields\": [" +
" {\"name\": \"timestamp\", \"type\": \"string\"}," +
" {\"name\": \"user\", \"type\": \"long\"}," +
" {\"name\": \"message\", \"type\": [\"string\", \"null\"]}" +
" ]" +
"}" +
)
)
// declare the schema of the table
.withSchema(
new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("timestamp")
.watermarksPeriodicBounded(60000)
)
.field("user", Types.LONG)
.field("message", Types.STRING)
)
// specify the update-mode for streaming tables
.inAppendMode()
// register as source, sink, or both and under a name
.registerTableSource("MyUserTable");
YAML
tables:
- name: MyUserTable # name the new table
type: source # declare if the table should be "source", "sink", or "both"
update-mode: append # specify the update-mode for streaming tables
# declare the external system to connect to
connector:
type: kafka
version: "0.10"
topic: test-input
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: localhost:2181
- key: bootstrap.servers
value: localhost:9092
# declare a format for this system
format:
type: avro
avro-schema: >
{
"namespace": "org.myorganization",
"type": "record",
"name": "UserMessage",
"fields": [
{"name": "ts", "type": "string"},
{"name": "user", "type": "long"},
{"name": "message", "type": ["string", "null"]}
]
}
# declare the schema of the table
schema:
- name: rowtime
type: TIMESTAMP
rowtime:
timestamps:
type: from-field
from: ts
watermarks:
type: periodic-bounded
delay: "60000"
- name: user
type: BIGINT
- name: message
type: VARCHAR
在两种方式中,所需的连接器属性都转换为规范化的,基于字符串的键值对。 所谓的表工厂从键值对创建配置的表源,表接收器和相应的格式。 在搜索完全匹配的表工厂时,会考虑通过Java的服务提供程序接口(SPI)找到的所有表工厂。【表工厂则在后面自定义Sources & Sinks可以找到相关概念】
这里简单补充:
TableFactory允许从基于字符串的属性创建不同的表相关实例。 调用所有可用工厂以匹配给定的属性集和相应的工厂类。
工厂利用Java的服务提供商接口(SPI)进行发现。 这意味着每个依赖项和JAR文件都应包含META_INF / services资源目录中的文件org.apache.flink.table.factories.TableFactory,该文件列出了它提供的所有可用表工厂。
如果找不到工厂或多个工厂匹配给定的属性,则会抛出一个异常,其中包含有关已考虑的工厂和支持的属性的其他信息。
表模式
表模式即:Table Schema
表模式定义列的名称和类型,类似于SQL CREATE TABLE语句的列定义。 此外,可以指定列的表示方式和表格数据编码格式的字段。 如果列的名称应与输入/输出格式不同,则字段的来源可能很重要。 例如,列user_name应从JSON格式引用字段 $$-user-name。 此外,schema需要将类型从外部系统映射到Flink的表示。 对于表接收器,它确保仅将具有有效模式(schema)的数据写入外部系统。
以下示例显示了没有时间属性的简单模式以及输入/输出到表列的一对一字段映射。
Java或则Scala
.withSchema(
new Schema()
.field("MyField1", Types.INT) // required: specify the fields of the table (in this order)
.field("MyField2", Types.STRING)
.field("MyField3", Types.BOOLEAN)
)
YAML
schema:
- name: MyField1 # required: specify the fields of the table (in this order)
type: INT
- name: MyField2
type: VARCHAR
- name: MyField3
type: BOOLEAN
对于每个字段,除了列的名称和类型之外,还可以声明以下属性:
Java或则Scala实现
.withSchema(
new Schema()
.field("MyField1", Types.SQL_TIMESTAMP)
.proctime() // optional: declares this field as a processing-time attribute
.field("MyField2", Types.SQL_TIMESTAMP)
.rowtime(...) // optional: declares this field as a event-time attribute
.field("MyField3", Types.BOOLEAN)
.from("mf3") // optional: original field in the input that is referenced/aliased by this field
)
YAML
schema:
- name: MyField1
type: TIMESTAMP
proctime: true # optional: boolean flag whether this field should be a processing-time attribute
- name: MyField2
type: TIMESTAMP
rowtime: ... # optional: wether this field should be a event-time attribute
- name: MyField3
type: BOOLEAN
from: mf3 # optional: original field in the input that is referenced/aliased by this field
使用无界流表时,时间属性是必不可少的。 因此,处理时间和事件时间(也称为“rowtime”)属性都可以定义为模式的一部分。
有关Flink中时间处理的更多信息,特别是事件时间,我们建议使用常规事件时间部分。
Rowtime 属性
为了控制表的事件时间行为,Flink提供了预定义的时间戳提取器和水印策略。
支持以下时间戳提取器:
Java|Scala实现
// //将输入中的现有LONG或SQL_TIMESTAMP字段转换为rowtime属性。
.rowtime(
new Rowtime()
.timestampsFromField("ts_field") // required: original field name in the input
)
//将分配的时间戳从DataStream API记录转换为rowtime属性
//从而保留来自源的指定时间戳。
//这需要一个分配时间戳的来源(例如,Kafka 0.10+)。
.rowtime(
new Rowtime()
.timestampsFromSource()
)
//设置要用于rowtime属性的自定义时间戳提取器。
//提取器必须扩展`org.apache.flink.table.sources.tsextractors.TimestampExtractor`。
.rowtime(
new Rowtime()
.timestampsFromExtractor(...)
)
YAML
# Converts an existing BIGINT or TIMESTAMP field in the input into the rowtime attribute.
rowtime:
timestamps:
type: from-field
from: "ts_field" # required: original field name in the input
# Converts the assigned timestamps from a DataStream API record into the rowtime attribute
# and thus preserves the assigned timestamps from the source.
rowtime:
timestamps:
type: from-source
支持以下水印策略:
Java|Scala实现
//为升序行时间属性设置水印策略。 发出最大的水印//到目前为止观察到的时间戳减去1.时间戳等于最大时间戳的行
//不迟到
.rowtime(
new Rowtime()
.watermarksPeriodicAscending()
)
// Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
// Emits watermarks which are the maximum observed timestamp minus the specified delay.
.rowtime(
new Rowtime()
.watermarksPeriodicBounded(2000) // delay in milliseconds
)
// Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
// underlying DataStream API and thus preserves the assigned watermarks from the source.
.rowtime(
new Rowtime()
.watermarksFromSource()
)
确保始终声明时间戳和水印。 触发基于时间的操作需要水印。
类型字符串
由于类型信息仅在编程语言中可用,因此支持在YAML文件中定义以下类型字符串:
VARCHAR
BOOLEAN
TINYINT
SMALLINT
INT
BIGINT
FLOAT
DOUBLE
DECIMAL
DATE
TIME
TIMESTAMP
MAP<fieldtype, fieldtype> # generic map; e.g. MAP<VARCHAR, INT> that is mapped to Flink's MapTypeInfo
MULTISET<fieldtype> # multiset; e.g. MULTISET<VARCHAR> that is mapped to Flink's MultisetTypeInfo
PRIMITIVE_ARRAY<fieldtype> # primitive array; e.g. PRIMITIVE_ARRAY<INT> that is mapped to Flink's PrimitiveArrayTypeInfo
OBJECT_ARRAY<fieldtype> # object array; e.g. OBJECT_ARRAY<POJO(org.mycompany.MyPojoClass)> that is mapped to
# Flink's ObjectArrayTypeInfo
ROW<fieldtype, ...> # unnamed row; e.g. ROW<VARCHAR, INT> that is mapped to Flink's RowTypeInfo
# with indexed fields names f0, f1, ...
ROW<fieldname fieldtype, ...> # named row; e.g., ROW<myField VARCHAR, myOtherField INT> that
# is mapped to Flink's RowTypeInfo
POJO<class> # e.g., POJO<org.mycompany.MyPojoClass> that is mapped to Flink's PojoTypeInfo
ANY<class> # e.g., ANY<org.mycompany.MyClass> that is mapped to Flink's GenericTypeInfo
ANY<class, serialized> # used for type information that is not supported by Flink's Table & SQL API
更新模式
对于流式查询,需要声明如何在动态表和外部连接器之间执行转换。更新模式指定应与外部系统交换哪种消息:
追加模式:在追加模式下,动态表和外部连接器仅交换INSERT消息。
回退模式:在回退模式下,动态表和外部连接器交换ADD和RETRACT消息。 INSERT更改被编码为ADD消息,DELETE更改被编码为RETRACT消息,UPDATE更改被编码为更新(前一个)行的RETRACT消息和更新(新)行的ADD消息。在此模式下,不能定义key。但是,每次更新都包含两个效率较低的消息。
Upsert模式:在upsert模式下,动态表和外部连接器交换UPSERT和DELETE消息。此模式需要一个(可能是复合的)唯一Key,通过该Key可以传播更新。外部连接器需要知道唯一Key属性才能正确应用消息。 INSERT和UPDATE更改被编码为UPSERT消息。 DELETE更改为DELETE消息。与收回(retract)流的主要区别在于UPDATE更改使用单个消息进行编码,因此更有效。
注意每个连接器的文档都说明了支持哪些更新模式。
Java|Scala实现
.connect(...)
.inAppendMode() // otherwise: inUpsertMode() or inRetractMode()
YAML
tables:
- name: ...
update-mode: append # otherwise: "retract" or "upsert"
表连接器
Flink提供了一组用于连接外部系统的连接器。
请注意,并非所有连接器都可用于批量和流式传输。 此外,并非每个流连接器都支持每种流模式。 因此,相应地标记每个连接器。 格式标记表示连接器需要特定类型的格式。
文件系统连接器
Source: Batch Source: Streaming Append Mode Sink: Batch Sink: Streaming Append Mode Format: CSV-only
文件系统连接器允许从本地或分布式文件系统进行读写。 文件系统可以定义为:
Java|Scala实现
.connect(
new FileSystem()
.path("file:///path/to/whatever") // required: path to a file or directory
)
YAML
connector:
type: filesystem
path: "file:///path/to/whatever" # required: path to a file or directory
文件系统连接器本身包含在Flink中,不需要额外的依赖项。 需要指定相应的格式,以便从文件系统读取和写入行。
注意:确保包含特定于Flink文件系统的依赖项。
注意:用于流式传输的文件系统源和接收器仅是实验性的。 将来,我们将支持实际的流用例,即目录监控和桶(bucket)输出。
Kafka连接器
Source: Streaming Append Mode Sink: Streaming Append Mode Format: Serialization Schema Format: Deserialization Schema
Kafka连接器允许从Apache Kafka主题读取和写入。 它可以定义如下:
Java|Scala实现
.connect(
new Kafka()
.version("0.11") // required: valid connector versions are
// "0.8", "0.9", "0.10", "0.11", and "universal"
.topic("...") // required: topic name from which the table is read
// optional: connector specific properties
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testGroup")
// optional: select a startup mode for Kafka offsets
.startFromEarliest()
.startFromLatest()
.startFromSpecificOffsets(...)
// optional: output partitioning from Flink's partitions into Kafka's partitions
.sinkPartitionerFixed() // each Flink partition ends up in at-most one Kafka partition (default)
.sinkPartitionerRoundRobin() // a Flink partition is distributed to Kafka partitions round-robin
.sinkPartitionerCustom(MyCustom.class) // use a custom FlinkKafkaPartitioner subclass
)
YAML
connector:
type: kafka
version: "0.11" # required: valid connector versions are
# "0.8", "0.9", "0.10", "0.11", and "universal"
topic: ... # required: topic name from which the table is read
properties: # optional: connector specific properties
- key: zookeeper.connect
value: localhost:2181
- key: bootstrap.servers
value: localhost:9092
- key: group.id
value: testGroup
startup-mode: ... # optional: valid modes are "earliest-offset", "latest-offset",
# "group-offsets", or "specific-offsets"
specific-offsets: # optional: used in case of startup mode with specific offsets
- partition: 0
offset: 42
- partition: 1
offset: 300
sink-partitioner: ... # optional: output partitioning from Flink's partitions into Kafka's partitions
# valid are "fixed" (each Flink partition ends up in at most one Kafka partition),
# "round-robin" (a Flink partition is distributed to Kafka partitions round-robin)
# "custom" (use a custom FlinkKafkaPartitioner subclass)
sink-partitioner-class: org.mycompany.MyPartitioner# optional: used in case of sink partitioner custom
指定开始读取位置:默认情况下,Kafka源将开始从Zookeeper或Kafka broker中的已提交组偏移量中读取数据。可以指定其他起始位置,这些位置对应于Kafka消费者起始位置配置部分中的配置。
Flink-Kafka Sink Partitioning:默认情况下,Kafka接收器最多写入与其自身并行性一样多的分区(接收器的每个并行实例只写入一个分区)。为了将写入分发到更多分区或控制行路由到分区,可以提供自定义接收器分区器。循环分区器可用于避免不平衡的分区。但是,它会在所有Flink实例和所有Kafka代理之间产生大量网络连接。
一致性保证:默认情况下,如果在启用了检查点的情况下执行查询,则Kafka接收器会使用至少一次保证将数据提取到Kafka主题中。
Kafka 0.10+时间戳:自Kafka 0.10起,Kafka消息的时间戳作为元数据,指定记录何时写入Kafka主题。通过选择时间戳,可以将这些时间戳用于行时属性:YAML中的from-source和Java / Scala中的timestampsFromSource()。
Kafka 0.11+版本控制:自Flink 1.7起,Kafka连接器定义应独立于硬编码(hard-coded)的Kafka版本。使用通用连接器版本作为Flink的Kafka连接器的通配符,该连接器与从0.11开始的所有Kafka版本兼容。
确保添加特定于版本的Kafka依赖项。此外,需要指定相应的格式以便从Kafka读取和写入行。
Elasticsearch连接器
Sink: Streaming Append Mode Sink: Streaming Upsert Mode Format: JSON-only
Elasticsearch连接器允许写入Elasticsearch搜索引擎的索引。
连接器可以在upsert模式下运行,以使用查询定义的密钥与外部系统交换UPSERT / DELETE消息。
对于仅追加查询,连接器也可以在追加模式下操作,以便仅与外部系统交换INSERT消息。 如果查询未定义任何键,则Elasticsearch会自动生成一个键。
连接器可以定义如下:
Java|Scala实现
.connect(
new Elasticsearch()
.version("6") // required: valid connector versions are "6"
.host("localhost", 9200, "http") // required: one or more Elasticsearch hosts to connect to
.index("MyUsers") // required: Elasticsearch index
.documentType("user") // required: Elasticsearch document type
.keyDelimiter("$") // optional: delimiter for composite keys ("_" by default)
// e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
.keyNullLiteral("n/a") // optional: representation for null fields in keys ("null" by default)
// optional: failure handling strategy in case a request to Elasticsearch fails (fail by default)
.failureHandlerFail() // optional: throws an exception if a request fails and causes a job failure
.failureHandlerIgnore() // or ignores failures and drops the request
.failureHandlerRetryRejected() // or re-adds requests that have failed due to queue capacity saturation
.failureHandlerCustom(...) // or custom failure handling with a ActionRequestFailureHandler subclass
// optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
.disableFlushOnCheckpoint() // optional: disables flushing on checkpoint (see notes below!)
.bulkFlushMaxActions(42) // optional: maximum number of actions to buffer for each bulk request
.bulkFlushMaxSize("42 mb") // optional: maximum size of buffered actions in bytes per bulk request
// (only MB granularity is supported)
.bulkFlushInterval(60000L) // optional: bulk flush interval (in milliseconds)
.bulkFlushBackoffConstant() // optional: use a constant backoff type
.bulkFlushBackoffExponential() // or use an exponential backoff type
.bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
.bulkFlushBackoffDelay(30000L) // optional: delay between each backoff attempt (in milliseconds)
// optional: connection properties to be used during REST communication to Elasticsearch
.connectionMaxRetryTimeout(3)// optional: maximum timeout (in milliseconds) between retries
.connectionPathPrefix("/v1") // optional: prefix string to be added to every REST communication
)
YAML
connector:
type: elasticsearch
version: 6 # required: valid connector versions are "6"
hosts: # required: one or more Elasticsearch hosts to connect to
- hostname: "localhost"
port: 9200
protocol: "http"
index: "MyUsers" # required: Elasticsearch index
document-type: "user" # required: Elasticsearch document type
key-delimiter: "$" # optional: delimiter for composite keys ("_" by default)
# e.g., "$" would result in IDs "KEY1$KEY2$KEY3"
key-null-literal: "n/a" # optional: representation for null fields in keys ("null" by default)
# optional: failure handling strategy in case a request to Elasticsearch fails ("fail" by default)
failure-handler: ... # valid strategies are "fail" (throws an exception if a request fails and
# thus causes a job failure), "ignore" (ignores failures and drops the request),
# "retry-rejected" (re-adds requests that have failed due to queue capacity
# saturation), or "custom" for failure handling with a
# ActionRequestFailureHandler subclass
# optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency
flush-on-checkpoint: true # optional: disables flushing on checkpoint (see notes below!) ("true" by default)
bulk-flush:
max-actions: 42 # optional: maximum number of actions to buffer for each bulk request
max-size: 42 mb # optional: maximum size of buffered actions in bytes per bulk request
# (only MB granularity is supported)
interval: 60000 # optional: bulk flush interval (in milliseconds)
back-off: # optional: backoff strategy ("disabled" by default)
type: ... # valid strategies are "disabled", "constant", or "exponential"
max-retries: 3 # optional: maximum number of retries
delay: 30000 # optional: delay between each backoff attempt (in milliseconds)
# optional: connection properties to be used during REST communication to Elasticsearch
connection-max-retry-timeout: 3 # optional: maximum timeout (in milliseconds) between retries
connection-path-prefix: "/v1" # optional: prefix string to be added to every REST communication
批量刷新(Bulk flushing):有关可选刷新参数特征的更多信息,请参阅相应的文档。
禁用检查点上的刷新(Disabling flushing on checkpoint):禁用时,接收器不会等待Elasticsearch在检查点上确认所有待处理的操作请求。 因此,接收器不会为至少一次传递动作请求提供任何强有力的保证。
密钥提取(Key extraction):Flink自动从查询中提取有效密钥。 例如,查询SELECT a,b,c FROM t GROUP BY a,b定义字段a和b的复合键。 Elasticsearch连接器通过使用键分隔符连接查询中定义的顺序中的所有键字段,为每一行生成文档ID字符串。 可以定义关键字段的空文字的自定义表示。
注意JSON格式定义了如何为外部系统编码文档,因此,必须将其添加为依赖项。
最新经典文章,欢迎关注公众号
http://www.aboutyun.com/data/attachment/forum/201406/15/084659qcxzzg8n59b6zejp.jpg
感谢分享 有没有怎么使用yaml方式的例子啊。没找到。。。 ld512870 发表于 2019-2-21 16:02
有没有怎么使用yaml方式的例子啊。没找到。。。
SQL客户端的YAML配置文件中,比如尝试在sql-client-defaults.yaml中设置
s060403072 发表于 2019-2-21 16:55
SQL客户端的YAML配置文件中,比如尝试在sql-client-defaults.yaml中设置
谢谢。SQL Client模块有。。。没看到。
页:
[1]