Search the Community
Showing results for tags 'data skew'.
-
AWS Glue is a fully managed, serverless data integration service provided by Amazon Web Services (AWS) that uses Apache Spark as one of its backend processing engines (as of this writing, you can use Python Shell, Spark, or Ray). Data skew occurs when the data being processed is not evenly distributed across the Spark cluster, causing some tasks to take significantly longer to complete than others. This can lead to inefficient resource utilization, longer processing times, and ultimately, slower performance. Data skew can arise from various factors, including uneven data distribution, skewed join keys, or uneven data processing patterns. Even though the biggest issue is often having nodes running out of disk during shuffling, which leads to nodes falling like dominoes and job failures, it’s also important to mention that data skew is hidden. The stealthy nature of data skew means it can often go undetected because monitoring tools might not flag an uneven distribution as a critical issue, and logs don’t always make it evident. As a result, a developer may observe that their AWS Glue jobs are completing without apparent errors, yet the system could be operating far from its optimal efficiency. This hidden inefficiency not only increases operational costs due to longer runtimes but can also lead to unpredictable performance issues that are difficult to diagnose without a deep dive into the data distribution and task run patterns. For example, in a dataset of customer transactions, if one customer has significantly more transactions than the others, it can cause a skew in the data distribution. Identifying and handling data skew issues is key to having good performance on Apache Spark and therefore on AWS Glue jobs that use Spark as a backend. In this post, we show how you can identify data skew and discuss the different techniques to mitigate data skew. How to detect data skew When an AWS Glue job has issues with local disks (split disk issues), doesn’t scale with the number of workers, or has low CPU usage (you can enable Amazon CloudWatch metrics for your job to be able to see this), you may have a data skew issue. You can detect data skew with data analysis or by using the Spark UI. In this section, we discuss how to use the Spark UI. The Spark UI provides a comprehensive view of Spark applications, including the number of tasks, stages, and their duration. To use it you need to enable Spark UI event logs for your job runs. It is enabled by default on Glue console and once enabled, Spark event log files will be created during the job run and stored in your S3 bucket. Then, those logs are parsed, and you can use the AWS Glue serverless Spark UI to visualize them. You can refer to this blogpost for more details. In those jobs where the AWS Glue serverless Spark UI does not work as it has a limit of 512 MB of logs, you can set up the Spark UI using an EC2 instance. You can use the Spark UI to identify which tasks are taking longer to complete than others, and if the data distribution among partitions is balanced or not (remember that in Spark, one partition is mapped to one task). If there is data skew, you will see that some partitions have significantly more data than others. The following figure shows an example of this. We can see that one task is taking a lot more time than the others, which can indicate data skew. Another thing that you can use is the summary metrics for each stage. The following screenshot shows another example of data skew. These metrics represent the task-related metrics below which a certain percentage of tasks completed. For example, the 75th percentile task duration indicates that 75% of tasks completed in less time than this value. When the tasks are evenly distributed, you will see similar numbers in all the percentiles. When there is data skew, you will see very biased values in each percentile. In the preceding example, it didn’t write many shuffle files (less than 50 MiB) in Min, 25th percentile, Median, and 75th percentile. However, in Max, it wrote 460 MiB, 10 times the 75th percentile. It means there was at least one task (or up to 25% of tasks) that wrote much bigger shuffle files than the rest of the tasks. You can also see that the duration of the tax in Max is 46 seconds and the Median is 2 seconds. These are all indicators that your dataset may have data skew. AWS Glue interactive sessions You can use interactive sessions to load your data from the AWS Glue Data Catalog or just use Spark methods to load the files such as Parquet or CSV that you want to analyze. You can use a similar script to the following to detect data skew from the partition size perspective; the more important issue is related to data skew while shuffling, and this script does not detect that kind of skew: from pyspark.sql.functions import spark_partition_id, asc, desc #input_dataframe being the dataframe where you want to check for data skew partition_sizes_df=input_dataframe\ .withColumn("partitionId", spark_partition_id())\ .groupBy("partitionId")\ .count()\ .orderBy(asc("count"))\ .withColumnRenamed("count","partition_size") #calculate average and standar deviation for the partition sizes avg_size = partition_sizes_df.agg({"partition_size": "avg"}).collect()[0][0] std_dev_size = partition_sizes_df.agg({"partition_size": "stddev"}).collect()[0][0] """ the code calculates the absolute difference between each value in the "partition_size" column and the calculated average (avg_size). then, calculates twice the standard deviation (std_dev_size) and use that as a boolean mask where the condition checks if the absolute difference is greater than twice the standard deviation in order to mark a partition 'skewed' """ skewed_partitions_df = partition_sizes_df.filter(abs(partition_sizes_df["partition_size"] - avg_size) > 2 * std_dev_size) if skewed_partitions_df.count() > 0: skewed_partitions = [row["partition_id"] for row in skewed_partitions_df.collect()] print(f"The following partitions have significantly different sizes: {skewed_partitions}") else: print("No data skew detected.") You can calculate the average and standard deviation of partition sizes using the agg() function and identify partitions with significantly different sizes using the filter() function, and you can print their indexes if any skewed partitions are detected. Otherwise, the output prints that no data skew is detected. This code assumes that your data is structured, and you may need to modify it if your data is of a different type. How to handle data skew You can use different techniques in AWS Glue to handle data skew; there is no single universal solution. The first thing to do is confirm that you’re using latest AWS Glue version, for example AWS Glue 4.0 based on Spark 3.3 has enabled by default some configs like Adaptative Query Execution (AQE) that can help improve performance when data skew is present. The following are some of the techniques that you can employ to handle data skew: Filter and perform – If you know which keys are causing the skew, you can filter them out, perform your operations on the non-skewed data, and then handle the skewed keys separately. Implementing incremental aggregation – If you are performing a large aggregation operation, you can break it up into smaller stages because in large datasets, a single aggregation operation (like sum, average, or count) can be resource-intensive. In those cases, you can perform intermediate actions. This could involve filtering, grouping, or additional aggregations. This can help distribute the workload across the nodes and reduce the size of intermediate data. Using a custom partitioner – If your data has a specific structure or distribution, you can create a custom partitioner that partitions your data based on its characteristics. This can help make sure that data with similar characteristics is in the same partition and reduce the size of the largest partition. Using broadcast join – If your dataset is small but exceeds the spark.sql.autoBroadcastJoinThreshold value (default is 10 MB), you have the option to either provide a hint to use broadcast join or adjust the threshold value to accommodate your dataset. This can be an effective strategy to optimize join operations and mitigate data skew issues resulting from shuffling large amounts of data across nodes. Salting – This involves adding a random prefix to the key of skewed data. By doing this, you distribute the data more evenly across the partitions. After processing, you can remove the prefix to get the original key values. These are just a few techniques to handle data skew in PySpark; the best approach will depend on the characteristics of your data and the operations you are performing. The following is an example of joining skewed data with the salting technique: from pyspark.sql import SparkSession from pyspark.sql.functions import lit, ceil, rand, concat, col # Define the number of salt values num_salts = 3 # Function to identify skewed keys def identify_skewed_keys(df, key_column, threshold): key_counts = df.groupBy(key_column).count() return key_counts.filter(key_counts['count'] > threshold).select(key_column) # Identify skewed keys skewed_keys = identify_skewed_keys(skewed_data, "key", skew_threshold) # Splitting the dataset skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "inner") non_skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "left_anti") # Apply salting to skewed data skewed_data_subset = skewed_data_subset.withColumn("salt", ceil((rand() * 10) % num_salts)) skewed_data_subset = skewed_data_subset.withColumn("salted_key", concat(col("key"), lit("_"), col("salt"))) # Replicate skewed rows in non-skewed dataset def replicate_skewed_rows(df, keys, multiplier): replicated_df = df.join(keys, ["key"]).crossJoin(spark.range(multiplier).withColumnRenamed("id", "salt")) replicated_df = replicated_df.withColumn("salted_key", concat(col("key"), lit("_"), col("salt"))) return replicated_df.drop("salt") replicated_non_skewed_data = replicate_skewed_rows(non_skewed_data, skewed_keys, num_salts) # Perform the JOIN operation on the salted keys for skewed data result_skewed = skewed_data_subset.join(replicated_non_skewed_data, "salted_key") # Perform regular join on non-skewed data result_non_skewed = non_skewed_data_subset.join(non_skewed_data, "key") # Combine results final_result = result_skewed.union(result_non_skewed) In this code, we first define a salt value, which can be a random integer or any other value. We then add a salt column to our DataFrame using the withColumn() function, where we set the value of the salt column to a random number using the rand() function with a fixed seed. The function replicate_salt_rows is defined to replicate each row in the non-skewed dataset (non_skewed_data) num_salts times. This ensures that each key in the non-skewed data has matching salted keys. Finally, a join operation is performed on the salted_key column between the skewed and non-skewed datasets. This join is more balanced compared to a direct join on the original key, because salting and replication have mitigated the data skew. The rand() function used in this example generates a random number between 0–1 for each row, so it’s important to use a fixed seed to achieve consistent results across different runs of the code. You can choose any fixed integer value for the seed. The following figures illustrate the data distribution before (left) and after (right) salting. Heavily skewed key2 identified and salted into key2_0, key2_1, and key2_2, balancing the data distribution and preventing any single node from being overloaded. After processing, the results can be aggregated back, so that that the final output is consistent with the unsalted key values. Other techniques to use on skewed data during the join operation When you’re performing skewed joins, you can use salting or broadcasting techniques, or divide your data into skewed and regular parts before joining the regular data and broadcasting the skewed data. If you are using Spark 3, there are automatic optimizations for trying to optimize Data Skew issues on joins. Those can be tuned because they have dedicated configs on Apache Spark. Conclusion This post provided details on how to detect data skew in your data integration jobs using AWS Glue and different techniques for handling it. Having a good data distribution is key to achieving the best performance on distributed processing systems like Apache Spark. Although this post focused on AWS Glue, the same concepts apply to jobs you may be running on Amazon EMR using Apache Spark or Amazon Athena for Apache Spark. As always, AWS welcomes your feedback. Please leave your comments and questions in the comments section. About the Authors Salim Tutuncu is a Sr. PSA Specialist on Data & AI, based from Amsterdam with a focus on the EMEA North and EMEA Central regions. With a rich background in the technology sector that spans roles as a Data Engineer, Data Scientist, and Machine Learning Engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses leveraging the AWS Platform, particularly in Data and AI use cases. Angel Conde Manjon is a Sr. PSA Specialist on Data & AI, based in Madrid, and focuses on EMEA South and Israel. He has previously worked on research related to Data Analytics and Artificial Intelligence in diverse European research projects. In his current role, Angel helps partners develop businesses centered on Data and AI. View the full article
-
Forum Statistics
63.6k
Total Topics61.7k
Total Posts