[mw_shl_code=bash,true]bin/flink run examples/streaming/SocketTextStreamWordCount.jar \
--hostname 10.218.130.9 \
--port 9000[/mw_shl_code]
在netcat端输入单词并监控 taskmanager 的输出可以看到单词统计的结果。
SocketTextStreamWordCount 的具体代码如下:
[mw_shl_code=java,true]public static void main(String[] args) throws Exception {
// 检查输入
final ParameterTool params = ParameterTool.fromArgs(args);
...
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data
DataStream<String> text =
env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.sum(1);
counts.print();
// execute program
env.execute("WordCount from SocketTextStream Example");
}[/mw_shl_code]