

  1. 如果读取的是hdfs,那么有多少个block,就有多少个partition
1.拿出1-5号5个小文件(也就是5个partition) 分别给3个executor读取(spark调度会以vcore为单位,实际就是5个executor,10个task读10个partition)
2.如果3个executor执行速度相同,再拿6-10号文件 依次给这3个executor读取
  1. repartition的底层调用


  def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)}



/*** Return a new RDD that is reduced into `numPartitions` partitions.** This results in a narrow dependency, e.g. if you go from 1000 partitions* to 100 partitions, there will not be a shuffle, instead each of the 100* new partitions will claim 10 of the current partitions.** However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,* this may result in your computation taking place on fewer nodes than* you like (e.g. one node in the case of numPartitions = 1). To avoid this,* you can pass shuffle = true. This will add a shuffle step, but means the* current upstream partitions will be executed in parallel (per whatever* the current partitioning is).** Note: With shuffle = true, you can actually coalesce to a larger number* of partitions. This is useful if you have a small number of partitions,* say 100, potentially with a few partitions being abnormally large. Calling* coalesce(1000, shuffle = true) will result in 1000 partitions with the* data distributed using a hash partitioner.*/def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null): RDD[T] = withScope {if (shuffle) {/** Distributes elements evenly across output partitions, starting from a random partition. 注意,键的哈希代码就是键本身。HashPartitioner将用分区的总数对它进行修改。*/val distributePartition = (index: Int, items: Iterator[T]) => {var position = (new Random(index)).nextInt(numPartitions) { t =>// Note that the hash code of the key will just be the key itself. The HashPartitioner// will mod it with the number of total partitions.position = position + 1(position, t)}} : Iterator[(Int, T)]// include a shuffle step so that our upstream tasks are still distributed 包含一个shuffle步骤,以便我们的上游任务仍然是分布式的。new CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),new HashPartitioner(numPartitions)),numPartitions).values} else {new CoalescedRDD(this, numPartitions)}}
  1. coalesce默认不会产生shuffle
def coalesce(numPartitions: Int): DataFrame = withPlan {Repartition(numPartitions, shuffle = false, logicalPlan)}



  • coleasce
  • repartition


