分享

Flink1.12支持对接Atlas【使用Atlas收集Flink元数据】

本帖最后由 pig2 于 2021-3-22 18:58 编辑

问题导读

1.Atlas中实体具体指什么?
2.如何为Flink创建Atlas实体类型定义?
3.如何验证元数据收集?


在Cloudera Streaming Analytics中,可以将Flink与Apache Atlas一起使用,以跟踪Flink作业的输入和输出数据。

Atlas是沿袭和元数据管理解决方案,在Cloudera Data Platform上受支持。这意味着可以查找,组织和管理有关Flink应用程序以及它们如何相互关联的数据的不同资产。这实现了一系列数据管理和法规遵从性用例。

有关Atlas的更多信息,请参阅Cloudera Runtime文档

Flink元数据集合中的Atlas实体
在Atlas中,表示Flink应用程序,Kafka主题,HBase表等的核心概念称为实体。需要了解Flink设置中实体的关系和定义,以增强元数据收集。

为Flink创建Atlas实体类型定义
在提交Flink作业以收集其元数据之前,需要为Flink创建Atlas实体类型定义。在命令行中,需要连接到Atlas服务器并添加预定义的类型定义。还需要在Cloudera Manager中为Flink启用Atlas。

验证元数据收集
启用Atlas元数据收集后,群集上新提交的Flink作业也将其元数据提交给Atlas。可以通过请求有关Atlas挂钩的信息来在命令行中使用消息验证元数据收集。


Flink元数据集合中的Atlas实体

在Atlas中,表示Flink应用程序,Kafka主题,HBase表等的核心概念称为实体。 需要了解Flink设置中实体的关系和定义,以增强元数据收集。

在向Atlas提交更新时,Flink应用程序会描述自身以及用作源和接收器的实体。 Atlas创建并更新相应的实体,并从收集到的和已经可用的实体创建沿袭。 在内部,Flink客户端和Atlas服务器之间的通信是使用Kafka主题实现的。 该解决方案被Atlas社区称为Flink挂钩。


csa-atlas-flink.png

为Flink创建Atlas实体类型定义
在提交Flink作业以收集其元数据之前,需要为Flink创建Atlas实体类型定义。 在命令行中,需要连接到Atlas服务器并添加预定义的类型定义。 还需要在Cloudera Manager中为Flink启用Atlas。

默认情况下,Atlas不包括Flink的元数据源。 管理员必须手动将实体类型定义上载到群集,才能启动Flink元数据收集。

注意:
启用或禁用TLS时,Atlas管理服务器的默认端口分别为31433和31000。

步骤
1.使用Atlas REST API将设计的实体类型定义上载到集群。
  1. curl -k -u <atlas_admin>:<atlas_admin_pwd> --location --request POST 'https://<atlas_server_host>:<atlas_server_port>/api/atlas/v2/types/typedefs' \
  2. --header 'Content-Type: application/json' \
  3. --data-raw '{
  4.     "enumDefs": [],
  5.     "structDefs": [],
  6.     "classificationDefs": [],
  7.     "entityDefs": [
  8.         {
  9.             "name": "flink_application",
  10.             "superTypes": [
  11.                 "Process"
  12.             ],
  13.             "serviceType": "flink",
  14.             "typeVersion": "1.0",
  15.             "attributeDefs": [
  16.                 {
  17.                     "name": "id",
  18.                     "typeName": "string",
  19.                     "cardinality": "SINGLE",
  20.                     "isIndexable": true,
  21.                     "isOptional": false,
  22.                     "isUnique": true
  23.                 },
  24.                 {
  25.                     "name": "startTime",
  26.                     "typeName": "date",
  27.                     "cardinality": "SINGLE",
  28.                     "isIndexable": false,
  29.                     "isOptional": true,
  30.                     "isUnique": false
  31.                 },
  32.                 {
  33.                     "name": "endTime",
  34.                     "typeName": "date",
  35.                     "cardinality": "SINGLE",
  36.                     "isIndexable": false,
  37.                     "isOptional": true,
  38.                     "isUnique": false
  39.                 },
  40.                 {
  41.                     "name": "conf",
  42.                     "typeName": "map<string,string>",
  43.                     "cardinality": "SINGLE",
  44.                     "isIndexable": false,
  45.                     "isOptional": true,
  46.                     "isUnique": false
  47.                 },
  48.                 {
  49.                     "name": "inputs",
  50.                     "typeName": "array<string>",
  51.                     "cardinality": "LIST",
  52.                     "isIndexable": false,
  53.                     "isOptional": false,
  54.                     "isUnique": false
  55.                 },
  56.                 {
  57.                     "name": "outputs",
  58.                     "typeName": "array<string>",
  59.                     "cardinality": "LIST",
  60.                     "isIndexable": false,
  61.                     "isOptional": false,
  62.                     "isUnique": false
  63.                 }
  64.             ]
  65.         }
  66.     ],
  67.     "relationshipDefs": []
  68. }'
复制代码
2.登录到Cloudera Manager。
3.转到Flink>配置。
4.在搜索栏中搜索“启用图集”。
5.启用Atlas元数据收集。


csa-atlas-flink-config.png


成功提交后,Flink客户端会通知Atlas有关作业的元数据。

验证元数据收集
启用Atlas元数据收集后,群集上新提交的Flink作业也将其元数据提交给Atlas。 可以通过请求有关Atlas挂钩的信息来在命令行中使用消息验证元数据收集。

要验证元数据集合,可以从“运行Flink作业”中运行“流式WordCount”示例。

在日志中,出现以下新行:

  1. ...
  2. 20/05/13 06:28:12 INFO hook.FlinkAtlasHook: Collecting metadata for a new Flink Application: Streaming WordCount
  3. ...
  4. 20/05/13 06:30:35 INFO hook.AtlasHook: <== Shutdown of Atlas Hook
复制代码
Flink通过Kafka主题与Atlas通信,默认情况下,该主题名为ATLAS_HOOK。


转载注明本文链接https://www.aboutyun.com/forum.php?mod=viewthread&tid=30521

大数据爱好者可加微信w3aboutyun




已有(3)人评论

跳转到指定楼层
若无梦何远方 发表于 2021-3-23 17:56:04
问题: 不是很理解 Atlas 它是用来做元数据管理的吗?实体是什么概念,本篇文章没看懂到底在实现什么功能??
架构图:
Atlas在Hadoop环境中作为独立服务运行。许多Hadoop数据处理和存储服务都包含Atlas附加组件,这些附加组件将服务活动的元数据发布到Kafka消息主题中。 Atlas读取消息并将其存储在JanusGraph中以对实体之间的关系建模。 JanusGraph背后的数据存储区是HBase。 Atlas将搜索索引存储在Solr中,以利用Solr的搜索功能。
回复

使用道具 举报

hyj 发表于 2021-3-23 19:32:13
若无梦何远方 发表于 2021-3-23 17:56
问题: 不是很理解 Atlas 它是用来做元数据管理的吗?实体是什么概念,本篇文章没看懂到底在实现什么功能??
...

Atlas 它是用来做元数据管理的实体指的是比如kafka的topic,hbase的表等
本篇文章介绍的是如何让Atlas 实现管理Flink的元数据,因为以前是不支持管理Flink的元数据的。


回复

使用道具 举报

若无梦何远方 发表于 2021-3-23 23:23:01
hyj 发表于 2021-3-23 19:32
Atlas 它是用来做元数据管理的实体指的是比如kafka的topic,hbase的表等
本篇文章介绍的是如何让Atlas  ...

好的 我大概了解了 后期我会调整性的补充下 数据治理元数据管理 这方面的一些知识
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条