避坑:Spark Sql的Order By排序是不稳定的
问题导读:
1、为什么同一批数据执行相同sql返回值不相同?
2、怎样构建一个测试用例?
3、怎样使同一批数据执行相同sql返回值相同?
问题背景
生产环境数据产品项目出现相同代码多次执行结果不同的问题,经过排查,发现了问题出现在first value的窗口函数返回的结果上.同一批数据执行相同sql返回值不相同.
sql类似如下形式:
select * ,first_value(s_id) over (partition by c_id order by s_score)first_show from score;
可以看到,first_value是窗口函数,它是基于over子句的结果进行取值,而first_value本身的逻辑比较简单,就是取窗口函数的第一个值,所以大概率是因为order by返回的结果不固定导致的.为了验证猜测,进行如下实验.
实验
只需要构建一个测试用例,用例中有字段相同的值,然后对该字段多次执行order by语句查看结果是否相同即可验证推测.
1- 构建测试用例
生产环境的数据量较大,数据为多分区,而spark任务执行时是根据底层文件数来启动task,为了尽量模拟生产环境,我们创建一个表,给它上传两个文件.表信息: orderbytest(name string, age int)
create table dataintel.orderbytest(name string, age int) row format delimited fields terminated by ',';
上传两个csv文件到表数据目录,文件内容如下
txt1 txt2
张三,1
李四,2
王五,3
赵六,1
张四,1
测试用例构建完成,执行select语句验证测试用例.
select * from dataintel.orderbytest;
+----+---+
|name|age|
+----+---+
|张三|1|
|李四|2|
|王五|3|
|赵六|1|
|张四|1|
+----+---+
2- 执行测试sql
多次执行order by语句,对比结果
spark.sql("select * from dataintel.orderbytest order by age").show#第一次执行结果
+----+---+
|name|age|
+----+---+
| 张三| 1|
| 张四| 1|
| 赵六| 1|
| 李四| 2|
| 王五| 3|
+----+---+
#第n次执行结果
+----+---+
|name|age|
+----+---+
| 赵六| 1|
| 张三| 1|
| 张四| 1|
| 李四| 2|
| 王五| 3|
+----+---+
经过多次实验可以看到,order by的排序是不稳定的.当排序字段发生多个值排名相同时,虽然排序结果都正确,但结果不唯一.
排序算法的稳定性指对数据集进行排序时,不改变它原来的相对位置.比如a和b相等,a在未排序数组中在b的前边.在排序结束后,不管a、b处于排序结果的什么位置,他俩的相对位置一定还是a在前,b在后.
Spark order by语句执行结果不唯一,证明a和b的位置一定发生过变动,所以是不稳定的.
原因
虽然sql排序不稳定在绝大多数的数据库软件中都存在,但我们知道,spark是用scala写的,而scala是运行在JVM上的,所以它的排序规则应该等同Java: 默认对所有非基本类型数据排序使用具备稳定性的,比如归并;对所有基本类型(int,double等)采用快排等不稳定排序.
scala的所有数据类型都封装为了非基本类型,按道理应该是使用稳定性排序,不应该出现这种情况.看源码找原因.
1- 代码定位及问题追踪
Spark sql的执行解析过程如下
从输入sql语句到最终执行动作,经过了一系列的封装和转化.根据经验,只有到PhysicalPlan这一步才能看到具体的操作.省略漫长的代码追踪过程,最终得到spark的排序代码:
下面所有的代码如果觉得繁琐可以略过,后边都有概述
/**
* A stable, adaptive, iterative mergesort that requires far fewer than
* n lg(n) comparisons when running on partially sorted arrays, while
* offering performance comparable to a traditional mergesort when run
* on random arrays. Like all proper mergesorts, this sort is stable and
* runs O(n log n) time (worst case). In the worst case, this sort requires
* temporary storage space for n/2 object references; in the best case,
* it requires only a small constant amount of space.
* @author Josh Bloch
*/
public void sort(Buffer a, int lo, int hi, Comparator<? super K> c) {
assert c != null;
int nRemaining = hi - lo;
if (nRemaining < 2)
return; // Arrays of size 0 and 1 are always sorted
// If array is small, do a "mini-TimSort" with no merges
if (nRemaining < MIN_MERGE) {
int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
binarySort(a, lo, hi, lo + initRunLen, c);
return;
}
/**
* March over the array once, left to right, finding natural runs,
* extending short natural runs to minRun elements, and merging runs
* to maintain stack invariant.
*/
SortState sortState = new SortState(a, c, hi - lo);
int minRun = minRunLength(nRemaining);
do {
// Identify next run
int runLen = countRunAndMakeAscending(a, lo, hi, c);
// If the run is short, extend to min(minRun, nRemaining)
if (runLen < minRun) {
int force = nRemaining <= minRun ? nRemaining : minRun;
binarySort(a, lo, lo + force, lo + runLen, c);
runLen = force;
}
// Push run onto pending-run stack, and maybe merge
sortState.pushRun(lo, runLen);
sortState.mergeCollapse();
// Advance to find next run
lo += runLen;
nRemaining -= runLen;
} while (nRemaining != 0);
// Merge all remaining runs to complete sort
assert lo == hi;
sortState.mergeForceCollapse();
assert sortState.stackSize == 1;
}
/**
* Sorts the specified portion of the specified array using a binary
* insertion sort. This is the best method for sorting small numbers
* of elements. It requires O(n log n) compares, but O(n^2) data
* movement (worst case).
*
* If the initial part of the specified range is already sorted,
* this method can take advantage of it: the method assumes that the
* elements from index {@code lo}, inclusive, to {@code start},
* exclusive are already sorted.
*/
@SuppressWarnings("fallthrough")
private void binarySort(Buffer a, int lo, int hi, int start, Comparator<? super K> c) {
assert lo <= start && start <= hi;
if (start == lo)
start++;
K key0 = s.newKey();
K key1 = s.newKey();
Buffer pivotStore = s.allocate(1);
for ( ; start < hi; start++) {
s.copyElement(a, start, pivotStore, 0);
K pivot = s.getKey(pivotStore, 0, key0);
// Set left (and right) to the index where a (pivot) belongs
int left = lo;
int right = start;
assert left <= right;
/*
* Invariants:
* pivot >= all in [lo, left).
* pivot < all in [right, start).
*/
while (left < right) {
int mid = (left + right) >>> 1;
if (c.compare(pivot, s.getKey(a, mid, key1)) < 0)
right = mid;
else
left = mid + 1;
}
assert left == right;
/*
* The invariants still hold: pivot >= all in [lo, left) and
* pivot < all in [left, start), so pivot belongs at left. Note
* that if there are elements equal to pivot, left points to the
* first slot after them -- that's why this sort is stable.
* Slide elements over to make room for pivot.
*/
int n = start - left; // The number of elements to move
// Switch is just an optimization for arraycopy in default case
switch (n) {
case 2: s.copyElement(a, left + 1, a, left + 2);
case 1: s.copyElement(a, left, a, left + 1);
break;
default: s.copyRange(a, left, a, left + 1, n);
}
s.copyElement(pivotStore, 0, a, left);
}
}
spark采用的排序方法是TIMSort,它是归并排序的优化版,并且在小数据量时切换为binarySort来提升效率.但不管是TimSort还是binarySort都是排序稳定的.既然是稳定的就不应该出现多次结果不一致.继续追踪代码,最终在ShuffleInMemorySorter类中定位到如下位置
//final class ShuffleInMemorySorter
/**
* Inserts a record to be sorted.
*
* @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
* certain pointer compression techniques used by the sorter, the sort can
* only operate on pointers that point to locations in the first
* {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
* @param partitionId the partition id, which must be less than or equal to
* {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
*/
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
throw new IllegalStateException("There is no space for new record");
}
array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
pos++;
}
根据代码追踪记录得到的信息,Spark会将数据存储在一个LongArray结构中,代码中的array就是该类的对象,在插入数据时会对数据进行排序.此时LongArray中的数据是单个partition的数据,是有序的.但还需要将全部的partition数据进行合并.
追踪到的合并代码如下,spark会创建partition的迭代器进行多分区数据合并.
/**
* Return an iterator over all the data written to this object, grouped by partition and
* aggregated by the requested aggregator. For each partition we then have an iterator over its
* contents, and these are expected to be accessed in order (you can't "skip ahead" to one
* partition without reading the previous one). Guaranteed to return a key-value pair for each
* partition, in order of partition ID.
*
* For now, we just merge all the spilled files in once pass, but this can be modified to
* support hierarchical merging.
* Exposed for testing.
*/
def partitionedIterator: Iterator[(Int, Iterator])] = {
val usingMap = aggregator.isDefined
val collection: WritablePartitionedPairCollection = if (usingMap) map else buffer
if (spills.isEmpty) {
// Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
// we don't even need to sort by anything other than partition ID
if (ordering.isEmpty) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
} else {
// We do need to sort by both partition ID and key
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
}
} else {
// Merge spilled and in-memory data
merge(spills.toSeq, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
}
}
该方法返回了一个partition的迭代器.向内继续追踪代码可以发现,迭代器的排序规则是首先按照partitionID进行排序,然后才是数据的key. partitionID的获取方法为getPartition()
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
它与VertexId相关.
val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
而VertexId是spark有向无环图中的点的ID,它的定义如下
/**
* A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
* to follow any ordering or any constraints other than uniqueness.
*/
type VertexId = Long
根据注释,vertexID的值只需要遵循唯一性,不需要遵循有序性.由此推断出partitionID也是无序的.在最终数据排序时,先遍历哪个partition是不固定的,造成了spark对数据整体排序时,如果有两个排名都为1的数据分布在不同的partition中,排名相同的数据不一定哪一个先被写入磁盘结果文件中,造成了排序不稳定.
2- 单分区设想
从上边追踪过程已经知道了order by排序不稳定的原因,但我们很容易联想到,单分区的排序是不是绝对稳定的,因为它避开了多分区遍历的问题.
还是先从实验入手,将dataintel.testorderby表中的小文件删除,此时表格中只剩下了一个文件,所以spark任务也是单分区执行的.剩余数据如下:
scala> spark.sql("select * from dataintel.orderbytest order by age").show
+----+---+
|name|age|
+----+---+
| 张三| 1|
| 赵六| 1|
| 李四| 2|
| 王五| 3|
+----+---+
经过多次执行(百八十次吧),发现数据顺序是稳定的.进一步印证了之前的结果.
结论
所以,在大数据量多分区的排序场景下,分区数大于1个会有多次排序结果不同的隐患,但我们不可能去限定分区数为1,这样效率太低而且不符合实际.在spark sql中,如果使用了order by而且对结果的期望是每次结果都相同,可以使用业务排序字段组合唯一编码字段排序.比如实际业务排序的key+messageId进行组合.
最新经典文章,欢迎关注公众号http://www.aboutyun.com/data/attachment/forum/201903/18/215536lzpn7n3u7m7u90vm.jpg
---------------------
作者:winrar_setup.rar
来源:csdn
原文:避坑:Spark Sql的Order By排序是不稳定的
坚持每日一读,总结下:hive 多分区下 spark partition 读数据的时候不需要遵循有序性,所以会出现这次跟上次读的顺序不一致,结果自然就不一致了;留个疑问:多分区下保持数据一致的场景以及对应的实现可以解答下吗? (他这里说 key+messageId 这里的 messageid 怎么理解)
页:
[1]