同一套api,比如你接受的是流数据,那直接使用datastream,如果是批处理,直接初始化dataset,相当简单。可以看看相关程序,一目了然。
[mw_shl_code=scala,true]object WordCount {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val dataset= env.fromElements("To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer", "The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,")
val counts = dataset.flatMap { _.toLowerCase.split("\\W+") }
.map { (_, (1,2)) }
.groupBy(0)
.sum(1)
// execute and print result
counts.print()
}
}[/mw_shl_code]
下面为datastream
[mw_shl_code=scala,true]1 使用并行度为1的source
public class MyNoParalleSource implements SourceFunction<Long>{
private long count = 1L;
private boolean isRunning = true;
/**
* 主要的方法
* 启动一个source
* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while(isRunning){
ctx.collect(count);
count++;
//每秒产生一条数据
Thread.sleep(1000);
}
}
* 取消一个cancel的时候会调用的方法
@Override
public void cancel() {
isRunning = false;
}
}
2 Main方法执行
public class StreamingDemoWithMyNoPralalleSource {
public static void main(String[] args) throws Exception {
//获取Flink的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); //注意:针对此source,并行度只能设置为1
DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
@Override
public Long map(Long value) throws Exception {
System.out.println("接收到数据:" + value);
return value;
}
});
//每2秒钟处理一次数据
DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果
sum.print().setParallelism(1);
String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName();
env.execute(jobName);
}
} [/mw_shl_code]
|