2) Determine data skew for a partition key, or in other words if the amount of data in certain partitions is significantly more than the data in other partitions then it is not a good partitioning column. For example, in the image below, note that there are more circles than the number of stars in the left column, whereas the distribution of data is even in the column to the right:
Next, we look into our options for partitioning our data by specifying number of partitions.
An existing RDD can be repartitioned by specifying the number of partitions using the repartition()
method. Note that this will trigger the expensive shuffle()
operation. Therefore, when you are repartitioning to reduce the number of partitions, then use coalesce()
which will run without performing a shuffle()
.
One example of where you would want to partition your RDD into a specific number of partitions is when you do not have any evenly distributed partitioning columns (as discussed above) and you do not want to use other techniques for distributing your data into partitions such as salting.
Number of partitions during Sparks shuffle()
operation can be specified by spark.sql.shuffle.partitions
parameter. Default value for this parameter is 200.
Number of partitions can also be specified to create Sparks distributed datasets from collections. If they are not specified, Spark will automatically split up the generated dataset into partitions based on the value of sc.defaultParallelism
. For example, on a cluster with 12 workers, following is the behavior of parallelize()
:
sc.parallelize(list(range(24))).glom().collect() #returns:
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15], [16, 17], [18, 19], [20, 21], [22, 23]]
In case, when the size of our collection is smaller than the number of workers, we have:
sc.parallelize(list(range(6))).glom().collect() #returns:
[[], [0], [], [1], [], [2], [], [3], [], [4], [], [5]]
Number of partitions can also be specified when reading from a JDBC datasource. Spark calculates Partition Strides (or intervals) when partitions are specified along with the lowerBound
and upperBound
values. Size of the stride is calculated by:
strideSize = (upperBound – lowerBound)/numPartitions
When using bounds, all four options: upperBound, lowerBound, partitionColumns, numPartitions
must be specified. For example, following command:
df=spark.read.format("jdbc").option("url", "jdbc:mysql://servername")
.option("dbtable", "yourschema.tablename")
.option("user", "yourusername")
.option("password", "yourpassword")
.option("lowerBound", 1)
.option("upperBound", 10)
.option("partitionColumn", "id")
.option("numPartitions", 5)
.option("driver", "com.mysql.jdbc.Driver")
.load()
Returns the following result, when the id
column contains values 1 to 10:
[
[Row(id=1, name='a'), Row(id=2, name='b')],
[Row(id=3, name='c'), Row(id=4, name='d')],
[Row(id=5, name='e'), Row(id=6, name='f')],
[Row(id=7, name='g'), Row(id=8, name='h')],
[Row(id=9, name='i'), Row(id=10, name='j')]
]
where the database table being read contains the following data:
This works well when our Spark Cluster has five workers since we get five equal partitions that each worker can process.
Now let us look into the things we need to keep into consideration when we are writing our DataFrame.
Let us look into an example where our write will fail unless we configure partitioning correctly. If you write your DataFrame without partitioning it, it will be partitioned by Sparks default minimum split size configuration, which is 128MB. Now say you have a DataFrame whose size is 1.28GB, then it could fail due to Out of Memory Error because Spark Stage processing may consume more memory than what is present on that executor. For example, if your executor has 10 workers, then you have to ensure that you provide at least 10*128MB=1280MB memory to Spark executor to execute your job which writes your DataFrame of size 1.28GB.
The general rule irrespective of the type of operation is that the number of partitions should be enough to keep all your workers in all your executors busy for the duration of the job while making sure that the executor has enough memory for all your workers.
Partitioning plays an important role when performing joins on your dataset. A spark dataframe is joined to another dataframe by specifying the join columns and a join type, which could be inner, outer, full, left, right etc. Spark considers various characteristics of data including its partitioning information in order to pick an efficient join algorithm which could be SortMergeJoin, BroadcastHashJoin, ShuffleHashJoin, BroadcastNestedLoopJoin. I will discuss join algorithms in more details in another article, but I will provide an example here to get basic idea about how partitioning plays a role when a join is performed. If you are performing an inner join and Spark chooses SortMergeJoin to perform the join, then the join columns must be partitioned by the same partitioning algorithm, which could be a Hash based partitioning which is based on Javas Object.hashCode()
method.
Notice that in the image above, if we want to find out which vegetable was consumed on which day, we have to make an inner-join on "Vegetables" and "VegetableDays" tables. If there is no partitioning key, our join algorithm will have to scan through all thirty records of "VegetableDays" for each vegetable to find out the days when it was consumed. On the other hand, if we use "Vegetable Id" as the partitioning key and our partitioning hash function as "id%3", in that case all the records with "id%3==0" will land in one bucket which we denote as "View 1" in our example. In that case, we only have to scan through nine records per vegetable (unless id%3==1, then we will have twelve records) reducing our search space to N/3 where N is the number of records in "VegetableDays".
You can find various default values for partitioning in Spark documentation available
here.
© Smarticles | All rights reserved