Search the Community
Showing results for tags 'aws glue'.
-
AWS Glue is a fully managed, serverless data integration service provided by Amazon Web Services (AWS) that uses Apache Spark as one of its backend processing engines (as of this writing, you can use Python Shell, Spark, or Ray). Data skew occurs when the data being processed is not evenly distributed across the Spark cluster, causing some tasks to take significantly longer to complete than others. This can lead to inefficient resource utilization, longer processing times, and ultimately, slower performance. Data skew can arise from various factors, including uneven data distribution, skewed join keys, or uneven data processing patterns. Even though the biggest issue is often having nodes running out of disk during shuffling, which leads to nodes falling like dominoes and job failures, it’s also important to mention that data skew is hidden. The stealthy nature of data skew means it can often go undetected because monitoring tools might not flag an uneven distribution as a critical issue, and logs don’t always make it evident. As a result, a developer may observe that their AWS Glue jobs are completing without apparent errors, yet the system could be operating far from its optimal efficiency. This hidden inefficiency not only increases operational costs due to longer runtimes but can also lead to unpredictable performance issues that are difficult to diagnose without a deep dive into the data distribution and task run patterns. For example, in a dataset of customer transactions, if one customer has significantly more transactions than the others, it can cause a skew in the data distribution. Identifying and handling data skew issues is key to having good performance on Apache Spark and therefore on AWS Glue jobs that use Spark as a backend. In this post, we show how you can identify data skew and discuss the different techniques to mitigate data skew. How to detect data skew When an AWS Glue job has issues with local disks (split disk issues), doesn’t scale with the number of workers, or has low CPU usage (you can enable Amazon CloudWatch metrics for your job to be able to see this), you may have a data skew issue. You can detect data skew with data analysis or by using the Spark UI. In this section, we discuss how to use the Spark UI. The Spark UI provides a comprehensive view of Spark applications, including the number of tasks, stages, and their duration. To use it you need to enable Spark UI event logs for your job runs. It is enabled by default on Glue console and once enabled, Spark event log files will be created during the job run and stored in your S3 bucket. Then, those logs are parsed, and you can use the AWS Glue serverless Spark UI to visualize them. You can refer to this blogpost for more details. In those jobs where the AWS Glue serverless Spark UI does not work as it has a limit of 512 MB of logs, you can set up the Spark UI using an EC2 instance. You can use the Spark UI to identify which tasks are taking longer to complete than others, and if the data distribution among partitions is balanced or not (remember that in Spark, one partition is mapped to one task). If there is data skew, you will see that some partitions have significantly more data than others. The following figure shows an example of this. We can see that one task is taking a lot more time than the others, which can indicate data skew. Another thing that you can use is the summary metrics for each stage. The following screenshot shows another example of data skew. These metrics represent the task-related metrics below which a certain percentage of tasks completed. For example, the 75th percentile task duration indicates that 75% of tasks completed in less time than this value. When the tasks are evenly distributed, you will see similar numbers in all the percentiles. When there is data skew, you will see very biased values in each percentile. In the preceding example, it didn’t write many shuffle files (less than 50 MiB) in Min, 25th percentile, Median, and 75th percentile. However, in Max, it wrote 460 MiB, 10 times the 75th percentile. It means there was at least one task (or up to 25% of tasks) that wrote much bigger shuffle files than the rest of the tasks. You can also see that the duration of the tax in Max is 46 seconds and the Median is 2 seconds. These are all indicators that your dataset may have data skew. AWS Glue interactive sessions You can use interactive sessions to load your data from the AWS Glue Data Catalog or just use Spark methods to load the files such as Parquet or CSV that you want to analyze. You can use a similar script to the following to detect data skew from the partition size perspective; the more important issue is related to data skew while shuffling, and this script does not detect that kind of skew: from pyspark.sql.functions import spark_partition_id, asc, desc #input_dataframe being the dataframe where you want to check for data skew partition_sizes_df=input_dataframe\ .withColumn("partitionId", spark_partition_id())\ .groupBy("partitionId")\ .count()\ .orderBy(asc("count"))\ .withColumnRenamed("count","partition_size") #calculate average and standar deviation for the partition sizes avg_size = partition_sizes_df.agg({"partition_size": "avg"}).collect()[0][0] std_dev_size = partition_sizes_df.agg({"partition_size": "stddev"}).collect()[0][0] """ the code calculates the absolute difference between each value in the "partition_size" column and the calculated average (avg_size). then, calculates twice the standard deviation (std_dev_size) and use that as a boolean mask where the condition checks if the absolute difference is greater than twice the standard deviation in order to mark a partition 'skewed' """ skewed_partitions_df = partition_sizes_df.filter(abs(partition_sizes_df["partition_size"] - avg_size) > 2 * std_dev_size) if skewed_partitions_df.count() > 0: skewed_partitions = [row["partition_id"] for row in skewed_partitions_df.collect()] print(f"The following partitions have significantly different sizes: {skewed_partitions}") else: print("No data skew detected.") You can calculate the average and standard deviation of partition sizes using the agg() function and identify partitions with significantly different sizes using the filter() function, and you can print their indexes if any skewed partitions are detected. Otherwise, the output prints that no data skew is detected. This code assumes that your data is structured, and you may need to modify it if your data is of a different type. How to handle data skew You can use different techniques in AWS Glue to handle data skew; there is no single universal solution. The first thing to do is confirm that you’re using latest AWS Glue version, for example AWS Glue 4.0 based on Spark 3.3 has enabled by default some configs like Adaptative Query Execution (AQE) that can help improve performance when data skew is present. The following are some of the techniques that you can employ to handle data skew: Filter and perform – If you know which keys are causing the skew, you can filter them out, perform your operations on the non-skewed data, and then handle the skewed keys separately. Implementing incremental aggregation – If you are performing a large aggregation operation, you can break it up into smaller stages because in large datasets, a single aggregation operation (like sum, average, or count) can be resource-intensive. In those cases, you can perform intermediate actions. This could involve filtering, grouping, or additional aggregations. This can help distribute the workload across the nodes and reduce the size of intermediate data. Using a custom partitioner – If your data has a specific structure or distribution, you can create a custom partitioner that partitions your data based on its characteristics. This can help make sure that data with similar characteristics is in the same partition and reduce the size of the largest partition. Using broadcast join – If your dataset is small but exceeds the spark.sql.autoBroadcastJoinThreshold value (default is 10 MB), you have the option to either provide a hint to use broadcast join or adjust the threshold value to accommodate your dataset. This can be an effective strategy to optimize join operations and mitigate data skew issues resulting from shuffling large amounts of data across nodes. Salting – This involves adding a random prefix to the key of skewed data. By doing this, you distribute the data more evenly across the partitions. After processing, you can remove the prefix to get the original key values. These are just a few techniques to handle data skew in PySpark; the best approach will depend on the characteristics of your data and the operations you are performing. The following is an example of joining skewed data with the salting technique: from pyspark.sql import SparkSession from pyspark.sql.functions import lit, ceil, rand, concat, col # Define the number of salt values num_salts = 3 # Function to identify skewed keys def identify_skewed_keys(df, key_column, threshold): key_counts = df.groupBy(key_column).count() return key_counts.filter(key_counts['count'] > threshold).select(key_column) # Identify skewed keys skewed_keys = identify_skewed_keys(skewed_data, "key", skew_threshold) # Splitting the dataset skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "inner") non_skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "left_anti") # Apply salting to skewed data skewed_data_subset = skewed_data_subset.withColumn("salt", ceil((rand() * 10) % num_salts)) skewed_data_subset = skewed_data_subset.withColumn("salted_key", concat(col("key"), lit("_"), col("salt"))) # Replicate skewed rows in non-skewed dataset def replicate_skewed_rows(df, keys, multiplier): replicated_df = df.join(keys, ["key"]).crossJoin(spark.range(multiplier).withColumnRenamed("id", "salt")) replicated_df = replicated_df.withColumn("salted_key", concat(col("key"), lit("_"), col("salt"))) return replicated_df.drop("salt") replicated_non_skewed_data = replicate_skewed_rows(non_skewed_data, skewed_keys, num_salts) # Perform the JOIN operation on the salted keys for skewed data result_skewed = skewed_data_subset.join(replicated_non_skewed_data, "salted_key") # Perform regular join on non-skewed data result_non_skewed = non_skewed_data_subset.join(non_skewed_data, "key") # Combine results final_result = result_skewed.union(result_non_skewed) In this code, we first define a salt value, which can be a random integer or any other value. We then add a salt column to our DataFrame using the withColumn() function, where we set the value of the salt column to a random number using the rand() function with a fixed seed. The function replicate_salt_rows is defined to replicate each row in the non-skewed dataset (non_skewed_data) num_salts times. This ensures that each key in the non-skewed data has matching salted keys. Finally, a join operation is performed on the salted_key column between the skewed and non-skewed datasets. This join is more balanced compared to a direct join on the original key, because salting and replication have mitigated the data skew. The rand() function used in this example generates a random number between 0–1 for each row, so it’s important to use a fixed seed to achieve consistent results across different runs of the code. You can choose any fixed integer value for the seed. The following figures illustrate the data distribution before (left) and after (right) salting. Heavily skewed key2 identified and salted into key2_0, key2_1, and key2_2, balancing the data distribution and preventing any single node from being overloaded. After processing, the results can be aggregated back, so that that the final output is consistent with the unsalted key values. Other techniques to use on skewed data during the join operation When you’re performing skewed joins, you can use salting or broadcasting techniques, or divide your data into skewed and regular parts before joining the regular data and broadcasting the skewed data. If you are using Spark 3, there are automatic optimizations for trying to optimize Data Skew issues on joins. Those can be tuned because they have dedicated configs on Apache Spark. Conclusion This post provided details on how to detect data skew in your data integration jobs using AWS Glue and different techniques for handling it. Having a good data distribution is key to achieving the best performance on distributed processing systems like Apache Spark. Although this post focused on AWS Glue, the same concepts apply to jobs you may be running on Amazon EMR using Apache Spark or Amazon Athena for Apache Spark. As always, AWS welcomes your feedback. Please leave your comments and questions in the comments section. About the Authors Salim Tutuncu is a Sr. PSA Specialist on Data & AI, based from Amsterdam with a focus on the EMEA North and EMEA Central regions. With a rich background in the technology sector that spans roles as a Data Engineer, Data Scientist, and Machine Learning Engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses leveraging the AWS Platform, particularly in Data and AI use cases. Angel Conde Manjon is a Sr. PSA Specialist on Data & AI, based in Madrid, and focuses on EMEA South and Israel. He has previously worked on research related to Data Analytics and Artificial Intelligence in diverse European research projects. In his current role, Angel helps partners develop businesses centered on Data and AI. View the full article
-
Today, we’re excited to announce general availability of Amazon Q data integration in AWS Glue. Amazon Q data integration, a new generative AI-powered capability of Amazon Q Developer, enables you to build data integration pipelines using natural language. This reduces the time and effort you need to learn, build, and run data integration jobs using AWS Glue data integration engines. Tell Amazon Q Developer what you need in English, it will return a complete job for you. For example, you can ask Amazon Q Developer to generate a complete extract, transform, and load (ETL) script or code snippet for individual ETL operations. You can troubleshoot your jobs by asking Amazon Q Developer to explain errors and propose solutions. Amazon Q Developer provides detailed guidance throughout the entire data integration workflow. Amazon Q Developer helps you learn and build data integration jobs using AWS Glue efficiently by generating the required AWS Glue code based on your natural language descriptions. You can create jobs that extract, transform, and load data that is stored in Amazon Simple Storage Service (Amazon S3), Amazon Redshift, and Amazon DynamoDB. Amazon Q Developer can also help you connect to third-party, software as a service (SaaS), and custom sources. With general availability, we added new capabilities for you to author jobs using natural language. Amazon Q Developer can now generate complex data integration jobs with multiple sources, destinations, and data transformations. It can generate data integration jobs for extracts and loads to S3 data lakes including file formats like CSV, JSON, and Parquet, and ingestion into open table formats like Apache Hudi, Delta, and Apache Iceberg. It generates jobs for connecting to over 20 data sources, including relational databases like PostgreSQL, MySQL and Oracle; data warehouses like Amazon Redshift, Snowflake, and Google BigQuery; NoSQL databases like DynamoDB, MongoDB and OpenSearch; tables defined in the AWS Glue Data Catalog; and custom user-supplied JDBC and Spark connectors. Generated jobs can use a variety of data transformations, including filter, project, union, join, and custom user-supplied SQL. Amazon Q data integration in AWS Glue helps you through two different experiences: the Amazon Q chat experience, and AWS Glue Studio notebook experience. This post describes the end-to-end user experiences to demonstrate how Amazon Q data integration in AWS Glue simplifies your data integration and data engineering tasks. Amazon Q chat experience Amazon Q Developer provides a conversational Q&A capability and a code generation capability for data integration. To start using the conversational Q&A capability, choose the Amazon Q icon on the right side of the AWS Management Console. For example, you can ask, “How do I use AWS Glue for my ETL workloads?” and Amazon Q provides concise explanations along with references you can use to follow up on your questions and validate the guidance. To start using the AWS Glue code generation capability, use the same window. On the AWS Glue console, start authoring a new job, and ask Amazon Q, “Please provide a Glue script that reads from Snowflake, renames the fields, and writes to Redshift.” You will notice that the code is generated. With this response, you can learn and understand how you can author AWS Glue code for your purpose. You can copy/paste the generated code to the script editor and configure placeholders. After you configure an AWS Identity and Access Management (IAM) role and AWS Glue connections on the job, save and run the job. When the job is complete, you can start querying the table exported from Snowflake in Amazon Redshift. Let’s try another prompt that reads data from two different sources, filters and projects them individually, joins on a common key, and writes the output to a third target. Ask Amazon Q: “I want to read data from S3 in Parquet format, and select some fields. I also want to read data from DynamoDB, select some fields, and filter some rows. I want to union these two datasets and write the results to OpenSearch.” The code is generated. When the job is complete, your index is available in OpenSearch and can be used by your downstream workloads. AWS Glue Studio notebook experience Amazon Q data integration in AWS Glue helps you author code in an AWS Glue notebook to speed up development of new data integration applications. In this section, we walk you through how to set up the notebook and run a notebook job. Prerequisites Before going forward with this tutorial, complete the following prerequisites: Set up AWS Glue Studio. Configure an IAM role to interact with Amazon Q. Attach the following policy to your IAM role for the AWS Glue Studio notebook: { "Version": "2012-10-17", "Statement": [ { "Sid": "CodeWhispererPermissions", "Effect": "Allow", "Action": [ "codewhisperer:GenerateRecommendations" ], "Resource": "*" } ] } Create a new AWS Glue Studio notebook job Create a new AWS Glue Studio notebook job by completing the following steps: On the AWS Glue console, choose Notebooks under ETL jobs in the navigation pane. Under Create job, choose Notebook. For Engine, select Spark (Python). For Options, select Start fresh. For IAM role, choose the IAM role you configured as a prerequisite. Choose Create notebook. A new notebook is created with sample cells. Let’s try recommendations using the Amazon Q data integration in AWS Glue to auto-generate code based on your intent. Amazon Q would help you with each step as you express an intent in a Notebook cell. Add a new cell and enter your comment to describe what you want to achieve. After you press Tab and Enter, the recommended code is shown. First intent is to extract the data: “Give me code that reads a Glue Data Catalog table”, followed by “Give me code to apply a filter transform with star_rating>3” and “Give me code that writes the frame into S3 as Parquet”. Similar to the Amazon Q chat experience, the code is recommended. If you press Tab, then the recommended code is chosen. You can learn more in User actions. You can run each cell by simply filling in the appropriate options for your sources in the generated code. At any point in the runs, you can also preview a sample of your dataset by simply using the show() method. Let’s now try to generate a full script with a single complex prompt. “I have JSON data in S3 and data in Oracle that needs combining. Please provide a Glue script that reads from both sources, does a join, and then writes results to Redshift” You may notice that, on the notebook, the Amazon Q data integration in AWS Glue generated the same code snippet that was generated in the Amazon Q chat. You can also run the notebook as a job, either by choosing Run or programmatically. Conclusion With Amazon Q data integration, you have an artificial intelligence (AI) expert by your side to integrate data efficiently without deep data engineering expertise. These capabilities simplify and accelerate data processing and integration on AWS. Amazon Q data integration in AWS Glue is available in every AWS Region where Amazon Q is available. To learn more, visit the product page, our documentation, and the Amazon Q pricing page. A special thanks to everyone who contributed to the launch of Amazon Q data integration in AWS Glue: Alexandra Tello, Divya Gaitonde, Andrew Kim, Andrew King, Anshul Sharma, Anshi Shrivastava, Chuhan Liu, Daniel Obi, Hirva Patel, Henry Caballero Corzo, Jake Zych, Jeremy Samuel, Jessica Cheng, , Keerthi Chadalavada, Layth Yassin, Maheedhar Reddy Chappidi, Maya Patwardhan, Neil Gupta, Raghavendhar Vidyasagar Thiruvoipadi, Rajendra Gujja, Rupak Ravi, Shaoying Dong, Vaibhav Naik, Wei Tang, William Jones, Daiyan Alamgir, Japson Jeyasekaran, Matt Sampson, Kartik Panjabi, Ranu Shah, Chuan Lei, Huzefa Rangwala, Jiani Zhang, Xiao Qin, Mukul Prasad, Alon Halevy, Brian Ross, Alona Nadler, Omer Zaki, Rick Sears, Bratin Saha, G2 Krishnamoorthy, Kinshuk Pahare, Nitin Bahadur, and Santosh Chandrachood. About the Authors Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike. Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening. Vishal Kajjam is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and using ML/AI for designing and building end-to-end solutions to address customers’ data integration needs. In his spare time, he enjoys spending time with family and friends. Bo Li is a Senior Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customers’ data analytic and processing needs with cloud-based, data-intensive technologies. XiaoRun Yu is a Software Development Engineer on the AWS Glue team. He is working on building new features for AWS Glue to help customers. Outside of work, Xiaorun enjoys exploring new places in the Bay Area. Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems & new interfaces for data integration and efficiently managing data lakes on AWS. Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple-to-use interfaces to efficiently manage and transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud. View the full article
-
Today, AWS announces general availability of Amazon Q data integration, a new generative AI–powered capability of Amazon Q Developer that enables you to build data integration pipelines using natural language. Amazon Q Developer is the AWS expert to assist you with all of your development tasks. Amazon Q data integration is a new chat experience specifically for AWS Glue, design for authoring and troubleshooting data integration pipelines. View the full article
-
AWS Glue is a serverless ETL solution that helps organizations move data into enterprise-class data warehouses. It provides close integration with other AWS services, which appeals to businesses already invested significantly in AWS. If you are looking for a replacement for AWS Glue, this guide will walk you through the top 5 AWS Glue alternatives. […]View the full article
-
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security. By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals. This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA. Solution overview For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security. Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A. Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables. The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. The workflow consists of the following components: The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data. In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones. Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager. VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources. Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion. The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed. The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA. Prerequisites Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA. Deploy resources in Account A using AWS CloudFormation In Account A, launch the provided AWS CloudFormation stack to create the following resources: The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/. A sample dataset called products.csv, which we use in this post. Upload the AWS Glue job to Amazon S3 in Account B In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location. Deploy resources in Account B using AWS CloudFormation In Account B, launch the provided CloudFormation stack template to create the following resources: The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure: dags – The folder for DAG files. plugins – The file for any custom or community Airflow plugins. requirements – The requirements.txt file for any Python packages. scripts – Any SQL scripts used in the DAG. data – Any datasets used in the DAG. A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample. An AWS Glue environment, which contains the following: An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A. A database called products_db in the AWS Glue Data Catalog. An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products. A VPC gateway endpointto Amazon S3. An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA). Create Amazon Redshift resources Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file. In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products. Configure Airflow permissions After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder. Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment. The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle. Set up the environment This section outlines the steps to configure the environment. The process involves the following high-level steps: Update any necessary providers. Set up cross-account access. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC. Configure Secrets Manager to integrate with Amazon MWAA. Define Airflow connections. Update the providers Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post). Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider. Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality. The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package. Specify the requirements as follows: --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt" apache-airflow-providers-amazon==8.4.0 Update the version in the constraints file to 8.4.0 or higher. Add the constraints-3.11-updated.txt file to the /dags folder. Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version. Navigate to the Amazon MWAA environment and choose Edit. Under DAG code in Amazon S3, for Requirements file, choose the latest version. Choose Save. This will update the environment and new providers will be in effect. To verify the providers version, go to Providers under the Admin table. The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details. Set up cross-account access You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps: In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": [ "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>", "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>" ] }, "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::sample-inp-bucket-etl-<username>/*", "arn:aws:s3:::sample-inp-bucket-etl-<username>" ] } ] } Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>" }, "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::sample-opt-bucket-etl-<username>/*", "arn:aws:s3:::sample-opt-bucket-etl-<username>" ] } ] } In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket: { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:Encrypt", "kms:GenerateDataKey" ], "Resource": [ "<KMS_KEY_ARN_Used_for_S3_encryption>" ] }, { "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetBucketAcl", "s3:GetBucketCors", "s3:GetEncryptionConfiguration", "s3:GetBucketLocation", "s3:ListAllMyBuckets", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListBucketVersions", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::sample-opt-bucket-etl-<username>", "arn:aws:s3:::sample-opt-bucket-etl-<username>/*" ] } ] } Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A. Add this policy to the AWS Glue role and Amazon MWAA role: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*" } ] } In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A: { "Version": "2012-10-17", "Statement": [ { "Sid": "CrossAccountPolicy", "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA" } ] } Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A. Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA. Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering. Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot. If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run. Configure the Amazon MWAA connection with Secrets Manager When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret. Complete the following steps: Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager). This allows Amazon MWAA to access credentials stored in Secrets Manager. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings. This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths: secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"} To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell. Run the following code to generate the connection URI string: import urllib.parse conn_type = 'redshift' host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint port = '5439' login = 'admin' #Specify the username to use for authentication with Amazon Redshift password = '<password>' #Specify the password to use for authentication with Amazon Redshift role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>') database = 'dev' region = 'us-east-1' #YOUR_REGION conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}®ion={7}'.format(conn_type, login, password, host, port, role_arn, database, region) print(conn_string) The connection string should be generated as follows: redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev®ion=<region> Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI). This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext. aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev®ion=us-east-1" --region=us-east-1 Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name. You can also add secrets using the Secrets Manager console as key-value pairs. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test. Create an Airflow connection through the metadata database You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI. For Connection Id, enter a name for the connection. For Connection Type, choose Amazon Redshift. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless. For Database, enter dev. For User, enter your admin user name. For Password, enter your password. For Port, use port 5439. For Extra, set the region and timeout parameters. Test the connection, then save your settings. Create and run a DAG In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets. Create a DAG In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules: The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket. For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder. We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler. After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps. The DAG uses GlueCrawlerSensor to wait for the crawler to complete. When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter. The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file. You can connect to Amazon Redshift from Airflow using three different operators: PythonOperator. SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection. RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection. In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator. Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections. In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed. In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter. As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data. TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete. The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence. The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI. from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator from airflow.utils.trigger_rule import TriggerRule dag_id = "data_pipeline" vYear = datetime.today().strftime("%Y") vMonth = datetime.today().strftime("%m") vDay = datetime.today().strftime("%d") src_bucket_name = "sample-inp-bucket-etl-<username>" tgt_bucket_name = "sample-opt-bucket-etl-<username>" s3_folder="products" #Please replace the variable with the glue_role_arn glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>" glue_crawler_name = "products" glue_db_name = "products_db" glue_job_name = "sample_glue_job" glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py" workgroup_name = "sample-workgroup" redshift_table = "products_f" redshift_conn_id_name="secrets_redshift_connection" db_name = "dev" secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx" poll_interval = 10 @task def get_role_name(arn: str) -> str: return arn.split("/")[-1] @task def get_s3_loc(s3_folder: str) -> str: s3_loc = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv" return s3_loc with DAG( dag_id=dag_id, schedule="@once", start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: role_arn = glue_role_arn_key glue_role_name = get_role_name(role_arn) s3_loc = get_s3_loc(s3_folder) # Check for new incremental files in S3 source/input bucket sensor_key = S3KeySensor( task_id="sensor_key", bucket_key=s3_loc, bucket_name=src_bucket_name, wildcard_match=True, #timeout=18*60*60, #poke_interval=120, timeout=60, poke_interval=30, mode="reschedule" ) # Run Glue crawler glue_crawler_config = { "Name": glue_crawler_name, "Role": role_arn, "DatabaseName": glue_db_name, } crawl_s3 = GlueCrawlerOperator( task_id="crawl_s3", config=glue_crawler_config, ) # GlueCrawlerOperator waits by default, setting as False to test the Sensor below. crawl_s3.wait_for_completion = False # Wait for Glue crawler to complete wait_for_crawl = GlueCrawlerSensor( task_id="wait_for_crawl", crawler_name=glue_crawler_name, ) # Run Glue Job submit_glue_job = GlueJobOperator( task_id="submit_glue_job", job_name=glue_job_name, script_location=glue_script_location, iam_role_name=glue_role_name, create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"}, ) # GlueJobOperator waits by default, setting as False to test the Sensor below. submit_glue_job.wait_for_completion = False # Wait for Glue Job to complete wait_for_job = GlueJobSensor( task_id="wait_for_job", job_name=glue_job_name, # Job ID extracted from previous Glue Job Operator task run_id=submit_glue_job.output, verbose=True, # prints glue job logs in airflow logs ) wait_for_job.poke_interval = 5 # Execute the Stored Procedure in Redshift Serverless using Data Operator execute_redshift_stored_proc = RedshiftDataOperator( task_id="execute_redshift_stored_proc", database=db_name, workgroup_name=workgroup_name, secret_arn=secret_arn, sql="""CALL sp_products();""", poll_interval=poll_interval, wait_for_completion=True, ) # Execute the Stored Procedure in Redshift Serverless using SQL Operator delete_from_table = SQLExecuteQueryOperator( task_id="delete_from_table", conn_id=redshift_conn_id_name, sql="DELETE FROM products;", trigger_rule=TriggerRule.ALL_DONE, ) # Unload the data from Redshift table to S3 transfer_redshift_to_s3 = RedshiftToS3Operator( task_id="transfer_redshift_to_s3", s3_bucket=tgt_bucket_name, s3_key=s3_loc, schema="PUBLIC", table=redshift_table, redshift_conn_id=redshift_conn_id_name, ) transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE #Chain the tasks to be executed chain( sensor_key, crawl_s3, wait_for_crawl, submit_glue_job, wait_for_job, execute_redshift_stored_proc, delete_from_table, transfer_redshift_to_s3 ) Verify the DAG run After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot. In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status. Verify the results On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files. On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/. Clean up Clean up the resources created as part of this post to avoid incurring ongoing charges: Delete the CloudFormation stacks and S3 bucket that you created as prerequisites. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager. Conclusion With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples. About the Authors Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture. Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well. View the full article
-
- etl
- etl pipelines
- (and 5 more)
-
In the era of data, organizations are increasingly using data lakes to store and analyze vast amounts of structured and unstructured data. Data lakes provide a centralized repository for data from various sources, enabling organizations to unlock valuable insights and drive data-driven decision-making. However, as data volumes continue to grow, optimizing data layout and organization becomes crucial for efficient querying and analysis. One of the key challenges in data lakes is the potential for slow query performance, especially when dealing with large datasets. This can be attributed to factors such as inefficient data layout, resulting in excessive data scanning and inefficient use of compute resources. To address this challenge, common practices like partitioning and bucketing can significantly improve query performance and reduce computation costs. Partitioning is a technique that divides a large dataset into smaller, more manageable parts based on specific criteria, such as date, region, or product category. By partitioning data, downstream analytical queries can skip irrelevant partitions, reducing the amount of data that needs to be scanned and processed. You can use partition columns in the WHERE clause in queries to scan only the specific partitions that your query needs. This can lead to faster query runtimes and more efficient resource utilization. It especially works well when columns with low cardinality are chosen as the key. What if you have a high cardinality column that you sometimes need to filter by VIP customers? Each customer is usually identified with an ID, which can be millions. Partitioning isn’t suitable for such high cardinality columns because you end up with small files, slow partition filtering, and high Amazon Simple Storage Service (Amazon S3) API cost (one S3 prefix is created per value of partition column). Although you can use partitioning with a natural key such as city or state to narrow down your dataset to some degree, it is still necessary to query across date-based partitions if your data is time series. This is where bucketing comes into play. Bucketing makes sure that all rows with the same values of one or more columns end up in the same file. Instead of one file per value, like partitioning, a hash function is used to distribute values evenly across a fixed number of files. By organizing data this way, you can perform efficient filtering, because only the relevant buckets need to be processed, further reducing computational overhead. There are multiple options for implementing bucketing on AWS. One approach is to use the Amazon Athena CREATE TABLE AS SELECT (CTAS) statement, which allows you to create a bucketed table directly from a query. Alternatively, you can use AWS Glue for Apache Spark, which provides built-in support for bucketing configurations during the data transformation process. AWS Glue allows you to define bucketing parameters, such as the number of buckets and the columns to bucket on, providing an optimized data layout for efficient querying with Athena. In this post, we discuss how to implement bucketing on AWS data lakes, including using Athena CTAS statement and AWS Glue for Apache Spark. We also cover bucketing for Apache Iceberg tables. Example use case In this post, you use a public dataset, the NOAA Integrated Surface Database. Data analysts run one-time queries for data during the past 5 years through Athena. Most of the queries are for specific stations with specific report types. The queries need to complete in 10 seconds, and the cost needs to be optimized carefully. In this scenario, you’re a data engineer responsible for optimizing query performance and cost. For example, if an analyst wants to retrieve data for a specific station (for example, station ID 123456) with a particular report type (for example, CRN01), the query might look like the following query: SELECT station, report_type, columnA, columnB, ... FROM table_name WHERE report_type = 'CRN01' AND station = '123456' In the case of the NOAA Integrated Surface Database, the station_id column is likely to have a high cardinality, with numerous unique station identifiers. On the other hand, the report_type column may have a relatively low cardinality, with a limited set of report types. Given this scenario, it would be a good idea to partition the data by report_type and bucket it by station_id. With this partitioning and bucketing strategy, Athena can first eliminate partitions for irrelevant report types, and then scan only the buckets within the relevant partition that match the specified station ID, significantly reducing the amount of data processed and accelerating query runtimes. This approach not only meets the query performance requirement, but also helps optimize costs by minimizing the amount of data scanned and billed for each query. In this post, we examine how query performance is affected by data layout, in particular, bucketing. We also compare three different ways to achieve bucketing. The following table represents conditions for the tables to be created. . noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg Format CSV Parquet Parquet Parquet Parquet Compression n/a Snappy Snappy Snappy Snappy Created via n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg Engine n/a Trino Trino Apache Spark Apache Iceberg Is partitioned? Yes but with different way Yes Yes Yes Yes Is bucketed? No No Yes Yes Yes noaa_remote_original is partitioned by the year column, but not by the report_type column. This row represents if the table is partitioned by the actual columns that are used in the queries. Baseline table For this post, you create several tables with different conditions: some without bucketing and some with bucketing, to showcase the performance characteristics of bucketing. First, let’s create an original table using the NOAA data. In subsequent steps, you ingest data from this table to create test tables. There are multiple ways to define a table definition: running DDL, an AWS Glue crawler, the AWS Glue Data Catalog API, and so on. In this step, you run DDL via the Athena console. Complete the following steps to create the "bucketing_blog"."noaa_remote_original" table in the Data Catalog: Open the Athena console. In the query editor, run the following DDL to create a new AWS Glue database: -- Create Glue database CREATE DATABASE bucketing_blog; For Database under Data, choose bucketing_blog to set the current database. Run the following DDL to create the original table: -- Create original table CREATE EXTERNAL TABLE `bucketing_blog`.`noaa_remote_original`( `station` STRING, `date` STRING, `source` STRING, `latitude` STRING, `longitude` STRING, `elevation` STRING, `name` STRING, `report_type` STRING, `call_sign` STRING, `quality_control` STRING, `wnd` STRING, `cig` STRING, `vis` STRING, `tmp` STRING, `dew` STRING, `slp` STRING, `aj1` STRING, `gf1` STRING, `mw1` STRING) PARTITIONED BY ( year STRING) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( 'escapeChar'='\\', 'quoteChar'='\"', 'separatorChar'=',') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://noaa-global-hourly-pds/' TBLPROPERTIES ( 'skip.header.line.count'='1' ) Because the source data has quoted fields, we use OpenCSVSerde instead of the default LazySimpleSerde. These CSV files have a header row, which we tell Athena to skip by adding skip.header.line.count and setting the value to 1. For more details, refer to OpenCSVSerDe for processing CSV. Run the following DDL to add partitions. We add partitions only for 5 years out of 124 years based on the use case requirement: -- Load partitions ALTER TABLE `bucketing_blog`.`noaa_remote_original` ADD PARTITION (year = '2024') LOCATION 's3://noaa-global-hourly-pds/2024/' PARTITION (year = '2023') LOCATION 's3://noaa-global-hourly-pds/2023/' PARTITION (year = '2022') LOCATION 's3://noaa-global-hourly-pds/2022/' PARTITION (year = '2021') LOCATION 's3://noaa-global-hourly-pds/2021/' PARTITION (year = '2020') LOCATION 's3://noaa-global-hourly-pds/2020/'; Run the following DML to verify if you can successfully query the data: -- Check data SELECT * FROM "bucketing_blog"."noaa_remote_original" LIMIT 10; Now you’re ready to start querying the original table to examine the baseline performance. Run a query against the original table to evaluate the query performance as a baseline. The following query selects records for five specific stations with report type CRN05: -- Baseline SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp FROM "bucketing_blog"."noaa_remote_original" WHERE report_type = 'CRN05' AND ( station = '99999904237' OR station = '99999953132' OR station = '99999903061' OR station = '99999963856' OR station = '99999994644' ); We ran this query 10 times. The average query runtime for 10 queries is 27.6 seconds, which is far longer than our target of 10 seconds, and 155.75 GB data is scanned to return 1.65 million records. This is the baseline performance of the original raw table. It’s time to start optimizing data layout from this baseline. Next, you create tables with different conditions from the original: one without bucketing and one with bucketing, and compare them. Optimize data layout using Athena CTAS In this section, we use an Athena CTAS query to optimize data layout and its format. First, let’s create a table with partitioning but without bucketing. The new table is partitioned by the column report_type because most of expected queries use this column in the WHERE clause, and objects are stored as Parquet with Snappy compression. Open the Athena query editor. Run the following query, providing your own S3 bucket and prefix: --CTAS, non-bucketed CREATE TABLE "bucketing_blog"."athena_non_bucketed" WITH ( external_location = 's3://<your-s3-location>/athena-non-bucketed/', partitioned_by = ARRAY['report_type'], format = 'PARQUET', write_compression = 'SNAPPY' ) AS SELECT station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type FROM "bucketing_blog"."noaa_remote_original" ; Your data should look like the following screenshots. There are 30 files under the partition. Next, you create a table with Hive style bucketing. The number of buckets needs to be carefully tuned through experiments for your own use case. Generally speaking, the more buckets you have, the smaller the granularity, which might result in better performance. On the other hand, too many small files may introduce inefficiency in query planning and processing. Also, bucketing only works if you are querying a few values of the bucketing key. The more values you add to your query, the more likely that you will end up reading all buckets. The following is the baseline query to optimize: -- Baseline SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp FROM "bucketing_blog"."noaa_remote_original" WHERE report_type = 'CRN05' AND ( station = '99999904237' OR station = '99999953132' OR station = '99999903061' OR station = '99999963856' OR station = '99999994644' ); In this example, the table is going to be bucketed into 16 buckets by a high-cardinality column (station), which is supposed to be used for the WHERE clause in the query. All other conditions remain the same. The baseline query has five values in the station ID, and you expect queries to have around that number at most, which is less enough than the number of buckets, so 16 should work well. It is possible to specify a larger number of buckets, but CTAS can’t be used if the total number of partitions exceeds 100. Run the following query: -- CTAS, Hive-bucketed CREATE TABLE "bucketing_blog"."athena_bucketed" WITH ( external_location = 's3://<your-s3-location>/athena-bucketed/', partitioned_by = ARRAY['report_type'], bucketed_by = ARRAY['station'], bucket_count = 16, format = 'PARQUET', write_compression = 'SNAPPY' ) AS SELECT station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type FROM "bucketing_blog"."noaa_remote_original" ; The query creates S3 objects organized as shown in the following screenshots. The table-level layout looks exactly the same between athena_non_bucketed and athena_bucketed: there are 13 partitions in each table. The difference is the number of objects under the partitions. There are 16 objects (buckets) per partition, of roughly 10–25 MB each in this case. The number of buckets is constant at the specified value regardless of the amount of data, but the bucket size depends on the amount of data. Now you’re ready to query against each table to evaluate query performance. The query will select records with five specific stations and report type CRN05 for the past 5 years. Although you can’t see which data of a specific station is located in which bucket, it has been calculated and located correctly by Athena. Query the non-bucketed table with the following statement: -- No bucketing SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp FROM "bucketing_blog"."athena_non_bucketed" WHERE report_type = 'CRN05' AND ( station = '99999904237' OR station = '99999953132' OR station = '99999903061' OR station = '99999963856' OR station = '99999994644' ); We ran this query 10 times. The average runtime of the 10 queries is 10.95 seconds, and 358 MB of data is scanned to return 2.21 million records. Both the runtime and scan size have been significantly decreased because you’ve partitioned the data, and can now read only one partition where 12 partitions of 13 are skipped. In addition, the amount of data scanned has gone down from 206 GB to 360 MB, which is a reduction of 99.8%. This is not just due to the partitioning, but also due to the change of its format to Parquet and compression with Snappy. Query the bucketed table with the following statement: -- Hive bucketing SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp FROM "bucketing_blog"."athena_bucketed" WHERE report_type = 'CRN05' AND ( station = '99999904237' OR station = '99999953132' OR station = '99999903061' OR station = '99999963856' OR station = '99999994644' ); We ran this query 10 times. The average runtime of the 10 queries is 7.82 seconds, and 69 MB of data is scanned to return 2.21 million records. This means a reduction of average runtime from 10.95 to 7.82 seconds (-29%), and a dramatic reduction of data scanned from 358 MB to 69 MB (-81%) to return the same number of records compared with the non-bucketed table. In this case, both runtime and data scanned were improved by bucketing. This means bucketing contributed not only to performance but also to cost reduction. Considerations As stated earlier, size your bucket carefully to maximize performance of your query. Bucketing only works if you are querying a few values of the bucketing key. Consider creating more buckets than the number of values expected in the actual query. Additionally, an Athena CTAS query is limited to create up to 100 partitions at one time. If you need a large number of partitions, you may want to use AWS Glue extract, transform, and load (ETL), although there is a workaround to split into multiple SQL statements. Optimize data layout using AWS Glue ETL Apache Spark is an open source distributed processing framework that enables flexible ETL with PySpark, Scala, and Spark SQL. It allows you to partition and bucket your data based on your requirements. Spark has several tuning options to accelerate jobs. You can effortlessly automate and monitor Spark jobs. In this section, we use AWS Glue ETL jobs to run Spark code to optimize data layout. Unlike Athena bucketing, AWS Glue ETL uses Spark-based bucketing as a bucketing algorithm. All you need to do is add the following table property onto the table: bucketing_format = 'spark'. For details about this table property, see Partitioning and bucketing in Athena. Complete the following steps to create a table with bucketing through AWS Glue ETL: On the AWS Glue console, choose ETL jobs in the navigation pane. Choose Create job and choose Visual ETL. Under Add nodes, choose AWS Glue Data Catalog for Sources. For Database, choose bucketing_blog. For Table, choose noaa_remote_original. Under Add nodes, choose Change Schema for Transforms. Under Add nodes, choose Custom Transform for Transforms. For Name, enter ToS3WithBucketing. For Node parents, choose Change Schema. For Code block, enter the following code snippet: def ToS3WithBucketing (glueContext, dfc) -> DynamicFrameCollection: # Convert DynamicFrame to DataFrame df = dfc.select(list(dfc.keys())[0]).toDF() # Write to S3 with bucketing and partitioning df.repartition(1, "report_type") \ .write.option("path", "s3://<your-s3-location>/glue-bucketed/") \ .mode("overwrite") \ .partitionBy("report_type") \ .bucketBy(16, "station") \ .format("parquet") \ .option("compression", "snappy") \ .saveAsTable("bucketing_blog.glue_bucketed") The following screenshot shows the job created using AWS Glue Studio to generate a table and data. Each node represents the following: The AWS Glue Data Catalog node loads the noaa_remote_original table from the Data Catalog The Change Schema node makes sure that it loads columns registered in the Data Catalog The ToS3WithBucketing node writes data to Amazon S3 with both partitioning and Spark-based bucketing The job has been successfully authored in the visual editor. Under Job details, for IAM Role, choose your AWS Identity and Access Management (IAM) role for this job. For Worker type, choose G.8X. For Requested number of workers, enter 5. Choose Save, then choose Run. After these steps, the table glue_bucketed. has been created. Choose Tables in the navigation pane, and choose the table glue_bucketed. On the Actions menu, choose Edit table under Manage. In the Table properties section, choose Add. Add a key pair with key bucketing_format and value spark. Choose Save. Now it’s time to query the tables. Query the bucketed table with the following statement: -- Spark bucketing SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp FROM "bucketing_blog"."glue_bucketed" WHERE report_type = 'CRN05' AND ( station = '99999904237' OR station = '99999953132' OR station = '99999903061' OR station = '99999963856' OR station = '99999994644' ); We ran the query 10 times. The average runtime of the 10 queries is 7.09 seconds, and 88 MB of data is scanned to return 2.21 million records. In this case, both the runtime and data scanned were improved by bucketing. This means bucketing contributed not only to performance but also to cost reduction. The reason for the larger bytes scanned compared to the Athena CTAS example is that the values were distributed differently in this table. In the AWS Glue bucketed table, the values were distributed over five files. In the Athena CTAS bucketed table, the values were distributed over four files. Remember that rows are distributed into buckets using a hash function. The Spark bucketing algorithm uses a different hash function than Hive, and in this case, it resulted in a different distribution across the files. Considerations Glue DynamicFrame does not support bucketing natively. You need to use Spark DataFrame instead of DynamicFrame to bucket tables. For information about fine-tuning AWS Glue ETL performance, refer to Best practices for performance tuning AWS Glue for Apache Spark jobs. Optimize Iceberg data layout with hidden partitioning Apache Iceberg is a high-performance open table format for huge analytic tables, bringing the reliability and simplicity of SQL tables to big data. Recently, there has been a huge demand to use Apache Iceberg tables to achieve advanced capabilities like ACID transaction, time travel query, and more. In Iceberg, bucketing works differently than the Hive table method we’ve seen so far. In Iceberg, bucketing is a subset of partitioning, and can be applied using the bucket partition transform. The way you use it and the end result is similar to bucketing in Hive tables. For more details about Iceberg bucket transforms, refer to Bucket Transform Details. Complete the following steps: Open the Athena query editor. Run the following query to create an Iceberg table with hidden partitioning along with bucketing: -- CTAS, Iceberg-bucketed CREATE TABLE "bucketing_blog"."athena_bucketed_iceberg" WITH (table_type = 'ICEBERG', location = 's3://<your-s3-location>/athena-bucketed-iceberg/', is_external = false, partitioning = ARRAY['report_type', 'bucket(station, 16)'], format = 'PARQUET', write_compression = 'SNAPPY' ) AS SELECT station, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type FROM "bucketing_blog"."noaa_remote_original" ; Your data should look like the following screenshot. There are two folders: data and metadata. Drill down to data. You see random prefixes under the data folder. Choose the first one to view its details. You see the top-level partition based on the report_type column. Drill down to the next level. You see the second-level partition, bucketed with the station column. The Parquet data files exist under these folders. Query the bucketed table with the following statement: -- Iceberg bucketing SELECT station, report_type, date, source, latitude, longitude, elevation, name, call_sign, quality_control, wnd, cig, tmp FROM "bucketing_blog"."athena_bucketed_iceberg" WHERE report_type = 'CRN05' AND ( station = '99999904237' OR station = '99999953132' OR station = '99999903061' OR station = '99999963856' OR station = '99999994644' ); With the Iceberg-bucketed table, the average runtime of the 10 queries is 8.03 seconds, and 148 MB of data is scanned to return 2.21 million records. This is less efficient than bucketing with AWS Glue or Athena, but considering the benefits of Iceberg’s various features, it is within an acceptable range. Results The following table summarizes all the results. . noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg Format CSV Parquet Parquet Parquet Iceberg (Parquet) Compression n/a Snappy Snappy Snappy Snappy Created via n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg Engine n/a Trino Trino Apache Spark Apache Iceberg Table size (GB) 155.8 5.0 5.0 5.8 5.0 The number of S3 Objects 53360 376 192 192 195 Is partitioned? Yes but with different way Yes Yes Yes Yes Is bucketed? No No Yes Yes Yes Bucketing format n/a n/a Hive Spark Iceberg Number of buckets n/a n/a 16 16 16 Average runtime (sec) 29.178 10.950 7.815 7.089 8.030 Scanned size (MB) 206640.0 358.6 69.1 87.8 147.7 With athena_bucketed, glue_bucketed, and athena_bucketed_iceberg, you were able to meet the latency goal of 10 seconds. With bucketing, you saw a 25–40% reduction in runtime and a 60–85% reduction in scan size, which can contribute to both latency and cost optimization. As you can see from the result, although partitioning contributes significantly to reduce both runtime and scan size, bucketing can also contribute to reduce them further. Athena CTAS is straightforward and fast enough to complete the bucketing process. AWS Glue ETL is more flexible and scalable to achieve advanced use cases. You can choose either method based on your requirement and use case, because you can take advantage of bucketing through either option. Conclusion In this post, we demonstrated how to optimize your table data layout with partitioning and bucketing through Athena CTAS and AWS Glue ETL. We showed that bucketing contributes to accelerating query latency and reducing scan size to further optimize costs. We also discussed bucketing for Iceberg tables through hidden partitioning. Bucketing just one technique to optimize data layout by reducing data scan. For optimizing your entire data layout, we recommend considering other options like partitioning, using columnar file format, and compression in conjunction with bucketing. This can enable your data to further enhance query performance. Happy bucketing! About the Authors Takeshi Nakatani is a Principal Big Data Consultant on the Professional Services team in Tokyo. He has 26 years of experience in the IT industry, with expertise in architecting data infrastructure. On his days off, he can be a rock drummer or a motorcyclist. Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike. View the full article
-
AWS Glue Studio Notebooks provides interactive job authoring in AWS Glue, which helps simplify the process of developing data integration jobs. Studio Notebooks is generally available in the following 6 AWS regions starting today: Middle East (UAE), Asia Pacific (Hyderabad), Asia Pacific (Melbourne), Israel (Tel Aviv), Europe (Spain) and Europe (Zurich). View the full article
-
Last week, we announced the general availability of the integration between Amazon DataZone and AWS Lake Formation hybrid access mode. In this post, we share how this new feature helps you simplify the way you use Amazon DataZone to enable secure and governed sharing of your data in the AWS Glue Data Catalog. We also delve into how data producers can share their AWS Glue tables through Amazon DataZone without needing to register them in Lake Formation first. Overview of the Amazon DataZone integration with Lake Formation hybrid access mode Amazon DataZone is a fully managed data management service to catalog, discover, analyze, share, and govern data between data producers and consumers in your organization. With Amazon DataZone, data producers populate the business data catalog with data assets from data sources such as the AWS Glue Data Catalog and Amazon Redshift. They also enrich their assets with business context to make it straightforward for data consumers to understand. After the data is available in the catalog, data consumers such as analysts and data scientists can search and access this data by requesting subscriptions. When the request is approved, Amazon DataZone can automatically provision access to the data by managing permissions in Lake Formation or Amazon Redshift so that the data consumer can start querying the data using tools such as Amazon Athena or Amazon Redshift. To manage the access to data in the AWS Glue Data Catalog, Amazon DataZone uses Lake Formation. Previously, if you wanted to use Amazon DataZone for managing access to your data in the AWS Glue Data Catalog, you had to onboard your data to Lake Formation first. Now, the integration of Amazon DataZone and Lake Formation hybrid access mode simplifies how you can get started with your Amazon DataZone journey by removing the need to onboard your data to Lake Formation first. Lake Formation hybrid access mode allows you to start managing permissions on your AWS Glue databases and tables through Lake Formation, while continuing to maintain any existing AWS Identity and Access Management (IAM) permissions on these tables and databases. Lake Formation hybrid access mode supports two permission pathways to the same Data Catalog databases and tables: In the first pathway, Lake Formation allows you to select specific principals (opt-in principals) and grant them Lake Formation permissions to access databases and tables by opting in The second pathway allows all other principals (that are not added as opt-in principals) to access these resources through the IAM principal policies for Amazon Simple Storage Service (Amazon S3) and AWS Glue actions With the integration between Amazon DataZone and Lake Formation hybrid access mode, if you have tables in the AWS Glue Data Catalog that are managed through IAM-based policies, you can publish these tables directly to Amazon DataZone, without registering them in Lake Formation. Amazon DataZone registers the location of these tables in Lake Formation using hybrid access mode, which allows managing permissions on AWS Glue tables through Lake Formation, while continuing to maintain any existing IAM permissions. Amazon DataZone enables you to publish any type of asset in the business data catalog. For some of these assets, Amazon DataZone can automatically manage access grants. These assets are called managed assets, and include Lake Formation-managed Data Catalog tables and Amazon Redshift tables and views. Prior to this integration, you had to complete the following steps before Amazon DataZone could treat the published Data Catalog table as a managed asset: Identity the Amazon S3 location associated with Data Catalog table. Register the Amazon S3 location with Lake Formation in hybrid access mode using a role with appropriate permissions. Publish the table metadata to the Amazon DataZone business data catalog. The following diagram illustrates this workflow. With the Amazon DataZone’s integration with Lake Formation hybrid access mode, you can simply publish your AWS Glue tables to Amazon DataZone without having to worry about registering the Amazon S3 location or adding an opt-in principal in Lake Formation by delegating these steps to Amazon DataZone. The administrator of an AWS account can enable the data location registration setting under the DefaultDataLake blueprint on the Amazon DataZone console. Now, a data owner or publisher can publish their AWS Glue table (managed through IAM permissions) to Amazon DataZone without the extra setup steps. When a data consumer subscribes to this table, Amazon DataZone registers the Amazon S3 locations of the table in hybrid access mode, adds the data consumer’s IAM role as an opt-in principal, and grants access to the same IAM role by managing permissions on the table through Lake Formation. This makes sure that IAM permissions on the table can coexist with newly granted Lake Formation permissions, without disrupting any existing workflows. The following diagram illustrates this workflow. Solution overview To demonstrate this new capability, we use a sample customer scenario where the finance team wants to access data owned by the sales team for financial analysis and reporting. The sales team has a pipeline that creates a dataset containing valuable information about ticket sales, popular events, venues, and seasons. We call it the tickit dataset. The sales team stores this dataset in Amazon S3 and registers it in a database in the Data Catalog. The access to this table is currently managed through IAM-based permissions. However, the sales team wants to publish this table to Amazon DataZone to facilitate secure and governed data sharing with the finance team. The steps to configure this solution are as follows: The Amazon DataZone administrator enables the data lake location registration setting in Amazon DataZone to automatically register the Amazon S3 location of the AWS Glue tables in Lake Formation hybrid access mode. After the hybrid access mode integration is enabled in Amazon DataZone, the finance team requests a subscription to the sales data asset. The asset shows up as a managed asset, which means Amazon DataZone can manage access to this asset even if the Amazon S3 location of this asset isn’t registered in Lake Formation. The sales team is notified of a subscription request raised by the finance team. They review and approve the access request. After the request is approved, Amazon DataZone fulfills the subscription request by managing permissions in the Lake Formation. It registers the Amazon S3 location of the subscribed table in Lake Formation hybrid mode. The finance team gains access to the sales dataset required for their financial reports. They can go to their DataZone environment and start running queries using Athena against their subscribed dataset. Prerequisites To follow the steps in this post, you need an AWS account. If you don’t have an account, you can create one. In addition, you must have the following resources configured in your account: An S3 bucket An AWS Glue database and crawler IAM roles for different personas and services An Amazon DataZone domain and project An Amazon DataZone environment profile and environment An Amazon DataZone data source If you don’t have these resources already configured, you can create them by deploying the following AWS CloudFormation stack: Choose Launch Stack to deploy a CloudFormation template. Complete the steps to deploy the template and leave all settings as default. Select I acknowledge that AWS CloudFormation might create IAM resources, then choose Submit. After the CloudFormation deployment is complete, you can log in to the Amazon DataZone portal and manually trigger a data source run. This pulls any new or modified metadata from the source and updates the associated assets in the inventory. This data source has been configured to automatically publish the data assets to the catalog. On the Amazon DataZone console, choose View domains. You should be logged in using the same role that is used to deploy CloudFormation and verify that you are in the same AWS Region. Find the domain blog_dz_domain, then choose Open data portal. Choose Browse all projects and choose Sales producer project. On the Data tab, choose Data sources in the navigation pane. Locate and choose the data source that you want to run. This opens the data source details page. Choose the options menu (three vertical dots) next to tickit_datasource and choose Run. The data source status changes to Running as Amazon DataZone updates the asset metadata. Enable hybrid mode integration in Amazon DataZone In this step, the Amazon DataZone administrator goes through the process of enabling the Amazon DataZone integration with Lake Formation hybrid access mode. Complete the following steps: On a separate browser tab, open the Amazon DataZone console. Verify that you are in the same Region where you deployed the CloudFormation template. Choose View domains. Choose the domain created by AWS CloudFormation, blog_dz_domain. Scroll down on the domain details page and choose the Blueprints tab. A blueprint defines what AWS tools and services can be used with the data assets published in Amazon DataZone. The DefaultDataLake blueprint is enabled as part of the CloudFormation stack deployment. This blueprint enables you to create and query AWS Glue tables using Athena. For the steps to enable this in your own deployments, refer to Enable built-in blueprints in the AWS account that owns the Amazon DataZone domain. Choose the DefaultDataLake blueprint. On the Provisioning tab, choose Edit. Select Enable Amazon DataZone to register S3 locations using AWS Lake Formation hybrid access mode. You have the option of excluding specific Amazon S3 locations if you don’t want Amazon DataZone to automatically register them to Lake Formation hybrid access mode. Choose Save changes. Request access In this step, you log in to Amazon DataZone as the finance team, search for the sales data asset, and subscribe to it. Complete the following steps: Return to your Amazon DataZone data portal browser tab. Switch to the finance consumer project by choosing the dropdown menu next to the project name and choosing Finance consumer project. From this step onwards, you take on the persona of a finance user looking to subscribe to a data asset published in the previous step. In the search bar, search for and choose the sales data asset. Choose Subscribe. The asset shows up as managed asset. This means that Amazon DataZone can grant access to this data asset to the finance team’s project by managing the permissions in Lake Formation. Enter a reason for the access request and choose Subscribe. Approve access request The sales team gets a notification that an access request from the finance team is submitted. To approve the request, complete the following steps: Choose the dropdown menu next to the project name and choose Sales producer project. You now assume the persona of the sales team, who are the owners and stewards of the sales data assets. Choose the notification icon at the top-right corner of the DataZone portal. Choose the Subscription Request Created task. Grant access to the sales data asset to the finance team and choose Approve. Analyze the data The finance team has now been granted access to the sales data, and this dataset has been to their Amazon DataZone environment. They can access the environment and query the sales dataset with Athena, along with any other datasets they currently own. Complete the following steps: On the dropdown menu, choose Finance consumer project. On the right pane of the project overview screen, you can find a list of active environments available for use. Choose the Amazon DataZone environment finance_dz_environment. In the navigation pane, under Data assets, choose Subscribed. Verify that your environment now has access to the sales data. It may take a few minutes for the data asset to be automatically added to your environment. Choose the new tab icon for Query data. A new tab opens with the Athena query editor. For Database, choose finance_consumer_db_tickitdb-<suffix>. This database will contain your subscribed data assets. Generate a preview of the sales table by choosing the options menu (three vertical dots) and choosing Preview table. Clean up To clean up your resources, complete the following steps: Switch back to the administrator role you used to deploy the CloudFormation stack. On the Amazon DataZone console, delete the projects used in this post. This will delete most project-related objects like data assets and environments. On the AWS CloudFormation console, delete the stack you deployed in the beginning of this post. On the Amazon S3 console, delete the S3 buckets containing the tickit dataset. On the Lake Formation console, delete the Lake Formation admins registered by Amazon DataZone. On the Lake Formation console, delete tables and databases created by Amazon DataZone. Conclusion In this post, we discussed how the integration between Amazon DataZone and Lake Formation hybrid access mode simplifies the process to start using Amazon DataZone for end-to-end governance of your data in the AWS Glue Data Catalog. This integration helps you bypass the manual steps of onboarding to Lake Formation before you can start using Amazon DataZone. For more information on how to get started with Amazon DataZone, refer to the Getting started guide. Check out the YouTube playlist for some of the latest demos of Amazon DataZone and short descriptions of the capabilities available. For more information about Amazon DataZone, see How Amazon DataZone helps customers find value in oceans of data. About the Authors Utkarsh Mittal is a Senior Technical Product Manager for Amazon DataZone at AWS. He is passionate about building innovative products that simplify customers’ end-to-end analytics journeys. Outside of the tech world, Utkarsh loves to play music, with drums being his latest endeavor. Praveen Kumar is a Principal Analytics Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-centered services. His areas of interests are serverless technology, modern cloud data warehouses, streaming, and generative AI applications. Paul Villena is a Senior Analytics Solutions Architect in AWS with expertise in building modern data and analytics solutions to drive business value. He works with customers to help them harness the power of the cloud. His areas of interests are infrastructure as code, serverless technologies, and coding in Python View the full article
-
- aws lake formation
- aws glue
-
(and 1 more)
Tagged with:
-
Amazon DataZone is used by customers to catalog, discover, analyze, share, and govern data at scale across organizational boundaries with governance and access controls. Today, Amazon DataZone launches integration with AWS Glue Data Quality and offers APIs to integrate data quality metrics from third party data quality solutions. This integration helps Amazon DataZone customers gain trust in their data and make confident business decisions. View the full article
-
- aws glue
- aws glue data quality
-
(and 1 more)
Tagged with:
-
Today, we are pleased to announce that Amazon DataZone is now able to present data quality information for data assets. This information empowers end-users to make informed decisions as to whether or not to use specific assets. Many organizations already use AWS Glue Data Quality to define and enforce data quality rules on their data, validate data against predefined rules, track data quality metrics, and monitor data quality over time using artificial intelligence (AI). Other organizations monitor the quality of their data through third-party solutions. Amazon DataZone now integrates directly with AWS Glue to display data quality scores for AWS Glue Data Catalog assets. Additionally, Amazon DataZone now offers APIs for importing data quality scores from external systems. In this post, we discuss the latest features of Amazon DataZone for data quality, the integration between Amazon DataZone and AWS Glue Data Quality and how you can import data quality scores produced by external systems into Amazon DataZone via API. Challenges One of the most common questions we get from customers is related to displaying data quality scores in the Amazon DataZone business data catalog to let business users have visibility into the health and reliability of the datasets. As data becomes increasingly crucial for driving business decisions, Amazon DataZone users are keenly interested in providing the highest standards of data quality. They recognize the importance of accurate, complete, and timely data in enabling informed decision-making and fostering trust in their analytics and reporting processes. Amazon DataZone data assets can be updated at varying frequencies. As data is refreshed and updated, changes can happen through upstream processes that put it at risk of not maintaining the intended quality. Data quality scores help you understand if data has maintained the expected level of quality for data consumers to use (through analysis or downstream processes). From a producer’s perspective, data stewards can now set up Amazon DataZone to automatically import the data quality scores from AWS Glue Data Quality (scheduled or on demand) and include this information in the Amazon DataZone catalog to share with business users. Additionally, you can now use new Amazon DataZone APIs to import data quality scores produced by external systems into the data assets. With the latest enhancement, Amazon DataZone users can now accomplish the following: Access insights about data quality standards directly from the Amazon DataZone web portal View data quality scores on various KPIs, including data completeness, uniqueness, accuracy Make sure users have a holistic view of the quality and trustworthiness of their data. In the first part of this post, we walk through the integration between AWS Glue Data Quality and Amazon DataZone. We discuss how to visualize data quality scores in Amazon DataZone, enable AWS Glue Data Quality when creating a new Amazon DataZone data source, and enable data quality for an existing data asset. In the second part of this post, we discuss how you can import data quality scores produced by external systems into Amazon DataZone via API. In this example, we use Amazon EMR Serverless in combination with the open source library Pydeequ to act as an external system for data quality. Visualize AWS Glue Data Quality scores in Amazon DataZone You can now visualize AWS Glue Data Quality scores in data assets that have been published in the Amazon DataZone business catalog and that are searchable through the Amazon DataZone web portal. If the asset has AWS Glue Data Quality enabled, you can now quickly visualize the data quality score directly in the catalog search pane. By selecting the corresponding asset, you can understand its content through the readme, glossary terms, and technical and business metadata. Additionally, the overall quality score indicator is displayed in the Asset Details section. A data quality score serves as an overall indicator of a dataset’s quality, calculated based on the rules you define. On the Data quality tab, you can access the details of data quality overview indicators and the results of the data quality runs. The indicators shown on the Overview tab are calculated based on the results of the rulesets from the data quality runs. Each rule is assigned an attribute that contributes to the calculation of the indicator. For example, rules that have the Completeness attribute will contribute to the calculation of the corresponding indicator on the Overview tab. To filter data quality results, choose the Applicable column dropdown menu and choose your desired filter parameter. You can also visualize column-level data quality starting on the Schema tab. When data quality is enabled for the asset, the data quality results become available, providing insightful quality scores that reflect the integrity and reliability of each column within the dataset. When you choose one of the data quality result links, you’re redirected to the data quality detail page, filtered by the selected column. Data quality historical results in Amazon DataZone Data quality can change over time for many reasons: Data formats may change because of changes in the source systems As data accumulates over time, it may become outdated or inconsistent Data quality can be affected by human errors in data entry, data processing, or data manipulation In Amazon DataZone, you can now track data quality over time to confirm reliability and accuracy. By analyzing the historical report snapshot, you can identify areas for improvement, implement changes, and measure the effectiveness of those changes. Enable AWS Glue Data Quality when creating a new Amazon DataZone data source In this section, we walk through the steps to enable AWS Glue Data Quality when creating a new Amazon DataZone data source. Prerequisites To follow along, you should have a domain for Amazon DataZone, an Amazon DataZone project, and a new Amazon DataZone environment (with a DataLakeProfile). For instructions, refer to Amazon DataZone quickstart with AWS Glue data. You also need to define and run a ruleset against your data, which is a set of data quality rules in AWS Glue Data Quality. To set up the data quality rules and for more information on the topic, refer to the following posts: Part 1: Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog Part 2: Getting started with AWS Glue Data Quality for ETL Pipelines Part 3: Set up data quality rules across multiple datasets using AWS Glue Data Quality Part 4: Set up alerts and orchestrate data quality rules with AWS Glue Data Quality Part 5: Visualize data quality score and metrics generated by AWS Glue Data Quality Part 6: Measure performance of AWS Glue Data Quality for ETL pipelines After you create the data quality rules, make sure that Amazon DataZone has the permissions to access the AWS Glue database managed through AWS Lake Formation. For instructions, see Configure Lake Formation permissions for Amazon DataZone. In our example, we have configured a ruleset against a table containing patient data within a healthcare synthetic dataset generated using Synthea. Synthea is a synthetic patient generator that creates realistic patient data and associated medical records that can be used for testing healthcare software applications. The ruleset contains 27 individual rules (one of them failing), so the overall data quality score is 96%. If you use Amazon DataZone managed policies, there is no action needed because these will get automatically updated with the needed actions. Otherwise, you need to allow Amazon DataZone to have the required permissions to list and get AWS Glue Data Quality results, as shown in the Amazon DataZone user guide. Create a data source with data quality enabled In this section, we create a data source and enable data quality. You can also update an existing data source to enable data quality. We use this data source to import metadata information related to our datasets. Amazon DataZone will also import data quality information related to the (one or more) assets contained in the data source. On the Amazon DataZone console, choose Data sources in the navigation pane. Choose Create data source. For Name, enter a name for your data source. For Data source type, select AWS Glue. For Environment, choose your environment. For Database name, enter a name for the database. For Table selection criteria, choose your criteria. Choose Next. For Data quality, select Enable data quality for this data source. If data quality is enabled, Amazon DataZone will automatically fetch data quality scores from AWS Glue at each data source run. Choose Next. Now you can run the data source. While running the data source, Amazon DataZone imports the last 100 AWS Glue Data Quality run results. This information is now visible on the asset page and will be visible to all Amazon DataZone users after publishing the asset. Enable data quality for an existing data asset In this section, we enable data quality for an existing asset. This might be useful for users that already have data sources in place and want to enable the feature afterwards. Prerequisites To follow along, you should have already run the data source and produced an AWS Glue table data asset. Additionally, you should have defined a ruleset in AWS Glue Data Quality over the target table in the Data Catalog. For this example, we ran the data quality job multiple times against the table, producing the related AWS Glue Data Quality scores, as shown in the following screenshot. Import data quality scores into the data asset Complete the following steps to import the existing AWS Glue Data Quality scores into the data asset in Amazon DataZone: Within the Amazon DataZone project, navigate to the Inventory data pane and choose the data source. If you choose the Data quality tab, you can see that there’s still no information on data quality because AWS Glue Data Quality integration is not enabled for this data asset yet. On the Data quality tab, choose Enable data quality. In the Data quality section, select Enable data quality for this data source. Choose Save. Now, back on the Inventory data pane, you can see a new tab: Data quality. On the Data quality tab, you can see data quality scores imported from AWS Glue Data Quality. Ingest data quality scores from an external source using Amazon DataZone APIs Many organizations already use systems that calculate data quality by performing tests and assertions on their datasets. Amazon DataZone now supports importing third-party originated data quality scores via API, allowing users that navigate the web portal to view this information. In this section, we simulate a third-party system pushing data quality scores into Amazon DataZone via APIs through Boto3 (Python SDK for AWS). For this example, we use the same synthetic dataset as earlier, generated with Synthea. The following diagram illustrates the solution architecture. The workflow consists of the following steps: Read a dataset of patients in Amazon Simple Storage Service (Amazon S3) directly from Amazon EMR using Spark. The dataset is created as a generic S3 asset collection in Amazon DataZone. In Amazon EMR, perform data validation rules against the dataset. The metrics are saved in Amazon S3 to have a persistent output. Use Amazon DataZone APIs through Boto3 to push custom data quality metadata. End-users can see the data quality scores by navigating to the data portal. Prerequisites We use Amazon EMR Serverless and Pydeequ to run a fully managed Spark environment. To learn more about Pydeequ as a data testing framework, see Testing Data quality at scale with Pydeequ. To allow Amazon EMR to send data to the Amazon DataZone domain, make sure that the IAM role used by Amazon EMR has the permissions to do the following: Read from and write to the S3 buckets Call the post_time_series_data_points action for Amazon DataZone: { "Version": "2012-10-17", "Statement": [ { "Sid": "Statement1", "Effect": "Allow", "Action": [ "datazone:PostTimeSeriesDataPoints" ], "Resource": [ "<datazone_domain_arn>" ] } ] } Make sure that you added the EMR role as a project member in the Amazon DataZone project. On the Amazon DataZone console, navigate to the Project members page and choose Add members. Add the EMR role as a contributor. Ingest and analyze PySpark code In this section, we analyze the PySpark code that we use to perform data quality checks and send the results to Amazon DataZone. You can download the complete PySpark script. To run the script entirely, you can submit a job to EMR Serverless. The service will take care of scheduling the job and automatically allocating the resources needed, enabling you to track the job run statuses throughout the process. You can submit a job to EMR within the Amazon EMR console using EMR Studio or programmatically, using the AWS CLI or using one of the AWS SDKs. In Apache Spark, a SparkSession is the entry point for interacting with DataFrames and Spark’s built-in functions. The script will start initializing a SparkSession: with SparkSession.builder.appName("PatientsDataValidation") \ .config("spark.jars.packages", pydeequ.deequ_maven_coord) \ .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \ .getOrCreate() as spark: We read a dataset from Amazon S3. For increased modularity, you can use the script input to refer to the S3 path: s3inputFilepath = sys.argv[1] s3outputLocation = sys.argv[2] df = spark.read.format("csv") \ .option("header", "true") \ .option("inferSchema", "true") \ .load(s3inputFilepath) #s3://<bucket_name>/patients/patients.csv Next, we set up a metrics repository. This can be helpful to persist the run results in Amazon S3. metricsRepository = FileSystemMetricsRepository(spark, s3_write_path) Pydeequ allows you to create data quality rules using the builder pattern, which is a well-known software engineering design pattern, concatenating instruction to instantiate a VerificationSuite object: key_tags = {'tag': 'patient_df'} resultKey = ResultKey(spark, ResultKey.current_milli_time(), key_tags) check = Check(spark, CheckLevel.Error, "Integrity checks") checkResult = VerificationSuite(spark) \ .onData(df) \ .useRepository(metricsRepository) \ .addCheck( check.hasSize(lambda x: x >= 1000) \ .isComplete("birthdate") \ .isUnique("id") \ .isComplete("ssn") \ .isComplete("first") \ .isComplete("last") \ .hasMin("healthcare_coverage", lambda x: x == 1000.0)) \ .saveOrAppendResult(resultKey) \ .run() checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult) checkResult_df.show() The following is the output for the data validation rules: +----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+ |check |check_level|check_status|constraint |constraint_status|constraint_message | +----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+ |Integrity checks|Error |Error |SizeConstraint(Size(None)) |Success | | |Integrity checks|Error |Error |CompletenessConstraint(Completeness(birthdate,None))|Success | | |Integrity checks|Error |Error |UniquenessConstraint(Uniqueness(List(id),None)) |Success | | |Integrity checks|Error |Error |CompletenessConstraint(Completeness(ssn,None)) |Success | | |Integrity checks|Error |Error |CompletenessConstraint(Completeness(first,None)) |Success | | |Integrity checks|Error |Error |CompletenessConstraint(Completeness(last,None)) |Success | | |Integrity checks|Error |Error |MinimumConstraint(Minimum(healthcare_coverage,None))|Failure |Value: 0.0 does not meet the constraint requirement!| +----------------+-----------+------------+----------------------------------------------------+-----------------+----------------------------------------------------+ At this point, we want to insert these data quality values in Amazon DataZone. To do so, we use the post_time_series_data_points function in the Boto3 Amazon DataZone client. The PostTimeSeriesDataPoints DataZone API allows you to insert new time series data points for a given asset or listing, without creating a new revision. At this point, you might also want to have more information on which fields are sent as input for the API. You can use the APIs to obtain the specification for Amazon DataZone form types; in our case, it’s amazon.datazone.DataQualityResultFormType. You can also use the AWS CLI to invoke the API and display the form structure: aws datazone get-form-type --domain-identifier <your_domain_id> --form-type-identifier amazon.datazone.DataQualityResultFormType --region <domain_region> --output text --query 'model.smithy' This output helps identify the required API parameters, including fields and value limits: $version: "2.0" namespace amazon.datazone structure DataQualityResultFormType { @amazon.datazone#timeSeriesSummary @range(min: 0, max: 100) passingPercentage: Double @amazon.datazone#timeSeriesSummary evaluationsCount: Integer evaluations: EvaluationResults } @length(min: 0, max: 2000) list EvaluationResults { member: EvaluationResult } @length(min: 0, max: 20) list ApplicableFields { member: String } @length(min: 0, max: 20) list EvaluationTypes { member: String } enum EvaluationStatus { PASS, FAIL } string EvaluationDetailType map EvaluationDetails { key: EvaluationDetailType value: String } structure EvaluationResult { description: String types: EvaluationTypes applicableFields: ApplicableFields status: EvaluationStatus details: EvaluationDetails } To send the appropriate form data, we need to convert the Pydeequ output to match the DataQualityResultsFormType contract. This can be achieved with a Python function that processes the results. For each DataFrame row, we extract information from the constraint column. For example, take the following code: CompletenessConstraint(Completeness(birthdate,None)) We convert it to the following: { "constraint": "CompletenessConstraint", "statisticName": "Completeness_custom", "column": "birthdate" } Make sure to send an output that matches the KPIs that you want to track. In our case, we are appending _custom to the statistic name, resulting in the following format for KPIs: Completeness_custom Uniqueness_custom In a real-world scenario, you might want to set a value that matches with your data quality framework in relation to the KPIs that you want to track in Amazon DataZone. After applying a transformation function, we have a Python object for each rule evaluation: ..., { 'applicableFields': ["healthcare_coverage"], 'types': ["Minimum_custom"], 'status': 'FAIL', 'description': 'MinimumConstraint - Minimum - Value: 0.0 does not meet the constraint requirement!' },... We also use the constraint_status column to compute the overall score: (number of success / total number of evaluation) * 100 In our example, this results in a passing percentage of 85.71%. We set this value in the passingPercentage input field along with the other information related to the evaluations in the input of the Boto3 method post_time_series_data_points: import boto3 # Instantiate the client library to communicate with Amazon DataZone Service # datazone = boto3.client( service_name='datazone', region_name=<Region(String) example: us-east-1> ) # Perform the API operation to push the Data Quality information to Amazon DataZone # datazone.post_time_series_data_points( domainIdentifier=<DataZone domain ID>, entityIdentifier=<DataZone asset ID>, entityType='ASSET', forms=[ { "content": json.dumps({ "evaluationsCount":<Number of evaluations (number)>, "evaluations": [<List of objects { 'description': <Description (String)>, 'applicableFields': [<List of columns involved (String)>], 'types': [<List of KPIs (String)>], 'status': <FAIL/PASS (string)> }> ], "passingPercentage":<Score (number)> }), "formName": <Form name(String) example: PydeequRuleSet1>, "typeIdentifier": "amazon.datazone.DataQualityResultFormType", "timestamp": <Date (timestamp)> } ] ) Boto3 invokes the Amazon DataZone APIs. In these examples, we used Boto3 and Python, but you can choose one of the AWS SDKs developed in the language you prefer. After setting the appropriate domain and asset ID and running the method, we can check on the Amazon DataZone console that the asset data quality is now visible on the asset page. We can observe that the overall score matches with the API input value. We can also see that we were able to add customized KPIs on the overview tab through custom types parameter values. With the new Amazon DataZone APIs, you can load data quality rules from third-party systems into a specific data asset. With this capability, Amazon DataZone allows you to extend the types of indicators present in AWS Glue Data Quality (such as completeness, minimum, and uniqueness) with custom indicators. Clean up We recommend deleting any potentially unused resources to avoid incurring unexpected costs. For example, you can delete the Amazon DataZone domain and the EMR application you created during this process. Conclusion In this post, we highlighted the latest features of Amazon DataZone for data quality, empowering end-users with enhanced context and visibility into their data assets. Furthermore, we delved into the seamless integration between Amazon DataZone and AWS Glue Data Quality. You can also use the Amazon DataZone APIs to integrate with external data quality providers, enabling you to maintain a comprehensive and robust data strategy within your AWS environment. To learn more about Amazon DataZone, refer to the Amazon DataZone User Guide. About the Authors Andrea Filippo is a Partner Solutions Architect at AWS supporting Public Sector partners and customers in Italy. He focuses on modern data architectures and helping customers accelerate their cloud journey with serverless technologies. Emanuele is a Solutions Architect at AWS, based in Italy, after living and working for more than 5 years in Spain. He enjoys helping large companies with the adoption of cloud technologies, and his area of expertise is mainly focused on Data Analytics and Data Management. Outside of work, he enjoys traveling and collecting action figures. Varsha Velagapudi is a Senior Technical Product Manager with Amazon DataZone at AWS. She focuses on improving data discovery and curation required for data analytics. She is passionate about simplifying customers’ AI/ML and analytics journey to help them succeed in their day-to-day tasks. Outside of work, she enjoys nature and outdoor activities, reading, and traveling. View the full article
-
- aws glue
- aws glue data quality
-
(and 1 more)
Tagged with:
-
This is post is co-written with Andries Engelbrecht and Scott Teal from Snowflake. Businesses are constantly evolving, and data leaders are challenged every day to meet new requirements. For many enterprises and large organizations, it is not feasible to have one processing engine or tool to deal with the various business requirements. They understand that a one-size-fits-all approach no longer works, and recognize the value in adopting scalable, flexible tools and open data formats to support interoperability in a modern data architecture to accelerate the delivery of new solutions. Customers are using AWS and Snowflake to develop purpose-built data architectures that provide the performance required for modern analytics and artificial intelligence (AI) use cases. Implementing these solutions requires data sharing between purpose-built data stores. This is why Snowflake and AWS are delivering enhanced support for Apache Iceberg to enable and facilitate data interoperability between data services. Apache Iceberg is an open-source table format that provides reliability, simplicity, and high performance for large datasets with transactional integrity between various processing engines. In this post, we discuss the following: Advantages of Iceberg tables for data lakes Two architectural patterns for sharing Iceberg tables between AWS and Snowflake: Manage your Iceberg tables with AWS Glue Data Catalog Manage your Iceberg tables with Snowflake The process of converting existing data lakes tables to Iceberg tables without copying the data Now that you have a high-level understanding of the topics, let’s dive into each of them in detail. Advantages of Apache Iceberg Apache Iceberg is a distributed, community-driven, Apache 2.0-licensed, 100% open-source data table format that helps simplify data processing on large datasets stored in data lakes. Data engineers use Apache Iceberg because it’s fast, efficient, and reliable at any scale and keeps records of how datasets change over time. Apache Iceberg offers integrations with popular data processing frameworks such as Apache Spark, Apache Flink, Apache Hive, Presto, and more. Iceberg tables maintain metadata to abstract large collections of files, providing data management features including time travel, rollback, data compaction, and full schema evolution, reducing management overhead. Originally developed at Netflix before being open sourced to the Apache Software Foundation, Apache Iceberg was a blank-slate design to solve common data lake challenges like user experience, reliability, and performance, and is now supported by a robust community of developers focused on continually improving and adding new features to the project, serving real user needs and providing them with optionality. Transactional data lakes built on AWS and Snowflake Snowflake provides various integrations for Iceberg tables with multiple storage options, including Amazon S3, and multiple catalog options, including AWS Glue Data Catalog and Snowflake. AWS provides integrations for various AWS services with Iceberg tables as well, including AWS Glue Data Catalog for tracking table metadata. Combining Snowflake and AWS gives you multiple options to build out a transactional data lake for analytical and other use cases such as data sharing and collaboration. By adding a metadata layer to data lakes, you get a better user experience, simplified management, and improved performance and reliability on very large datasets. Manage your Iceberg table with AWS Glue You can use AWS Glue to ingest, catalog, transform, and manage the data on Amazon Simple Storage Service (Amazon S3). AWS Glue is a serverless data integration service that allows you to visually create, run, and monitor extract, transform, and load (ETL) pipelines to load data into your data lakes in Iceberg format. With AWS Glue, you can discover and connect to more than 70 diverse data sources and manage your data in a centralized data catalog. Snowflake integrates with AWS Glue Data Catalog to access the Iceberg table catalog and the files on Amazon S3 for analytical queries. This greatly improves performance and compute cost in comparison to external tables on Snowflake, because the additional metadata improves pruning in query plans. You can use this same integration to take advantage of the data sharing and collaboration capabilities in Snowflake. This can be very powerful if you have data in Amazon S3 and need to enable Snowflake data sharing with other business units, partners, suppliers, or customers. The following architecture diagram provides a high-level overview of this pattern. The workflow includes the following steps: AWS Glue extracts data from applications, databases, and streaming sources. AWS Glue then transforms it and loads it into the data lake in Amazon S3 in Iceberg table format, while inserting and updating the metadata about the Iceberg table in AWS Glue Data Catalog. The AWS Glue crawler generates and updates Iceberg table metadata and stores it in AWS Glue Data Catalog for existing Iceberg tables on an S3 data lake. Snowflake integrates with AWS Glue Data Catalog to retrieve the snapshot location. In the event of a query, Snowflake uses the snapshot location from AWS Glue Data Catalog to read Iceberg table data in Amazon S3. Snowflake can query across Iceberg and Snowflake table formats. You can share data for collaboration with one or more accounts in the same Snowflake region. You can also use data in Snowflake for visualization using Amazon QuickSight, or use it for machine learning (ML) and artificial intelligence (AI) purposes with Amazon SageMaker. Manage your Iceberg table with Snowflake A second pattern also provides interoperability across AWS and Snowflake, but implements data engineering pipelines for ingestion and transformation to Snowflake. In this pattern, data is loaded to Iceberg tables by Snowflake through integrations with AWS services like AWS Glue or through other sources like Snowpipe. Snowflake then writes data directly to Amazon S3 in Iceberg format for downstream access by Snowflake and various AWS services, and Snowflake manages the Iceberg catalog that tracks snapshot locations across tables for AWS services to access. Like the previous pattern, you can use Snowflake-managed Iceberg tables with Snowflake data sharing, but you can also use S3 to share datasets in cases where one party does not have access to Snowflake. The following architecture diagram provides an overview of this pattern with Snowflake-managed Iceberg tables. This workflow consists of the following steps: In addition to loading data via the COPY command, Snowpipe, and the native Snowflake connector for AWS Glue, you can integrate data via the Snowflake Data Sharing. Snowflake writes Iceberg tables to Amazon S3 and updates metadata automatically with every transaction. Iceberg tables in Amazon S3 are queried by Snowflake for analytical and ML workloads using services like QuickSight and SageMaker. Apache Spark services on AWS can access snapshot locations from Snowflake via a Snowflake Iceberg Catalog SDK and directly scan the Iceberg table files in Amazon S3. Comparing solutions These two patterns highlight options available to data personas today to maximize their data interoperability between Snowflake and AWS using Apache Iceberg. But which pattern is ideal for your use case? If you’re already using AWS Glue Data Catalog and only require Snowflake for read queries, then the first pattern can integrate Snowflake with AWS Glue and Amazon S3 to query Iceberg tables. If you’re not already using AWS Glue Data Catalog and require Snowflake to perform reads and writes, then the second pattern is likely a good solution that allows for storing and accessing data from AWS. Considering that reads and writes will probably operate on a per-table basis rather than the entire data architecture, it is advisable to use a combination of both patterns. Migrate existing data lakes to a transactional data lake using Apache Iceberg You can convert existing Parquet, ORC, and Avro-based data lake tables on Amazon S3 to Iceberg format to reap the benefits of transactional integrity while improving performance and user experience. There are several Iceberg table migration options (SNAPSHOT, MIGRATE, and ADD_FILES) for migrating existing data lake tables in-place to Iceberg format, which is preferable to rewriting all of the underlying data files—a costly and time-consuming effort with large datasets. In this section, we focus on ADD_FILES, because it’s useful for custom migrations. For ADD_FILES options, you can use AWS Glue to generate Iceberg metadata and statistics for an existing data lake table and create new Iceberg tables in AWS Glue Data Catalog for future use without needing to rewrite the underlying data. For instructions on generating Iceberg metadata and statistics using AWS Glue, refer to Migrate an existing data lake to a transactional data lake using Apache Iceberg or Convert existing Amazon S3 data lake tables to Snowflake Unmanaged Iceberg tables using AWS Glue. This option requires that you pause data pipelines while converting the files to Iceberg tables, which is a straightforward process in AWS Glue because the destination just needs to be changed to an Iceberg table. Conclusion In this post, you saw the two architecture patterns for implementing Apache Iceberg in a data lake for better interoperability across AWS and Snowflake. We also provided guidance on migrating existing data lake tables to Iceberg format. Sign up for AWS Dev Day on April 10 to get hands-on not only with Apache Iceberg, but also with streaming data pipelines with Amazon Data Firehose and Snowpipe Streaming, and generative AI applications with Streamlit in Snowflake and Amazon Bedrock. About the Authors Andries Engelbrecht is a Principal Partner Solutions Architect at Snowflake and works with strategic partners. He is actively engaged with strategic partners like AWS supporting product and service integrations as well as the development of joint solutions with partners. Andries has over 20 years of experience in the field of data and analytics. Deenbandhu Prasad is a Senior Analytics Specialist at AWS, specializing in big data services. He is passionate about helping customers build modern data architectures on the AWS Cloud. He has helped customers of all sizes implement data management, data warehouse, and data lake solutions. Brian Dolan joined Amazon as a Military Relations Manager in 2012 after his first career as a Naval Aviator. In 2014, Brian joined Amazon Web Services, where he helped Canadian customers from startups to enterprises explore the AWS Cloud. Most recently, Brian was a member of the Non-Relational Business Development team as a Go-To-Market Specialist for Amazon DynamoDB and Amazon Keyspaces before joining the Analytics Worldwide Specialist Organization in 2022 as a Go-To-Market Specialist for AWS Glue. Nidhi Gupta is a Sr. Partner Solution Architect at AWS. She spends her days working with customers and partners, solving architectural challenges. She is passionate about data integration and orchestration, serverless and big data processing, and machine learning. Nidhi has extensive experience leading the architecture design and production release and deployments for data workloads. Scott Teal is a Product Marketing Lead at Snowflake and focuses on data lakes, storage, and governance. View the full article
-
- data lakes
- amazon s3
-
(and 2 more)
Tagged with:
-
Data Engineering Tools in 2024 The data engineering landscape in 2024 is bustling with innovative tools and evolving trends. Here’s an updated perspective on some of the key players and how they can empower your data pipelines: Data Integration: Informatica Cloud: Still a leader for advanced data quality and governance, with enhanced cloud-native capabilities. MuleSoft Anypoint Platform: Continues to shine in building API-based integrations, now with deeper cloud support and security features. Fivetran: Expands its automated data pipeline creation with pre-built connectors and advanced transformations. Hevo Data: Remains a strong contender for ease of use and affordability, now offering serverless options for scalability. Data Warehousing: Snowflake: Maintains its edge in cloud-based warehousing, with improved performance and broader integrations for analytics. Google BigQuery: Offers even more cost-effective options for variable workloads, while deepening its integration with other Google Cloud services. Amazon Redshift: Continues to be a powerful choice for AWS environments, now with increased focus on security and data governance. Microsoft Azure Synapse Analytics: Further integrates its data warehousing, lake, and analytics capabilities, providing a unified platform for diverse data needs. Data Processing and Orchestration: Apache Spark: Remains the reigning champion for large-scale data processing, now with enhanced performance optimizations and broader ecosystem support. Apache Airflow: Maintains its popularity for workflow orchestration, with improved scalability and user-friendliness. Databricks: Expands its cloud-based platform for Spark with advanced features like AI integration and real-time streaming. AWS Glue: Simplifies data processing and ETL within the AWS ecosystem, now with serverless options for cost efficiency. Emerging Trends: GitOps: Gaining traction for managing data pipelines with version control and collaboration, ensuring consistency and traceability. AI and Machine Learning: Increasingly integrated into data engineering tools for automation, anomaly detection, and data quality improvement. Serverless Data Processing: Offering cost-effective and scalable options for event-driven and real-time data processing. Choosing the right tools: With this diverse landscape, selecting the right tools depends on your specific needs. Consider factors like: Data volume and complexity: Match tool capabilities to your data size and structure. Cloud vs. on-premises: Choose based on your infrastructure preferences and security requirements. Budget: Evaluate pricing models and potential costs associated with each tool. Integration needs: Ensure seamless compatibility with your existing data sources and BI tools. Skillset: Consider the technical expertise required for each tool and available support resources. By carefully evaluating your needs and exploring the strengths and limitations of these top contenders, you’ll be well-equipped to choose the data engineering tools that empower your organization to unlock valuable insights from your data in 2024. The post Data Engineering Tools in 2024 appeared first on DevOpsSchool.com. View the full article
-
- 1
-
- snowflake
- databricks
- (and 9 more)
-
With all the generative AI announcements at AWS re:invent 2023, I’ve committed to dive deep into this technology and learn as much as I can. If you are too, I’m happy that among other resources available, the AWS community also has a space that I can access for generative AI tools and guides. Last week’s launches Here are some launches that got my attention during the previous week. Amazon Q data integration in AWS Glue (Preview) – Now you can use natural language to ask Amazon Q to author jobs, troubleshoot issues, and answer questions about AWS Glue and data integration. Amazon Q was launched in preview at AWS re:invent 2023, and is a generative AI–powered assistant to help you solve problems, generate content, and take action. General availability of CDK Migrate – CDK Migrate is a component of the AWS Cloud Development Kit (CDK) that enables you to migrate AWS CloudFormation templates, previously deployed CloudFormation stacks, or resources created outside of Infrastructure as Code (IaC) into a CDK application. This feature was launched alongside the CloudFormation IaC Generator to give you an end-to-end experience that enables you to create an IaC configuration based off a resource, as well as its relationships. You can expect the IaC generator to have a huge impact for a common use case we’ve seen. For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page. Other AWS news Here are some additional projects, programs, and news items that you might find interesting: Amazon API Gateway processed over 100 trillion API requests in 2023, demonstrating the growing demand for API-driven applications. API Gateway is a fully-managed API management service. Customers from all industry verticals told us they’re adopting API Gateway for multiple reasons. First, its ability to scale to meet the demands of even the most high-traffic applications. Second, its fully-managed, serverless architecture, which eliminates the need to manage any infrastructure, and frees customers to focus on their core business needs. Join the PartyRock Generative AI Hackathon by AWS. This is a challenge for you to get hands-on building generative AI-powered apps. You’ll use Amazon PartyRock, an Amazon Bedrock Playground, as a fast and fun way to learn about Prompt Engineering and Foundational Models (FMs) to build a functional app with generative AI. AWS open source news and updates – My colleague Ricardo writes this weekly open source newsletter in which he highlights new open source projects, tools, and demos from the AWS Community. Upcoming AWS events Whether you’re in the Americas, Asia Pacific & Japan, or EMEA region, there’s an upcoming AWS Innovate Online event that fits your timezone. Innovate Online events are free, online, and designed to inspire and educate you about AWS. AWS Summits are a series of free online and in-person events that bring the cloud computing community together to connect, collaborate, and learn about AWS. These events are designed to educate you about AWS products and services and help you develop the skills needed to build, deploy, and operate your infrastructure and applications. Find an AWS Summit near you and register or set a notification to know when registration opens for a Summit that interests you. AWS Community re:Invent re:Caps – Join a Community re:Cap event organized by volunteers from AWS User Groups and AWS Cloud Clubs around the world to learn about the latest announcements from AWS re:Invent. You can browse all upcoming in-person and virtual events. That’s all for this week. Check back next Monday for another Weekly Roundup! – Veliswa This post is part of our Weekly Roundup series. Check back each week for a quick roundup of interesting news and announcements from AWS! View the full article
-
Today we’re previewing a new chat experience for AWS Glue that will let you use natural language to author and troubleshoot data integration jobs. Amazon Q data integration in AWS Glue will reduce the time and effort you need to learn, build, and run data integration jobs using AWS Glue data integration engines. You can author jobs, troubleshoot issues, and get instant answers to questions about AWS Glue and anything related to data integration. The chat experience is powered by Amazon Bedrock. You can describe your data integration workload and Amazon Q will generate a complete ETL script. You can troubleshoot your jobs by asking Amazon Q to explain errors and propose solutions. Amazon Q provides detailed guidance throughout the entire data integration workflow. Amazon Q helps you learn and build data integration jobs using AWS Glue. Amazon Q can help you connect to common AWS sources such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, and Amazon DynamoDB. Let me show you some capabilities of Amazon Q data integration in AWS Glue. 1. Conversational Q&A capability To start using this feature, I can select the Amazon Q icon on the right-hand side of the AWS Management Console. For example, I can ask, “What is AWS Glue,” and Amazon Q provides concise explanations along with references I can use to follow up on my questions and validate the guidance. With Amazon Q, I can elaborate on my use cases in more detail to provide context. For example, I can ask Amazon Q, “How do I create an AWS Glue job?” Next let me ask Amazon Q, “How do I optimize memory management in my AWS Glue job?” 2. AWS Glue job creation To use this feature, I can tell Amazon Q, “Write a Glue ETL job that reads from Redshift, drops null fields, and writes to S3 as parquet files.” I can copy code into the script editor or notebook with a simple click on the Copy button. I can also tell Amazon Q, “Help me with a Glue job that reads my DynamoDB table, maps the fields, and writes the results to Amazon S3 in Parquet format”. Get started with Amazon Q today With Amazon Q, you have an artificial intelligence (AI) expert by your side to answer questions, write code faster, troubleshoot issues, optimize workloads, and even help you code new features. These capabilities simplify every phase of building applications on AWS. Amazon Q data integration in AWS Glue is available in every region where Amazon Q is supported. To learn more, see the Amazon Q pricing page. Learn more Amazon Q main product page Amazon Q data integration Amazon Q details for IT pros and developers Get started with Amazon Q — Irshad View the full article
-
AWS Glue now supports GitLab and BitBucket, alongside GitHub and AWS CodeCommit, broadening your toolset for managing data integration pipeline deployments. AWS Glue is a serverless data integration service that makes it simpler to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. View the full article
-
AWS Lake Formation and the Glue Data Catalog now extend data cataloging, data sharing and fine-grained access control support for customers using a self-managed Apache Hive Metastore (HMS) as their data catalog. Previously, customers had to replicate their metadata into the AWS Glue Data Catalog in order use Lake Formation permissions and data sharing capabilities. Now, customers can integrate their HMS metadata within AWS, allowing them to discover data alongside native tables in the Glue data catalog, manage permissions and sharing from Lake Formation, and query data using AWS analytics services. View the full article
-
- aws glue
- aws glue data catalog
-
(and 1 more)
Tagged with:
-
AWS Glue crawlers now have enhanced support for Linux Foundation Delta Lake tables, increasing operational efficiency to extract meaningful insights from analytics services such as Amazon Athena, Amazon EMR, and AWS Glue. This feature enables analytics services scan Delta Lake tables without requiring the creation of manifest files by Glue crawlers. Newly cataloged data is now quickly made available for analysis using your preferred analytics and machine learning (ML) tools. View the full article
-
AWS Glue for Apache Spark now supports three open source data lake storage frameworks: Apache Hudi, Apache Iceberg, and Linux Foundation Delta Lake. These frameworks allow you to read and write data in Amazon Simple Storage Service (Amazon S3) in a transactionally consistent manner. AWS Glue is a serverless, scalable data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources. This feature removes the need to install a separate connector and reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark jobs. View the full article
-
AWS Glue Studio now supports updating the AWS Glue Data Catalog during job runs. This feature makes it easy to keep your tables up to date as AWS Glue writes new data into Amazon S3, making the data immediately queryable from any analytics service compatible with the AWS Glue Data Catalog. View the full article
- 2 replies
-
- aws glue
- aws glue data catalog
-
(and 1 more)
Tagged with:
-
Streaming extract, transform, and load (ETL) jobs in AWS Glue can now read data encoded in the Apache Avro format. Previously, streaming ETL jobs could read data in the JSON, CSV, Parquet, and XML formats. With the addition of Avro, streaming ETL jobs now support all the same formats as batch AWS Glue jobs. View the full article
-
Forum Statistics
67.4k
Total Topics65.3k
Total Posts