分享

避坑:Spark Sql的Order By排序是不稳定的



问题导读:

1、为什么同一批数据执行相同sql返回值不相同?
2、怎样构建一个测试用例?
3、怎样使同一批数据执行相同sql返回值相同?




问题背景
生产环境数据产品项目出现相同代码多次执行结果不同的问题,经过排查,发现了问题出现在first value的窗口函数返回的结果上.同一批数据执行相同sql返回值不相同.

sql类似如下形式:

  1. select * ,first_value(s_id) over (partition by c_id order by s_score)first_show from score;
复制代码

可以看到,first_value是窗口函数,它是基于over子句的结果进行取值,而first_value本身的逻辑比较简单,就是取窗口函数的第一个值,所以大概率是因为order by返回的结果不固定导致的.为了验证猜测,进行如下实验.

实验

只需要构建一个测试用例,用例中有字段相同的值,然后对该字段多次执行order by语句查看结果是否相同即可验证推测.

1- 构建测试用例

生产环境的数据量较大,数据为多分区,而spark任务执行时是根据底层文件数来启动task,为了尽量模拟生产环境,我们创建一个表,给它上传两个文件.表信息: orderbytest(name string, age int)


  1. create table dataintel.orderbytest(name string, age int) row format delimited fields terminated by ',';
复制代码


上传两个csv文件到表数据目录,文件内容如下

txt1 txt2
张三,1
李四,2
王五,3
赵六,1
张四,1

测试用例构建完成,执行select语句验证测试用例.

  1. select * from dataintel.orderbytest;
  2. +----+---+
  3. |name|age|
  4. +----+---+
  5. |张三|1|
  6. |李四|2|
  7. |王五|3|
  8. |赵六|1|
  9. |张四|1|
  10. +----+---+
复制代码

2- 执行测试sql

多次执行order by语句,对比结果

  1. spark.sql("select * from dataintel.orderbytest order by age").show
复制代码
  1. #第一次执行结果
  2. +----+---+
  3. |name|age|
  4. +----+---+
  5. | 张三| 1|
  6. | 张四| 1|
  7. | 赵六| 1|
  8. | 李四| 2|
  9. | 王五| 3|
  10. +----+---+
  11. #第n次执行结果
  12. +----+---+
  13. |name|age|
  14. +----+---+
  15. | 赵六| 1|
  16. | 张三| 1|
  17. | 张四| 1|
  18. | 李四| 2|
  19. | 王五| 3|
  20. +----+---+
复制代码

经过多次实验可以看到,order by的排序是不稳定的.当排序字段发生多个值排名相同时,虽然排序结果都正确,但结果不唯一.

排序算法的稳定性指对数据集进行排序时,不改变它原来的相对位置.比如a和b相等,a在未排序数组中在b的前边.在排序结束后,不管a、b处于排序结果的什么位置,他俩的相对位置一定还是a在前,b在后.

Spark order by语句执行结果不唯一,证明a和b的位置一定发生过变动,所以是不稳定的.

原因

虽然sql排序不稳定在绝大多数的数据库软件中都存在,但我们知道,spark是用scala写的,而scala是运行在JVM上的,所以它的排序规则应该等同Java: 默认对所有非基本类型数据排序使用具备稳定性的,比如归并;对所有基本类型(int,double等)采用快排等不稳定排序.

scala的所有数据类型都封装为了非基本类型,按道理应该是使用稳定性排序,不应该出现这种情况.看源码找原因.

1- 代码定位及问题追踪

Spark sql的执行解析过程如下

d35e1aa0343648b6861d8a71c99e7342.png

从输入sql语句到最终执行动作,经过了一系列的封装和转化.根据经验,只有到PhysicalPlan这一步才能看到具体的操作.省略漫长的代码追踪过程,最终得到spark的排序代码:

下面所有的代码如果觉得繁琐可以略过,后边都有概述

  1. /**
  2. * A stable, adaptive, iterative mergesort that requires far fewer than
  3. * n lg(n) comparisons when running on partially sorted arrays, while
  4. * offering performance comparable to a traditional mergesort when run
  5. * on random arrays. Like all proper mergesorts, this sort is stable and
  6. * runs O(n log n) time (worst case). In the worst case, this sort requires
  7. * temporary storage space for n/2 object references; in the best case,
  8. * it requires only a small constant amount of space.
  9. * @author Josh Bloch
  10. */
  11. public void sort(Buffer a, int lo, int hi, Comparator<? super K> c) {
  12. assert c != null;
  13. int nRemaining = hi - lo;
  14. if (nRemaining < 2)
  15. return; // Arrays of size 0 and 1 are always sorted
  16. // If array is small, do a "mini-TimSort" with no merges
  17. if (nRemaining < MIN_MERGE) {
  18. int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
  19. binarySort(a, lo, hi, lo + initRunLen, c);
  20. return;
  21. }
  22. /**
  23. * March over the array once, left to right, finding natural runs,
  24. * extending short natural runs to minRun elements, and merging runs
  25. * to maintain stack invariant.
  26. */
  27. SortState sortState = new SortState(a, c, hi - lo);
  28. int minRun = minRunLength(nRemaining);
  29. do {
  30. // Identify next run
  31. int runLen = countRunAndMakeAscending(a, lo, hi, c);
  32. // If the run is short, extend to min(minRun, nRemaining)
  33. if (runLen < minRun) {
  34. int force = nRemaining <= minRun ? nRemaining : minRun;
  35. binarySort(a, lo, lo + force, lo + runLen, c);
  36. runLen = force;
  37. }
  38. // Push run onto pending-run stack, and maybe merge
  39. sortState.pushRun(lo, runLen);
  40. sortState.mergeCollapse();
  41. // Advance to find next run
  42. lo += runLen;
  43. nRemaining -= runLen;
  44. } while (nRemaining != 0);
  45. // Merge all remaining runs to complete sort
  46. assert lo == hi;
  47. sortState.mergeForceCollapse();
  48. assert sortState.stackSize == 1;
  49. }
  50. /**
  51. * Sorts the specified portion of the specified array using a binary
  52. * insertion sort. This is the best method for sorting small numbers
  53. * of elements. It requires O(n log n) compares, but O(n^2) data
  54. * movement (worst case).
  55. *
  56. * If the initial part of the specified range is already sorted,
  57. * this method can take advantage of it: the method assumes that the
  58. * elements from index {@code lo}, inclusive, to {@code start},
  59. * exclusive are already sorted.
  60. */
  61. @SuppressWarnings("fallthrough")
  62. private void binarySort(Buffer a, int lo, int hi, int start, Comparator<? super K> c) {
  63. assert lo <= start && start <= hi;
  64. if (start == lo)
  65. start++;
  66. K key0 = s.newKey();
  67. K key1 = s.newKey();
  68. Buffer pivotStore = s.allocate(1);
  69. for ( ; start < hi; start++) {
  70. s.copyElement(a, start, pivotStore, 0);
  71. K pivot = s.getKey(pivotStore, 0, key0);
  72. // Set left (and right) to the index where a[start] (pivot) belongs
  73. int left = lo;
  74. int right = start;
  75. assert left <= right;
  76. /*
  77. * Invariants:
  78. * pivot >= all in [lo, left).
  79. * pivot < all in [right, start).
  80. */
  81. while (left < right) {
  82. int mid = (left + right) >>> 1;
  83. if (c.compare(pivot, s.getKey(a, mid, key1)) < 0)
  84. right = mid;
  85. else
  86. left = mid + 1;
  87. }
  88. assert left == right;
  89. /*
  90. * The invariants still hold: pivot >= all in [lo, left) and
  91. * pivot < all in [left, start), so pivot belongs at left. Note
  92. * that if there are elements equal to pivot, left points to the
  93. * first slot after them -- that's why this sort is stable.
  94. * Slide elements over to make room for pivot.
  95. */
  96. int n = start - left; // The number of elements to move
  97. // Switch is just an optimization for arraycopy in default case
  98. switch (n) {
  99. case 2: s.copyElement(a, left + 1, a, left + 2);
  100. case 1: s.copyElement(a, left, a, left + 1);
  101. break;
  102. default: s.copyRange(a, left, a, left + 1, n);
  103. }
  104. s.copyElement(pivotStore, 0, a, left);
  105. }
  106. }
复制代码

spark采用的排序方法是TIMSort,它是归并排序的优化版,并且在小数据量时切换为binarySort来提升效率.但不管是TimSort还是binarySort都是排序稳定的.既然是稳定的就不应该出现多次结果不一致.继续追踪代码,最终在ShuffleInMemorySorter类中定位到如下位置

  1. //final class ShuffleInMemorySorter
  2. /**
  3. * Inserts a record to be sorted.
  4. *
  5. * @param recordPointer a pointer to the record, encoded by the task memory manager. Due to
  6. * certain pointer compression techniques used by the sorter, the sort can
  7. * only operate on pointers that point to locations in the first
  8. * {@link PackedRecordPointer#MAXIMUM_PAGE_SIZE_BYTES} bytes of a data page.
  9. * @param partitionId the partition id, which must be less than or equal to
  10. * {@link PackedRecordPointer#MAXIMUM_PARTITION_ID}.
  11. */
  12. public void insertRecord(long recordPointer, int partitionId) {
  13. if (!hasSpaceForAnotherRecord()) {
  14. throw new IllegalStateException("There is no space for new record");
  15. }
  16. array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
  17. pos++;
  18. }
复制代码

根据代码追踪记录得到的信息,Spark会将数据存储在一个LongArray结构中,代码中的array就是该类的对象,在插入数据时会对数据进行排序.此时LongArray中的数据是单个partition的数据,是有序的.但还需要将全部的partition数据进行合并.

追踪到的合并代码如下,spark会创建partition的迭代器进行多分区数据合并.

  1. /**
  2. * Return an iterator over all the data written to this object, grouped by partition and
  3. * aggregated by the requested aggregator. For each partition we then have an iterator over its
  4. * contents, and these are expected to be accessed in order (you can't "skip ahead" to one
  5. * partition without reading the previous one). Guaranteed to return a key-value pair for each
  6. * partition, in order of partition ID.
  7. *
  8. * For now, we just merge all the spilled files in once pass, but this can be modified to
  9. * support hierarchical merging.
  10. * Exposed for testing.
  11. */
  12. def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
  13. val usingMap = aggregator.isDefined
  14. val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
  15. if (spills.isEmpty) {
  16. // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
  17. // we don't even need to sort by anything other than partition ID
  18. if (ordering.isEmpty) {
  19. // The user hasn't requested sorted keys, so only sort by partition ID, not key
  20. groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
  21. } else {
  22. // We do need to sort by both partition ID and key
  23. groupByPartition(destructiveIterator(
  24. collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
  25. }
  26. } else {
  27. // Merge spilled and in-memory data
  28. merge(spills.toSeq, destructiveIterator(
  29. collection.partitionedDestructiveSortedIterator(comparator)))
  30. }
  31. }
复制代码

该方法返回了一个partition的迭代器.向内继续追踪代码可以发现,迭代器的排序规则是首先按照partitionID进行排序,然后才是数据的key. partitionID的获取方法为getPartition()

  1. override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
复制代码

它与VertexId相关.

  1. val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
  2. val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
复制代码

而VertexId是spark有向无环图中的点的ID,它的定义如下

  1. /**
  2. * A 64-bit vertex identifier that uniquely identifies a vertex within a graph. It does not need
  3. * to follow any ordering or any constraints other than uniqueness.
  4. */
  5. type VertexId = Long
复制代码

根据注释,vertexID的值只需要遵循唯一性,不需要遵循有序性.由此推断出partitionID也是无序的.在最终数据排序时,先遍历哪个partition是不固定的,造成了spark对数据整体排序时,如果有两个排名都为1的数据分布在不同的partition中,排名相同的数据不一定哪一个先被写入磁盘结果文件中,造成了排序不稳定.

2- 单分区设想

从上边追踪过程已经知道了order by排序不稳定的原因,但我们很容易联想到,单分区的排序是不是绝对稳定的,因为它避开了多分区遍历的问题.

还是先从实验入手,将dataintel.testorderby表中的小文件删除,此时表格中只剩下了一个文件,所以spark任务也是单分区执行的.剩余数据如下:

  1. scala> spark.sql("select * from dataintel.orderbytest order by age").show
  2. +----+---+
  3. |name|age|
  4. +----+---+
  5. | 张三| 1|
  6. | 赵六| 1|
  7. | 李四| 2|
  8. | 王五| 3|
  9. +----+---+
复制代码

经过多次执行(百八十次吧),发现数据顺序是稳定的.进一步印证了之前的结果.

结论

所以,在大数据量多分区的排序场景下,分区数大于1个会有多次排序结果不同的隐患,但我们不可能去限定分区数为1,这样效率太低而且不符合实际.在spark sql中,如果使用了order by而且对结果的期望是每次结果都相同,可以使用业务排序字段组合唯一编码字段排序.比如实际业务排序的key+messageId进行组合.




最新经典文章,欢迎关注公众号



---------------------

作者:winrar_setup.rar
来源:csdn
原文:避坑:Spark Sql的Order By排序是不稳定的


已有(1)人评论

跳转到指定楼层
若无梦何远方 发表于 2021-11-8 10:32:44
坚持每日一读,总结下:hive 多分区下 spark partition 读数据的时候不需要遵循有序性,所以会出现这次跟上次读的顺序不一致,结果自然就不一致了;留个疑问:多分区下保持数据一致的场景以及对应的实现可以解答下吗? (他这里说 key+messageId 这里的 messageid 怎么理解)
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条