Search the Community
Showing results for tags 'deep dives'.
-
xz is a widely distributed package that provides lossless compression for both users and developers, and is included by default in most, if not all, Linux distributions. Created in 2009, it has since released numerous versions. As an open-source project, it is available on GitHub. However, as of the time of writing this article, attempting […] The post A Deep Dive on the xz Compromise appeared first on TuxCare. The post A Deep Dive on the xz Compromise appeared first on Security Boulevard. View the full article
-
Widely adopted by both developers and organisations, Kubeflow is an MLOps platform that runs on Kubernetes and automates machine learning (ML) workloads. It covers the entire ML lifecycle, enabling data scientists and machine learning engineers to develop and deploy ML models. Kubeflow is designed as a suite of leading open source projects that enable different capabilities such as model serving, training or hypertuning optimisations. At Canonical, we deliver Charmed Kubeflow – an official distribution of the upstream solution with additional security maintenance, tool integrations, and enterprise support and managed services – so we know a thing or two about the project. In our experience, one of the most important concepts to understand with respect to both Kubeflow itself and the broader ML lifecycle is machine learning pipelines. Taking advantage of pipelines is the best way to effectively deploy models at scale in production, so let’s break down this critical component in the MLOps landscape. What is an ML pipeline? A machine learning pipeline is an important component of ML systems, ensuring simplified experimentation and capability to take models to production. They are a series of steps that automate how ML models are created, in order to streamline the workflow,development and deployment. ML pipelines simplify the complexity of the end-to-end ML lifecycle, helping professionals to develop and deploy models. Amongst their benefits, ML pipelines ensure scalability thanks to their ability to handle large volumes of data while supporting collaboration and reproducibility. A core value of MLOps platforms such as Kubeflow is that they enable professionals to build and maintain ML pipelines. What is Kubeflow Pipelines? Kubeflow Pipelines or KFP is the heart of Kubeflow. It is a Kubeflow component that enables the creation of ML pipelines. It is used to help you build and deploy container-based ML workflows that are portable and scalable. The main goals of Kubeflow Pipelines are to simplify the following processes: Orchestration of the end-to-end ML pipelines Experimentation with various ideas and techniques Experiment management Reuse of components and pipelines to enable users to quickly put together end-to-end solutions without having to re-build each time Components of Kubeflow Pipelines Kubeflow Pipelines is part of the Kubeflow project. It can be used as part of the project or as an independent tool. It is made of 3 main components: User interface (UI) for managing and tracking experiments, jobs, and runs Engine for scheduling multi-step ML workflows SDK for defining and manipulating pipelines and components Kubeflow Pipelines use cases Kubeflow Pipelines is typically most useful for advanced users of Kubeflow or professionals who already have experience with machine learning. You don’t necessarily need KFP in the experimentation phase of the ML journey, but it becomes useful when you want to take yourmodels to production. The main use cases for KFP include: Workflow automation: Data scientists and machine learning engineers often perform a lot of the initial experimentation phase manually to better understand optimisation possibilities and quickly iterate. But once they have defined their workflow, they can use KFP to automate the process and save time. Model deployment to production: Models are usually compiled in a binary file. Traditionally, for the model to be loaded to a server where the requirements for inference are met, this file would be manually copied to the machine that hosts the application. KFP simplifies this process by enabling you to build automated pipelines to multiple applications or servers. Model maintenance and updates: The ML lifecycle is an iterative process and models need to be updated periodically. KFP helps users run updates and rollbacks across multiple applications or servers. Once the model is updated in one place and the update transaction is complete, KFP ensures the update is quickly applied to all client applications. Multi-tenant ML environment: Organisations often have large data and ML teams that need to share their resources. KFP enables simple and effective sharing of the environment, where each collaborator gets an isolated environment. It is then utilised by the K8s cluster and tools such as Volcano to schedule resources or manage containers. This helps professionals isolate workflows and keep track of pending and running jobs for each collaborator. Benefits of KFP Among machine learning specialists, Kubeflow Pipelines is widely adopted for a number of reasons. The most important benefits of KFP include: Streamlined workflow automation: Kubeflow Pipelines allows users to define the machine learning pipelines as a sequence of steps, each with its input, output, and dependencies. This leads to streamlining the machine learning workflows, and reduces the overhead and complexity of managing and executing your pipelines. Improved collaboration: Kubeflow Pipelines provides a central and shared platform for data scientists, machine learning engineers, and IT operations teams to collaborate on machine learning projects. It allows them to share pipelines and artifacts with others, and enables the tracking and monitoring of the pipelines across the entire organisation. Enhanced performance and scalability: Kubeflow Pipelines runs on Kubernetes, which provides a scalable and flexible infrastructure for running machine learning pipelines and models. This allows you to easily scale up and down the pipelines, and ensure that your pipelines are performant and reliable. Resource optimisation: KFP is a cloud native application, so it can leverage the resource schedulers that Kubernetes platforms provide. This leads to optimised usage of the existing resources and faster project delivery. Extensive support for popular machine learning frameworks: KFP provides built-in support for popular machine learning frameworks like TensorFlow, PyTorch, and XGBoost, as well as a rich ecosystem of integrations and plugins for other tools and services. Charmed Kubeflow goes a step further and enables additional integrations with tools and frameworks such as NVIDIA NGC Containers, Triton Inference Server and MLflow. Whereas Kubeflow Pipelines is a feature-rich tool, it still raises some challenges for beginners. It comes with a steep learning curve and there is limited documentation available. Since it is a fully open source tool, there is a big community that can help beginners, but it can be frustrating at times. You can alleviate these challenges by taking advantage of enterprise support or managed services from organisations which distribute Kubeflow. Architecture of Kubeflow Pipelines Kubeflow Pipelines is a complex component with capabilities that unblock users and enable them to automate their workflows and reduce their time spent on manual tasks. The following architecture depicts these capabilities: source: Kubeflow community As the diagram illustrates, users can interact with KFP either through the user interface or through development tools such as Notebooks. Initially, users create components or specify a pipeline using the Kubeflow Pipelines domain-specific language (DSL). Once defined, the compiler transforms the Python code into a YAML static configuration. Then, the Pipeline Service creates a pipeline run from the static configuration. It calls the server of Kubernetes API for creating the necessary Kubernetes resources (CRDs) to run the pipeline. If you have a resource scheduler integrated, you can use it to run the pipeline when resources are available or at a desired time. To complete the pipeline, the containers are executed within the Kubernetes pods, using orchestration controllers. Two types of data can be stored. The first type is metadata, which includes experiments, jobs, pipeline runs, and single scalar metrics. The second type is artefacts, which includes pipeline packages, views, and large-scale metrics (time series). Metadata is stored in a MySQL database, whereas artefacts are stored within MinIO. Storing them in an external component also enables portability, so artefacts can be migrated to different clusters or environments. Kubernetes resources created by the Pipeline Service are monitored by the Persistence Agent. To enable reproducibility, the input and output of the containers are recorded. It enables professionals to use the configurations and replicates different tasks, also being able to check if the results match. They consist of parameters or data artefact URIs and are seen as metadata. The Pipeline web server enables users to get a visual understanding of the steps from the Kubeflow Pipelines. It presents various information, including list of pipelines currently running, history of pipeline execution, data artefacts and logs for debugging. Get started with Kubeflow Pipelines In order to access Kubefow Pipelines, users can either deploy them independently or as part of the Kubeflow project. For simplified deployment, we recommend using Charmed Kubeflow. Deploy Charmed Kubeflow following the tutorial. You can do it on any environment, including public cloud or on-prem. Ensure that you have enough resources available, so you do not bump into problems along the way Access the Kubeflow dashboard. In case you are accessing it from a VM or from a public cloud, please ensure that you change the SOCKs proxy settings. There you will have different options, including to upload an existing pipeline or create a new one. Clone this repository from Github which contains a simple example of how to use some of the components of Kubeflow Access the examples from the Notebook. There are several pipelines created which you can run, edit or play with. Of course, they are just examples. In order to build your own pipeline, check the official documentation of the Kubeflow project. Further reading Kubeflow vs MLflow Launch NGC containers with Kubeflow MLOps pipelines with Kubefow, MLflow and Seldon View the full article
-
In the rapidly evolving landscape of container orchestration, Kubernetes has emerged as the de facto standard, offering a robust framework for deploying, managing, and scaling containerized applications. One of the cornerstone features of Kubernetes is its powerful and flexible scheduling system, which efficiently allocates workloads across a cluster of machines, known as nodes. This article delves deep into the mechanics of Kubernetes scheduling, focusing on the pivotal roles of pods and nodes, to equip technology professionals with the knowledge to harness the full potential of Kubernetes in their projects. Understanding Kubernetes Pods A pod is the smallest deployable unit in Kubernetes and serves as a wrapper for one or more containers that share the same context and resources. Pods encapsulate application containers, storage resources, a unique network IP, and options that govern how the container(s) should run. A key concept to grasp is that pods are ephemeral by nature; they are created and destroyed to match the state of your application as defined in deployments. View the full article
-
- deep dives
- pods
-
(and 2 more)
Tagged with:
-
This post is the second part of our two-part series on the latest performance improvements of stateful pipelines. The first part of this... View the full article
-
- databricks
- pipelines
-
(and 4 more)
Tagged with:
-
Introduction Amazon Elastic Container Service (Amazon ECS) is a container orchestration service that manages the lifecycle of billions of application containers on AWS every week. One of the core goals of Amazon ECS is to remove overhead burden from human operators. Amazon ECS watches over your application containers 24/7, and can respond to unexpected changes faster and better than any human can. Amazon ECS reacts to undesired changes, such as application crashes and hardware failures by continuously attempting to self-heal your application container deployments back to your desired state. There are also external factors such as traffic spikes that can cause an application brown out. This can be more challenging to handle. This post dives deep into recent changes to how Amazon ECS handles task health issues and task replacement, and how these changes increase the availability of your Amazon ECS orchestrated applications. Task health evaluation Amazon ECS evaluates the health of a task based on a few criteria: First, for a task to be healthy all containers that are marked as essential must be running. Every Amazon ECS task must have at least one essential container. Best practice containers run a single application process, and if that process ends because of a critical runtime exception, then the container stops. If that stopped container was marked as essential, then the entire task is considered to be unhealthy and the task must be replaced. You can use the Amazon ECS Task Definition to configure an optional internal health check command that the Amazon ECS agent runs inside the container periodically. This command is expected to return a zero exit code that indicates success. If it returns a non-zero exit code, then that indicates failure. The container is considered unhealthy and an unhealthy essential container causes the task to be considered unhealthy, which causes Amazon ECS to replace the task. You can use the Amazon ECS service to configure attachments between your application container and other AWS services. For example, you can connect your container deployment to an Amazon Elastic Load Balancer (ELB) or AWS Cloud Map. These services perform their own external health checks. For example, ELB periodically attempts to open a connection to your container and send a test request. If it isn’t possible to open that connection, your container returns an unexpected response, or your container takes too long to respond, then the ELB considers the target container to be unhealthy. Amazon ECS also considers this external health status when deciding whether an Amazon ECS task is healthy or unhealthy. An unhealthy ELB health check causes the task to be replaced. For a task to be healthy, all sources of health status must evaluate as healthy. If any of the sources return an unhealthy status, then the Amazon ECS task is considered unhealthy and it will be replaced. Task replacement behavior Replacing an Amazon ECS task is something that happens in two main circumstances: During a fresh deployment triggered by the UpdateService API call. Any existing tasks that are part of the previous deployment must be replaced by new tasks that are part of the new deployment. When an existing task inside an active deployment becomes unhealthy. Unhealthy tasks must be replaced in order to maintain the desired count of healthy tasks. From early on in the history of Amazon ECS, the behavior of task replacement during rolling deployments has been configurable using two properties of the Amazon ECS service: maximumPercent – This controls how many additional tasks Amazon ECS can launch above the service’s desired count. For example, if the maximumPercent is 200% and the desired count for the service is eight tasks, then Amazon ECS can launch additional tasks up to a total of 16 tasks. minimumHealthyPercent – This controls the percentage that an Amazon ECS service is allowed to go below the desired count during a deployment. For example, if minimumHealthyPercent is 75% and the desired count for the service is eight tasks, then Amazon ECS can stop two tasks, reducing the service deployment down to six running tasks. The maximumPercent and minimumHealthyPercent have functioned for many years as efficient controls for fine tuning the behavior of rolling deployments when running Amazon ECS tasks on Amazon Elastic Compute Cloud (Amazon EC2) capacity. However, these deployment controls don’t make as much sense in a world where more and more Amazon ECS users are choosing serverless AWS Fargate capacity. In most cases, modern applications don’t require Amazon ECS to go below the desired count of running tasks during a rolling deployment or reduce the number of additional tasks being launched during a rolling deployment, because AWS Fargate utilization isn’t constrained by how many underlying Amazon EC2 instances you have registered into your cluster. Additionally, the maximumPercent and minimumHealthyPercent controls were originally ignored when it came to replacing unhealthy tasks. If tasks became unhealthy, then your service’s desired count could dip well below the threshold defined by minimumHealthyPercent. For example, if you were running eight tasks and four of them became unhealthy, then Amazon ECS would terminate the four unhealthy tasks and launch four replacement tasks. The number of running tasks would temporarily dip to 50% of the desired count. Updates to how Amazon ECS replaces unhealthy tasks As of October 20, 2023, Amazon ECS now uses your maximumPercent whenever possible when replacing unhealthy tasks. Let’s look at a few scenarios to understand how this works: Crashing tasks You’re running a service with a desired count of eight tasks and maximum healthy percent of 200%. Four of your eight tasks encounter critical runtime exceptions. Their processes crash and exit, which causes an essential container to exit. Amazon ECS observes that four of the eight tasks have gone unhealthy because their essential container exited. Unfortunately, Amazon ECS can’t avoid the healthy percentage dipping below 100% because the unhealthy container crashed. The running task count dips to 50% of the desired count briefly, but Amazon ECS launches four replacement tasks as quickly as possible to bring the number of running tasks back up to the desired count of eight tasks. Frozen tasks You’re running a service with a desired count of eight tasks and maximum healthy percent of 200%. Because of an endless loop in your code four of your eight tasks freeze up, but the processes stay running. The attached load balancer that is sending health check requests to the service observes that the target container is no longer responsive to health check requests, so it marks the target as unhealthy. Amazon ECS considers those four frozen tasks to be unhealthy. The maximum percent for the service allows it to go up to 16 tasks. Amazon ECS launches four additional replacement tasks for the four unhealthy tasks, making a total of 12 running tasks. Once the four additional tasks have become healthy, Amazon ECS stops the four unhealthy tasks, which brings the running task count back down to the desired count of eight tasks. Overburdened tasks You’re running a service with a desired count of eight tasks and maximum healthy percent of 150%. The service has autoscaling rules attached to it. It also has a load balancer attached to it, and a large spike of traffic arrives via the load balancer. The spike of traffic is so large that response time from the task rises dramatically. As a result of high response time, the load balancer health check fails and the ELB marks all eight targets as unhealthy. The ELB fails open and continues distributing traffic to all the targets as there are no healthy targets in the load balancer. Amazon ECS observes that all eight tasks are unhealthy. As a result, Amazon ECS wants to replace these unhealthy tasks. The maximum percent of 150% allows the service to go up to 12 running tasks. Therefore, Amazon ECS avoids stopping the unhealthy running tasks immediately. Instead, it launches four replacement tasks in parallel with the existing eight unhealthy tasks. Fortunately these four additional tasks give the ELB more targets to distribute traffic across, and all 12 of the running tasks stabilize in health as they are now able to handle the incoming traffic without timing out. Amazon ECS observes that there are now 12 healthy running tasks. Simultaneously with this, an Application Auto Scaling rule has kicked in based on seeing high CPU utilization by the original eight running tasks. The rule has updated the desired count for the Amazon ECS service from eight running tasks to 10 running tasks. Therefore, Amazon ECS only stops two of the 12 healthy running tasks, which reduces the task count back down to its current desired count of 10 running tasks. Limited maximum percent You’re running a service with a desired count of eight tasks and because of downstream limits or infrastructure constraints you have set a maximum percent of 100%. This doesn’t allow Amazon ECS to launch any additional tasks in parallel with your eight running tasks. If a task from this deployment freezes, or becomes overburdened and starts failing health checks, then Amazon ECS needs to replace it. Amazon ECS stops the unhealthy task first, then launches a replacement task after the unhealthy task has been stopped. This means the running task count still temporarily dips below the desired count. Task fails health checks during a rolling deployment You’re running a service with a desired count of eight tasks and a maximum healthy percent of 150%. You’re doing a rolling deployment to update your running tasks to be based off of a new task definition. Because the maximum healthy percent is 150%, this allows Amazon ECS to launch additional tasks in parallel with your currently running tasks. The rolling deployment has already triggered four additional task launches. The service currently has 12 running tasks: eight old tasks and four new tasks. During this rolling deployment, some of the old tasks begin failing a health check due to an unexpected bug. Because there’s an active rolling deployment occurring, Amazon ECS resorts to terminating unhealthy tasks immediately and replacing them with instances of the new task as quickly as possible. During a rolling deployment, Amazon ECS always try to replace failing tasks with tasks from the new active deployment. Health checks and responsive absorption of workload spikes Previously, Amazon ECS always stopped unhealthy tasks first, then launched a replacement task. This behavior made sense in a world where tasks were binpacked densely onto a statically sized cluster of Amazon EC2 instances that had no room to launch a replacement task without stopping an existing task. But more modern container workloads are now running using serverless AWS Fargate capacity. There’s no need to stop an unhealthy running task to make room for its replacement, as AWS Fargate can supply as much on-demand container capacity as needed. Additionally, many customers of Amazon ECS on Amazon EC2 are now using Amazon ECS capacity providers to launch additional Amazon EC2 instances on demand, rather than deploying to statically sized clusters of Amazon EC2 instances. Therefore, Amazon ECS now prioritizes using the maximumPercent for a service, and whenever possible it keeps unhealthy tasks running until after their replacements have become healthy. Additionally, the new Amazon ECS task replacement behavior helps prevent runaway task termination. In some cases, a large workload spike could cause a few tasks from the deployment to become unhealthy, which triggered their replacement. However, when Amazon ECS stopped unhealthy tasks in order to launch a replacement, the load balancer would shift more workload onto the remaining healthy tasks, which caused them to go unhealthy. In quick succession, all healthy tasks would be overwhelmed with workload that caused a cascade of runaway health check failures until every task had gone unhealthy. Eventually, Application Auto Scaling rules would kick in and scale up the deployment to a large enough size to handle the workload. But in most cases, a traffic spike causes the load balancer health checks to fail before it triggers aggregate resource consumption-based autoscaling. Auto scaling rules need to observe at least one minute of high average resource utilization before they react by scaling out the container deployment. However, an overburdened task may begin failing load balancer health checks immediately. In the scenario where your tasks are unhealthy because they are dealing with a large spike of incoming workload, the new task replacement behavior of Amazon ECS dramatically improves availability and reliability of your service. Amazon ECS catches health check failures and proactively launches a parallel replacement task that can help absorb the incoming workload spike before autoscaling rules even trigger. Once autoscaling rules trigger, the replacement task and the original task are both retained, if they are both healthy and if they fulfill the current desired task count of the service. Conclusion In this post, we explained new Amazon ECS behavior when handling unhealthy tasks. As more customers adopt Amazon ECS for their mission critical applications, we are always happy to tackle challenging new orchestration problems at scale. This updated task replacement behavior is designed to help serve the needs of customers both small and large. It helps keep your container deployments online and available—even in adverse circumstances such as application failure or traffic spikes. Please visit the Amazon ECS public roadmap for more info on additional upcoming features for Amazon ECS or to create your own issue to request a change or new feature. For more info on Amazon ECS scheduler behavior, see the official documentation, under Service Scheduler Concepts. View the full article
-
How Bad is Bad Code: The ROI of Fixing Broken Spark Code Once in a while I stumble upon Spark code that looks like it has been written by a Java developer and it never fails to make me wince because it is a missed opportunity to write elegant and efficient code: it is verbose, difficult to read, and full of distributed processing anti-patterns. One such occurrence happened a few weeks ago when one of my colleagues was trying to make some churn analysis code downloaded from GitHub work. I was looking for some broken code to add a workshop to our Spark Performance Tuning class and write a blog post about, and this fitted the bill perfectly. For convenience purposes I chose to limit the scope of this exercise to a specific function that prepares the data prior to the churn analysis. Here it is in all its glorious juiciness: from pyspark.sql.functions import udf,col from pyspark.sql.types import IntegerType def prepare_data_baseline(df): ''' Function to prepare the given dataframe and divid into groups of churn and non churn users while returnng the original datafrme with a new label column into a spark dataframe. Args: df- the original dataframe Returns: df - dataframe of the dataset with new column of churn added stayed - dataframe of the non -churn user's activities only. all_cancelled - dataframe of the churn user's activities only. ''' #Define a udf for cancelled canceled = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0) #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.withColumn('Churn', canceled(df.page)) #Dataframe of all that cancelled cancelled_df = df.select('page', 'userId','Churn').where(col('churn')==1) #List of cancelled list_cancelled = cancelled_df.select('userId').distinct().collect()#list of cancelled users #Put in a list format gb = []#temporary variable to store lists for row in list_cancelled: gb.append(row[0]) canc_list = [x for x in gb if x != '']#remove the invalid users #Total number of users who canceled print(f"The number of churned users is: {len(canc_list)}") #List of staying users all_users = df.select('userId').distinct().collect() gh = []#a temporary variable to store all users for row in all_users: gh.append(row[0]) stayed_list = set(gh)-set(gb)#list of users staying stayed_list = [x for x in stayed_list if x != '']#remove the invalid users #Total number of users who did not cancel print(f"The number of staying users is: {len(stayed_list)}") #Store both canceled and staying users in new dataframes containng all actions they undertook all_cancelled = df.select("*").where(col('userId').isin(canc_list)) stayed = df.select('*').where(col('userId').isin(stayed_list)) #Redefine a udf for churn churned = udf(lambda x: 0 if x in stayed_list else 1, IntegerType()) #Creat new column which will be our label column to track all users that eventually cancelled their subscription df = df.withColumn('label', churned(col('userId'))) return df, stayed, all_cancelled In this blog post, I will outline the steps I took to fix this code, and then measure the resulting difference in execution performance. In the process, I will explicitly state the best practices I will implement. Let’s jump in this rabbit hole! Define a non-regression test harness Stop! Resist the temptation to start tweaking the code right away! You want to be able to: Make sure that you do not introduce a regression by fixing the code Measure the improvements in terms of performance This is where limiting the scope of the analysis to a function came in handy: it allowed me to use ad hoc and simple tooling: I isolated the original function in a prepare_data_baseline function in a separate prepareData_baseline.py file I created a new file called prepare_data.py with the new version of the prepare_data function I measured the time to perform the processing using the time library And I compared the resulting DataFrames with subtract Because lazy evaluation defers the time when the code is actually executed, I added code that saves the DataFrames to files, thus forcing the materialization of the DataFrames via the execution of the code. I also added these lines in the scope of the time measurement. And this is what it looks like: from pyspark.sql import SparkSession import time, datetime from prepareData import prepare_data from prepareData_baseline import prepare_data_baseline spark = SparkSession \ .builder \ .appName("Churn Analysis Data Preparation Test Harness") \ .getOrCreate() spark.sparkContext.setLogLevel("ERROR") spark.conf.set('spark.sql.adaptive.enabled','false') print(f"AQE enabled: {spark.conf.get('spark.sql.adaptive.enabled')}") df = spark.read.json('data/mini_sparkify_event_data.json') #Baseline version process_time_start = time.perf_counter() # Start timer: begin processing df_baseline, stayed_baseline, all_cancelled_baseline = prepare_data_baseline(df) df_baseline.write.mode("overwrite").json('data/df_baseline') stayed_baseline.write.mode("overwrite").json('data/stayed_baseline') all_cancelled_baseline.write.mode("overwrite").json('data/all_cancelled_baseline') process_time_end = time.perf_counter() # Stop timer: end processing process_time = process_time_end - process_time_start # Elapsed time for processing totalTime = datetime.timedelta(seconds = process_time) print(f"Preparing data took with the baseline version took {totalTime}") #New version process_time_start = time.perf_counter() # Start timer: begin processing df, stayed, all_cancelled = prepare_data(df) df.write.mode("overwrite").json('data/df') stayed.write.mode("overwrite").json('data/stayed') all_cancelled.write.mode("overwrite").json('data/all_cancelled') process_time_end = time.perf_counter() # Stop timer: end processing process_time = process_time_end - process_time_start # Elapsed time for processing totalTime = datetime.timedelta(seconds = process_time) print(f"Preparing data took with the new version took {totalTime}") # Regression Testing def diffDataFrame(df1,df2): return df1.subtract(df2).count() print(f"New processing introduced {diffDataFrame(df,df_baseline)} differences in df.") print(f"New processing introduced {diffDataFrame(all_cancelled,all_cancelled_baseline)} differences in all_cancelled.") print(f"New processing introduced {diffDataFrame(stayed,stayed_baseline)} differences in stayed.") spark.stop() Retro document the requirements This step was quite easy because of the comments that were present in the initial code. This function: Takes a DataFrame containing activities from users, splits it into two groups of activities: activities from users who eventually churned and activities from users who did not, and adds a “label” column to the input DataFrame to tag activities that belong to users that eventually churned (1 if user churned 0 otherwise). If that sounds suspiciously redundant to you I agree. But let’s table that issue for now; we will revisit it once we are satisfied with our new version of the code. Refactor the code The main problem of the code is the use of Python lists to achieve the required results. Those lists are created by collecting the DataFrames onto the Spark driver where the for loops will be processed, making this code not scalable: above a certain number of users the driver memory might become overwhelmed and the program will crash. Also this choice prevents the code from leveraging all the optimizations that come with DataFrames operations. Then the code uses plain Pyspark UDFs for which you incur a performance penalty because of the need to: Deserialize the Spark DataFrame to its Java representation Transfer the resulting Java object to the Python process where the UDF will be executed Serialize back the output of the function to Spark format Beware of the cost of Pyspark UDFs There are ways to mitigate those issues by using PyArrow and vector UDFs when you really need to use them, but this is not one of those times. First, the function creates a “Churn” column, which I guess is for convenience purposes. A user is identified as “churned” if they have been to the “Cancellation Confirmation” page. This is achieved with a withColumn call and a UDF. #Define a udf for cancelled canceled = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0) #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.withColumn('Churn', canceled(df.page)) There is no need for a UDF in that case, those lines of code can be replaced by a simple column expression like so: #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.withColumn('Churn', (df.page == 'Cancellation Confirmation').cast('integer').cast('string')) I believe the correct type for that new column would be boolean, but for non-regression purposes I had to cast it to a string of 0 or 1. Then the author proceeds to create two lists: one for the users that churned and one for the users that stayed. Since my goal is to avoid those lists, I am going to create the corresponding DataFrames instead: all_users = df.select(df.userId).distinct().where(df.userId != '') churned_users = df.where(df.Churn == '1').select(df.userId).distinct().where(df.userId != '') stayed_users = all_users.subtract(churned_users) First I create a DataFrame of all the non-empty users, then the DataFrame of users that churned, and define the users that stayed as the difference between the two. The author uses the awkwardly created lists together with UDFs to create the all_cancelled and stayed DataFrames. Here is the code for the first one: #List of cancelled list_cancelled = cancelled_df.select('userId').distinct().collect()#list of cancelled users #Put in a list format gb = []#temporary variable to store lists for row in list_cancelled: gb.append(row[0]) canc_list = [x for x in gb if x != '']#remove the invalid users … all_cancelled = df.select("*").where(col('userId').isin(canc_list)) I realize now that the “Put in list format” loop is probably unnecessary. To create the same DataFrame I just do the following: all_cancelled = df.join(churned_users,'userId') The same technique is applied to create the stayed DataFrame: stayed = df.join(stayed_users,'userId') Last the author adds the “label” column to the main DataFrame by using a UDF: #Redefine a udf for churn churned = udf(lambda x: 0 if x in stayed_list else 1, IntegerType()) #Creat new column which will be our label column to track all users that eventually cancelled their subscription df = df.withColumn('label', churned(col('userId'))) Instead I just use a union: df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0))) That triggered a regression because I did not include the null users. I wonder what use could be made of records with null users for training a model to predict churn from users’ behavior, but for non-regression purposes I added those too: empty_users = df.where(df.userId.isNull()) … #Add empty users for non regression purposes df_label = df_label.union(empty_users.withColumn('label',lit(1))) Last, I also had to reorder the columns of my DataFrames for my simple non-regression tests to be successful: # Sort the columns columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn','label'] df_label_sorted = df_label.select(columns) columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn'] all_cancelled_sorted = all_cancelled.select(columns) stayed_sorted = stayed.select(columns) This is my full version of the function: from pyspark.sql.functions import lit def prepare_data(df): ''' Function to prepare the given dataframe and divide into groups of churn and non churn users while returning the original DataFrame with a new label column into a spark dataframe. Args: df- the original dataframe Returns: df - dataframe of the dataset with new column of churn added stayed - dataframe of the non -churn user's activities only. all_cancelled - dataframe of the churn user's activities only. ''' #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.withColumn('Churn', (df.page == 'Cancellation Confirmation').cast('integer').cast('string')) all_users = df.select(df.userId).distinct().where(df.userId != '') churned_users = df.where(df.Churn == '1').select(df.userId).distinct().where(df.userId != '') stayed_users = all_users.subtract(churned_users) empty_users = df.where(df.userId.isNull()) #Store both canceled and staying users in new DataFrames containing all actions they undertook all_cancelled = df.join(churned_users,'userId') stayed = df.join(stayed_users,'userId') df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0))) #Add empty users for non regression purposes df_label = df_label.union(empty_users.withColumn('label',lit(1))) # Sort the columns columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn','label'] df_label_sorted = df_label.select(columns) columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn'] all_cancelled_sorted = all_cancelled.select(columns) stayed_sorted = stayed.select(columns) #Total number of users who canceled print(f"The number of churned users is: {churned_users.count()}") #Total number of users who did not cancel print(f"The number of staying users is: {stayed_users.count()}") return df_label_sorted, stayed_sorted, all_cancelled_sorted Non regression and performance I was able to verify that I had not introduced any regression in my version of the function on my desktop with Spark 3.3. In order to get meaningful performance measurements I needed to use the full 12G JSON dataset. Otherwise, with small data, most of the time is spent on overhead and results vary wildly. So I switched to our CML data service using Spark 3.2 and adapted the code accordingly. CML uses Spark on Kubernetes and the default is dynamic allocation of executors. I had to disable that to get a stable environment and thus, meaningful measures: import time, datetime from prepareData import prepare_data from prepareData_baseline import prepare_data_baseline from prepareData_improved import prepare_data_improved import cml.data_v1 as cmldata from env import S3_ROOT, S3_HOME, CONNECTION_NAME conn = cmldata.get_connection(CONNECTION_NAME) spark = ( SparkSession.builder.appName(conn.app_name) .config("spark.sql.hive.hwc.execution.mode", "spark") .config("spark.dynamicAllocation.enabled","false") .config("spark.executor.instances", 3) .config("spark.executor.memory","32g") .config("spark.executor.cores",4) .config("spark.yarn.access.hadoopFileSystems", conn.hive_external_dir) .getOrCreate() ) spark.sparkContext.setLogLevel("ERROR") spark.conf.set('spark.sql.adaptive.enabled','true') print(f"AQE enabled: {spark.conf.get('spark.sql.adaptive.enabled')}") That got me the desired result: I then found out that the full 12G data set contained a corrupt record that I had to deal with, and while I was at it I converted the file to Parquet format to save me some time: Convert early to compressed columnar formats (Parquet, ORC) I created a function that performs the tests to avoid repetitive code in which I also added calls to setJobGroup and setJobDescription to improve the readability of the Spark UI: def measureDataPreparation(df,f,versionName): spark.sparkContext.setJobGroup(versionName,"") # Start timer: begin processing process_time_start = time.perf_counter() df, stayed, all_cancelled = f(df) spark.sparkContext.setJobDescription("Write /data/df") df.write.mode("overwrite").json(S3_HOME + '/data/df') spark.sparkContext.setJobDescription("Write /data/stayed") stayed.write.mode("overwrite").json(S3_HOME + '/data/stayed') spark.sparkContext.setJobDescription("Write /data/all_cancelled") all_cancelled.write.mode("overwrite").json(S3_HOME + '/data/all_cancelled') # Stop timer: end processing process_time_end = time.perf_counter() # Elapsed time for processing process_time = process_time_end - process_time_start totalTime = datetime.timedelta(seconds = process_time) print(f"Preparing data with the {versionName} took {totalTime}") Use setJobGroup and setJobDescription to improve readability of the Spark UI And this is how the Spark UI looks as a result: Since I had established that I had not introduced any regression, I also removed the regression tests. Here is the the relevant part of the session’s output: measureDataPreparation(df,prepare_data_baseline,"baseline version") The number of churned users is: 4982 The number of staying users is: 17282 Preparing data with the baseline version took 0:09:11.799036 measureDataPreparation(df,prepare_data,"no regression version") The number of churned users is: 4982 The number of staying users is: 17282 Preparing data with the no regression version took 0:01:48.224514 Great success! The new version is more than four times more efficient! Further improvements Since I no longer need to test for non regression I can remove the sorting of the columns. I can also remove the code that prints the counts of the churned and stayed users. This code does not belong in a function that very likely will run unattended in a data pipeline. It triggers distributed execution to compute results that nobody will see. It should be left to the code that calls the function to log that kind of information or not. This is also an instance of breaking the following rule: Remove code that helped debugging with count(), take() or show() in production I checked the rest of the initial code, and after exhaustive data exploration and right before splitting the data set for training purposes, the author does remove the rows with null users. There is no point in carrying around this extra baggage all this time. In fact this breaks another rule of big data processing: Filter early Finally, I removed the casting of the “Churn” column and left it as a boolean. I also checked that it was not used outside of this function and renamed it “churn” because I hated that uppercase “C” with all the passion of a thousand white hot blazing suns. This is the final version of the code: from pyspark.sql.functions import lit def prepare_data_improved(df): ''' Function to prepare the given DataFrame and divide into groups of churn and non churn users while returning the original DataFrame with a new label column into a Spark DataFrame. Args: df- the original DataFrame Returns: df - DataFrame of the dataset with new column of churn added stayed - DataFrame of the non -churn user's activities only. all_cancelled - DataFrame of the churn user's activities only. ''' #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.where(df.userId != '').withColumn('churn', (df.page == 'Cancellation Confirmation')) all_users = df.select(df.userId).distinct() churned_users = df.where(df.churn).select(df.userId).distinct() stayed_users = all_users.subtract(churned_users) #Store both canceled and staying users in new DataFrames containing all actions they undertook all_cancelled = df.join(churned_users,'userId') stayed = df.join(stayed_users,'userId') df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0))) return df_label, stayed, all_cancelled Conclusion Now that I have achieved non regression using DataFrame exclusively, and that I also have an improved version, I should be able to measure the benefits of using the Spark cache and of the Adaptive Query Execution engine. Here are the full results: In this limited experiment, the number one factor that influences the performance of the execution is the refactoring of the Spark code to remove the distributed processing anti-patterns. Caching the data, improving the code further, or using AQE all bring marginal improvements compared to the elimination of the technical debt. The return on investment of training is always a thorny issue because of the difficulty to conveniently measure it in a spreadsheet but, with this experiment, I hope I have shown that the lack of skills should be a major concern for any organization running Spark workloads. If you’d like to get hands-on experience with Spark 3.2, as well as other tools and techniques for making your Spark jobs run at peak performance, sign up for Cloudera’s Apache Spark Performance Tuning course. If you need an introduction to AQE kindly refer to my previous blog post. The post Spark Technical Debt Deep Dive appeared first on Cloudera Blog. View the full article
-
Forum Statistics
67.4k
Total Topics65.3k
Total Posts