分享

Nebula Flink Connector 在实时 ETL 的实践



作者介绍
祝亚运,新奥新智研发,专注实时数据处理、OLAP 集群维护工作。在 NebulaGraph nMeetup 上海站作为讲师分享了 NebulaGraph Flink Connector 在 ETL 过程中的高效应用,本文从理论到实践,超详细讲解 NebulaGraph Flink Connector 使用思路。


Nebula Flink Connector 简介


NebulaGraph Flink Connector


NebulaGraph Flink Connector 是一款帮助 Flink 用户快速访问 NebulaGraph 的连接器,支持从 NebulaGraph 中读取数据,或者将其他外部数据源读取的数据写入 NebulaGraph.

适用于以下场景:

读取 NebulaGraph 数据进行分析计算;
分析计算完的数据写入 NebulaGraph;
迁移数据。


Apache Flink
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。



2.png

从上图中我们能发现 Flink 能从不同的第三方存储引擎中读取数据,并进行处理,再写入另外的存储引擎中。

那 Flink 又是如何读取外部系统的数据呢,其实是通过 Flink Connector,它的作用就相当于一个连接器,连接 Flink 计算引擎跟外界存储系统。

流计算中经常需要与外部存储系统交互,比如需要关联 MySQL 中的某个表,都需要通过连接器来读取外部系统的数据。Nebula Flink Connector 采用类似 Flink 提供的 Flink Connector 形式,支持 Flink 读写 NebulaGraph.


Nebula Flink Connector 中的 Source


Nebula Flink Connector 的 Source 即 NebulaGraph.通过轮训的方式来不断读取 NebulaGraph 中的数据,直至数据读取完毕。

Flink 提供了丰富的 Connector 组件允许用户自定义数据源来连接外部数据存储系统。Nebula Flink Connector 是基于 Flink 老版本的 API SourceFunction 实现的,我们知道一个完整的 Flink程序包含至少包含 Source、Sink .

那 Source 就是由下面的方式添加的:

StreamExecutionEnvironment.addSource(sourceFunction)

ExecutionEnvironment.createInput(inputFormat)

本文从 SourceFunction 开始介绍。

首先,让我们更详细地介绍 Source 组件中类的关系图,其中核心类或者接口包含下面 5 个:


NebulaSource 接口
NebulaSource 是一个定义了获取 NebulaGraph 数据的接口。这个接口在 Source 组件中扮演着核心的角色,因为它定义了如何从 NebulaGraph 获取数据的基本方法。

该接口有两个实现类:NebulaVertexSource 和 NebulaEdgeSource .这两个类分别负责从 NebulaGraph 中获取顶点(Vertex)数据和边(Edge)数据。

2NebulaVertexSource 和 NebulaEdgeSource:

这两个类实现了 NebulaSource 接口,并提供了获取点或边数据的具体实现。

例如,NebulaVertexSource 会包含特定于顶点的逻辑,而 NebulaEdgeSource 则包含特定于边的逻辑。

3RichParallelSourceFunction 抽象类:

它继承自 Flink 的 RichParallelSourceFunction 类,并实现了 ParallelSourceFunction 接口。
这个类提供了并行数据源的功能,并且可以在 Flink 任务中并行运行。它定义了一些基本的方法,如 open 和 close,这些方法可以在源的生命周期中进行初始化和清理。

4NebulaSourceFunction 类

它继承了 RichParallelSourceFunction 类,并重写了关键的方法,包括 open 、 run 和 cancel .

open 方法用于初始化资源,例如连接 NebulaGraph 的客户端。

run 方法是核心,在 run 方法中创建了 NebulaSource 对象,这个对象主要用来读取 NebulaGraph 中的数据。



2.png


在 Nebula Flink Connector 的 Source 组件中,最核心的逻辑是 NebulaSourceFunction 的 run 方法,在此方法中不断读取  NebulaGraph 中的数据。那会有不少看官疑惑这个 run 方法在什么时候运行呢,下面就来一一道来。

熟悉 Flink 的都知道,Flink 中的任务是执行的基本单元。它是执行 operator 的每个并行实例的地方。那 StreamTask 是 Flink 流处理引擎中所有不同任务子类型的基础。因为任务是执行 operator 并行实例的实体,它的生命周期与 operator 的生命周期紧密集成。因此,我们将浅浅的讨论 operator 生命周期的基本方法。

以下是按每个方法被调用的顺序列出的方法。

鉴于 operator 可以有用户定义函数(UDF),在每个 operator 方法下,我们还提供它调用的 UDF 生命周期中的方法。这些方法在您的 operator 扩展了 AbstractUdfStreamOperator 时可用,它是执行 UDF 的所有 operator 的基本类。


  1. // initialization phase
  2. OPERATOR::setup
  3.     UDF::setRuntimeContext
  4. OPERATOR::initializeState
  5. OPERATOR::open
  6.     UDF::open
  7. // processing phase (called on every element/watermark)
  8. OPERATOR::processElement
  9.     UDF::run
  10. OPERATOR::processWatermark
  11. // checkpointing phase (called asynchronously on every checkpoint)
  12. OPERATOR::snapshotState
  13. // notify the operator about the end of processing records
  14. OPERATOR::finish
  15. // termination phase
  16. OPERATOR::close
  17.     UDF::close
复制代码
本文介绍几个对于我们理解 Nebula Flink Connector 的方法。

open() 方法,open() 方法执行任何 operator 特定的初始化,例如在 AbstractUdfStreamOperator 的情况下打开用户定义的函数。其实会执行到我们自定义的 NebulaSourceFunction 中的 open() 方法。在 open() 方法中我们主要是初始化 storageClient 和 metaClient 等。

run() 方法,然后会调用到 NebulaSourceFunction 中的 run() 方法,在这个方法中创建了 NebulaSource 这个对象,这个对象负责读取 NebulaGraph 中的数据,以轮训的方式获取数据。

小结
Flink 是以 StreamTask 形式运行在 TaskManager 上,通过 StreamTask 的生命周期得知最终会先运行 operator 的 open() 方法和 run() 方法。

在 NebulaSourceFunction 中 open() 做了初始化赋值,只会运行一次(不考虑重视的情况下)。然后就会运行 run() 方法来读取 NebulaGraph 中的数据,在 run() 方法中创建了 NebulaSource 这个对象,这个对象负责读取 NebulaGraph 中的数据,以轮训的方式获取数据。

StreamTask 的执行流程图如下图,我们这里省略的 open() 方法。


2.png

Nebula Flink Connector 中的 Sink


Nebula Flink Connector 的 Sink 组件负责将 Flink 中 Source 组件获取的数据写入到 NebulaGraph 数据库中。以下是对该组件的详细介绍,包括其核心类和接口的作用与关系。在 Nebula Flink Connector 中,Sink 组件的作用是关键的一环,它确保了数据从 Flink 环境流向 NebulaGraph.

以下是内容的重组和详细说明。我们的 Nebula Flink Connector 是基于 Flink 的老版本 API SinkFunction 实现的。

在使用时,通过调用 Flink 算子的 addSink() 方法来添加一个 Sink 函数。

例如:operator.addSink(sinkFunction)


1.png

Nebula Flink Connector 中 Sink 组件的核心类关系图如上图,以下是三个核心的类和接口的详细介绍:



1NebulaSinkFunction 类:

这是一个继承了 RichSinkFunction 抽象类的实现类。它重写了父类的方法,并在其内部包含了一个 NebulaBatchOutputFormat 属性。

在构造 NebulaSinkFunction 时,会对 NebulaBatchOutputFormat 对象进行赋值。

该类有两个重要方法:open() 和 invoke()

open() 用于初始化参数,调用 NebulaBatchOutputFormat 对象的 open() 方法。
invoke() 负责调用 outputFormat.writeRecord(value) 将数据写入 NebulaGraph.

2NebulaBatchOutputFormat 抽象类:

NebulaBatchOutputFormat 继承自 RichOutputFormat 类,主要用于获取 RuntimeContext 对象。‍

它有四个实现类:

NebulaEdgeBatchOutputFormat

NebulaEdgeBatchTableOutputFormat

NebulaVertexBatchOutputFormat

NebulaVertexBatchTableOutputFormat

分别处理边和点的数据。

该类包含两个核心方法:

open() 方法:初始化了 Nebula 的 Session 对象,并创建了一个定时任务,用于执行 commit() 方法。commit() 方法通过调用 nebulaBatchExecutor.executeBatch(session) 将数据写入 NebulaGraph。

writeRecord(Trow) 方法:将数据添加到批处理中,最终通过 executeBatch 方法写入数据库。

3NebulaBatchExecutor 抽象类

NebulaBatchExecutor 是一个抽象类,有三个实现类:

NebulaEdgeBatchExecutor

NebulaTableBufferReducedExecutor

NebulaVertexBatchExecutor

分别负责将边和点的数据写入 NebulaGraph

该类有两个核心方法:

addToBatch(Row record) 方法:将数据缓存到内存中。

executeBatch(Session session) 方法:将缓存中的数据实际写入数据库。


当我们观察 Flink 中的数据如何写入到 NebulaGraph 时,可以概括为以下几个关键步骤:



数据流入与处理:

StreamTask 在 TaskManager 上运行,一旦有数据流入,它将触发 operator 的 invoke() 方法。这个调用链最终会传导至 NebulaSinkFunction 的 invoke() 方法内部。

在 NebulaSinkFunction 的 invoke() 方法中,数据通过调用 outputFormat.writeRecord(value) 方法被处理,这是数据写入流程的起点。

批处理与条件触发:

随后, numPendingRow (待处理记录数)会进行累加。当累加的记录数达到配置的批量大小(即 executionOptions.getBatchSize() > 0 )而且达到批处理的最大容量时,将触发 nebulaBatchExecutor.executeBatch(session) 方法的调用。

执行批量写入:

NebulaBatchExecutor 中的 executeBatch(Session session) 方法是一个抽象方法,具体的实现由其子类提供。

在子类的实现中,会将缓存在内存中的数据封装成可执行的 SQL 批量语句,然后通过调用自身的 executeStatement() 方法执行这些语句。

实际上, executeStatement() 方法内部是通过 session.execute(statement) 调用,将缓存的数据批量写入到 NebulaGraph 数据库中。

通过这个过程,可看到 Nebula Flink Connector 的 Sink 组件如何将数据从 Flink 转到 NebulaGraph. 这一流程的每个环节都由特定的类和方法负责,确保了数据传输的高效和可靠。

为了更直观地理解这一流程,可以参考下图所示的流程图,它形象地展示了数据写入到 NebulaGraph.


1.png

Nebula Flink Connector 的实践


Nebula Flink Connector 是一款基于 Apache Flink API 开发的连接器,旨在为 Flink 提供与 NebulaGraph 数据库的集成方案。该连接器使得数据可以在 Flink 和 NebulaGraph 之间无缝流动,支持实时的数据处理和分析,具有下面 3 个特性。1.数据读取:Nebula Flink Connector 提供了 Source 连接器,允许 Flink 程序从 NebulaGraph 中读取数据。

2.数据写入:同时,它也提供了 Sink 连接器,用于将 Flink 处理后的数据写入到 NebulaGraph.

3.独立性:作为一个独立的 jar 包,Nebula Flink Connector 本身不包含 Flink 运行时环境,因此无法独立运行。



为了使用 Nebula Flink Connector 需要和 Flink 进行集成,我们需要遵循以下步骤:



1、构建 Flink 程序:

需要利用 Flink 的 API 编写一个 Flink 程序,确保程序中至少包含 Source 和 Sink 组件。

在程序中, Source 组件可以从 NebulaGraph 读取数据,而 Sink 组件则负责将数据写入NebulaGraph.

2、配置 Nebula Flink Connector:
在构建 Flink 程序时,需要将 Nebula Flink Connector 的 jar 包添加到项目的依赖中。

根据需要配置连接 NebulaGraph 的参数,如数据库地址、认证信息、数据模型等。

3、提交 Flink 程序:
开发完成后,使用 Flink 客户端将 Flink 程序打包并提交到资源调度系统上执行。支持的资源调度系统包括但不限于 YARN(Yet Another Resource Negotiator)和 K8S(Kubernetes)

提交过程中,可以指定资源需求、并行度等运行时参数,以确保程序的高效执行。


Nebula Flink Connector 为 Apache Flink 和 NebulaGraph 之间的数据交互提供了一种简洁而高效的解决方案。通过将此连接器集成到 Flink 程序中,我们可以轻松地构建出能够实时处理和分析 NebulaGraph 数据的应用,进一步拓宽了大数据处理和分析的应用场景。下面我们分别看下 Source 和 Sink 的案例。
NebulaGraph Source 应用实践


  1. // 构造 NebulaGraph 客户端连接需要的参数
  2. NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
  3.                 .NebulaClientOptionsBuilder()
  4.                 .setAddress("127.0.0.1:45500")
  5.                 .build();
  6. // 创建 connectionProvider
  7. NebulaConnectionProvider metaConnectionProvider = new
  8. NebulaMetaConnectionProvider(nebulaClientOptions);
  9. // 构造 NebulaGraph 数据读取需要的参数
  10. List<String> cols = Arrays.asList("name", "age");
  11. VertexExecutionOptions sourceExecutionOptions = new
  12. VertexExecutionOptions.ExecutionOptionBuilder()
  13.                 .setGraphSpace("flinkSource")
  14.                 .setTag(tag)
  15.                 .setFields(cols)
  16.                 .setLimit(100)
  17.                 .builder();
  18. // 构造 NebulaInputFormat
  19. NebulaInputFormat inputFormat = new NebulaInputFormat(metaConnectionProvider)
  20.                 .setExecutionOptions(sourceExecutionOptions);
  21. // 方式 1 使用 createInput 方式注册 NebulaGraph 数据源
  22. DataSource<Row> dataSource1 = ExecutionEnvironment.getExecutionEnvironment()
  23.                            .createInput(inputFormat);
  24. // 方式 2 使用 addSource 方式注册 NebulaGraph 数据源
  25. NebulaSourceFunction sourceFunction = new
  26. NebulaSourceFunction(metaConnectionProvider)
  27.                 .setExecutionOptions(sourceExecutionOptions);
  28. DataStreamSource<Row> dataSource2 =
  29. StreamExecutionEnvironment.getExecutionEnvironment()
  30.                             .addSource(sourceFunction);
复制代码
NebulaGraph Sink 应用实践
  1. /// 构造 NebulaGraphd 客户端连接需要的参数
  2. NebulaClientOptions nebulaClientOptions = new NebulaClientOptions
  3.                 .NebulaClientOptionsBuilder()
  4.                 .setAddress("127.0.0.1:3699")
  5.                 .build();
  6. NebulaConnectionProvider graphConnectionProvider = new
  7. NebulaGraphConnectionProvider(nebulaClientOptions);
  8. // 构造 NebulaGraph 写入操作参数
  9. List<String> cols = Arrays.asList("name", "age")
  10. ExecutionOptions sinkExecutionOptions = new
  11. VertexExecutionOptions.ExecutionOptionBuilder()
  12.                 .setGraphSpace("flinkSink")
  13.                 .setTag(tag)
  14.                 .setFields(cols)
  15.                 .setIdIndex(0)
  16.                 .setBatch(20)
  17.                 .builder();
  18. // 写入 NebulaGraph
  19. dataSource.addSink(nebulaSinkFunction);
复制代码
遇到的问题



通过 FlinkSQL 将数据写入到 Nebula 的边和点的问题


我们是用 Nebula Flink Connector 写 FlinkSQL 将数据写入到 NebulaGraph 中会出现下面两个问题。

1、写入到 NebulaGraph 的边中,创建表的时候,源点索引和目标点索引必须在第一位和第二位


  1. CREATE TABLE `friend` (
  2.     sid BIGINT,-- 第一位必须是源点的id
  3.     did BIGINT,-- 第二位必须是目标点的id
  4.     rid BIGINT,
  5.     col1 STRING,
  6.     col2 STRING
  7. ) WITH (
  8.     'connector' = 'nebula',
  9.     'meta-address' = '127.0.0.1:9559',
  10.     'graph-address' = '127.0.0.1:9669',
  11.     'username' = 'root',
  12.     'password' = 'nebula',
  13.     'graph-space' = 'flink_test',
  14.     'label-name' = 'friend',
  15.     'data-type'='edge',
  16.     'src-id-index'='0',
  17.     'dst-id-index'='1',
  18.     'rank-id-index'='2'
  19. )
复制代码


我们看到上面的SQL, src-id-index 是源点的索引位置对应的是 sid , dst-id-index 是目标点的索引位置对应的是 did ,这俩值必须是 0 和 1 .

我们去代码中发现,在获取属性列表时发现这里的下面从 2 开始,也就是把 0 和 1 作为源点和目标点。

  1. for (int i = 2; i < columns.size(); i++) {
  2.    if (config.get(RANK_ID_INDEX) != i) {
  3.      positions.add(i);
  4.      fields.add(columns.get(i).getName());
  5. }
  6. }
复制代码
我们进行了下面的改造就支持了 src-id-index 和 dst-id-index 可以放在任意位置。
  1. for (int i = 2; i < columns.size(); i++) {
  2.    if (config.get(RANK_ID_INDEX) != i) {
  3.      if(i == srcIndex || i == dstIndex){
  4.         continue;
  5.      }
  6.      positions.add(i);
  7.      fields.add(columns.get(i).getName());
  8. }
  9. }
复制代码
2、写入到 NebulaGraph 的点中,创建表的时候,点 id 必须放在首位


  1. CREATE TABLE `person` (
  2.        vid BIGINT,-- 这个必须放在首位
  3.        col1 STRING,
  4.        col2 STRING,
  5.        col3 BIGNT
  6. ) WITH ("
  7.     'connector' = 'nebula',
  8.     'meta-address' = '127.0.0.1:9559',
  9.     'graph-address' = '127.0.0.1:9669',
  10.     'username' = 'root',
  11.     'password' = 'nebula',
  12.     'data-type' = 'vertex',
  13.     'graph-space' = 'flink_test',
  14.     'label-name' = 'person'
  15. );
复制代码


我们从上面的 SQL 中发现,这个源点 id 是 vid 必须放在首位。我们在代码中发现,这个封装属性时下标是从 1 开始,默认把首位作为源 id。
  1. if (config.get(DATA_TYPE).isVertex()) {
  2.    for (int i = 1; i < columns.size(); i++) {
  3.       positions.add(i);
  4.       fields.add(columns.get(i).getName());
  5.    }
  6. }
复制代码
那我们改造成下面的方式,这样就支持源 id 放到任意位置了。


  1. if (config.get(DATA_TYPE).isVertex()) {
  2.    for (int i = 1; i < columns.size(); i++) {
  3.       if(idIndex == i){
  4.           continue;
  5.       }
  6.       positions.add(i);
  7.       fields.add(columns.get(i).getName());
  8.    }
  9. }
复制代码


接入FlinkCDC的Source时,无法处理删除的数据

我们知道通过 FlinkCDC 采集到的数据流入 Flink,通过 Nebula Flink Connector 写入到 NebulaGraph 中,我们知道 FlinkCDC 集到的数据的 RowKind 有可能是 DELETE .

然而在构建 NebulaVertexBatchOutputFormat 对象时传入 executionOptions 对象,在构建 executionOptions 对象时,我们设置了 WriteMode , 这时 WriteMode 已经固定了,也就是说 NebulaVertexBatchOutputFormat 对象只能处理一种流。

基于上面的问题我们采用 Flink 的侧输出来解决此问题,我们把采集到的数据流通过 Side Output Tag 来进行分流。



  1. final int DELETE_FLAG = 0;
  2. final int INSERT_FLAG = 1;
  3. GraphConfig graphConfig = buildGraphConfig();
  4. SinkFunction sinkFunction = SinkFunctionLoader.load(graphConfig);
  5. NebulaSinkFunction<Row> deleteFunction =
  6. sinkFunction.getSinkFunction(DELETE_FLAG);
  7. NebulaSinkFunction<Row> insertFunction =
  8. sinkFunction.getSinkFunction(INSERT_FLAG);
  9. SingleOutputStreamOperator<Row> changeOperator = dataStream.process(new
  10. OutputProcessFunction());
  11. DataStream<Row> deleteOperator =
  12. changeOperator.getSideOutput(Descriptors.DELETE_TAG);
  13. deleteOperator.addSink(deleteFunction);
  14. changeOperator.addSink(insertFunction);
复制代码
侧输出流函数的内容如下:

  1. @Override
  2. public void processElement(Row record,
  3.                            ProcessFunction<Row, Row>.Context ctx, Collector<Row>
  4. out) throws Exception {
  5.         if (record.getKind() == RowKind.DELETE) {
  6.             log.info("NebulaGraph's sink received deleted data, the data is:{}",
  7. record);
  8.             ctx.output(delegateOutputTag, record);
  9.        } else {
  10. out.collect(record);
  11.        }
  12. }
复制代码


这样就解决了上面说的问题了。



最新经典文章,欢迎关注公众号

社区创作者介绍

2013年创办About云社区,成为大数据垂直领域NO1。2017年最早提出并发起系统帮助IT人面试和就业,帮助1万+Learner拿到offer,积累了大量的行业经验和资料。
2020年成立北京梭伦科技有限公司。2023年上线中文本Chat GPT智能星。通过智能星+面试提高我们面试、职场竞争力。


个人微信:w3aboutyun














没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条