分享

如何从 0 到 1 开发 PyFlink API 作业

fc013 2021-4-30 15:48:15 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 3715


问题导读:

1、如何开发 PyFlink Table API 作业?
2、如何开发 PyFlink DataStream API 作业?
3、Flink 提供了哪几种作业部署方式?





一、环境准备

第一步:安装 Python

PyFlink 仅支持 Python 3.5+,您首先需要确认您的开发环境是否已安装了 Python 3.5+,如果没有的话,需要先安装 Python 3.5+。

第二步:安装 JDK

我们知道 Flink 的运行是使用 Java 语言开发的,所以为了执行 Flink 作业,您还需要安装 JDK。Flink 提供了对于 JDK 8 以及 JDK 11 的全面支持,您需要确认您的开发环境中是否已经安装了上述版本的 JDK,如果没有的话,需要先安装 JDK。

第三步:安装 PyFlink

接下来需要安装 PyFlink,可以通过以下命令进行安装:

  1. # 创建Python虚拟环境
  2. python3 -m pip install virtualenv
  3. virtualenv -p `which python3` venv
  4. # 使用上述创建的Python虚拟环境
  5. ./venv/bin/activate
  6. # 安装PyFlink 1.12
  7. python3 -m pip install apache-flink==1.12.2
复制代码

二、作业开发

PyFlink Table API 作业

我们首先介绍一下如何开发 PyFlink Table API 作业。

1)创建 TableEnvironment 对象

对于 Table API 作业来说,用户首先需要创建一个 TableEnvironment 对象。以下示例定义了一个 TableEnvironment 对象,使用该对象的定义的作业,运行在流模式,且使用 blink planner 执行。

  1. env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  2. t_env = StreamTableEnvironment.create(environment_settings=env_settings)
复制代码

■ 2)配置作业的执行参数

可以通过以下方式,配置作业的执行参数。以下示例将作业的默认并发度设置为 4。

  1. t_env.get_config().get_configuration().set_string('parallelism.default', '4')
复制代码

■ 3)创建数据源表

接下来,需要为作业创建一个数据源表。PyFlink 中提供了多种方式来定义数据源表。

方式一:from_elements

PyFlink 支持用户从一个给定列表,创建源表。以下示例定义了包含了 3 行数据的表:[("hello", 1), ("world", 2), ("flink", 3)],该表有 2 列,列名分别为 a 和 b,类型分别为 VARCHAR 和 BIGINT。

  1. tab = t_env.from_elements([("hello", 1), ("world", 2), ("flink", 3)], ['a', 'b'])
复制代码

说明:

  • 这种方式通常用于测试阶段,可以快速地创建一个数据源表,验证作业逻辑。

  • from_elements 方法可以接收多个参数,其中第一个参数用于指定数据列表,列表中的每一个元素必须为 tuple 类型;第二个参数用于指定表的 schema。

方式二:DDL

除此之外,数据也可以来自于一个外部的数据源。以下示例定义了一个名字为 my_source,类型为 datagen 的表,表中有两个类型为 VARCHAR 的字段。

  1. t_env.execute_sql("""
  2.         CREATE TABLE my_source (
  3.           a VARCHAR,
  4.           b VARCHAR
  5.         ) WITH (
  6.           'connector' = 'datagen',
  7.           'number-of-rows' = '10'
  8.         )
  9.     """)
  10. tab = t_env.from_path('my_source')
复制代码

说明:

  • 通过 DDL 的方式来定义数据源表是目前最推荐的方式,且所有 Java Table API & SQL 中支持的 connector,都可以通过 DDL 的方式,在 PyFlink Table API 作业中使用,详细的 connector 列表请参见 Flink 官方文档 [1]。

  • 当前仅有部分 connector 的实现包含在 Flink 官方提供的发行包中,比如 FileSystem,DataGen、Print、BlackHole 等,大部分 connector 的实现当前没有包含在 Flink 官方提供的发行包中,比如 Kafka、ES 等。针对没有包含在 Flink 官方提供的发行包中的 connector,如果需要在 PyFlink 作业中使用,用户需要显式地指定相应 FAT JAR,比如针对 Kafka,需要使用 JAR 包 [2],JAR 包可以通过如下方式指定:

  1. # 注意:file:///前缀不能省略
  2. t_env.get_config().get_configuration().set_string("pipeline.jars", "file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
复制代码

方式三:catalog
  1. hive_catalog = HiveCatalog("hive_catalog")
  2. t_env.register_catalog("hive_catalog", hive_catalog)
  3. t_env.use_catalog("hive_catalog")
  4. # 假设hive catalog中已经定义了一个名字为source_table的表
  5. tab = t_env.from_path('source_table')
复制代码

这种方式和 DDL 的方式类似,只不过表的定义事先已经注册到了 catalog 中了,不需要在作业中重新再定义一遍。

■ 4)定义作业的计算逻辑

方式一:通过 Table API

得到 source 表之后,接下来就可以使用 Table API 中提供的各种操作,定义作业的计算逻辑,对表进行各种变换,比如:

  1. @udf(result_type=DataTypes.STRING())
  2. def sub_string(s: str, begin: int, end: int):
  3.    return s[begin:end]
  4. transformed_tab = tab.select(sub_string(col('a'), 2, 4))
复制代码

方式二:通过 SQL 语句

除了可以使用 Table API 中提供的各种操作之外,也可以直接通过 SQL 语句来对表进行变换,比如上述逻辑,也可以通过 SQL 语句来实现:

  1. t_env.create_temporary_function("sub_string", sub_string)
  2. transformed_tab = t_env.sql_query("SELECT sub_string(a, 2, 4) FROM %s" % tab)
复制代码

说明:

  • TableEnvironment 中提供了多种方式用于执行 SQL 语句,其用途略有不同:


方法名 使用说明
sql_query 用来执行 SELECT 语句
sql_update 用来执行 INSERT 语句 / CREATE TABLE 语句。该方法已经被 deprecate,推荐使用 execute_sql 或者create_statement_set 替代。
create_statement_set 用来执行多条 SQL 语句,可以通过该方法编写 multi-sink 的作业。
execute_sql 用来执行单条 SQL 语句。execute_sql VS create_statement_set: 前者只能执行单条 SQL 语句,后者可用于执行多条 SQL 语句 execute_sql VS sql_query:前者可用于执行各种类型的 SQL 语句,比如 DDL、 DML、DQL、SHOW、DESCRIBE、EXPLAIN、USE 等,后者只能执行 DQL 语句即使是 DQL 语句,两者的行为也不一样。前者会生成 Flink 作业,触发表数据的计算,返回 TableResult 类型,后者并不触发计算,仅对表进行逻辑变换,返回 Table 类型

■ 5)查看执行计划

用户在开发或者调试作业的过程中,可能需要查看作业的执行计划,可以通过如下方式。

方式一:Table.explain

比如,当我们需要知道 transformed_tab 当前的执行计划时,可以执行:print(transformed_tab.explain()),得到如下输出:

  1. == Abstract Syntax Tree ==
  2. LogicalProject(EXPR$0=[sub_string($0, 2, 4)])
  3. +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]])
  4. == Optimized Logical Plan ==
  5. PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])
  6. +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]]], fields=[a])
  7. == Physical Execution Plan ==
  8. Stage 1 : Data Source
  9.     content : Source: PythonInputFormatTableSource(a)
  10.     Stage 2 : Operator
  11.         content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_582508460, source: [PythonInputFormatTableSource(a)]], fields=[a])
  12.         ship_strategy : FORWARD
  13.         Stage 3 : Operator
  14.             content : StreamExecPythonCalc
  15.             ship_strategy : FORWARD
复制代码

方式二:TableEnvironment.explain_sql

方式一适用于查看某一个 table 的执行计划,有时候并没有一个现成的 table 对象可用,比如:

  1. print(t_env.explain_sql("INSERT INTO my_sink SELECT * FROM %s " % transformed_tab))
复制代码

其执行计划如下所示:

  1. == Abstract Syntax Tree ==
  2. LogicalSink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
  3. +- LogicalProject(EXPR$0=[sub_string($0, 2, 4)])
  4.    +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]])
  5. == Optimized Logical Plan ==
  6. Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
  7. +- PythonCalc(select=[sub_string(a, 2, 4) AS EXPR$0])
  8.    +- LegacyTableSourceScan(table=[[default_catalog, default_database, Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]]], fields=[a])
  9. == Physical Execution Plan ==
  10. Stage 1 : Data Source
  11.     content : Source: PythonInputFormatTableSource(a)
  12.     Stage 2 : Operator
  13.         content : SourceConversion(table=[default_catalog.default_database.Unregistered_TableSource_1143388267, source: [PythonInputFormatTableSource(a)]], fields=[a])
  14.         ship_strategy : FORWARD
  15.         Stage 3 : Operator
  16.             content : StreamExecPythonCalc
  17.             ship_strategy : FORWARD
  18.             Stage 4 : Data Sink
  19.                 content : Sink: Sink(table=[default_catalog.default_database.my_sink], fields=[EXPR$0])
  20.                 ship_strategy : FORWARD
复制代码

■ 6)写出结果数据

方式一:通过 DDL

和创建数据源表类似,也可以通过 DDL 的方式来创建结果表。

  1. t_env.execute_sql("""
  2.         CREATE TABLE my_sink (
  3.           `sum` VARCHAR
  4.         ) WITH (
  5.           'connector' = 'print'
  6.         )
  7.     """)
  8. table_result = transformed_tab.execute_insert('my_sink')
复制代码

说明:

  • 当使用 print 作为 sink 时,作业结果会打印到标准输出中。如果不需要查看输出,也可以使用 blackhole 作为 sink。

方式二:collect

也可以通过 collect 方法,将 table 的结果收集到客户端,并逐条查看。

  1. table_result = transformed_tab.execute()
  2. with table_result.collect() as results:
  3.     for result in results:
  4.         print(result)
复制代码
说明:

  • 该方式可以方便地将 table 的结果收集到客户端并查看。
  • 由于数据最终会收集到客户端,所以最好限制一下数据条数,比如:
transformed_tab.limit(10).execute(),限制只收集 10 条数据到客户端。

方式三:to_pandas

也可以通过 to_pandas 方法,将 table 的结果转换成 pandas.DataFrame 并查看。

  1. result = transformed_tab.to_pandas()
  2. print(result)
复制代码

可以看到如下输出:

  1.   _c0
  2. 0  32
  3. 1  e6
  4. 2  8b
  5. 3  be
  6. 4  4f
  7. 5  b4
  8. 6  a6
  9. 7  49
  10. 8  35
  11. 9  6b
复制代码

说明:

  • 该方式与 collect 类似,也会将 table 的结果收集到客户端,所以最好限制一下结果数据的条数。

■ 7)总结

完整的作业示例如下:

  1. from pyflink.table import DataTypes, EnvironmentSettings, StreamTableEnvironment
  2. from pyflink.table.expressions import col
  3. from pyflink.table.udf import udf
  4. def table_api_demo():
  5.     env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
  6.     t_env = StreamTableEnvironment.create(environment_settings=env_settings)
  7.     t_env.get_config().get_configuration().set_string('parallelism.default', '4')
  8.     t_env.execute_sql("""
  9.             CREATE TABLE my_source (
  10.               a VARCHAR,
  11.               b VARCHAR
  12.             ) WITH (
  13.               'connector' = 'datagen',
  14.               'number-of-rows' = '10'
  15.             )
  16.         """)
  17.     tab = t_env.from_path('my_source')
  18.     @udf(result_type=DataTypes.STRING())
  19.     def sub_string(s: str, begin: int, end: int):
  20.         return s[begin:end]
  21.     transformed_tab = tab.select(sub_string(col('a'), 2, 4))
  22.     t_env.execute_sql("""
  23.             CREATE TABLE my_sink (
  24.               `sum` VARCHAR
  25.             ) WITH (
  26.               'connector' = 'print'
  27.             )
  28.         """)
  29.     table_result = transformed_tab.execute_insert('my_sink')
  30.     # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
  31.     # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
  32.     table_result.wait()
  33. if __name__ == '__main__':
  34.     table_api_demo()
复制代码

执行结果如下:

  1. 4> +I(a1)
  2. 3> +I(b0)
  3. 2> +I(b1)
  4. 1> +I(37)
  5. 3> +I(74)
  6. 4> +I(3d)
  7. 1> +I(07)
  8. 2> +I(f4)
  9. 1> +I(7f)
  10. 2> +I(da)
复制代码

PyFlink DataStream API 作业

■ 1)创建 StreamExecutionEnvironment 对象

对于 DataStream API 作业来说,用户首先需要定义一个 StreamExecutionEnvironment 对象。

  1. env = StreamExecutionEnvironment.get_execution_environment()
复制代码

■ 2)配置作业的执行参数

可以通过以下方式,配置作业的执行参数。以下示例将作业的默认并发度设置为4。

  1. env.set_parallelism(4)
复制代码

■ 3)创建数据源

接下来,需要为作业创建一个数据源。PyFlink 中提供了多种方式来定义数据源。

方式一:from_collection

PyFlink 支持用户从一个列表创建源表。以下示例定义了包含了 3 行数据的表:[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],该表有 2 列,列名分别为 a 和 b,类型分别为 VARCHAR 和 BIGINT。

  1. ds = env.from_collection(
  2.         collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
  3.         type_info=Types.ROW([Types.INT(), Types.STRING()]))
复制代码

说明:

  • 这种方式通常用于测试阶段,可以方便地创建一个数据源。
  • from_collection 方法可以接收两个参数,其中第一个参数用于指定数据列表;第二个参数用于指定数据的类型。

方式二:使用 PyFlink DataStream API 中定义的 connector

此外,也可以使用 PyFlink DataStream API 中已经支持的 connector,需要注意的是,1.12 中仅提供了 Kafka connector 的支持。

  1. deserialization_schema = JsonRowDeserializationSchema.builder() \
  2.     .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
  3. kafka_consumer = FlinkKafkaConsumer(
  4.     topics='test_source_topic',
  5.     deserialization_schema=deserialization_schema,
  6.     properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
  7. ds = env.add_source(kafka_consumer)
复制代码

说明:

  • Kafka connector 当前没有包含在 Flink 官方提供的发行包中,如果需要在PyFlink 作业中使用,用户需要显式地指定相应 FAT JAR [2],JAR 包可以通过如下方式指定:

  1. # 注意:file:///前缀不能省略
  2. env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
复制代码

即使是 PyFlink DataStream API 作业,也推荐使用 Table & SQL connector 中打包出来的 FAT JAR,可以避免递归依赖的问题。

方式三:使用 PyFlink Table API 中定义的 connector

以下示例定义了如何将 Table & SQL 中支持的 connector 用于 PyFlink DataStream API 作业。

  1. t_env = StreamTableEnvironment.create(stream_execution_environment=env)
  2. t_env.execute_sql("""
  3.         CREATE TABLE my_source (
  4.           a INT,
  5.           b VARCHAR
  6.         ) WITH (
  7.           'connector' = 'datagen',
  8.           'number-of-rows' = '10'
  9.         )
  10.     """)
  11. ds = t_env.to_append_stream(
  12.     t_env.from_path('my_source'),
  13.     Types.ROW([Types.INT(), Types.STRING()]))
复制代码

说明:

  • 由于当前 PyFlink DataStream API 中 built-in 支持的 connector 种类还比较少,推荐通过这种方式来创建 PyFlink DataStream API 作业中使用的数据源表,这样的话,所有 PyFlink Table API 中可以使用的 connector,都可以在 PyFlink DataStream API 作业中使用。

  • 需要注意的是,TableEnvironment 需要通过以下方式创建 StreamTableEnvironment.create(stream_execution_environment=env),以使得 PyFlink DataStream API 与 PyFlink Table API 共享同一个 StreamExecutionEnvironment 对象。

■ 4)定义计算逻辑

生成数据源对应的 DataStream 对象之后,接下来就可以使用 PyFlink DataStream API 中定义的各种操作,定义计算逻辑,对 DataStream 对象进行变换了,比如:

  1. def split(s):
  2.     splits = s[1].split("|")
  3.     for sp in splits:
  4.        yield s[0], sp
  5. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  6.        .flat_map(split) \
  7.        .key_by(lambda i: i[1]) \
  8.        .reduce(lambda i, j: (i[0] + j[0], i[1]))
复制代码

■ 5)写出结果数据

方式一:print

可以调用 DataStream 对象上的 print 方法,将 DataStream 的结果打印到标准输出中,比如:

  1. ds.print()
复制代码

方式二:使用 PyFlink DataStream API 中定义的 connector

可以直接使用 PyFlink DataStream API 中已经支持的 connector,需要注意的是,1.12 中提供了对于 FileSystem、JDBC、Kafka connector 的支持,以 Kafka 为例:

  1. serialization_schema = JsonRowSerializationSchema.builder() \
  2.     .with_type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
  3. kafka_producer = FlinkKafkaProducer(
  4.     topic='test_sink_topic',
  5.     serialization_schema=serialization_schema,
  6.     producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})
  7. ds.add_sink(kafka_producer)
复制代码

说明:

  • JDBC、Kafka connector 当前没有包含在 Flink 官方提供的发行包中,如果需要在 PyFlink 作业中使用,用户需要显式地指定相应 FAT JAR,比如 Kafka connector 可以使用 JAR 包 [2],JAR 包可以通过如下方式指定:

  1. # 注意:file:///前缀不能省略
  2. env.add_jars("file:///my/jar/path/flink-sql-connector-kafka_2.11-1.12.0.jar")
复制代码

推荐使用 Table & SQL connector 中打包出来的 FAT JAR,可以避免递归依赖的问题。

方式三:使用 PyFlink Table API 中定义的 connector

以下示例展示了如何将 Table & SQL 中支持的 connector,用作 PyFlink DataStream API 作业的 sink。

  1. # 写法一:ds类型为Types.ROW
  2. def split(s):
  3.     splits = s[1].split("|")
  4.     for sp in splits:
  5.         yield Row(s[0], sp)
  6. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  7.        .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
  8.        .key_by(lambda i: i[1]) \
  9.        .reduce(lambda i, j: Row(i[0] + j[0], i[1]))
  10. # 写法二:ds类型为Types.TUPLE
  11. def split(s):
  12.     splits = s[1].split("|")
  13.     for sp in splits:
  14.         yield s[0], sp
  15. ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  16.        .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
  17.        .key_by(lambda i: i[1]) \
  18.        .reduce(lambda i, j: (i[0] + j[0], i[1]))
  19. # 将ds写出到sink
  20. t_env.execute_sql("""
  21.         CREATE TABLE my_sink (
  22.           a INT,
  23.           b VARCHAR
  24.         ) WITH (
  25.           'connector' = 'print'
  26.         )
  27.     """)
  28. table = t_env.from_data_stream(ds)
  29. table_result = table.execute_insert("my_sink")
复制代码

说明:

  • 需要注意的是,t_env.from_data_stream(ds) 中的 ds 对象的 result type 类型必须是复合类型 Types.ROW 或者 Types.TUPLE,这也就是为什么需要显式声明作业计算逻辑中 flat_map 操作的 result 类型

  • 作业的提交,需要通过 PyFlink Table API 中提供的作业提交方式进行提交

  • 由于当前 PyFlink DataStream API 中支持的 connector 种类还比较少,推荐通过这种方式来定义 PyFlink DataStream API 作业中使用的数据源表,这样的话,所有 PyFlink Table API 中可以使用的 connector,都可以作为 PyFlink DataStream API 作业的 sink。

■ 7)总结

完整的作业示例如下:

方式一(适合调试):

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream import StreamExecutionEnvironment
  3. def data_stream_api_demo():
  4.     env = StreamExecutionEnvironment.get_execution_environment()
  5.     env.set_parallelism(4)
  6.     ds = env.from_collection(
  7.         collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
  8.         type_info=Types.ROW([Types.INT(), Types.STRING()]))
  9.     def split(s):
  10.         splits = s[1].split("|")
  11.         for sp in splits:
  12.             yield s[0], sp
  13.     ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  14.            .flat_map(split) \
  15.            .key_by(lambda i: i[1]) \
  16.            .reduce(lambda i, j: (i[0] + j[0], i[1]))
  17.     ds.print()
  18.     env.execute()
  19. if __name__ == '__main__':
  20.     data_stream_api_demo()
复制代码

执行结果如下:

  1. 3> (2, 'aaa')
  2. 3> (2, 'bb')
  3. 3> (6, 'aaa')
  4. 3> (4, 'a')
  5. 3> (5, 'bb')
  6. 3> (7, 'a')
复制代码

方式二(适合线上作业):

  1. from pyflink.common.typeinfo import Types
  2. from pyflink.datastream import StreamExecutionEnvironment
  3. from pyflink.table import StreamTableEnvironment
  4. def data_stream_api_demo():
  5.     env = StreamExecutionEnvironment.get_execution_environment()
  6.     t_env = StreamTableEnvironment.create(stream_execution_environment=env)
  7.     env.set_parallelism(4)
  8.     t_env.execute_sql("""
  9.             CREATE TABLE my_source (
  10.               a INT,
  11.               b VARCHAR
  12.             ) WITH (
  13.               'connector' = 'datagen',
  14.               'number-of-rows' = '10'
  15.             )
  16.         """)
  17.     ds = t_env.to_append_stream(
  18.         t_env.from_path('my_source'),
  19.         Types.ROW([Types.INT(), Types.STRING()]))
  20.     def split(s):
  21.         splits = s[1].split("|")
  22.         for sp in splits:
  23.             yield s[0], sp
  24.     ds = ds.map(lambda i: (i[0] + 1, i[1])) \
  25.            .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
  26.            .key_by(lambda i: i[1]) \
  27.            .reduce(lambda i, j: (i[0] + j[0], i[1]))
  28.     t_env.execute_sql("""
  29.             CREATE TABLE my_sink (
  30.               a INT,
  31.               b VARCHAR
  32.             ) WITH (
  33.               'connector' = 'print'
  34.             )
  35.         """)
  36.     table = t_env.from_data_stream(ds)
  37.     table_result = table.execute_insert("my_sink")
  38.     # 1)等待作业执行结束,用于local执行,否则可能作业尚未执行结束,该脚本已退出,会导致minicluster过早退出
  39.     # 2)当作业通过detach模式往remote集群提交时,比如YARN/Standalone/K8s等,需要移除该方法
  40.     table_result.wait()
  41. if __name__ == '__main__':
  42.     data_stream_api_demo()
复制代码

三、作业提交

Flink 提供了多种作业部署方式,比如 local、standalone、YARN、K8s 等,PyFlink 也支持上述作业部署方式,请参考 Flink 官方文档 [3],了解更多详细信息。

local

说明:使用该方式执行作业时,会启动一个 minicluster,作业会提交到 minicluster 中执行,该方式适合作业开发阶段。

示例:python3 table_api_demo.py

standalone

说明:使用该方式执行作业时,作业会提交到一个远端的 standalone 集群。

示例:

./bin/flink run --jobmanager localhost:8081 --python table_api_demo.py

YARN Per-Job

说明:使用该方式执行作业时,作业会提交到一个远端的 YARN 集群。

示例:

./bin/flink run --target yarn-per-job --python table_api_demo.py

K8s application mode

说明:使用该方式执行作业时,作业会提交到 K8s 集群,以 application mode 的方式执行。

示例:

./bin/flink run-application \    --target kubernetes-application \    --parallelism 8 \    -Dkubernetes.cluster-id=<ClusterId> \    -Dtaskmanager.memory.process.size=4096m \    -Dkubernetes.taskmanager.cpu=2 \    -Dtaskmanager.numberOfTaskSlots=4 \    -Dkubernetes.container.image=<PyFlinkImageName> \
--pyModule table_api_demo \    --pyFiles file:///path/to/table_api_demo.py

参数说明

除了上面提到的参数之外,通过 flink run 提交的时候,还有其它一些和 PyFlink 作业相关的参数。

参数名 用途描述 示例
-py / --python 指定作业的入口文件 -py file:///path/to/table_api_demo.py
-pym / --pyModule 指定作业的 entry module,功能和--python类似,可用于当作业的 Python 文件为 zip 包,无法通过--python 指定时,相比--python 来说,更通用-pym table_api_demo -pyfs file:///path/to/table_api_demo.py
-pyfs / --pyFiles 指定一个到多个 Python 文件(.py/.zip等,逗号分割),这些 Python 文件在作业执行的时候,会放到 Python 进程的 PYTHONPATH 中,可以在 Python 自定义函数中访问到-pyfs file:///path/to/table_api_demo.py,file:///path/to/deps.zip
-pyarch / --pyArchives指定一个到多个存档文件(逗号分割),这些存档文件,在作业执行的时候,会被解压之后,放到 Python 进程的 workspace 目录,可以通过相对路径的方式进行访问 -pyarch file:///path/to/venv.zip
-pyexec / --pyExecutable 指定作业执行的时候,Python 进程的路径-pyarch file:///path/to/venv.zip -pyexec venv.zip/venv/bin/python3
-pyreq / --pyRequirements 指定 requirements 文件,requirements 文件中定义了作业的依赖-pyreq requirements.txt

四、问题排查

当我们刚刚上手 PyFlink 作业开发的时候,难免会遇到各种各样的问题,学会如何排查问题是非常重要的。接下来,我们介绍一些常见的问题排查手段。

client 端异常输出

PyFlink 作业也遵循 Flink 作业的提交方式,作业首先会在 client 端编译成 JobGraph,然后提交到 Flink 集群执行。如果作业编译有问题,会导致在 client 端提交作业的时候就抛出异常,此时可以在 client 端看到类似这样的输出:

  1. Traceback (most recent call last):
  2.   File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 50, in <module>
  3.     data_stream_api_demo()
  4.   File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 45, in data_stream_api_demo
  5.     table_result = table.execute_insert("my_")
  6.   File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/table/table.py", line 864, in execute_insert
  7.     return TableResult(self._j_table.executeInsert(table_path, overwrite))
  8.   File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
  9.     return_value = get_return_value(
  10.   File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 162, in deco
  11.     raise java_exception
  12. pyflink.util.exceptions.TableException: Sink `default_catalog`.`default_database`.`my_` does not exists
  13.      at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:247)
  14.      at org.apache.flink.table.planner.delegation.PlannerBase$anonfun$1.apply(PlannerBase.scala:159)
  15.      at org.apache.flink.table.planner.delegation.PlannerBase$anonfun$1.apply(PlannerBase.scala:159)
  16.      at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:234)
  17.      at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:234)
  18.      at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  19.      at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  20.      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  21.      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  22.      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  23.      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  24.      at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:159)
  25.      at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1329)
  26.      at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:676)
  27.      at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
  28.      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  29.      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  30.      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  31.      at java.lang.reflect.Method.invoke(Method.java:498)
  32.      at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
  33.      at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
  34.      at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
  35.      at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  36.      at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
  37.      at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
  38.      at java.lang.Thread.run(Thread.java:748)
  39. Process finished with exit code 1
复制代码

比如上述报错说明作业中使用的名字为"my_"的表不存在。

TaskManager 日志文件

有些错误直到作业运行的过程中才会发生,比如脏数据或者 Python 自定义函数的实现问题等,针对这种错误,通常需要查看 TaskManager 的日志文件,比如以下错误反映用户在 Python 自定义函数中访问的 opencv 库不存在。

  1. Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 2: Traceback (most recent call last):
  2.   File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
  3.     response = task()
  4.   File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
  5.     lambda: self.create_worker().do_instruction(request), request)
  6.   File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 479, in do_instruction
  7.     return getattr(self, request_type)(
  8.   File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
  9.     bundle_processor.process_bundle(instruction_id))
  10.   File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 977, in process_bundle
  11.     input_op_by_transform_id[element.transform_id].process_encoded(
  12.   File "/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
  13.     self.output(decoded_value)
  14.   File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
  15.   File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
  16.   File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  17.   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  18.   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 85, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
  19.   File "pyflink/fn_execution/coder_impl_fast.pyx", line 83, in pyflink.fn_execution.coder_impl_fast.DataStreamFlatMapCoderImpl.encode_to_stream
  20.   File "/Users/dianfu/code/src/github/pyflink-usecases/datastream_api_demo.py", line 26, in split
  21.     import cv2
  22. ModuleNotFoundError: No module named 'cv2'
  23.     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
  24.     at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
  25.     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
  26.     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
  27.     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
  28.     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
  29.     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
  30.     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
  31.     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
  32.     at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
  33.     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  34.     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  35.     ... 1 more
复制代码

说明:

  • local 模式下,TaskManager 的 log 位于 PyFlink 的安装目录下:site-packages/pyflink/log/,也可以通过如下命令找到:

>>> import pyflink

>>> print(pyflink.path)

['/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink'],则log文件位于/Users/dianfu/venv/pyflink-usecases/lib/python3.8/site-packages/pyflink/log目录下

自定义日志

有时候,异常日志的内容并不足以帮助我们定位问题,此时可以考虑在 Python 自定义函数中打印一些日志信息。PyFlink 支持用户在 Python 自定义函数中通过 logging 的方式输出 log,比如:

  1. def split(s):
  2.     import logging
  3.     logging.info("s: " + str(s))
  4.     splits = s[1].split("|")
  5.     for sp in splits:
  6.         yield s[0], sp
复制代码

通过上述方式,split 函数的输入参数,会打印到 TaskManager 的日志文件中。

远程调试

PyFlink 作业,在运行过程中,会启动一个独立的 Python 进程执行 Python 自定义函数,所以如果需要调试 Python 自定义函数,需要通过远程调试的方式进行,可以参见[4],了解如何在 Pycharm 中进行 Python 远程调试。

1)在 Python 环境中安装 pydevd-pycharm:

pip install pydevd-pycharm~=203.7717.65

2)在 Python 自定义函数中设置远程调试参数:

  1. def split(s):
  2.     import pydevd_pycharm
  3.     pydevd_pycharm.settrace('localhost', port=6789, stdoutToServer=True, stderrToServer=True)
  4.     splits = s[1].split("|")
  5.     for sp in splits:
  6.         yield s[0], sp
复制代码

3)按照 Pycharm 中远程调试的步骤,进行操作即可,可以参见[4],也可以参考博客[5]中“代码调试”部分的介绍。

说明:Python 远程调试功能只在 Pycharm 的 professional 版才支持。

社区用户邮件列表

如果通过以上步骤之后,问题还未解决,也可以订阅 Flink 用户邮件列表 [6],将问题发送到 Flink 用户邮件列表。需要注意的是,将问题发送到邮件列表时,尽量将问题描述清楚,最好有可复现的代码及数据,可以参考一下这个邮件[7]。






最新经典文章,欢迎关注公众号



---------------------

作者:Flink 中文社区
来源:weixin
原文:如何从 0 到 1 开发 PyFlink API 作业


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条