本帖最后由 helianthus 于 2015-12-17 23:42 编辑
问题导读:
1.tez DAG是怎么创建的?
2.Tez DAG中如何确定数据流向?
1.TEZ中的常用术语:
(1)DAG:有向无环图,代表一个数据流处理工作流,数据按照边的方向流动
(2)Vertex:数据处理中的一个逻辑步骤,由应用程序代码使用,比如过滤/修改数据等等
(3)Logical DAG:由一组Vertex组成,其中每个vertex代表一个具体的计算步骤
(4)Task:代表一个vertex中的一个工作单位。每个task都是vertex的实例化,处理该节点vertex上的一部分数据
(5)Physical DAG:由逻辑DAG在运行时扩展为一组tasks产生
(6)Edge:代表上游生产节点和下游消费节点之间的数据迁移
logical DAG to Physical DAG
2.通过一个OrderedWordCount实例体现DAG APIs:
(1)设置DAG数据源即最终结果类型:
[mw_shl_code=java,true]DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf),
TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping).build();
DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf),
TextOutputFormat.class, outputPath).build();[/mw_shl_code]
(2)创建第一个处理逻辑对应的节点:
[mw_shl_code=java,true]Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
TokenProcessor.class.getName()));
tokenizerVertex.addDataSource(INPUT, dataSource);[/mw_shl_code]
(3)为第一个定点和第二个顶点间的边设置i数据传输格式:
[mw_shl_code=java,true]OrderedPartitionedKVEdgeConfig summationEdgeConf = OrderedPartitionedKVEdgeConfig
.newBuilder(Text.class.getName(), IntWritable.class.getName(),
HashPartitioner.class.getName())
.setFromConfiguration(tezConf)
.build();[/mw_shl_code]
(4)创建第二个顶点(这是一个处理中间结果的节点)
[mw_shl_code=applescript,true] Vertex summationVertex = Vertex.create(SUMMATION, ProcessorDescriptor.create(
SumProcessor.class.getName()), numPartitions);[/mw_shl_code]
(5)为第二个顶点和第三个顶点之间设置数据传输格式:
[mw_shl_code=applescript,true] OrderedPartitionedKVEdgeConfig sorterEdgeConf = OrderedPartitionedKVEdgeConfig
.newBuilder(IntWritable.class.getName(), Text.class.getName(),
HashPartitioner.class.getName())
.setFromConfiguration(tezConf)
.build();[/mw_shl_code]
(6)创建第三个顶点:
[mw_shl_code=java,true] Vertex sorterVertex = Vertex.create(SORTER, ProcessorDescriptor.create(
NoOpSorter.class.getName()), 1);
sorterVertex.addDataSink(OUTPUT, dataSink);[/mw_shl_code]
(7)组合出DAG
[mw_shl_code=java,true] DAG dag = DAG.create(dagName);
dag.addVertex(tokenizerVertex)
.addVertex(summationVertex)
.addVertex(sorterVertex)
.addEdge(
Edge.create(tokenizerVertex, summationVertex,
summationEdgeConf.createDefaultEdgeProperty()))
.addEdge(
Edge.create(summationVertex, sorterVertex, sorterEdgeConf.createDefaultEdgeProperty()));[/mw_shl_code]
3.创建DAG时边的一些重要属性
(1)数据迁移:定义不同tasks之间的数据路由
①one-to-one:即一对一,上游一个节点一个task结果只能发送给下游某个节点的一个task
②broadcast:广播形式
③scatter-gether:相当于MR中的shuffle
(2)调度属性:
①顺序调度
②并发执行
(3)数据源属性:
持久的:当前task退出后,该数据集还可用
持久可靠地:数据会可靠存储(磁盘)一直有效
暂时的:当前task使用之后便丢弃
data movement
|
|