Performance Tuning on Apache Spark
By Prabeesh Keezhathra
Performance tuning is an important aspect of working with Apache Spark, as it can help ensure that your data processing tasks are efficient and run smoothly. In this blog post, we will delve into the common issues that can be considered when tuning the performance of Apache Spark. These issues include spill, skew, shuffle, storage, and serialization.
Spill
One problem that can occur is spill, which is the writing of temp files to disk due to a lack of memory. This can happen when the data being processed is too large to fit into memory, and it can significantly impact the performance of your tasks.
To avoid spill, you can try using techniques like salted joins or adaptive query execution. For example, you can use a salted join in PySpark as follows:
# Use a salted join to avoid spill
df1 = df1.withColumn("salt", functions.monotonically_increasing_id())
df2 = df2.withColumn("salt", functions.monotonically_increasing_id())
df1.join(df2, on=["key", "salt"], how="inner").drop("salt")
Also it is important to ensure that you have enough memory available to avoid spills. One way to do this is by increasing the amount of memory available to the JVM. You can do this by using the following code:
spark.conf.set("spark.executor.memory", "16g")
Skew
Another issue that can arise is skew, which refers to an imbalance in the size of the partitions. When the size of the partitions is not evenly distributed, it can lead to a skewed workload, which can negatively impact performance, which can lead to some tasks taking longer than others. This can be mitigated by ensuring that the data is evenly distributed across the partitions. One way to do this is by using the repartition() function to redistribute the data evenly.
You can also try manually redistributing the data or using techniques like bucketing or sampling. For example, you can use bucketing in PySpark as follows:
# Use bucketing to address skew
df = df.bucketBy(10, "key")
If you do encounter skew, it’s important to check each stage and ensure that the shuffle is almost equal. A small amount of skew, less than 20%, is usually ignorable.
Shuffle
Shuffle is another problem that can arise during performance tuning. Shuffle refers to the act of moving data between executors, and it can be resource-intensive and lead to slower processing times. To minimize the impact of shuffle, you can try to avoid wide transformations, such as joins, distinct, groupBy, and orderBy, whenever possible. One way to do this is by using narrow transformations, which are independent of other partitions and do not involve shuffling. Examples of narrow transformations include map(), filter(), and reduce(). These operations can be much faster than wide transformations, which involve shuffling.
You can also try using techniques like pre-shuffling and broadcasting smaller tables(~10MB) to reduce the amount of data that needs to be shuffled.
For example, you can use pre-shuffling in PySpark as follows:
# Use pre-shuffling to reduce shuffle
df1 = df1.repartition(10, "key")
df2 = df2.repartition(10, "key")
Storage
Storage is another area that can impact performance, and it refers to a set of problems related to how the data is stored on disk. Issues like the tiny file problem, directory scanning, and schema evolution can all impact performance and should be addressed during tuning.
One issue to be aware of is the tiny file problem, where small files can cause performance issues when reading and processing data. It’s important to ensure that you have large enough part-files to avoid this issue. A general rule of thumb is to aim for part-files that are between 128MB and 1GB in size. One way to address the tiny file problem is by compact small files into larger ones.
For example, you can use manual compaction in PySpark as follows:
# Use manual compaction to address the tiny file problem
df.coalesce(1).write.mode("overwrite").parquet("output_path")
One tip is to always specify the schema when reading data. This can help reduce reading time, as Spark won’t have to infer the schema on its own. For example, in PySpark you can specify the schema as follows:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
df = spark.read.format("csv") \
.option("header", "true") \
.schema(schema) \
.load("data.csv")
Serialization
Serialization is the distribution of code segments across the cluster. It’s important to use efficient serialization techniques, such as Tungsten. Tungsten is a serialization project developed specifically for Apache Spark, and it can significantly improve the performance of your data processing tasks. To enable Tungsten serialization in your Spark code, you can use the following code:
# Use Tungsten for serialization
spark.conf.set("spark.sql.tungsten.enabled", "true")
One issue with serialization is Python overhead, which occurs when using Python User Defined Functions (UDFs) in Spark. Python UDFs can be slower than their Scala or Java counterparts due to the overhead of serializing and deserializing the data between the JVM and Python. This overhead can significantly impact the performance of your data processing tasks, especially if you are using a large number of UDFs.
To mitigate this issue, it’s recommended to use Python’s higher-order functions instead of UDFs wherever possible. Higher-order functions are functions that operate on other functions, and they can often be more efficient than UDFs. For example, the map() function is a higher-order function that applies a given function to each element in a list. Here’s an example of how you can use the map() function in place of a UDF:
# Use the map() function to apply a function to each element in a list
numbers = [1, 2, 3, 4, 5]
doubled_numbers = map(lambda x: x * 2, numbers)
Another option is to use Pandas or vectorized UDFs, which can also be more performant than traditional UDFs. Pandas UDFs are functions that operate on Pandas DataFrames and Series, and they can be used to apply custom functions to large amounts of data in a highly efficient manner. Vectorized UDFs are similar to Pandas UDFs, but they operate on Apache Arrow data structures, which are even more efficient.
To use a Pandas UDF in PySpark, you can use the following code:
# Define a Pandas UDF
@pandas_udf(returnType=DoubleType())
def double(x: pd.Series) -> pd.Series:
return x * 2
# Apply the Pandas UDF to a Spark DataFrame
df = df.withColumn("doubled_col", double(df["col"]))
Another option is to use SQL higher-order functions, which are very robust and efficient. These functions operate on a column of data and can be used in place of UDFs to improve performance. For example, the AVG() function is a SQL higher-order function that calculates the average value of a column. Here’s an example of how you can use the AVG() function in a Spark SQL query:
# Use the AVG() function to calculate the average value of a column
df.createOrReplaceTempView("data")
spark.sql("SELECT AVG(col) FROM data").show()
Overall, it’s important to consider serialization when performance tuning on Apache Spark. By using more efficient serialization techniques, such as higher-order functions, Pandas or vectorized UDFs, and SQL higher-order functions, you can significantly improve the performance of your data processing tasks.
It is also a good idea to use the sc.setJobDescription() function in your code. This will help you see the named description of the current job in the Spark UI, which can make it easier to debug specific jobs. For example:
sc.setJobDescription("Processing data for analysis")
df = df.filter(df.age > 30)
Overall, performance tuning on Apache Spark involves addressing a variety of issues, including spill, skew, shuffle, storage, and serialization. By properly addressing these problems, you can ensure that your data processing tasks are efficient and run smoothly.