Spark中repartition和coalesce的区别与使用场景解析

Alex / 3-3 11:33 / Spark / Tag: Spark

repartition和coalesce都是进行RDD的重新分区操作,那么他们有什么区别与各自合适的使用场景呢,我们来看下边的源码


def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)
  }
 可以看到 repartition 内部实现调用的 coalesce 且为coalesce中  shuffle = true的实现

注释中:

* If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
* which can avoid performing a shuffle.

如果父RDD为1000分区,filter转换操作后过滤了50%的数据,想把数据重新分布在500分区中,这个时候是去减少分区这个时候使用coalesce(500)

能够避免引起shuffle


分区由少变多,或者在一些不是键值对的RDD中想要重新分区的话,就需要使用repartition了


下边是

coalesce函数的源码实现
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
      : RDD[T] = withScope {
    if (shuffle) {
      /** Distributes elements evenly across output partitions, starting from a random partition. */
      val distributePartition = (index: Int, items: Iterator[T]) => {
        var position = (new Random(index)).nextInt(numPartitions)
        items.map { t =>
          // Note that the hash code of the key will just be the key itself. The HashPartitioner
          // will mod it with the number of total partitions.
          position = position + 1
          (position, t)
        }
      } : Iterator[(Int, T)]

      // include a shuffle step so that our upstream tasks are still distributed
      new CoalescedRDD(
        new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
        new HashPartitioner(numPartitions)),
        numPartitions).values
    } else {
      new CoalescedRDD(this, numPartitions)
    }
  }





发表留言:

Python3.x不再支持mysqldb以后python3.x使用pymysql连接mysql
返回顶部
Themes by lishiyu.cn