Jump to content

Search the Community

Showing results for tags 'pyspark'.

  • Search By Tags

    Type tags separated by commas.
  • Search By Author

Content Type


Forums

  • General
    • General Discussion
    • Artificial Intelligence
    • DevOpsForum News
  • DevOps & SRE
    • DevOps & SRE General Discussion
    • Databases, Data Engineering & Data Science
    • Development & Programming
    • CI/CD, GitOps, Orchestration & Scheduling
    • Docker, Containers, Microservices, Serverless & Virtualization
    • Infrastructure-as-Code
    • Kubernetes & Container Orchestration
    • Linux
    • Logging, Monitoring & Observability
    • Security, Governance, Risk & Compliance
  • Cloud Providers
    • Amazon Web Services
    • Google Cloud Platform
    • Microsoft Azure

Find results in...

Find results that contain...


Date Created

  • Start

    End


Last Updated

  • Start

    End


Filter by number of...

Joined

  • Start

    End


Group


Website URL


LinkedIn Profile URL


About Me


Cloud Platforms


Cloud Experience


Development Experience


Current Role


Skills


Certifications


Favourite Tools


Interests

Found 12 results

  1. Fabric Madness part 2Image by author and ChatGPT. “Design an illustration, focusing on a basketball player in action, this time the theme is on using pyspark to generate features for machine leaning models in a graphic novel style” prompt. ChatGPT, 4, OpenAI, 4 April. 2024. https://chat.openai.com.A Huge thanks to Martim Chaves who co-authored this post and developed the example scripts. In our previous post we took a high level view of how to train a machine learning model in Microsoft Fabric. In this post we wanted to dive deeper into the process of feature engineering. Feature engineering is a crucial part of the development lifecycle for any Machine Learning (ML) systems. It is a step in the development cycle where raw data is processed to better represent its underlying structure and provide additional information that enhance our ML models. Feature engineering is both an art and a science. Even though there are specific steps that we can take to create good features, sometimes, it is only through experimentation that good results are achieved. Good features are crucial in guaranteeing a good system performance. As datasets grow exponentially, traditional feature engineering may struggle with the size of very large datasets. This is where PySpark can help — as it is a scalable and efficient processing platform for massive datasets. A great thing about Fabric is that it makes using PySpark easy! In this post, we’ll be going over: How does PySpark Work?Basics of PySparkFeature Engineering in ActionBy the end of this post, hopefully you’ll feel comfortable carrying out feature engineering with PySpark in Fabric. Let’s get started! How does PySpark work?Spark is a distributed computing system that allows for the processing of large datasets with speed and efficiency across a cluster of machines. It is built around the concept of a Resilient Distributed Dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. RDDs are the fundamental data structure of Spark, and they allow for the distribution of data across a cluster of machines. PySpark is the Python API for Spark. It allows for the creation of Spark DataFrames, which are similar to Pandas DataFrames, but with the added benefit of being distributed across a cluster of machines. PySpark DataFrames are the core data structure in PySpark, and they allow for the manipulation of large datasets in a distributed manner. At the core of PySpark is the SparkSession object, which is what fundamentally interacts with Spark. This SparkSession is what allows for the creation of DataFrames, and other functionalities. Note that, when running a Notebook in Fabric, a SparkSession is automatically created for you, so you don't have to worry about that. Having a rough idea of how PySpark works, let’s get to the basics. Basics of PySparkAlthough Spark DataFrames may remind us of Pandas DataFrames due to their similarities, the syntax when using PySpark can be a bit different. In this section, we’ll go over some of the basics of PySpark, such as reading data, combining DataFrames, selecting columns, grouping data, joining DataFrames, and using functions. The DataThe data we are looking at is from the 2024 US college basketball tournaments, which was obtained from the on-going March Machine Learning Mania 2024 Kaggle competition, the details of which can be found here, and is licensed under CC BY 4.0 [1] Reading dataAs mentioned in the previous post of this series, the first step is usually to create a Lakehouse and upload some data. Then, when creating a Notebook, we can attach it to the created Lakehouse, and we’ll have access to the data stored there. PySpark Dataframes can read various data formats, such as CSV, JSON, Parquet, and others. Our data is stored in CSV format, so we’ll be using that, like in the following code snippet: # Read women's data w_data = ( spark.read.option("header", True) .option("inferSchema", True) .csv(f"Files/WNCAATourneyDetailedResults.csv") .cache() )In this code snippet, we’re reading the detailed results data set of the final women’s basketball college tournament matches. Note that the "header" option being true means that the names of the columns will be derived from the first row of the CSV file. The inferSchema option tells Spark to guess the data types of the columns - otherwise they would all be read as strings. .cache() is used to keep the DataFrame in memory. If you’re coming from Pandas, you may be wondering what the equivalent of df.head() is for PySpark - it's df.show(5). The default for .show() is the top 20 rows, hence the need to specifically select 5. Combining DataFramesCombining DataFrames can be done in multiple ways. The first we will look at is a union, where the columns are the same for both DataFrames: # Read women's data ... # Read men's data m_data = ( spark.read.option("header", True) .option("inferSchema", True) .csv(f"Files/MNCAATourneyDetailedResults.csv") .cache() ) # Combine (union) the DataFrames combined_results = m_data.unionByName(w_data)Here, unionByName joins the two DataFrames by matching the names of the columns. Since both the women's and the men's detailed match results have the same columns, this is a good approach. Alternatively, there's also union, which combines two DataFrames, matching column positions. Selecting ColumnsSelecting columns from a DataFrame in PySpark can be done using the .select() method. We just have to indicate the name or names of the columns that are relevant as a parameter. Here’s the output for w_scores.show(5): # Selecting a single column w_scores = w_data.select("WScore") # Selecting multiple columns teamid_w_scores = w_data.select("WTeamID", "WScore") ``` Here's the output for `w_scores.show(5)`: ``` +------+ |Season| +------+ | 2010| | 2010| | 2010| | 2010| | 2010| +------+ only showing top 5 rowsThe columns can also be renamed when being selected using the .alias() method: winners = w_data.select( w_data.WTeamID.alias("TeamID"), w_data.WScore.alias("Score") )Grouping DataGrouping allows us to carry out certain operations for the groups that exist within the data and is usually combined with a aggregation functions. We can use .groupBy() for this: # Grouping and aggregating winners_average_scores = winners.groupBy("TeamID").avg("Score")In this example, we are grouping by "TeamID", meaning we're considering the groups of rows that have a distinct value for "TeamID". For each of those groups, we're calculating the average of the "Score". This way, we get the average score for each team. Here’s the output of winners_average_scores.show(5), showing the average score of each team: +------+-----------------+ |TeamID| avg(Score)| +------+-----------------+ | 3125| 68.5| | 3345| 74.2| | 3346|79.66666666666667| | 3376|73.58333333333333| | 3107| 61.0| +------+-----------------+Joining DataJoining two DataFrames can be done using the .join() method. Joining is essentially extending the DataFrame by adding the columns of one DataFrame to another. # Joining on Season and TeamID final_df = matches_df.join(stats_df, on=['Season', 'TeamID'], how='left')In this example, both stats_df and matches_df were using Season and TeamID as unique identifiers for each row. Besides Season and TeamID, stats_df has other columns, such as statistics for each team during each season, whereas matches_df has information about the matches, such as date and location. This operation allows us to add those interesting statistics to the matches information! FunctionsThere are several functions that PySpark provides that help us transform DataFrames. You can find the full list here. Here’s an example of a simple function: from pyspark.sql import functions as F w_data = w_data.withColumn("HighScore", F.when(F.col("Score") > 80, "Yes").otherwise("No"))In the code snippet above, a "HighScore" column is created when the score is higher than 80. For each row in the "Score" column (indicated by the .col() function), the value "Yes" is chosen for the "HighScore" column if the "Score" value is larger than 80, determined by the .when() function. .otherwise(), the value chosen is "No". Feature Engineering in ActionNow that we have a basic understanding of PySpark and how it can be used, let’s go over how the regular season statistics features were created. These features were then used as inputs into our machine learning model to try to predict the outcome of the final tournament games. The starting point was a DataFrame, regular_data, that contained match by match statistics for the regular seasons, which is the United States College Basketball Season that happens from November to March each year. Each row in this DataFrame contained the season, the day the match was held, the ID of team 1, the ID of team 2, and other information such as the location of the match. Importantly, it also contained statistics for each team for that specific match, such as "T1_FGM", meaning the Field Goals Made (FGM) for team 1, or "T2_OR", meaning the Offensive Rebounds (OR) of team 2. The first step was selecting which columns would be used. These were columns that strictly contained in-game statistics. # Columns that we'll want to get statistics from boxscore_cols = [ 'T1_FGM', 'T1_FGA', 'T1_FGM3', 'T1_FGA3', 'T1_OR', 'T1_DR', 'T1_Ast', 'T1_Stl', 'T1_PF', 'T2_FGM', 'T2_FGA', 'T2_FGM3', 'T2_FGA3', 'T2_OR', 'T2_DR', 'T2_Ast', 'T2_Stl', 'T2_PF' ]If you’re interested, here’s what each statistic’s code means: FGM: Field Goals MadeFGA: Field Goals AttemptedFGM3: Field Goals Made from the 3-point-lineFGA3: Field Goals Attempted for 3-point-line goalsOR: Offensive Rebounds. A rebounds is when the ball rebounds from the board when a goal is attempted, not getting in the net. If the team that attempted the goal gets possession of the ball, it’s called an “Offensive” rebound. Otherwise, it’s called a “Defensive” Rebound.DR: Defensive ReboundsAst: Assist, a pass that led directly to a goalStl: Steal, when the possession of the ball is stolenPF: Personal Foul, when a player makes a foulFrom there, a dictionary of aggregation expressions was created. Basically, for each column name in the previous list of columns, a function was stored that would calculate the mean of the column, and rename it, by adding a suffix, "mean". from pyspark.sql import functions as F from pyspark.sql.functions import col # select a column agg_exprs = {col: F.mean(col).alias(col + 'mean') for col in boxscore_cols}Then, the data was grouped by "Season" and "T1_TeamID", and the aggregation functions of the previously created dictionary were used as the argument for .agg(). season_statistics = regular_data.groupBy(["Season", "T1_TeamID"]).agg(*agg_exprs.values())Note that the grouping was done by season and the ID of team 1 — this means that "T2_FGAmean", for example, will actually be the mean of the Field Goals Attempted made by the opponents of T1, not necessarily of a specific team. So, we actually need to rename the columns that are something like "T2_FGAmean" to something like "T1_opponent_FGAmean". # Rename columns for T1 for col in boxscore_cols: season_statistics = season_statistics.withColumnRenamed(col + 'mean', 'T1_' + col[3:] + 'mean') if 'T1_' in col \ else season_statistics.withColumnRenamed(col + 'mean', 'T1_opponent_' + col[3:] + 'mean')At this point, it’s important to mention that the regular_data DataFrame actually has two rows per each match that occurred. This is so that both teams can be "T1" and "T2", for each match. This little "trick" is what makes these statistics useful. Note that we “only” have the statistics for “T1”. We “need” the statistics for “T2” as well — “need” in quotations because there are no new statistics being calculated. We just need the same data, but with the columns having different names, so that for a match with “T1” and “T2”, we have statistics for both T1 and T2. So, we created a mirror DataFrame, where, instead of “T1…mean” and “T1_opponent_…mean”, we have “T2…mean” and “T2_opponent_…mean”. This is important because, later on, when we’re joining these regular season statistics to tournament matches, we’ll be able to have statistics for both team 1 and team 2. season_statistics_T2 = season_statistics.select( *[F.col(col).alias(col.replace('T1_opponent_', 'T2_opponent_').replace('T1_', 'T2_')) if col not in ['Season'] else F.col(col) for col in season_statistics.columns] )Now, there are two DataFrames, with season statistics for “both” T1 and T2. Since the final DataFrame will contain the “Season”, the “T1TeamID” and the “T2TeamID”, we can join these newly created features with a join! tourney_df = tourney_df.join(season_statistics, on=['Season', 'T1_TeamID'], how='left') tourney_df = tourney_df.join(season_statistics_T2, on=['Season', 'T2_TeamID'], how='left')Elo RatingsFirst created by Arpad Elo, Elo is a rating system for zero-sum games (games where one player wins and the other loses), like basketball. With the Elo rating system, each team has an Elo rating, a value that generally conveys the team’s quality. At first, every team has the same Elo, and whenever they win, their Elo increases, and when they lose, their Elo decreases. A key characteristic of this system is that this value increases more with a win against a strong opponent than with a win against a weak opponent. Thus, it can be a very useful feature to have! We wanted to capture the Elo rating of a team at the end of the regular season, and use that as feature for the tournament. To do this, we calculated the Elo for each team on a per match basis. To calculate Elo for this feature, we found it more straightforward to use Pandas. Central to Elo is calculating the expected score for each team. It can be described in code like so: # Function to calculate expected score def expected_score(ra, rb): # ra = rating (Elo) team A # rb = rating (Elo) team B # Elo function return 1 / (1 + 10 ** ((rb - ra) / 400))Considering a team A and a team B, this function computes the expected score of team A against team B. For each match, we would update the teams’ Elos. Note that the location of the match also played a part — winning at home was considered less impressive than winning away. # Function to update Elo ratings, keeping T1 and T2 terminology def update_elo(t1_elo, t2_elo, location, T1_Score, T2_Score): expected_t1 = expected_score(t1_elo, t2_elo) expected_t2 = expected_score(t2_elo, t1_elo) actual_t1 = 1 if T1_Score > T2_Score else 0 actual_t2 = 1 - actual_t1 # Determine K based on game location # The larger the K, the bigger the impact # team1 winning at home (location=1) less impressive than winning away (location = -1) if actual_t1 == 1: # team1 won if location == 1: k = 20 elif location == 0: k = 30 else: # location = -1 k = 40 else: # team2 won if location == 1: k = 40 elif location == 0: k = 30 else: # location = -1 k = 20 new_t1_elo = t1_elo + k * (actual_t1 - expected_t1) new_t2_elo = t2_elo + k * (actual_t2 - expected_t2) return new_t1_elo, new_t2_eloTo apply the Elo rating system, we iterated through each season’s matches, initializing teams with a base rating and updating their ratings match by match. The final Elo available for each team in each season will, hopefully, be a good descriptor of the team’s quality. def calculate_elo_through_seasons(regular_data): # For this feature, using Pandas regular_data = regular_data.toPandas() # Set value of initial elo initial_elo = 1500 # DataFrame to collect final Elo ratings final_elo_list = [] for season in sorted(regular_data['Season'].unique()): print(f"Season: {season}") # Initialize elo ratings dictionary elo_ratings = {} print(f"Processing Season: {season}") # Get the teams that played in the season season_teams = set(regular_data[regular_data['Season'] == season]['T1_TeamID']).union(set(regular_data[regular_data['Season'] == season]['T2_TeamID'])) # Initialize season teams' Elo ratings for team in season_teams: if (season, team) not in elo_ratings: elo_ratings[(season, team)] = initial_elo # Update Elo ratings per game season_games = regular_data[regular_data['Season'] == season] for _, row in season_games.iterrows(): t1_elo = elo_ratings[(season, row['T1_TeamID'])] t2_elo = elo_ratings[(season, row['T2_TeamID'])] new_t1_elo, new_t2_elo = update_elo(t1_elo, t2_elo, row['location'], row['T1_Score'], row['T2_Score']) # Only keep the last season rating elo_ratings[(season, row['T1_TeamID'])] = new_t1_elo elo_ratings[(season, row['T2_TeamID'])] = new_t2_elo # Collect final Elo ratings for the season for team in season_teams: final_elo_list.append({'Season': season, 'TeamID': team, 'Elo': elo_ratings[(season, team)]}) # Convert list to DataFrame final_elo_df = pd.DataFrame(final_elo_list) # Separate DataFrames for T1 and T2 final_elo_t1_df = final_elo_df.copy().rename(columns={'TeamID': 'T1_TeamID', 'Elo': 'T1_Elo'}) final_elo_t2_df = final_elo_df.copy().rename(columns={'TeamID': 'T2_TeamID', 'Elo': 'T2_Elo'}) # Convert the pandas DataFrames back to Spark DataFrames final_elo_t1_df = spark.createDataFrame(final_elo_t1_df) final_elo_t2_df = spark.createDataFrame(final_elo_t2_df) return final_elo_t1_df, final_elo_t2_dfIdeally, we wouldn’t calculate Elo changes on a match-by-match basis to determine each team’s final Elo for the season. However, we couldn’t come up with a better approach. Do you have any ideas? If so, let us know! Value AddedThe feature engineering steps demonstrated show how we can transform raw data — regular season statistics — into valuable information with predictive power. It is reasonable to assume that a team’s performance during the regular season is indicative of its potential performance in the final tournaments. By calculating the mean of observed match-by-match statistics for both the teams and their opponents, along with each team’s Elo rating in their final match, we were able to create a dataset suitable for modelling. Then, models were trained to predict the outcome of tournament matches using these features, among others developed in a similar way. With these models, we only need the two team IDs to look up the mean of their regular season statistics and their Elos to feed into the model and predict a score! ConclusionIn this post, we looked at some of the theory behind Spark and PySpark, how that can be applied, and a concrete practical example. We explored how feature engineering can be done in the case of sports data, creating regular season statistics to use as features for final tournament games. Hopefully you’ve found this interesting and helpful — happy feature engineering! The full source code for this post and others in the series can be found here. Originally published at https://nobledynamic.com on April 8, 2024. References[1] Jeff Sonas, Ryan Holbrook, Addison Howard, Anju Kandru. (2024). March Machine Learning Mania 2024. Kaggle. https://kaggle.com/competitions/march-machine-learning-mania-2024 Feature Engineering with Microsoft Fabric and PySpark was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story. View the full article
  2. With the releases of Apache Spark 3.4 and 3.5 in 2023, we focused heavily on improving PySpark performance, flexibility, and ease of use... View the full article
  3. The DataFrame equality test functions were introduced in Apache Spark™ 3.5 and Databricks Runtime 14.2 to simplify PySpark unit testing. The full set o... View the full article
  4. Apache Spark revolutionized big data processing with its distributed computing capabilities, which enabled efficient data processing at scale. It offers the flexibility to run on traditional Central Processing Unit (CPUs) as well as specialized Graphic Processing Units (GPUs), which provides distinct advantages for various workloads. As the demand for faster and more efficient machine learning (ML) workloads grows, specialized hardware acceleration becomes crucial. This is where NVIDIA GPUs and Compute Unified Device Architecture (CUDA) come into the picture. To further enhance the capabilities of NVIDIA GPUs within the Spark ecosystem, NVIDIA developed Spark-RAPIDS. Spark-RAPIDS is an extension library that uses RAPIDS libraries built on CUDA, to enable high-performance data processing and ML training on GPUs. By combining the distributed computing framework of Spark with the parallel processing power of GPUs, Spark-RAPIDS significantly improves the speed and efficiency of analytics and ML workloads... View the full article
  5. radians() Function The radians() function in PySpark returns the radians of any given number present in a DataFrame column. It can be used with the select() method because the select() function is used to display the values in the PySpark DataFrame. Syntax dataframe_obj.select(radians(dataframe_obj.column)) Parameter: It takes the column name as a parameter to return radians for that column. Example 1 Let’s create a PySpark DataFrame with 3 rows and 4 columns, plus all the numeric types and return radians. import pyspark import math from pyspark.sql import SparkSession from pyspark.sql.functions import radians spark_app = SparkSession.builder.appName('_').getOrCreate() #create math values values =[(math.pi,0,7.8,120), (math.pi/2,1,0.5,180), (math.pi/3,-5,-12.9,360) ] #assign columns by creating the PySpark DataFrame dataframe_obj = spark_app.createDataFrame(values,['value1','value2','value3','value4']) dataframe_obj.show() #get the radians of value1 column dataframe_obj.select(radians(dataframe_obj.value1)).show() Output: So, for the column value1, we returned the following radians: 3.141592653589793 is equal to 0.05483113556160755 radians. 1.5707963267948966 is equal to 0.027415567780803774 radians. 1.0471975511965976 is equal to 0.018277045187202513 radians. Example 2 Now, we will return radians for value2 and value3 columns. import pyspark import math from pyspark.sql import SparkSession from pyspark.sql.functions import radians spark_app = SparkSession.builder.appName('_').getOrCreate() #create math values values =[(math.pi,0,7.8,120), (math.pi/2,1,0.5,180), (math.pi/3,-5,-12.9,360) ] #assign columns by creating the PySpark DataFrame dataframe_obj = spark_app.createDataFrame(values,['value1','value2','value3','value4']) dataframe_obj.show() #get the radians values of value2 and value3 column dataframe_obj.select(radians(dataframe_obj.value2),radians(dataframe_obj.value3)).show() Output: column – value2: 0 is equal to 0 radians. 1 is equal to 0.017453292519943295 radians. -5 is equal to -0.08726646259971647 radians. column – value3: 7.8 is equal to 0.1361356816555577 radians. 0.5 is equal to 0.008726646259971648 radians. -12.9 is equal to -0.22514747350726852 radians. degrees() Function The degrees() function in PySpark returns the degrees of any given number present in a DataFrame column. It can be used with the select() method because the select() function is used to display the values in the PySpark DataFrame. Syntax dataframe_obj.select(degrees(dataframe_obj.column)) Parameter: It takes the column name as a parameter to return degrees for that column. Example 1 Let’s create a PySpark DataFrame with 3 rows and 4 columns, plus all the numeric types and return degrees. import pyspark import math from pyspark.sql import SparkSession from pyspark.sql.functions import degrees spark_app = SparkSession.builder.appName('_').getOrCreate() #create math values values =[(math.pi,0,7.8,120), (math.pi/2,1,0.5,180), (math.pi/3,-5,-12.9,360) ] #assign columns by creating the PySpark DataFrame dataframe_obj = spark_app.createDataFrame(values,['value1','value2','value3','value4']) dataframe_obj.show() #get the degrees of value1 column dataframe_obj.select(degrees(dataframe_obj.value1)).show() Output: So, for the column value1, we returned degrees. 3.141592653589793 is equal to 100.0 Degrees. 1.5707963267948966 is equal to 90.0 Degrees. 1.0471975511965976 is equal to 59.99999999999999 Degrees. Example 2 Now, we will return degrees for value2 and value3 columns. import pyspark import math from pyspark.sql import SparkSession from pyspark.sql.functions import degrees spark_app = SparkSession.builder.appName('_').getOrCreate() #create math values values =[(math.pi,0,7.8,120), (math.pi/2,1,0.5,180), (math.pi/3,-5,-12.9,360) ] #assign columns by creating the PySpark DataFrame dataframe_obj = spark_app.createDataFrame(values,['value1','value2','value3','value4']) dataframe_obj.show() #get the degrees values of value2 and value3 column dataframe_obj.select(degrees(dataframe_obj.value2),degrees(dataframe_obj.value3)).show() Output: column – value2: 0 is equal to 0.0 degrees. 1 is equal to 57.29577951308232 degrees. -5 is equal to -286.4788975654116 degrees. column – value3: 7.8 is equal to 446.9070802020421 degrees. 0.5 is equal to 28.64788975654116 degrees. -12.9 is equal to -739.115555718762 Degrees. Conclusion In this PySpark tutorial, we discussed the radians() and degrees() functions. The radians() function in PySpark is used to return the radians of any given number present in a DataFrame column, and degrees() in PySpark is used to return the degrees of any given number present in a DataFrame column. We discussed two examples for both functions. View the full article
  6. If you want to sort the values in a column in the PySpark DataFrame having nulls in descending order, then you can go with the desc_nulls_first() and desc_nulls_last() functions. Before discussing these functions, we will create a sample PySpark DataFrame. Data import pyspark from pyspark.sql import SparkSession spark_app = SparkSession.builder.appName('_').getOrCreate() students =[(4,'sravan',23,None,None), (4,'chandana',23,'CSS','PySpark'), (46,'mounika',22,None,'.NET'), (4,'deepika',21,'HTML',None), ] dataframe_obj = spark_app.createDataFrame( students,['subject_id','name','age','technology1','technology2']) dataframe_obj.show() Output: Now, there are 5 columns and 4 rows. desc_nulls_first() Function The desc_nulls_first() function sorts the values in a column in descending order, but it will place the existing null values in a column. It can be used with the select() method to select the ordered columns. It is very important to use the orderBy() function because the main thing here is the sort.orderBy() takes the desc_nulls_first() function as a parameter. Syntax dataframe_obj.select(dataframe_obj.column).orderBy(dataframe_obj.column.desc_nulls_ first()) Where dataframe_obj is the DataFrame and column is the column name in which the values are sorted, all the null values will be placed first. So, our DataFrame is ready. Let’s demonstrate the desc_nulls_first() function. Example 1 Now, we will sort the values in the technology1 column that has None/Null values in descending order using the desc_nulls_first() function. #sort the technology1 column in descending order and get the null values first. dataframe_obj.select(dataframe_obj.technology1).orderBy(dataframe_obj.technology1.desc_ nulls_first()).show() Output: Actually, there are two null values. First, they are placed, and later HTML and CSS are sorted in descending order. Example 2 Now, we will sort the values in the technology2 column that has None/Null values in Descending order using the desc_nulls_first() function. #sort the technology2 column in descending order and get the null values first. dataframe_obj.select(dataframe_obj.technology2).orderBy(dataframe_obj.technology2.desc_ nulls_first()).show() Output: Actually, there are two null values. First, they are placed and later, PySpark and .NET are sorted in descending order. desc_nulls_last() Function The desc_nulls_last() function sorts the values in a column in descending order, but it will place the existing null values in a column. It can be used with the select() method to select the ordered columns. It is very important to use orderBy() because the main thing here is the sort.orderBy() takes desc_nulls_first() as a parameter. Syntax dataframe_obj.select(dataframe_obj.column).orderBy(dataframe_obj.column.desc_nulls_ last()) Where, dataframe_obj is the DataFrame and column is the column name in which the values are sorted such that all the null values will be placed as last. So, our DataFrame is ready. Let’s demonstrate the desc_nulls_last() function. Example 1 Now, we will sort the values in the technology2 column with None/Null values in descending order using the desc_nulls_last() function. #sort the technology1 column in descending order and get the null values last. dataframe_obj.select(dataframe_obj.technology1).orderBy(dataframe_obj.technology1.desc_ nulls_last()).show() Output: Actually, there are two null values. First, HTML and CSS are sorted in descending order, and two null values are placed last. Example 2 Now, we will sort the values in the technology2 column that has None/Null values in descending order using the desc_nulls_last() function. #sort the technology2 column in descending order and get the null values last. Dataframe_obj.select(dataframe_obj.technology2).orderBy(dataframe_obj.technology2.desc_ nulls_last()).show() Output: Actually, there are two null values. First, .NET and PySpark are sorted in descending order, and two null values are placed last. Overall Code import pyspark from pyspark.sql import SparkSession spark_app = SparkSession.builder.appName('_').getOrCreate() students =[(4,'sravan',23,None,None), (4,'chandana',23,'CSS','PySpark'), (46,'mounika',22,None,'.NET'), (4,'deepika',21,'HTML',None), ] dataframe_obj = spark_app.createDataFrame( students,['subject_id','name','age','technology1','technology2']) dataframe_obj.show() #sort the technology1 column in descending order and get the null values first. dataframe_obj.select(dataframe_obj.technology1).orderBy(dataframe_obj.technology1.desc_ nulls_first()).show() #sort the technology2 column in descending order and get the null values first. dataframe_obj.select(dataframe_obj.technology2).orderBy(dataframe_obj.technology2.desc_ nulls_first()).show() #sort the technology1 column in descending order and get the null values last. dataframe_obj.select(dataframe_obj.technology1).orderBy(dataframe_obj.technology1.desc_ nulls_last()).show() #sort the technology1 column in descending order and get the null values last. dataframe_obj.select(dataframe_obj.technology1).orderBy(dataframe_obj.technology1.desc_ nulls_last()).show() Conclusion By the end of this PySpark tutorial, we learned that it is possible to deal with null while sorting the values in a DataFrame using the desc_nulls_first() and desc_nulls_last() functions. The desc_nulls_first() function sorts the values in a column in descending order, but it will arrange the existing null values in a column first. The desc_nulls_last() function sorts the values in a column in descending order, but it will arrange the existing null values in a column last. You can run the entire code specified in the last part of the tutorial. View the full article
  7. “In Python, PySpark is a Spark module used to provide a similar kind of Processing like spark using DataFrame, which will store the given data in row and column format. PySpark – pandas DataFrame represents the pandas DataFrame, but it holds the PySpark DataFrame internally. Pandas support DataFrame data structure, and pandas are imported from the pyspark module. Before that, you have to install the pyspark module.” Command pip install pyspark Syntax to import: from pyspark import pandas After that, we can create or use the dataframe from the pandas module. Syntax to create pandas DataFrame: pyspark.pandas.DataFrame() We can pass a dictionary or list of lists with values. Let’s create a pandas DataFrame through pyspark that has four columns and five rows. #import pandas from the pyspark module from pyspark import pandas #create dataframe from pandas pyspark pyspark_pandas=pandas.DataFrame({'student_lastname':['manasa','trisha','lehara','kapila','hyna'], 'mark1':[90,56,78,54,67],'mark2':[100,67,96,89,32],'mark3':[91,92,98,97,87]}) print(pyspark_pandas) Output: Now, we will go into our tutorial. There are several ways to return the top and last rows from the pyspark pandas dataframe. Let’s see them one by one. pyspark.pandas.DataFrame.head head() will return top rows from the top of the pyspark pandas dataframe. It takes n as a parameter that specifies the number of rows displayed from the top. By default, it will return the top 5 rows. Syntax: pyspark_pandas.head(n) Where pyspark_pandas is the pyspark pandas dataframe. Parameter: n specifies an integer value that displays the number of rows from the top of the pyspark pandas dataframe. We can also use the head() function to display specific column. Syntax: pyspark_pandas.column.head(n) Example 1 In this example, we will return the top 2 and 4 rows in the mark1 column. #import pandas from the pyspark module from pyspark import pandas #create dataframe from pandas pyspark pyspark_pandas=pandas.DataFrame({'student_lastname':['manasa','trisha','lehara','kapila','hyna'],'mark1':[90,56,78,54,67],'mark2':[100,67,96,89,32],'mark3':[91,92,98,97,87]}) #display top 2 rows in mark1 column print(pyspark_pandas.mark1.head(2)) print() #display top 4 rows in mark1 column print(pyspark_pandas.mark1.head(4)) Output: 0 90 1 56 Name: mark1, dtype: int64 0 90 1 56 2 78 3 54 Name: mark1, dtype: int64 We can see that the top 2 and 4 rows were selected from the marks1 column. Example 2 In this example, we will return the top 2 and 4 rows in the student_lastname column. #import pandas from the pyspark module from pyspark import pandas #create dataframe from pandas pyspark pyspark_pandas=pandas.DataFrame({'student_lastname':['manasa','trisha','lehara','kapila','hyna'],'mark1':[90,56,78,54,67],'mark2':[100,67,96,89,32],'mark3':[91,92,98,97,87]}) #display top 2 rows in student_lastname column print(pyspark_pandas.student_lastname.head(2)) print() #display top 4 rows in student_lastname column print(pyspark_pandas.student_lastname.head(4)) Output: 0 manasa 1 trisha Name: student_lastname, dtype: object 0 manasa 1 trisha 2 lehara 3 kapila Name: student_lastname, dtype: object We can see that the top 2 and 4 rows were selected from the student_lastname column. Example 3 In this example, we will return the top 2 rows from the entire dataframe. #import pandas from the pyspark module from pyspark import pandas #create dataframe from pandas pyspark pyspark_pandas=pandas.DataFrame({'student_lastname':['manasa','trisha','lehara','kapila','hyna'],'mark1':[90,56,78,54,67],'mark2':[100,67,96,89,32],'mark3':[91,92,98,97,87]}) #display top 2 rows print(pyspark_pandas.head(2)) print() #display top 4 rows print(pyspark_pandas.head(4)) Output: student_lastname mark1 mark2 mark3 0 manasa 90 100 91 1 trisha 56 67 92 student_lastname mark1 mark2 mark3 0 manasa 90 100 91 1 trisha 56 67 92 2 lehara 78 96 98 3 kapila 54 89 97 We can see that the entire dataframe is returned with the top 2 and 4 rows. pyspark.pandas.DataFrame.tail tail() will return rows from the last in the pyspark pandas dataframe. It takes n as a parameter that specifies the number of rows displayed from the last. Syntax: pyspark_pandas.tail(n) Where pyspark_pandas is the pyspark pandas dataframe. Parameter: n specifies an integer value that displays the number of rows from the last of the pyspark pandas dataframe. By default, it will return the last 5 rows. We can also use the tail() function to display specific columns. Syntax: pyspark_pandas.column.tail(n) Example 1 In this example, we will return the last 2 and 4 rows in the mark1 column. #import pandas from the pyspark module from pyspark import pandas #create dataframe from pandas pyspark pyspark_pandas=pandas.DataFrame({'student_lastname':['manasa','trisha','lehara','kapila','hyna'],'mark1':[90,56,78,54,67],'mark2':[100,67,96,89,32],'mark3':[91,92,98,97,87]}) #display last 2 rows in mark1 column print(pyspark_pandas.mark1.tail(2)) print() #display last 4 rows in mark1 column print(pyspark_pandas.mark1.tail(4)) Output: 3 54 4 67 Name: mark1, dtype: int64 1 56 2 78 3 54 4 67 Name: mark1, dtype: int64 We can see that the last 2 and 4 rows were selected from the marks1 column. Example 2 In this example, we will return the last 2 and 4 rows in the student_lastname column. #import pandas from the pyspark module from pyspark import pandas #create dataframe from pandas pyspark pyspark_pandas=pandas.DataFrame({'student_lastname':['manasa','trisha','lehara','kapila','hyna'],'mark1':[90,56,78,54,67],'mark2':[100,67,96,89,32],'mark3':[91,92,98,97,87]}) #display last 2 rows in student_lastname column print(pyspark_pandas.student_lastname.tail(2)) print() #display last 4 rows in student_lastname column print(pyspark_pandas.student_lastname.tail(4)) Output: 3 kapila 4 hyna Name: student_lastname, dtype: object 1 trisha 2 lehara 3 kapila 4 hyna Name: student_lastname, dtype: object We can see that the last 2 and 4 rows were selected from the student_lastname column. Example 3 In this example, we will return the last 2 rows from the entire dataframe. #import pandas from the pyspark module from pyspark import pandas #create dataframe from pandas pyspark pyspark_pandas=pandas.DataFrame({'student_lastname':['manasa','trisha','lehara','kapila','hyna'],'mark1':[90,56,78,54,67],'mark2':[100,67,96,89,32],'mark3':[91,92,98,97,87]}) #display last 2 rows print(pyspark_pandas.tail(2)) print() #display last 4 rows print(pyspark_pandas.tail(4)) Output: student_lastname mark1 mark2 mark3 3 kapila 54 89 97 4 hyna 67 32 87 student_lastname mark1 mark2 mark3 1 trisha 56 67 92 2 lehara 78 96 98 3 kapila 54 89 97 4 hyna 67 32 87 We can see that the entire dataframe is returned with the last 2 and 4 rows. Conclusion We saw how to display the top and last rows from the pyspark pandas dataframe using head() and tail() functions. By default, they return 5 rows.head(), and tail() functions are also used to get the top and last rows with specific columns. View the full article
  8. In Python, PySpark is a Spark module used to provide a similar kind of processing like spark using DataFrame. In this article, we will discuss several ways to create PySpark DataFrame. Method 1: Using Dictionary Dictionary is a datastructure which will store the data in key,value pair format. The key acts as column and value act as row value/data in the PySpark DataFrame. This has to be passed inside the list. Structure: [{‘key’ : value}] We can also provide multiple dictionaries. Structure: [{‘key’ : value},{‘key’ : value},…….,{‘key’ : value}] Example: Here, we are going to create PySpark DataFrame with 5 rows and 6 columns through the dictionary. Finally, we are displaying the DataFrame using show() method. #import the pyspark module import pyspark #import SparkSession for creating a session from pyspark.sql import SparkSession #create an app named linuxhint spark_app = SparkSession.builder.appName(‘linuxhint’).getOrCreate() # create student data with 5 rows and 6 attributes students =[{‘rollno’:’001’,’name’:’sravan’,’age’:23,’height’:5.79,’weight’:67,’address’:’guntur’}, {‘rollno’:’002’,’name’:’ojaswi’,’age’:16,’height’:3.79,’weight’:34,’address’:’hyd’}, {‘rollno’:’003’,’name’:’gnanesh chowdary’,’age’:7,’height’:2.79,’weight’:17,’address’:’patna’}, {‘rollno’:’004’,’name’:’rohith’,’age’:9,’height’:3.69,’weight’:28,’address’:’hyd’}, {‘rollno’:’005’,’name’:’sridevi’,’age’:37,’height’:5.59,’weight’:54,’address’:’hyd’}] # create the dataframe df = spark_app.createDataFrame( students) #display the dataframe df.show() Output: Method 2: Using list of tuples Tuple is a data structure which will store the data in (). We can pass the rows separated by comma in a tuple surrounded by a list. Structure: [(value1,value2,.,valuen)] We can also provide multiple tuples in a list. Structure: [(value1,value2,.,valuen), (value1,value2,.,valuen), ………………,(value1,value2,.,valuen)] We need to provide the column names through a list while creating the DataFrame. Syntax: column_names = [‘column1’,’column2’,….’column’] spark_app.createDataFrame( list_of_tuple,column_names) Example: Here, we are going to create PySpark DataFrame with 5 rows and 6 columns through the dictionary. Finally, we are displaying the DataFrame using show() method. #import the pyspark module import pyspark #import SparkSession for creating a session from pyspark.sql import SparkSession #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =[('001','sravan',23,5.79,67,'guntur'), ('002','ojaswi',16,3.79,34,'hyd'), ('003','gnanesh chowdary',7,2.79,17,'patna'), ('004','rohith',9,3.69,28,'hyd'), ('005','sridevi',37,5.59,54,'hyd')] #assign the column names column_names = ['rollno','name','age','height','weight','address'] # create the dataframe df = spark_app.createDataFrame( students,column_names) #display the dataframe df.show() Output: Method 3: Using tuple of lists List is a data structure which will store the data in []. We can pass the rows separated by comma in a list surrounded by a tuple. Structure: ([value1,value2,.,valuen]) We can also provide multiple lists in a tuple. Structure: ([value1,value2,.,valuen], [value1,value2,.,valuen], ………………,[value1,value2,.,valuen]) We need to provide the column names through a list while creating the DataFrame. Syntax: column_names = [‘column1’,’column2’,….’column’] spark_app.createDataFrame( tuple_of_list,column_names) Example: Here, we are going to create PySpark DataFrame with 5 rows and 6 columns through the dictionary. Finally, we are displaying the DataFrame using show() method. #import the pyspark module import pyspark #import SparkSession for creating a session from pyspark.sql import SparkSession #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =(['001','sravan',23,5.79,67,'guntur'], ['002','ojaswi',16,3.79,34,'hyd'], ['003','gnanesh chowdary',7,2.79,17,'patna'], ['004','rohith',9,3.69,28,'hyd'], ['005','sridevi',37,5.59,54,'hyd']) #assign the column names column_names = ['rollno','name','age','height','weight','address'] # create the dataframe df = spark_app.createDataFrame( students,column_names) #display the dataframe df.show() Output: Method 4: Using nested list List is a datastructure which will store the data in []. So, we can pass the rows separated by comma in a list surrounded by a list. Structure: [[value1,value2,.,valuen]] We can also provide multiple lists in a list. Structure: [[value1,value2,.,valuen], [value1,value2,.,valuen], ………………,[value1,value2,.,valuen]] We need to provide the column names through a list while creating the DataFrame. Syntax: column_names = [‘column1’,’column2’,….’column’] spark_app.createDataFrame( nested_list,column_names) Example: Here, we are going to create PySpark DataFrame with 5 rows and 6 columns through the dictionary. Finally, we are displaying the DataFrame using show() method. #import the pyspark module import pyspark #import SparkSession for creating a session from pyspark.sql import SparkSession #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =[['001','sravan',23,5.79,67,'guntur'], ['002','ojaswi',16,3.79,34,'hyd'], ['003','gnanesh chowdary',7,2.79,17,'patna'], ['004','rohith',9,3.69,28,'hyd'], ['005','sridevi',37,5.59,54,'hyd']] #assign the column names column_names = ['rollno','name','age','height','weight','address'] # create the dataframe df = spark_app.createDataFrame( students,column_names) #display the dataframe df.show() Output: Method 5: Using nested tuple Structure: ((value1,value2,.,valuen)) We can also provide multiple tuples in a tuple. Structure: ((value1,value2,.,valuen), (value1,value2,.,valuen), ………………,(value1,value2,.,valuen)) We need to provide the column names through a list while creating the DataFrame. Syntax: column_names = [‘column1’,’column2’,….’column’] spark_app.createDataFrame( nested_tuple,column_names) Example: Here, we are going to create PySpark DataFrame with 5 rows and 6 columns through the dictionary. Finally, we are displaying the DataFrame using show() method. #import the pyspark module import pyspark #import SparkSession for creating a session from pyspark.sql import SparkSession #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =(('001','sravan',23,5.79,67,'guntur'), ('002','ojaswi',16,3.79,34,'hyd'), ('003','gnanesh chowdary',7,2.79,17,'patna'), ('004','rohith',9,3.69,28,'hyd'), ('005','sridevi',37,5.59,54,'hyd')) #assign the column names column_names = ['rollno','name','age','height','weight','address'] # create the dataframe df = spark_app.createDataFrame( students,column_names) #display the dataframe df.show() Output: Conclusion In this tutorial, we discussed five methods to create PySpark DataFrame: list of tuples, tuple of lists, nested tuple, nested list use, and columns list to provide column names. There is no need to provide the column names list while creating PySpark DataFrame using dictionary. View the full article
  9. In Python, PySpark is a Spark module used to provide a similar kind of processing like spark. RDD stands for Resilient Distributed Datasets. We can call RDD a fundamental data structure in Apache Spark. Syntax: 1 spark_app.sparkContext.parallelize(data) We can display the data in a tabular format. The data structure used is DataFrame.Tabular format means it stores data in rows and columns. Syntax: In PySpark, we can create a DataFrame from spark app with the createDataFrame() method. Syntax: 1 Spark_app.createDataFrame(input_data,columns) Where input_data may be a dictionary or a list to create a dataframe from this data, and if the input_data is a list of dictionaries, then the columns are not needed. If it is a nested list, we have to provide the column names. Now, let’s discuss how to check the given data in PySpark RDD or DataFrame. Creation of PySpark RDD: In this example, we will create an RDD named students and display using collect() action. #import the pyspark module import pyspark #import SparkSession for creating a session from pyspark.sql import SparkSession # import RDD from pyspark.rdd from pyspark.rdd import RDD #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =spark_app.sparkContext.parallelize([ {'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'}, {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'}, {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'}, {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'}, {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]) #display the RDD using collect() print(students.collect()) Output: [{'rollno': '001', 'name': 'sravan', 'age': 23, 'height': 5.79, 'weight': 67, 'address': 'guntur'}, {'rollno': '002', 'name': 'ojaswi', 'age': 16, 'height': 3.79, 'weight': 34, 'address': 'hyd'}, {'rollno': '003', 'name': 'gnanesh chowdary', 'age': 7, 'height': 2.79, 'weight': 17, 'address': 'patna'}, {'rollno': '004', 'name': 'rohith', 'age': 9, 'height': 3.69, 'weight': 28, 'address': 'hyd'}, {'rollno': '005', 'name': 'sridevi', 'age': 37, 'height': 5.59, 'weight': 54, 'address': 'hyd'}] Creation of PySpark DataFrame: In this example, we will create a DataFrame named df from the students’ data and display it using the show() method. #import the pyspark module import pyspark #import SparkSession for creating a session from pyspark.sql import SparkSession #import the col function from pyspark.sql.functions import col #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =[ {'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'}, {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'}, {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'}, {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'}, {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}] # create the dataframe df = spark_app.createDataFrame( students) #display the dataframe df.show() Output: Method 1 : isinstance() In Python, isinstance() method is used to compare the given object(data) with the type(RDD/DataFrame) Syntax: 1 isinstance(object,RDD/DataFrame) It takes two parameters: Parameters: object refers to the data RDD is the type available in pyspark.rdd module and DataFrame is the type available in pyspark.sql module It will return Boolean values (True/False). Suppose the data is RDD and the type is also RDD, then it will return True, otherwise it will return False. Similarly, if the data is DataFrame and type is also DataFrame, then it will return True, otherwise it will return False. Example 1: Check for RDD object In this example, we will apply isinstance() for RDD object. #import the pyspark module import pyspark #import SparkSession and DataFrame for creating a session from pyspark.sql import SparkSession,DataFrame # import RDD from pyspark.rdd from pyspark.rdd import RDD #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =spark_app.sparkContext.parallelize([ {'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'}, {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'}, {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'}, {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'}, {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]) #check if the students object is RDD print(isinstance(students,RDD)) #check if the students object is DataFrame print(isinstance(students,DataFrame)) Output: 1 2 3 True False First, we compared students with RDD; it returned True because it is an RDD; and then we compared students with DataFrame, it returned False because it is an RDD (not a DataFrame). Example 2: Check for DataFrame object In this example, we will apply isinstance() for the DataFrame object. #import the pyspark module import pyspark #import SparkSession,DataFrame for creating a session from pyspark.sql import SparkSession,DataFrame #import the col function from pyspark.sql.functions import col # import RDD from pyspark.rdd from pyspark.rdd import RDD #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =[ {'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'}, {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'}, {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'}, {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'}, {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}] # create the dataframe df = spark_app.createDataFrame( students) #check if the df is RDD print(isinstance(df,RDD)) #check if the df is DataFrame print(isinstance(df,DataFrame)) Output: 1 2 3 False True First, we compared df with RDD; it returned False because it is a DataFrame and then we compared df with DataFrame; it returned True because it is a DataFrame (not an RDD). Method 2 : type() In Python, the type() method returns the class of the specified object. It takes object as a parameter. Syntax: 1 type(object) Example 1: Check for an RDD object. We will apply type() to the RDD object. #import the pyspark module import pyspark #import SparkSession for creating a session from pyspark.sql import SparkSession # import RDD from pyspark.rdd from pyspark.rdd import RDD #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =spark_app.sparkContext.parallelize([ {'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'}, {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'}, {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'}, {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'}, {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}]) #check the type of students print(type(students)) Output: 1 <class 'pyspark.rdd.RDD'> We can see that class RDD is returned. Example 2: Check for DataFrame object. We will apply type() on the DataFrame object. #import the pyspark module import pyspark #import SparkSession for creating a session from pyspark.sql import SparkSession #import the col function from pyspark.sql.functions import col #create an app named linuxhint spark_app = SparkSession.builder.appName('linuxhint').getOrCreate() # create student data with 5 rows and 6 attributes students =[ {'rollno':'001','name':'sravan','age':23,'height':5.79,'weight':67,'address':'guntur'}, {'rollno':'002','name':'ojaswi','age':16,'height':3.79,'weight':34,'address':'hyd'}, {'rollno':'003','name':'gnanesh chowdary','age':7,'height':2.79,'weight':17,'address':'patna'}, {'rollno':'004','name':'rohith','age':9,'height':3.69,'weight':28,'address':'hyd'}, {'rollno':'005','name':'sridevi','age':37,'height':5.59,'weight':54,'address':'hyd'}] # create the dataframe df = spark_app.createDataFrame( students) #check the type of df print(type(df)) Output: 1 <class 'pyspark.sql.dataframe.DataFrame'> We can see that class DataFrame is returned. Conclusion In the above article, we saw two ways to check if the given data or object is an RDD or DataFrame using isinstance() and type(). You must note that isinstance() results in boolean values based on the given object – if the object type is the same, then it will return True, otherwise False. And type() is used to return the class of the given data or object. View the full article
  10. We will discuss about Pyspark – a significant data processing technology that can handle data on a petabyte-scale, PySpark When Otherwise, and SQL Case in PySpark When. What is PySpark? Spark is a general-purpose, in-memory, distributed processing engine that allows you to handle the data across several machines efficiently. You can develop Spark applications to process the data and run them on the Spark platform using PySpark. The AWS offers managed EMR and the Spark platform. You may use PySpark to process data and establish an EMR cluster on AWS. PySpark can read the data from various file formats including CSV, parquet, json, and databases. Because Spark is primarily implemented in Scala, creating Spark apps in Scala or Java allows you to access more of its features than writing Spark programs in Python or R. PySpark, for example, does not currently support Dataset. If you’re doing a data science, PySpark is a better option than Scala because there are many popular data science libraries written in Python such as NumPy, TensorFlow, and Scikit-learn. PySpark “When” and “Otherwise” “Otherwise” and “when” in PySpark, and SQL Case “when” working with DataFrame PySpark, like SQL and other programming languages, have a mechanism of checking multiple conditions in order and returning a value when the first condition is met using SQL like case and when(). Otherwise() expressions are similar to “Switch” and “if-then-else” statements in their functionality. PySpark When Otherwise – when() is an SQL function that returns a Column type, and otherwise() is a Column function that produces None/NULL, if otherwise() is not used. SQL Case in PySpark When – This is similar to an SQL expression, and it is used as follows: IF condition 1 is true, then the result is true, and vice versa. Example 1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import org.apache.spark.sql.functions.when val df = Seq( ("A B","2019-01-19"), ("A A", "2019-01-10"), ("B F", "2019-01-15"), ("B E", "2019-01-30"), ("C B", "2019-01-22"), ("D O", "2019-01-30"), ("E U", "2019-01-22") df.withColumn("ends_with_B",when($"word".endsWith("B"),true).otherwise(false)) Example 2 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import org.apache.spark.sql.functions.{element_at,split,when} val df = Seq( ("BA","human"), ("AB", "human"), ("E_bot", "bot"), ("D_bot", "bot"), ("TT", "human"), ("A_bot", "bot"), ("C_bot", "bot") ).toDF("user", "type") df.withColumn("isBot", when($"user".endsWith("bot"), element_at(split($"user","_"),1))) Conclusion We discussed about PySpark, PySpark When, PySpark Otherwise, and SQL Case in PySpark When which are used to check multiple conditions and return the first element that follows the condition, along with some examples. View the full article
  11. We’ll learn about the PySpark library in this session. It is a general-purpose, in-memory, distributed processing engine that lets you effectively manage the data across several workstations. We’ll also learn about the PySpark fillna() method that is used to fill the null values in the dataframe with a custom value, along with its examples. What is PySpark? PySpark is one of Spark’s supported languages. Spark is a large data processing technology that can handle data on a petabyte scale. PySpark is an Apache Spark and Python cooperation. Python is a modern high-level programming language, whereas Apache Spark is an open-source that focuses on computational tasks of clusters and mainly targets speed, ease of use, and streaming analytics. Because Spark is mostly built in Scala, creating Spark apps in Scala or Java allows you to access more of its capabilities than writing Spark programmes in Python or R. PySpark, for example, does not currently support Dataset. You may develop Spark applications to process data and launch them on the Spark platform using PySpark. The AWS offers the managed EMR and the Spark platform. If you’re doing a data science, PySpark is a better option than Scala because there are many popular data science libraries written in Python such as NumPy, TensorFlow, and Scikit-learn. You may use PySpark to process the data and establish an EMR cluster on AWS. PySpark can read the data from a variety of file formats including csv, parquet, json, as well as databases. For smaller datasets, Pandas is utilized, whereas for bigger datasets, PySpark is employed. In comparison to PySpark, Pandas gives quicker results. Depending on memory availability and data size, you may switch between PySpark and Pandas to improve performance. Always use Pandas over PySpark when the data to be processed is enough for the memory. Spark has quickly become the industry’s preferred technology for data processing. It is, however, not the first. Before Spark, the processing engine was MapReduce. What is PySpark Fillna()? PySpark fillna() is a PySpark method used to replace the null values in a single or many columns in a PySpark data frame model. Depending on the business requirements, this value might be anything. It can be 0 or an empty string and any constant literal. This fillna() method is useful for data analysis since it eliminates null values which can cause difficulties with data analysis. Example of Using Fillna() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 from pyspark.sql import SparkSession spark_session = SparkSession.builder \ .master('local[1]') \ .appName('Example') \ .getOrCreate() df = spark_session.createDataFrame( [ (1, 'Canada', 'Toronto', None), (2, 'Japan', 'Tokyo', 8000000), (3, 'India', 'Amritsar', None), (4, 'Turkey', 'Ankara', 550000), ], ['id', 'country', 'city', 'population'] ) df.show() Output: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 +---+---------+--------------+-----------+ | id| country| city | population| +---+---------+--------------+-----------+ | 1| Canada| Toronto| null| | 2| Japan| Tokyo| 8000000| | 3| India| Amritsar| null| | 4| Turkey| Ankara| 550000| +---+---------+--------------+-----------+ We may now use merely the value argument to replace all the null values in a DataFrame: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 df.na.fill(value=0).show() df.na.fill(value=0,subset=["population"]).show() df.fillna(value=0).show() +---+---------+--------------+-----------+ | id| country| city | population| +---+---------+--------------+-----------+ | 1| Canada| Toronto| 0| | 2| Japan| Tokyo| 8000000| | 3| India| Amritsar| 0| | 4| Turkey| Ankara| 550000| +---+---------+--------------+-----------+ The above operation will replace all the null values in the integer columns with 0. Conclusion We discussed the PySpark, PySpark fillna() method, and its examples in this session. The fillna() method replaces all the null values in the DataFrame with our custom values. View the full article
  12. Today, we are making it faster and easier to prepare and visualize data using PySpark and Altair with support for code snippets in Amazon SageMaker Data Wrangler. Amazon SageMaker Data Wrangler reduces the time it takes to aggregate and prepare data for machine learning (ML) from weeks to minutes. With SageMaker Data Wrangler, you can simplify the process of data preparation and feature engineering, and complete each step of the data preparation workflow, including data selection, cleansing, exploration, and visualization from a single visual interface. With SageMaker Data Wrangler’s data selection tool, you can quickly select data from multiple data sources, such as Amazon S3, Amazon Athena, Amazon Redshift, AWS Lake Formation, Amazon SageMaker Feature Store, Databricks, and Snowflake. View the full article
  • Forum Statistics

    43.3k
    Total Topics
    42.7k
    Total Posts
×
×
  • Create New...