Search the Community
Showing results for tags 'spark'.
-
Fabric Madness part 2Image by author and ChatGPT. “Design an illustration, focusing on a basketball player in action, this time the theme is on using pyspark to generate features for machine leaning models in a graphic novel style” prompt. ChatGPT, 4, OpenAI, 4 April. 2024. https://chat.openai.com.A Huge thanks to Martim Chaves who co-authored this post and developed the example scripts. In our previous post we took a high level view of how to train a machine learning model in Microsoft Fabric. In this post we wanted to dive deeper into the process of feature engineering. Feature engineering is a crucial part of the development lifecycle for any Machine Learning (ML) systems. It is a step in the development cycle where raw data is processed to better represent its underlying structure and provide additional information that enhance our ML models. Feature engineering is both an art and a science. Even though there are specific steps that we can take to create good features, sometimes, it is only through experimentation that good results are achieved. Good features are crucial in guaranteeing a good system performance. As datasets grow exponentially, traditional feature engineering may struggle with the size of very large datasets. This is where PySpark can help — as it is a scalable and efficient processing platform for massive datasets. A great thing about Fabric is that it makes using PySpark easy! In this post, we’ll be going over: How does PySpark Work?Basics of PySparkFeature Engineering in ActionBy the end of this post, hopefully you’ll feel comfortable carrying out feature engineering with PySpark in Fabric. Let’s get started! How does PySpark work?Spark is a distributed computing system that allows for the processing of large datasets with speed and efficiency across a cluster of machines. It is built around the concept of a Resilient Distributed Dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. RDDs are the fundamental data structure of Spark, and they allow for the distribution of data across a cluster of machines. PySpark is the Python API for Spark. It allows for the creation of Spark DataFrames, which are similar to Pandas DataFrames, but with the added benefit of being distributed across a cluster of machines. PySpark DataFrames are the core data structure in PySpark, and they allow for the manipulation of large datasets in a distributed manner. At the core of PySpark is the SparkSession object, which is what fundamentally interacts with Spark. This SparkSession is what allows for the creation of DataFrames, and other functionalities. Note that, when running a Notebook in Fabric, a SparkSession is automatically created for you, so you don't have to worry about that. Having a rough idea of how PySpark works, let’s get to the basics. Basics of PySparkAlthough Spark DataFrames may remind us of Pandas DataFrames due to their similarities, the syntax when using PySpark can be a bit different. In this section, we’ll go over some of the basics of PySpark, such as reading data, combining DataFrames, selecting columns, grouping data, joining DataFrames, and using functions. The DataThe data we are looking at is from the 2024 US college basketball tournaments, which was obtained from the on-going March Machine Learning Mania 2024 Kaggle competition, the details of which can be found here, and is licensed under CC BY 4.0 [1] Reading dataAs mentioned in the previous post of this series, the first step is usually to create a Lakehouse and upload some data. Then, when creating a Notebook, we can attach it to the created Lakehouse, and we’ll have access to the data stored there. PySpark Dataframes can read various data formats, such as CSV, JSON, Parquet, and others. Our data is stored in CSV format, so we’ll be using that, like in the following code snippet: # Read women's data w_data = ( spark.read.option("header", True) .option("inferSchema", True) .csv(f"Files/WNCAATourneyDetailedResults.csv") .cache() )In this code snippet, we’re reading the detailed results data set of the final women’s basketball college tournament matches. Note that the "header" option being true means that the names of the columns will be derived from the first row of the CSV file. The inferSchema option tells Spark to guess the data types of the columns - otherwise they would all be read as strings. .cache() is used to keep the DataFrame in memory. If you’re coming from Pandas, you may be wondering what the equivalent of df.head() is for PySpark - it's df.show(5). The default for .show() is the top 20 rows, hence the need to specifically select 5. Combining DataFramesCombining DataFrames can be done in multiple ways. The first we will look at is a union, where the columns are the same for both DataFrames: # Read women's data ... # Read men's data m_data = ( spark.read.option("header", True) .option("inferSchema", True) .csv(f"Files/MNCAATourneyDetailedResults.csv") .cache() ) # Combine (union) the DataFrames combined_results = m_data.unionByName(w_data)Here, unionByName joins the two DataFrames by matching the names of the columns. Since both the women's and the men's detailed match results have the same columns, this is a good approach. Alternatively, there's also union, which combines two DataFrames, matching column positions. Selecting ColumnsSelecting columns from a DataFrame in PySpark can be done using the .select() method. We just have to indicate the name or names of the columns that are relevant as a parameter. Here’s the output for w_scores.show(5): # Selecting a single column w_scores = w_data.select("WScore") # Selecting multiple columns teamid_w_scores = w_data.select("WTeamID", "WScore") ``` Here's the output for `w_scores.show(5)`: ``` +------+ |Season| +------+ | 2010| | 2010| | 2010| | 2010| | 2010| +------+ only showing top 5 rowsThe columns can also be renamed when being selected using the .alias() method: winners = w_data.select( w_data.WTeamID.alias("TeamID"), w_data.WScore.alias("Score") )Grouping DataGrouping allows us to carry out certain operations for the groups that exist within the data and is usually combined with a aggregation functions. We can use .groupBy() for this: # Grouping and aggregating winners_average_scores = winners.groupBy("TeamID").avg("Score")In this example, we are grouping by "TeamID", meaning we're considering the groups of rows that have a distinct value for "TeamID". For each of those groups, we're calculating the average of the "Score". This way, we get the average score for each team. Here’s the output of winners_average_scores.show(5), showing the average score of each team: +------+-----------------+ |TeamID| avg(Score)| +------+-----------------+ | 3125| 68.5| | 3345| 74.2| | 3346|79.66666666666667| | 3376|73.58333333333333| | 3107| 61.0| +------+-----------------+Joining DataJoining two DataFrames can be done using the .join() method. Joining is essentially extending the DataFrame by adding the columns of one DataFrame to another. # Joining on Season and TeamID final_df = matches_df.join(stats_df, on=['Season', 'TeamID'], how='left')In this example, both stats_df and matches_df were using Season and TeamID as unique identifiers for each row. Besides Season and TeamID, stats_df has other columns, such as statistics for each team during each season, whereas matches_df has information about the matches, such as date and location. This operation allows us to add those interesting statistics to the matches information! FunctionsThere are several functions that PySpark provides that help us transform DataFrames. You can find the full list here. Here’s an example of a simple function: from pyspark.sql import functions as F w_data = w_data.withColumn("HighScore", F.when(F.col("Score") > 80, "Yes").otherwise("No"))In the code snippet above, a "HighScore" column is created when the score is higher than 80. For each row in the "Score" column (indicated by the .col() function), the value "Yes" is chosen for the "HighScore" column if the "Score" value is larger than 80, determined by the .when() function. .otherwise(), the value chosen is "No". Feature Engineering in ActionNow that we have a basic understanding of PySpark and how it can be used, let’s go over how the regular season statistics features were created. These features were then used as inputs into our machine learning model to try to predict the outcome of the final tournament games. The starting point was a DataFrame, regular_data, that contained match by match statistics for the regular seasons, which is the United States College Basketball Season that happens from November to March each year. Each row in this DataFrame contained the season, the day the match was held, the ID of team 1, the ID of team 2, and other information such as the location of the match. Importantly, it also contained statistics for each team for that specific match, such as "T1_FGM", meaning the Field Goals Made (FGM) for team 1, or "T2_OR", meaning the Offensive Rebounds (OR) of team 2. The first step was selecting which columns would be used. These were columns that strictly contained in-game statistics. # Columns that we'll want to get statistics from boxscore_cols = [ 'T1_FGM', 'T1_FGA', 'T1_FGM3', 'T1_FGA3', 'T1_OR', 'T1_DR', 'T1_Ast', 'T1_Stl', 'T1_PF', 'T2_FGM', 'T2_FGA', 'T2_FGM3', 'T2_FGA3', 'T2_OR', 'T2_DR', 'T2_Ast', 'T2_Stl', 'T2_PF' ]If you’re interested, here’s what each statistic’s code means: FGM: Field Goals MadeFGA: Field Goals AttemptedFGM3: Field Goals Made from the 3-point-lineFGA3: Field Goals Attempted for 3-point-line goalsOR: Offensive Rebounds. A rebounds is when the ball rebounds from the board when a goal is attempted, not getting in the net. If the team that attempted the goal gets possession of the ball, it’s called an “Offensive” rebound. Otherwise, it’s called a “Defensive” Rebound.DR: Defensive ReboundsAst: Assist, a pass that led directly to a goalStl: Steal, when the possession of the ball is stolenPF: Personal Foul, when a player makes a foulFrom there, a dictionary of aggregation expressions was created. Basically, for each column name in the previous list of columns, a function was stored that would calculate the mean of the column, and rename it, by adding a suffix, "mean". from pyspark.sql import functions as F from pyspark.sql.functions import col # select a column agg_exprs = {col: F.mean(col).alias(col + 'mean') for col in boxscore_cols}Then, the data was grouped by "Season" and "T1_TeamID", and the aggregation functions of the previously created dictionary were used as the argument for .agg(). season_statistics = regular_data.groupBy(["Season", "T1_TeamID"]).agg(*agg_exprs.values())Note that the grouping was done by season and the ID of team 1 — this means that "T2_FGAmean", for example, will actually be the mean of the Field Goals Attempted made by the opponents of T1, not necessarily of a specific team. So, we actually need to rename the columns that are something like "T2_FGAmean" to something like "T1_opponent_FGAmean". # Rename columns for T1 for col in boxscore_cols: season_statistics = season_statistics.withColumnRenamed(col + 'mean', 'T1_' + col[3:] + 'mean') if 'T1_' in col \ else season_statistics.withColumnRenamed(col + 'mean', 'T1_opponent_' + col[3:] + 'mean')At this point, it’s important to mention that the regular_data DataFrame actually has two rows per each match that occurred. This is so that both teams can be "T1" and "T2", for each match. This little "trick" is what makes these statistics useful. Note that we “only” have the statistics for “T1”. We “need” the statistics for “T2” as well — “need” in quotations because there are no new statistics being calculated. We just need the same data, but with the columns having different names, so that for a match with “T1” and “T2”, we have statistics for both T1 and T2. So, we created a mirror DataFrame, where, instead of “T1…mean” and “T1_opponent_…mean”, we have “T2…mean” and “T2_opponent_…mean”. This is important because, later on, when we’re joining these regular season statistics to tournament matches, we’ll be able to have statistics for both team 1 and team 2. season_statistics_T2 = season_statistics.select( *[F.col(col).alias(col.replace('T1_opponent_', 'T2_opponent_').replace('T1_', 'T2_')) if col not in ['Season'] else F.col(col) for col in season_statistics.columns] )Now, there are two DataFrames, with season statistics for “both” T1 and T2. Since the final DataFrame will contain the “Season”, the “T1TeamID” and the “T2TeamID”, we can join these newly created features with a join! tourney_df = tourney_df.join(season_statistics, on=['Season', 'T1_TeamID'], how='left') tourney_df = tourney_df.join(season_statistics_T2, on=['Season', 'T2_TeamID'], how='left')Elo RatingsFirst created by Arpad Elo, Elo is a rating system for zero-sum games (games where one player wins and the other loses), like basketball. With the Elo rating system, each team has an Elo rating, a value that generally conveys the team’s quality. At first, every team has the same Elo, and whenever they win, their Elo increases, and when they lose, their Elo decreases. A key characteristic of this system is that this value increases more with a win against a strong opponent than with a win against a weak opponent. Thus, it can be a very useful feature to have! We wanted to capture the Elo rating of a team at the end of the regular season, and use that as feature for the tournament. To do this, we calculated the Elo for each team on a per match basis. To calculate Elo for this feature, we found it more straightforward to use Pandas. Central to Elo is calculating the expected score for each team. It can be described in code like so: # Function to calculate expected score def expected_score(ra, rb): # ra = rating (Elo) team A # rb = rating (Elo) team B # Elo function return 1 / (1 + 10 ** ((rb - ra) / 400))Considering a team A and a team B, this function computes the expected score of team A against team B. For each match, we would update the teams’ Elos. Note that the location of the match also played a part — winning at home was considered less impressive than winning away. # Function to update Elo ratings, keeping T1 and T2 terminology def update_elo(t1_elo, t2_elo, location, T1_Score, T2_Score): expected_t1 = expected_score(t1_elo, t2_elo) expected_t2 = expected_score(t2_elo, t1_elo) actual_t1 = 1 if T1_Score > T2_Score else 0 actual_t2 = 1 - actual_t1 # Determine K based on game location # The larger the K, the bigger the impact # team1 winning at home (location=1) less impressive than winning away (location = -1) if actual_t1 == 1: # team1 won if location == 1: k = 20 elif location == 0: k = 30 else: # location = -1 k = 40 else: # team2 won if location == 1: k = 40 elif location == 0: k = 30 else: # location = -1 k = 20 new_t1_elo = t1_elo + k * (actual_t1 - expected_t1) new_t2_elo = t2_elo + k * (actual_t2 - expected_t2) return new_t1_elo, new_t2_eloTo apply the Elo rating system, we iterated through each season’s matches, initializing teams with a base rating and updating their ratings match by match. The final Elo available for each team in each season will, hopefully, be a good descriptor of the team’s quality. def calculate_elo_through_seasons(regular_data): # For this feature, using Pandas regular_data = regular_data.toPandas() # Set value of initial elo initial_elo = 1500 # DataFrame to collect final Elo ratings final_elo_list = [] for season in sorted(regular_data['Season'].unique()): print(f"Season: {season}") # Initialize elo ratings dictionary elo_ratings = {} print(f"Processing Season: {season}") # Get the teams that played in the season season_teams = set(regular_data[regular_data['Season'] == season]['T1_TeamID']).union(set(regular_data[regular_data['Season'] == season]['T2_TeamID'])) # Initialize season teams' Elo ratings for team in season_teams: if (season, team) not in elo_ratings: elo_ratings[(season, team)] = initial_elo # Update Elo ratings per game season_games = regular_data[regular_data['Season'] == season] for _, row in season_games.iterrows(): t1_elo = elo_ratings[(season, row['T1_TeamID'])] t2_elo = elo_ratings[(season, row['T2_TeamID'])] new_t1_elo, new_t2_elo = update_elo(t1_elo, t2_elo, row['location'], row['T1_Score'], row['T2_Score']) # Only keep the last season rating elo_ratings[(season, row['T1_TeamID'])] = new_t1_elo elo_ratings[(season, row['T2_TeamID'])] = new_t2_elo # Collect final Elo ratings for the season for team in season_teams: final_elo_list.append({'Season': season, 'TeamID': team, 'Elo': elo_ratings[(season, team)]}) # Convert list to DataFrame final_elo_df = pd.DataFrame(final_elo_list) # Separate DataFrames for T1 and T2 final_elo_t1_df = final_elo_df.copy().rename(columns={'TeamID': 'T1_TeamID', 'Elo': 'T1_Elo'}) final_elo_t2_df = final_elo_df.copy().rename(columns={'TeamID': 'T2_TeamID', 'Elo': 'T2_Elo'}) # Convert the pandas DataFrames back to Spark DataFrames final_elo_t1_df = spark.createDataFrame(final_elo_t1_df) final_elo_t2_df = spark.createDataFrame(final_elo_t2_df) return final_elo_t1_df, final_elo_t2_dfIdeally, we wouldn’t calculate Elo changes on a match-by-match basis to determine each team’s final Elo for the season. However, we couldn’t come up with a better approach. Do you have any ideas? If so, let us know! Value AddedThe feature engineering steps demonstrated show how we can transform raw data — regular season statistics — into valuable information with predictive power. It is reasonable to assume that a team’s performance during the regular season is indicative of its potential performance in the final tournaments. By calculating the mean of observed match-by-match statistics for both the teams and their opponents, along with each team’s Elo rating in their final match, we were able to create a dataset suitable for modelling. Then, models were trained to predict the outcome of tournament matches using these features, among others developed in a similar way. With these models, we only need the two team IDs to look up the mean of their regular season statistics and their Elos to feed into the model and predict a score! ConclusionIn this post, we looked at some of the theory behind Spark and PySpark, how that can be applied, and a concrete practical example. We explored how feature engineering can be done in the case of sports data, creating regular season statistics to use as features for final tournament games. Hopefully you’ve found this interesting and helpful — happy feature engineering! The full source code for this post and others in the series can be found here. Originally published at https://nobledynamic.com on April 8, 2024. References[1] Jeff Sonas, Ryan Holbrook, Addison Howard, Anju Kandru. (2024). March Machine Learning Mania 2024. Kaggle. https://kaggle.com/competitions/march-machine-learning-mania-2024 Feature Engineering with Microsoft Fabric and PySpark was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story. View the full article
-
- 1
-
- microsoft fabric
- pyspark
-
(and 1 more)
Tagged with:
-
This post is the second part of our two-part series on the latest performance improvements of stateful pipelines. The first part of this... View the full article
-
- databricks
- pipelines
-
(and 4 more)
Tagged with:
-
Data Engineering Tools in 2024 The data engineering landscape in 2024 is bustling with innovative tools and evolving trends. Here’s an updated perspective on some of the key players and how they can empower your data pipelines: Data Integration: Informatica Cloud: Still a leader for advanced data quality and governance, with enhanced cloud-native capabilities. MuleSoft Anypoint Platform: Continues to shine in building API-based integrations, now with deeper cloud support and security features. Fivetran: Expands its automated data pipeline creation with pre-built connectors and advanced transformations. Hevo Data: Remains a strong contender for ease of use and affordability, now offering serverless options for scalability. Data Warehousing: Snowflake: Maintains its edge in cloud-based warehousing, with improved performance and broader integrations for analytics. Google BigQuery: Offers even more cost-effective options for variable workloads, while deepening its integration with other Google Cloud services. Amazon Redshift: Continues to be a powerful choice for AWS environments, now with increased focus on security and data governance. Microsoft Azure Synapse Analytics: Further integrates its data warehousing, lake, and analytics capabilities, providing a unified platform for diverse data needs. Data Processing and Orchestration: Apache Spark: Remains the reigning champion for large-scale data processing, now with enhanced performance optimizations and broader ecosystem support. Apache Airflow: Maintains its popularity for workflow orchestration, with improved scalability and user-friendliness. Databricks: Expands its cloud-based platform for Spark with advanced features like AI integration and real-time streaming. AWS Glue: Simplifies data processing and ETL within the AWS ecosystem, now with serverless options for cost efficiency. Emerging Trends: GitOps: Gaining traction for managing data pipelines with version control and collaboration, ensuring consistency and traceability. AI and Machine Learning: Increasingly integrated into data engineering tools for automation, anomaly detection, and data quality improvement. Serverless Data Processing: Offering cost-effective and scalable options for event-driven and real-time data processing. Choosing the right tools: With this diverse landscape, selecting the right tools depends on your specific needs. Consider factors like: Data volume and complexity: Match tool capabilities to your data size and structure. Cloud vs. on-premises: Choose based on your infrastructure preferences and security requirements. Budget: Evaluate pricing models and potential costs associated with each tool. Integration needs: Ensure seamless compatibility with your existing data sources and BI tools. Skillset: Consider the technical expertise required for each tool and available support resources. By carefully evaluating your needs and exploring the strengths and limitations of these top contenders, you’ll be well-equipped to choose the data engineering tools that empower your organization to unlock valuable insights from your data in 2024. The post Data Engineering Tools in 2024 appeared first on DevOpsSchool.com. View the full article
-
- 1
-
- snowflake
- databricks
- (and 9 more)
-
We’re super excited to announce that we have shipped the first release of our solution for big data – Charmed Spark. Charmed Spark packages a supported distribution of Apache Spark and optimises it for deployment to Kubernetes, which is where most of the industry is moving these days... View the full article
-
Apache Spark revolutionized big data processing with its distributed computing capabilities, which enabled efficient data processing at scale. It offers the flexibility to run on traditional Central Processing Unit (CPUs) as well as specialized Graphic Processing Units (GPUs), which provides distinct advantages for various workloads. As the demand for faster and more efficient machine learning (ML) workloads grows, specialized hardware acceleration becomes crucial. This is where NVIDIA GPUs and Compute Unified Device Architecture (CUDA) come into the picture. To further enhance the capabilities of NVIDIA GPUs within the Spark ecosystem, NVIDIA developed Spark-RAPIDS. Spark-RAPIDS is an extension library that uses RAPIDS libraries built on CUDA, to enable high-performance data processing and ML training on GPUs. By combining the distributed computing framework of Spark with the parallel processing power of GPUs, Spark-RAPIDS significantly improves the speed and efficiency of analytics and ML workloads... View the full article
-
I’ve read and watched more than a few articles about ChatGPT in the last couple of months. It seems the large language model AI hype machine just can’t stop. As somebody with a passion for music production, some of the more interesting things I’ve seen included a guy using ChatGPT to build a virtual effect plugin for his DAW (digital audio workstation) that emulates an Ibanez Tube Screamer guitar effects pedal, and this video about getting ChatGPT to write MIDI music scores using Python notebooks. As I’m working on bringing to market a solution for running Spark on Kubernetes, it got me thinking… May the prompt be with you Can I get ChatGPT to output a Spark job? Well there’s only one way to find out so I signed up for a ChatGPT account over at OpenAI and fired up a prompt. Feeling a bit like a naughty hacker, I was in. I typed in my command: Write a pyspark job that ranks Linux distributions by popularity based on issues reported on stackoverflow And the output immediately began spewing down my screen. But now the $64k question. Will it work? Examining the output, it won’t work, because ChatGPT hasn’t provided us with code to scrape StackOverflow.com for the information we need. Let’s see: Write a pyspark job to scrape Stackoverflow for the Linux distribution issue report data used as input to the previous job ChatGPT comes back with a python script (not a PySpark job, but OK) to scrape StackOverflow.com. So I fired up an editor and pasted it in. Perhaps needless to say, but StackOverflow seems to have changed its HTML layout template since the last time ChatGPT was trained, because the Python script didn’t work out of the box, and tweaks were needed. When I was a kid in the early 1980s, publishers would sell computer magazines and books with code listings for games in BASIC that you could program into your ZX Spectrum yourself. Alas they were always full of bugs and would never run first time, and due to the unusual way code had to be input on a Spectrum, this usually meant spending a fair few hours inputting the commands before finding out. I’m getting the feeling that ChatGPT might be going the same way. Better get a cup of tea and a biscuit, I feel this is going to be a session. ChatGPT vs hand edited script Ok, nice try ChatGPT but this is going to need a bit of tweaking. I needed to change the target HTML entities and CSS classes that the script needs to find and process (and lightly restructure things). I’m able to scrape the data I need from StackOverflow. Here’s the original and the adapted code listings. Original web scraper listing from ChatGPT Corrected web scraper listing Time to make some parallel, distributed sparks fly Alright, so now we have the data we need, will that PySpark job that ChatGPT made us actually work? Let’s give it a whirl. Well immediately, it won’t work because the fields in the CSV have different names from what the job expects. But that’s an easy tweak. Here’s ChatGPT’s listing, but adapted for my needs. That wasn’t as bad as I feared. Adapted ChatGPT output PySpark job The result Drumroll please, time to find out which is the most popular distro: Of course no surprises: it’s Ubuntu that gets the most questions, because it’s Ubuntu that gets the most use. In the end there were quite a few changes that I needed to make to get a working job, but clearly there’s potential for this technology, especially if you’re new to Spark and data engineering in general – it can give you a starter job quite quickly, but expect to make changes. If you’re interested in Spark… You might like to check out our Charmed Spark solution for running Spark on Kubernetes. We recently shipped the Beta and are looking for feedback. To get started, visit the Charmed Spark documentation pages and install the spark-client snap. Let us know what you think at https://chat.charmhub.io/charmhub/channels/data-platform or file bug reports and feature requests in Github. View the full article
-
Amazon Athena for Apache Spark now supports open-source data lake storage frameworks Apache Hudi 0.13, Apache Iceberg 1.2.1, and Linux Foundation Delta Lake 2.0.2. These frameworks simplify incremental data processing of large data sets using ACID (atomicity, consistency, isolation, durability) transactions and make it simpler to store and process large data sets in your data lakes. View the full article
-
- delta lake
- athena
-
(and 4 more)
Tagged with:
-
We are thrilled to introduce Data on EKS (DoEKS), a new open-source project aimed at streamlining and accelerating the process of building, deploying, and scaling data workloads on Amazon Elastic Kubernetes Service (Amazon EKS). With DoEKS, customers get access to a comprehensive range of resources including Infrastructure as Code (IaC) templates, performance benchmark reports, deployment examples, and architectures optimized for data-centric workloads aligned with AWS best practices and industry expertise. This means that customers can quickly and easily provision popular open-source data frameworks (e.g., Apache Spark, Ray, Apache Airflow, Argo Workflows, and Kubeflow) to run on Amazon EKS. Additionally, DoEKS areas of focus include distributed streaming platforms, query engines, and databases to meet the growing demands of data processing. DoEKS blueprints are made with managed AWS services and popular open-source tools to provide customers flexibility to choose the right combination of managed and self-managed components to suit their needs. For example, DoEKS includes several blueprints with Amazon EMR on EKS so customers can take advantage of optimized features like automated provisioning, scaling, faster runtimes, and debugging tools that Amazon EMR provides for running Spark applications... View the full article
-
- eks
- data on eks
-
(and 6 more)
Tagged with:
-
AWS Glue for Apache Spark now supports three open source data lake storage frameworks: Apache Hudi, Apache Iceberg, and Linux Foundation Delta Lake. These frameworks allow you to read and write data in Amazon Simple Storage Service (Amazon S3) in a transactionally consistent manner. AWS Glue is a serverless, scalable data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources. This feature removes the need to install a separate connector and reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark jobs. View the full article
-
We are excited to launch two new features that help enforce access controls with Amazon EMR on EC2 clusters (EMR Clusters). These features are supported with jobs that are submitted to the cluster using the EMR Steps API. First is Runtime Role with EMR Steps. A Runtime Role is an AWS Identity and Access Management (IAM) role that you associate with an EMR Step. An EMR Step uses this role to access AWS resources. The second is integration with AWS Lake Formation to apply table and column-level access controls for Apache Spark and Apache Hive jobs with EMR Steps. View the full article
-
- iam
- lake formation
-
(and 7 more)
Tagged with:
-
Amazon EMR release 6.6 now supports Apache Spark 3.2, Apache Spark RAPIDS 22.02, CUDA 11, Apache Hudi 0.10.1, Apache Iceberg 0.13, Trino 0.367, and PrestoDB 0.267. You can use the performance-optimized version of Apache Spark 3.2 on EMR on EC2, EKS, and recently released EMR Serverless. In addition Apache Hudi 0.10.1 and Apache Iceberg 0.13 are available on EC2, EKS, and Serverless. Apache Hive 3.1.2 is available on EMR on EC2 and EMR Serverless. Trino 0.367 and PrestoDB 0.267 are only available on EMR on EC2. View the full article
-
Introduction Have you ever wondered if there are low-hanging optimization opportunities to improve the performance of a Spark app? Profiling can help you gain visibility regarding the runtime characteristics of the Spark app to identify its bottlenecks and inefficiencies. We’re excited to announce the release of a new Spark plugin that enables profiling for JVM based Spark apps via Amazon CodeGuru. The plugin is open sourced on GitHub and published to Maven. Walkthrough This post shows how you can onboard this plugin with two steps in under 10 minutes. Step 1: Create a profiling group in Amazon CodeGuru Profiler and grant permission to your Amazon EMR on EC2 role, so that profiler agents can emit metrics to CodeGuru. Detailed instructions can be found here. Step 2: Reference codeguru-profiler-for-spark when submitting your Spark job, along with PROFILING_CONTEXT and ENABLE_AMAZON_PROFILER defined. View the full article
-
Amazon EMR on Amazon EKS provides a new deployment option for Amazon EMR that allows you to run Apache Spark on Amazon Elastic Kubernetes Service (Amazon EKS). If you already use Amazon EMR, you can now run Amazon EMR based applications with other types of applications on the same Amazon EKS cluster to improve resource utilization and simplify infrastructure management across multiple AWS Availability Zones. If you already run big data frameworks on Amazon EKS, you can now use Amazon EMR to automate provisioning and management, and run Apache Spark up to 3x faster. With this deployment option, you can focus on running analytics workloads while Amazon EMR on Amazon EKS builds, configures, and manages containers. View the full article
-
We’re excited to announce Amazon SageMaker now supports Apache Spark as a pre-built big data processing container. You can now use this container with Amazon SageMaker Processing and take advantage of a fully managed Spark environment for data processing or feature engineering workloads. View the full article
-
Amazon EMR now supports Amazon EC2 M6g, C6g and R6g instances with EMR Versions 6.1.0, 5.31.0 and later. These instances are powered by AWS Graviton2 processors that are custom designed by AWS utilizing 64-bit ArmNeoverse cores to deliver the best price performance for cloud workloads running in Amazon EC2. Please read our blog for more information. View the full article
-
Errors in Spark applications commonly arise from inefficient Spark scripts, distributed in-memory execution of large-scale transformations, and dataset abnormalities. AWS Glue workload partitioning is the newest offering from AWS Glue to address these issues and improve the reliability of Spark applications and consistency of run-time. Workload partitioning enables you to specify how much data to process in each job-run and, using AWS Glue job bookmarks, track how much of the data AWS Glue processed. View the full article
-
Apache Hive supports transactional tables which provide ACID guarantees. There has been a significant amount of work that has gone into hive to make these transactional tables highly performant. Apache Spark provides some capabilities to access hive external tables but it cannot access hive managed tables. To access hive managed tables from spark Hive Warehouse Connector needs to be used. We are happy to announce Spark Direct Reader mode in Hive Warehouse Connector which can read hive transactional tables directly from the filesystem. This feature has been available from CDP-Public-Cloud-2.0 (7.2.0.0) and CDP-DC-7.1 (7.1.1.0) releases onwards. Hive Warehouse Connector (HWC) was available to provide access to managed tables in hive from spark, however since this involved communication with LLAP there was an additional hop to get the data and process it in spark vs the ability of spark to directly read the data from FileSystem for External tables. This leads to performance degradation in accessing data from managed tables vs external tables. Additionally a lot of use cases for HWC were associated with ETL jobs where a super user was running these jobs to update data in multiple tables hence authorization was not a strong business need for this case. HWC Spark Direct Reader is an additional mode available in HWC which tries to address the above concerns. This article describes the usage of spark direct reader to consume hive transactional table data in a spark application. It also introduces the methods and APIs to read hive transactional tables into spark dataframes. Finally, it demonstrates the transaction handling and semantics while using this reader. HWC Spark Direct Reader is derived from Qubole Spark Acid Connector. Prerequisites Following are the prerequisites to be able to query hive managed tables from Spark Direct Reader – Connectivity to HMS (Hive Metastore) which means the spark application should be able to access hive metastore using thrift URI. This URI is determined by hive config hive.metastore.uris The User launching spark application must have Read and Execute permissions on hive warehouse location on the filesystem. The location is determined by hive config hive.metastore.warehouse.dir Use cases This section focuses on the usage of the Spark Direct Reader to read transactional tables. Consider that we have a hive transactional table emp_acid which contains information about the employees. With auto-translate extension enabled The connector has an auto-translate rule which is a spark extension rule which automatically instructs spark to use spark direct reader in case of managed tables so that the user does not need to specify it explicitly. See employee data in table emp_acid scala> spark.sql("select * from emp_acid").show +------+----------+--------------------+-------------+--------------+-----+-----+-------+ |emp_id|first_name| e_mail|date_of_birth| city|state| zip|dept_id| +------+----------+--------------------+-------------+--------------+-----+-----+-------+ |677509| Lois|lois.walker@hotma… | 3/29/1981| Denver | CO|80224| 4| |940761| Brenda|brenda.robinson@g...| 7/31/1970| Stonewall | LA |71078| 5| |428945| Joe|joe.robinson@gmai… | 6/16/1963| Michigantown| IN |46057| 3| ………. ………. ………. Using transactional tables in conjunction with other data sources Spark direct reader works seamlessly with other data sources as well, like in the below example we are joining emp_acid table with an external table dept_ext to find out corresponding departments of employees. scala> sql("select e.emp_id, e.first_name, d.name department from emp_acid e join dept_ext d on e.dept_id = d.id").show +------+----------+-----------+ |emp_id|first_name| department| +------+----------+-----------+ |677509| Lois | HR | |940761| Brenda| FINANCE| |428945| Joe | ADMIN | Here direct reader is used to fetch the data of emp_acid table since it’s transactional table, the data of dept_ext table is fetched by spark’s native reader scala> sql("select e.emp_id, e.first_name, d.name department from emp_acid e join dept_ext d on e.dept_id = d.id").explain == Physical Plan == *(2) Project [emp_id#288, first_name#289, name#287 AS department#255] +- *(2) BroadcastHashJoin [dept_id#295], [id#286], Inner, BuildRight :- *(2) Filter isnotnull(dept_id#295) : +- *(2) Scan HiveAcidRelation(org.apache.spark.sql.SparkSession@1444fa42,default.emp_acid,Map(transactional -> true, numFilesErasureCoded -> 0, bucketing_version -> 2, transient_lastDdlTime -> 1594830632, transactional_properties -> default, table -> default.emp_acid)) [emp_id#288,first_name#289,dept_id#295] PushedFilters: [IsNotNull(dept_id)], ReadSchema: struct<emp_id:int,first_name:string,dept_id:int> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))), [id=#169] +- *(1) Project [id#286, name#287] +- *(1) Filter isnotnull(id#286) +- *(1) FileScan orc default.dept_ext[id#286,name#287] Batched: true, Format: ORC, Location: InMemoryFileIndex[hdfs://anurag-hwc-1.anurag-hwc.root.hwx.site:8020/warehouse/tablespace/external..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:int,name:string> Configurations to enable auto-translate To turn on auto translate feature, we need to specify spark sql extension like spark.sql.extensions=com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension Using Hive Warehouse Connector executeQuery() API If you are already using Hive Warehouse Connector in your spark application then you can continue to use executeQuery() API and switch to Spark Direct Reader just by adding some configurations. Code is similar to what we need to use with Hive Warehouse Connector. Queries shown in sections 3.1.1 and 3.1.2 can be done like the following. scala> val hive = com.hortonworks.hwc.HiveWarehouseSession.session(spark).build() scala> hive.executeQuery("select * from emp_acid").show scala> hive.executeQuery("select e.emp_id, e.first_name, d.name department from emp_acid e join dept_ext d on e.dept_id = d.id").show Configurations to use Spark Direct Reader via Hive Warehouse Connector API spark.datasource.hive.warehouse.read.via.llap=false spark.sql.hive.hwc.execution.mode=spark spark.sql.extensions=com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension Behind the scenes – Read Architecture Spark Direct Reader is built on top DataSource V1 APIs exposed by spark which allows us to plug in custom data sources to spark. In our case the custom data source is the layer which enables us to read hive ACID tables. Following diagram depicts the high level read process and transaction management in connector Spark Driver parses the query and for each ACID table starts a read txn. Transaction snapshot for each table is stored separately and is used for generating the split. Spark driver serializes and sends the partition info and txn snapshot to executors. Executors read the specific split using the transaction snapshot. Processed and transformed data is sent back to the driver. Driver commits the read transactions started. Note on transactions Currently the connector only supports single table transaction consistency i.e. one new transaction is opened for each table involved in query. This means if multiple tables are involved in the query then all may not use the same snapshot of data. Transactions when single table t1 is involved scala> spark.sql(“select * from t1”).show 20/07/08 05:41:39 INFO transaction.HiveAcidTxn: Begin transaction {"id":"174","validTxns":"174:9223372036854775807::"} 20/07/08 05:41:39 INFO transaction.HiveAcidTxn: Lock taken for lockInfo com.qubole.spark.hiveacid.transaction.LockInfo@37f5c5fd in transaction with id 174 .... .... 20/07/08 05:41:47 INFO transaction.HiveAcidTxn: End transaction {"id":"174","validTxns":"174:9223372036854775807::"} abort = false Transactions when multiple tables t3 and t4 are involved scala> spark.sql("select * from default.t3 join default.t4 on default.t3.a = default.t4.a").show 20/07/08 05:43:36 INFO transaction.HiveAcidTxn: Begin transaction {"id":"175","validTxns":"175:9223372036854775807::"} 20/07/08 05:43:36 INFO transaction.HiveAcidTxn: Lock taken for lockInfo com.qubole.spark.hiveacid.transaction.LockInfo@67cc7aee in transaction with id 175 .... .... 20/07/08 05:43:36 INFO transaction.HiveAcidTxn: Begin transaction {"id":"176","validTxns":"176:175:175:"} 20/07/08 05:43:36 INFO transaction.HiveAcidTxn: Lock taken for lockInfo com.qubole.spark.hiveacid.transaction.LockInfo@a8c0c6d in transaction with id 176 .... .... 20/07/08 05:43:53 INFO transaction.HiveAcidTxn: End transaction {"id":"175","validTxns":"175:9223372036854775807::"} abort = false 20/07/08 05:43:53 INFO transaction.HiveAcidTxn: End transaction {"id":"176","validTxns":"176:175:175:"} abort = false Notice different transactions 175 and 176 when two different tables t3 and t4 are present in the query. API to close transactions explicitly To commit or abort transactions, we have a sql listener which does it whenever a dataframe operation or spark sql query finishes. In some cases when .explain() / .rdd() / .cache() are invoked on a dataframe, it opens a transaction and never closes it since technically they are not spark sql queries so the sql listener does not kick in. To handle this scenario and to be able to close the transactions manually, an explicit API is exposed which can be invoked like the following. scala> com.qubole.spark.hiveacid.transaction.HiveAcidTxnManagerObject.commitTxn(spark) Or if you are using Hive Warehouse Connector’s session (say ‘hive’ is the instance) scala> hive.commitTxn Configuration Summary To use Spark Direct Reader, we need the following configurations. Property Value Description spark.hadoop.hive.metastore.uris thrift://<host>:<port> Hive metastore URI spark.sql.extensions com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension Extension needed to auto-translate to work spark.kryo.registrator com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator For using kryo serialization. spark.sql.hive.hwc.execution.mode spark spark.datasource.hive.warehouse.read.via.llap false Hive Warehouse connector jar should be supplied to spark-shell or spark-submit using –jars option while launching the application. For instance, spark-shell can be launched like the following. spark-shell --jars /opt/cloudera/parcels/CDH/lib/hive_warehouse_connector/hive-warehouse-connector-assembly-<version>.jar \ --conf "spark.sql.extensions=com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension" \ --conf "spark.datasource.hive.warehouse.read.via.llap=false" \ --conf "spark.sql.hive.hwc.execution.mode=spark" \ --conf "spark.kryo.registrator=com.qubole.spark.hiveacid.util.HiveAcidKyroRegistrator" \ --conf "spark.hadoop.hive.metastore.uris=<metastore_uri>" Further Information and Resources Cloudera Data Warehouse HWC Spark Direct Reader for accessing Hive data Integrating Apache Hive with Apache Spark The post Enabling high-speed Spark direct reader for Apache Hive ACID tables appeared first on Cloudera Blog. View the full article
-
Forum Statistics
67.4k
Total Topics65.3k
Total Posts