分享

Apache Flink:开发经验总结


问题导读

1.本文的背景是什么?
2.本文介绍了哪些开发经验?
3.本地开发环境正常运行,部署环境出错,原因是什么?
4.本地开发环境正常运行,部署环境出错本文是如何解决的?
5.流处理中,如何更改需求,添加新的计算,该如何最大限度的减少修改?

关注最新经典文章,欢迎关注公众号


这篇文章分享Flink的开发经验 - 编写代码是什么样的。 然而,在介绍到五个关键要点之前,有一点背景


开发背景介绍

什么是Flink以及用它构建了什么?
Flink是一个开源流处理框架。 我们选择Flink是因为它在数据摄取方面非常准确,在保持状态的同时轻松地从故障中恢复,并且能够扩展以满足我们的需求,所有这些都在Flink自己的介绍中有更详细的介绍(https://flink.apache.org/introduction.html#features-why-flink)。 (请注意,这篇文章明确不是关于Flink的架构或操作问题。)

AX团队构建了两个现在在生产中运行的基于Flink的服务:Usage Calculator和Usage Stamper。 Usage Calculator是一个应用程序,它从Apache Kafka主题中读取,其中包含来自New Relic APM,New Relic Infrastructure和New Relic Synthetics代理的使用元数据; 应用程序聚合数据24小时,然后将该数据写入包含每日使用数据的Kafka主题。 Usage Stamper从该Kafka topic中读取,并将使用数据与其帐户层次结构相匹配,该层次结构来自单独的Kafka topic。

ax_flink_apps.jpg

快速查看Flink app—the Usage Calculator
实际上,每个Flink应用程序都从输入流中读取,并行运行少量操作以转换数据,并将数据写入数据存储区。 在大多数情况下,程序的独特之处在于它运行的操作。
编写代码以运行基本的Flink应用程序非常简单且相对简洁。 至少,AX团队感到惊讶和印象深刻。  Usage Calculator适用于读取,处理,写入模型:
1.app消费Kafka topics数据从客户端
2.该应用程序运行通过多个操作计算数据,以使用特定于产品的业务规则计算每个产品的每个帐户的每日使用情况。
3.该应用程序将结果写入由Usage Stamper应用程序读取的Kafka Topic。

Flink_1.jpg
Flink_2.jpg

当第一次开始编写应用程序时,花了一天时间从零开始到初稿版本。 总共大约有一百行代码 。

经验1:指导Flink概念
以下是我们需要了解的实现应用程序的第一个版本。


DataStream:这些是表示无界数据集合的Flink类。


Flink_3-768x345.jpg

Time Windows:流元素(elements)按其出现的时间分组。 时间窗口以处理时间,事件时间或摄取时间表示。 在Usage Calculator(使用计算),我们使用事件时间。
Flink_4-768x532.jpg

操作(Operators):运算符将DataStream转换为新的DataStream。 Usage Calculator使用map将消息转换为业务对象,使用reduce函数“计算”我们获取的消息数(作为使用代理)。

Flink_5-768x332.jpg

在部署的群集中,这些operators在不同的计算机上运行。

经验2.准备:Flink使用延迟计算来分离配置和执行
Flink操作并行运行。job manager设置和协调task managers.  Task manager对Flink传入的数据进行用户定义的操作。 Flink通过使用“延迟计算”将操作的配置与程序的执行分开。

main()方法确定应用程序的配置--Flink将如何设置操作。 也就是说,直到在环境中调用execute()方法才开始读取和处理数据。 execute()调用是Flink启动实际处理的时候。

例如,此代码设置graph对象:

Flink_6-768x293.jpg
Flink_71.jpg

如果在直接从main()方法调用的方法中监视或设置断点,将看到它们仅在启动时执行。 真正想要做的是监视操作本身,可在操作内设置断点。

此外,可以将操作分解为单独的类(这也使它们更容易测试),或者可以单独设置流作为保持main()方法简单的模式。


经验3:本地开发环境与部署环境不同
我们学习并重新学习了本课程。 正如我之前提到的,Flink允许您对数据运行大规模并行操作,这要归功于协调多个任务管理器的作业管理器,后者在Flink传入的数据上运行用户定义的操作。 在集群分布在多台计算机上的完全部署情况下,作业管理器(Job Manager)和每个任务管理器(Task Manager)可以在不同的计算机上运行,这实际上意味着两件事:
  • 任务管理器与作业管理器具有独立的生命周期和环境。
  • Flink必须能够序列化运算符(operators),以便它可以将它们发送到不同机器上的任务管理器。



那么,这对开发人员意味着什么呢? 在本地Flink环境中正确运行的代码可能无法在已部署的环境中无法运行。

当我们构建和测试我们的应用程序时,当我们想要将环境变量作为运算符(operators)的一部分传递时,我们遇到了这个问题。 我们的第一次尝试是在UsageCalculator类中初始化静态变量并将它们传递给运算符。 我们编写的代码检查了在初始化之前设置了环境变量。

Flink_8.jpg

这在我们的本地机器上完美运行,其中环境在作业管理器和任务管理器之间共享。 但是,只要我们部署到分布在多台计算机上的集群,任务管理器上就不会出现环境变量,并且检查它们的代码失败了。

因此,我们编写了一个单独的类,它在作业管理器上实例化并读取环境变量。 然后我们将该类传递给我们的运算符,这样他们就可以访问相同的变量。 只是这一次,我们没有使环境类可序列化。 那也行不通; 这是我们在启动时获得的堆栈跟踪:

  1. Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FilterFunction is not serializable. The object probably contains or references non serializable fields.
  2.      at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
  3.      at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
  4.      at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
  5.      at org.apache.flink.streaming.api.datastream.DataStream.filter(DataStream.java:627)
  6.      at com.newrelic.UsageCalculator.main(UsageCalculator.java:54)
  7. Caused by: java.io.NotSerializableException: com.newrelic.util.Env
  8.      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
  9.      at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
  10.      at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
  11.      at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream…
复制代码

我们的最终解决方案是仅将单个字段传递给operator,以限制operator 需要获取的内容。
Flink_9.jpg

Flink_10.jpg


经验4:修改需求,添加新的计算
我们有多个应用程序在生产中运行; 我们学会了利用Flink,并以闪电般的速度处理数据。 然后我们的产品经理告诉我们一个我们需要计算并添加字段到输出。

这项工作需要在Usage Calculator进行,所以让我们快速回顾一下我们看到calculator的第一个版本:



这项工作需要在使用计算器中进行,所以让我们快速回顾一下我们看到项目的第一个版本:
  • 从Kafka主题中读取记录
  • 将这些记录映射到模型类以用于我们的计算
  • 在特定时间段内汇总记录
  • 创建事件并将它们发送到我们的输出Kafka主题


我们需要做的就是添加新计算:
  • 向我们的模型类添加一个字段,并从传入的数据中填充它
  • 修改reduce操作以包括在创建事件之前计算数据



简单的代码编写。但问题是,将字段添加到我们的模型类中意味着我们必须更改所有操作(operators)正在使用的字段。

存在的问题
Flink存储某些操作的状态,以便可以关闭并重新启动它而不会丢失数据。添加新字段打破了Flink读取其存储在状态中的旧版本模型类的能力。结果?部署后,我们丢失了尚未写入数据库的所有使用数据。由于我们的聚合窗口(我们写出数据之前的时间)是24小时,这意味着我们丢失了一天的使用数据。

我们不想丢失数据。我们的PM不希望我们丢失数据。我们的客户真的不希望我们丢失数据。我们需要弄清楚如何在不丢失数据的情况下改进代码,以便我们可以继续快速迭代。我们通过最小化模型中的信息量来实现这一点,以便减少更改频率。

解决办法:
最初,我们将传入的消息映射到数据模型。此数据模型包含业务运营所需的所有字段,也是Flink中存储的内容。为了尽量减少会破坏我们存储和读取状态的能力的变化,我们需要尽量减少对该模型的更改;模型必须只包含识别单个数据(传递给keyBy方法)所需的字段,并能够聚合数据(即计算使用小时数)。我们更改了代码,以便使用所需的最少数据创建传入的模型类,并包含传入消息的有效负载。只有在聚合操作之后,传入模型才会转换为我们执行业务操作的业务模型,这可能会随着时间而变化。





以上内容翻译自早期flink开发者最新文章,供大家参考学习

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条