问题导读:
1、为什么同一批数据执行相同sql返回值不相同?
2、怎样构建一个测试用例?
3、怎样使同一批数据执行相同sql返回值相同?
问题背景
生产环境数据产品项目出现相同代码多次执行结果不同的问题,经过排查,发现了问题出现在first value的窗口函数返回的结果上.同一批数据执行相同sql返回值不相同.
sql类似如下形式:
- select * ,first_value(s_id) over (partition by c_id order by s_score)first_show from score;
复制代码
可以看到,first_value是窗口函数,它是基于over子句的结果进行取值,而first_value本身的逻辑比较简单,就是取窗口函数的第一个值,所以大概率是因为order by返回的结果不固定导致的.为了验证猜测,进行如下实验.
实验
只需要构建一个测试用例,用例中有字段相同的值,然后对该字段多次执行order by语句查看结果是否相同即可验证推测.
1- 构建测试用例
生产环境的数据量较大,数据为多分区,而spark任务执行时是根据底层文件数来启动task,为了尽量模拟生产环境,我们创建一个表,给它上传两个文件.表信息: orderbytest(name string, age int)
- create table dataintel.orderbytest(name string, age int) row format delimited fields terminated by ',';
复制代码
上传两个csv文件到表数据目录,文件内容如下
txt1 | txt2 | 张三,1
李四,2
王五,3
赵六,1
| 张四,1 |
测试用例构建完成,执行select语句验证测试用例.
- select * from dataintel.orderbytest;
-
- +----+---+
- |name|age|
- +----+---+
- |张三|1|
- |李四|2|
- |王五|3|
- |赵六|1|
- |张四|1|
- +----+---+
复制代码
2- 执行测试sql
多次执行order by语句,对比结果
- spark.sql("select * from dataintel.orderbytest order by age").show
复制代码
- #第一次执行结果
- +----+---+
- |name|age|
- +----+---+
- | 张三| 1|
- | 张四| 1|
- | 赵六| 1|
- | 李四| 2|
- | 王五| 3|
- +----+---+
-
- #第n次执行结果
- +----+---+
- |name|age|
- +----+---+
- | 赵六| 1|
- | 张三| 1|
- | 张四| 1|
- | 李四| 2|
- | 王五| 3|
- +----+---+
复制代码
经过多次实验可以看到,order by的排序是不稳定的.当排序字段发生多个值排名相同时,虽然排序结果都正确,但结果不唯一.
排序算法的稳定性指对数据集进行排序时,不改变它原来的相对位置.比如a和b相等,a在未排序数组中在b的前边.在排序结束后,不管a、b处于排序结果的什么位置,他俩的相对位置一定还是a在前,b在后.
Spark order by语句执行结果不唯一,证明a和b的位置一定发生过变动,所以是不稳定的.
原因
虽然sql排序不稳定在绝大多数的数据库软件中都存在,但我们知道,spark是用scala写的,而scala是运行在JVM上的,所以它的排序规则应该等同Java: 默认对所有非基本类型数据排序使用具备稳定性的,比如归并;对所有基本类型(int,double等)采用快排等不稳定排序.
scala的所有数据类型都封装为了非基本类型,按道理应该是使用稳定性排序,不应该出现这种情况.看源码找原因.
1- 代码定位及问题追踪
Spark sql的执行解析过程如下
从输入sql语句到最终执行动作,经过了一系列的封装和转化.根据经验,只有到PhysicalPlan这一步才能看到具体的操作.省略漫长的代码追踪过程,最终得到spark的排序代码:
下面所有的代码如果觉得繁琐可以略过,后边都有概述
- /**
- * A stable, adaptive, iterative mergesort that requires far fewer than
- * n lg(n) comparisons when running on partially sorted arrays, while
- * offering performance comparable to a traditional mergesort when run
- * on random arrays. Like all proper mergesorts, this sort is stable and
- * runs O(n log n) time (worst case). In the worst case, this sort requires
- * temporary storage space for n/2 object references; in the best case,
- * it requires only a small constant amount of space.
- * @author Josh Bloch
- */
-
- public void sort(Buffer a, int lo, int hi, Comparator<? super K> c) {
- assert c != null;
- int nRemaining = hi - lo;
- if (nRemaining < 2)
- return; // Arrays of size 0 and 1 are always sorted
- // If array is small, do a "mini-TimSort" with no merges
- if (nRemaining < MIN_MERGE) {
- int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
- binarySort(a, lo, hi, lo + initRunLen, c);
- return;
- }
-
- /**
- * March over the array once, left to right, finding natural runs,
- * extending short natural runs to minRun elements, and merging runs
- * to maintain stack invariant.
- */
-
- SortState sortState = new SortState(a, c, hi - lo);
- int minRun = minRunLength(nRemaining);
- do {
- // Identify next run
- int runLen = countRunAndMakeAscending(a, lo, hi, c);
- // If the run is short, extend to min(minRun, nRemaining)
- if (runLen < minRun) {
- int force = nRemaining <= minRun ? nRemaining : minRun;
- binarySort(a, lo, lo + force, lo + runLen, c);
- runLen = force;
- }
-
- // Push run onto pending-run stack, and maybe merge
- sortState.pushRun(lo, runLen);
- sortState.mergeCollapse();
- // Advance to find next run
- lo += runLen;
- nRemaining -= runLen;
- } while (nRemaining != 0);
-
- // Merge all remaining runs to complete sort
- assert lo == hi;
- sortState.mergeForceCollapse();
- assert sortState.stackSize == 1;
- }
-
-
- /**
- * Sorts the specified portion of the specified array using a binary
- * insertion sort. This is the best method for sorting small numbers
- * of elements. It requires O(n log n) compares, but O(n^2) data
- * movement (worst case).
- *
- * If the initial part of the specified range is already sorted,
- * this method can take advantage of it: the method assumes that the
- * elements from index {@code lo}, inclusive, to {@code start},
- * exclusive are already sorted.
- */
-
- @SuppressWarnings("fallthrough")
- private void binarySort(Buffer a, int lo, int hi, int start, Comparator<? super K> c) {
- assert lo <= start && start <= hi;
- if (start == lo)
- start++;
- K key0 = s.newKey();
- K key1 = s.newKey();
- Buffer pivotStore = s.allocate(1);
- for ( ; start < hi; start++) {
- s.copyElement(a, start, pivotStore, 0);
- K pivot = s.getKey(pivotStore, 0, key0);
- // Set left (and right) to the index where a[start] (pivot) belongs
- int left = lo;
- int right = start;
- assert left <= right;
- /*
- * Invariants:
- * pivot >= all in [lo, left).
- * pivot < all in [right, start).
- */
- while (left < right) {
- int mid = (left + right) >>> 1;
- if (c.compare(pivot, s.getKey(a, mid, key1)) < 0)
- right = mid;
- else
- left = mid + 1;
- }
-
- assert left == right;
-
- /*
- * The invariants still hold: pivot >= all in [lo, left) and
- * pivot < all in [left, start), so pivot belongs at left. Note
- * that if there are elements equal to pivot, left points to the
- * first slot after them -- that's why this sort is stable.
- * Slide elements over to make room for pivot.
- */
-
- int n = start - left; // The number of elements to move
- // Switch is just an optimization for arraycopy in default case
- switch (n) {
- case 2: s.copyElement(a, left + 1, a, left + 2);
- case 1: s.copyElement(a, left, a, left + 1);
- break;
- default: s.copyRange(a, left, a, left + 1, n);
- }
- s.copyElement(pivotStore, 0, a, left);
- }
-
- }
复制代码
spark采用的排序方法是TIMSort,它是归并排序的优化版,并且在小数据量时切换为binarySort来提升效率.但不管是TimSort还是binarySort都是排序稳定的.既然是稳定的就不应该出现多次结果不一致.继续追踪代码,最终在ShuffleInMemorySorter类中定位到如下位置
- //final class ShuffleInMemorySorter
- /**
- * Inserts a record to be sorted.
- *
- * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
- * certain pointer compression techniques used by the sorter, the sort can
- * only operate on pointers that point to locations in the first
- * {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
- * @param partitionId the partition id, which must be less than or equal to
- * {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
- */
-
- public void insertRecord(long recordPointer, int partitionId) {
- if (!hasSpaceForAnotherRecord()) {
- throw new IllegalStateException("There is no space for new record");
- }
- array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
- pos++;
- }
复制代码
根据代码追踪记录得到的信息,Spark会将数据存储在一个LongArray结构中,代码中的array就是该类的对象,在插入数据时会对数据进行排序.此时LongArray中的数据是单个partition的数据,是有序的.但还需要将全部的partition数据进行合并.
追踪到的合并代码如下,spark会创建partition的迭代器进行多分区数据合并.
- /**
- * Return an iterator over all the data written to this object, grouped by partition and
- * aggregated by the requested aggregator. For each partition we then have an iterator over its
- * contents, and these are expected to be accessed in order (you can't "skip ahead" to one
- * partition without reading the previous one). Guaranteed to return a key-value pair for each
- * partition, in order of partition ID.
- *
- * For now, we just merge all the spilled files in once pass, but this can be modified to
- * support hierarchical merging.
- * Exposed for testing.
- */
-
- def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
- val usingMap = aggregator.isDefined
- val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
- if (spills.isEmpty) {
- // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
- // we don't even need to sort by anything other than partition ID
- if (ordering.isEmpty) {
- // The user hasn't requested sorted keys, so only sort by partition ID, not key
- groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
- } else {
- // We do need to sort by both partition ID and key
- groupByPartition(destructiveIterator(
- collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
- }
- } else {
- // Merge spilled and in-memory data
- merge(spills.toSeq, destructiveIterator(
- collection.partitionedDestructiveSortedIterator(comparator)))
- }
- }
复制代码
该方法返回了一个partition的迭代器.向内继续追踪代码可以发现,迭代器的排序规则是首先按照partitionID进行排序,然后才是数据的key. partitionID的获取方法为getPartition()
- override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
复制代码
它与VertexId相关.
- val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
- val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
复制代码
而VertexId是spark有向无环图中的点的ID,它的定义如下
- /**
- * A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
- * to follow any ordering or any constraints other than uniqueness.
- */
- type VertexId = Long
复制代码
根据注释,vertexID的值只需要遵循唯一性,不需要遵循有序性.由此推断出partitionID也是无序的.在最终数据排序时,先遍历哪个partition是不固定的,造成了spark对数据整体排序时,如果有两个排名都为1的数据分布在不同的partition中,排名相同的数据不一定哪一个先被写入磁盘结果文件中,造成了排序不稳定.
2- 单分区设想
从上边追踪过程已经知道了order by排序不稳定的原因,但我们很容易联想到,单分区的排序是不是绝对稳定的,因为它避开了多分区遍历的问题.
还是先从实验入手,将dataintel.testorderby表中的小文件删除,此时表格中只剩下了一个文件,所以spark任务也是单分区执行的.剩余数据如下:
- scala> spark.sql("select * from dataintel.orderbytest order by age").show
- +----+---+
- |name|age|
- +----+---+
- | 张三| 1|
- | 赵六| 1|
- | 李四| 2|
- | 王五| 3|
- +----+---+
复制代码
经过多次执行(百八十次吧),发现数据顺序是稳定的.进一步印证了之前的结果.
结论
所以,在大数据量多分区的排序场景下,分区数大于1个会有多次排序结果不同的隐患,但我们不可能去限定分区数为1,这样效率太低而且不符合实际.在spark sql中,如果使用了order by而且对结果的期望是每次结果都相同,可以使用业务排序字段组合唯一编码字段排序.比如实际业务排序的key+messageId进行组合.
---------------------
|