分享

Flink环境配置实战详解

本帖最后由 hanyunsong 于 2020-8-7 11:30 编辑

批注 2020-08-07 112219.png

链接: https://pan.baidu.com/s/1NbCSBcwXKhZDOr0-gL61mw 提取码: d9zp




一.Windows开发环境构建

1. JDK 说明

安装(略)—版本在1.8及之上(如果有多版本的话,在path中将JAVA_HOME放在最前面)

2. Scala安装

①下载地址:https://www.scala-lang.org/download/ 在页面的最底部

图片12.png

安装过程几乎没什么注意事项,网上说的安装路径不能有空格(如: Program Files),否则安装后使用会报错 图片13.png ;仅参考,此处安装未使用含有空格的文件名称

② 环境变量的配置

新增环境变量: SCALA_HOME值:E:\soft\dev\install\scala
图片14.png

Path地址配置:

图片15.png

3. Flink的下载(JDK1.8及之上)

① 下载地址: https://flink.apache.org/downloads.html

图片16.png

此处使用版本的是1.9.0
下载后直接解压可使用;
进入解压目录的bin目录,运行start-cluster.bat,启动成功后本地访问http://localhost:8081

图片17.png

4. idea开发配置

① 创建MAVEN项目(略)

Pom.xml文件:

图片18.png

只需要依赖flink-java和flink-streaming-java_${sclac.version};其他的暂时不需要;

② Scala插件安装

a>在线安装:

File->Settings->Plugins->Install JetBrains plugins  然后输入Scala;然后安装(时间可能持久较长,视网络情况而定)—;

b>离线安装:

下载插件:https://plugins.jetbrains.com/plugin/1347-scala  选择相应的版本(idea 、JDK) –>可用在线安装的方式查看版本;下载后将其解压,将解压的文件夹Scala放入idea安装目录的plugins目录下

安装完成后(在线安装):

图片19.png

③ 项目依赖配置

选择项目结构:(如下图)

图片20.png

图片21.png

图片22.png

(如果没有Ivy则表示Scala插件安装不成功)

然后直接点击OK即可;最后会多一个scala-sdk

图片23.png
不要以为到此就可用运行了,我就在此处掉坑里了,获取运行环境的适合总是出错:

图片24.png

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

避免上面的代码出错,还相应运行环境的配置

④ Flink在Idea的运行环境配置

选择项目结构:

图片25.png

图片26.png

点击OK,然后依赖中会多一个lib依赖

图片27.png

到此,idea可用运行flink了;

⑤运行验证

Java代码例子(网上找的),网上是监听端口读取;偷懒一下直接读取文本

图片28.png

图片34.png

图片30.png

代码:
[mw_shl_code=text,true]public static void main(String[] args) throws Exception {
        //获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //从文件中读取
        DataStreamSource<String> text = env.readTextFile("D:\\source\\idea\\test\\project\\flink\\flink-parent\\flink-demo1\\src\\main\\resources\\demo.txt");
        //计算数据
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        })//打平操作,把每行的单词转为<word,count>类型的数据
                .keyBy("word")//针对相同的word数据进行分组
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
                .sum("count");

        //把数据打印到控制台
        windowCount.print().setParallelism(1);//使用一个并行度
        //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming word count");

    }

    /**
     * 主要为了存储单词以及单词出现的次数
     */
    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }

Demo.txt[/mw_shl_code] 图片31.png

二.Flink 第一个demo

Pom.xml文件中只需要两个依赖包

图片32.png

Java代码

图片33.png

图片29.png

输出结果: 图片35.png
如果能够成功,则开发环境能够操作flink

三.Flink-kafka整合(处理OGG数据)

1.ogg数据处理,大概流程图:

图片36.png

2.开发依赖说明

flink和kafka的整合需要一个关键依赖包:org.apache.flink

图片37.png

目前使用flink-connector-kafka-0.10_2.11版本,不同版本会影响flink与kafka的操作工具类

图片38.png

部分配置说明

1.  Flink web端口修改

图片39.png

修改conf文件下的flink-conf.yaml文件
Yaml文件中描述了端口(rest.port:)修改此属性则可用修改web的端口;默认8081


作者:liujun168
来源:https://github.com/liujun168/flink-parent







没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条