Search the Community
Showing results for tags 'etl'.
-
AWS Glue is a serverless ETL solution that helps organizations move data into enterprise-class data warehouses. It provides close integration with other AWS services, which appeals to businesses already invested significantly in AWS. If you are looking for a replacement for AWS Glue, this guide will walk you through the top 5 AWS Glue alternatives. […]View the full article
-
In today’s data-driven era, you have more raw data than ever before. However, to leverage the power of big data, you need to convert raw data into valuable insights for informed decision-making. When it comes to preparing data for analysis, you will always come across the terms “data wrangling” and “ETL.” While they may sound […]View the full article
-
- etl
- differences
-
(and 1 more)
Tagged with:
-
Organizations use ETL (Extract, Transform, and Load) to obtain quality data for expediting decision-making. But, the myriad of available ETL tools makes it challenging for organizations to evaluate and embrace the right tool. Today, ETL tools are divided into various types, making it even more difficult for companies to find the right fit. In this […]View the full article
-
Amazon Redshift is a serverless, fully managed leading data warehouse in the market, and many organizations are migrating their legacy data to Redshift for better analytics. In this blog, we will discuss the best Redshift ETL tools that you can use to load data into Redshift. 8 Best Redshift ETL Tools Let’s have a detailed […]View the full article
-
Today, companies have access to a broad spectrum of big data gathered from various sources. These sources include web crawlers, sensors, server logs, marketing tools, spreadsheets, and APIs. To gain a competitive advantage in the business, it is crucial to gain proficiency in using data to improve business operations. However, the information from different sources […]View the full article
-
According to a research report* by MarketsandMarkets, the data integration market is expected to grow from USD 11.6 Billion in 2021 to USD 19.6 Billion by 2026. This implies the huge potential of data integration and the two approaches to data management– ETL and ELT. However, in the battle of ETL vs ELT, choosing one over […]View the full article
-
The importance of using data in sectors like Data Science, Machine Learning, etc. grows as the amount of data sources, and data types in an organization expand. Converting raw data into a clean and reliable form is a key step for extracting meaningful insights from it. ETL (Extract, Transform, and Load) is a Data Engineering […]View the full article
-
Today, businesses all around the world are driven by data. This has led to companies exploiting every available online application, service, and social platform to extract data to better understand the changing market trends. Now, this data requires numerous complex transformations to get ready for Data Analytics. Moreover, companies require technologies that can transfer and […]View the full article
-
- data pipelines
- comparisons
-
(and 2 more)
Tagged with:
-
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security. By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals. This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA. Solution overview For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security. Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A. Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables. The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. The workflow consists of the following components: The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data. In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones. Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager. VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources. Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion. The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed. The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA. Prerequisites Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA. Deploy resources in Account A using AWS CloudFormation In Account A, launch the provided AWS CloudFormation stack to create the following resources: The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/. A sample dataset called products.csv, which we use in this post. Upload the AWS Glue job to Amazon S3 in Account B In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location. Deploy resources in Account B using AWS CloudFormation In Account B, launch the provided CloudFormation stack template to create the following resources: The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure: dags – The folder for DAG files. plugins – The file for any custom or community Airflow plugins. requirements – The requirements.txt file for any Python packages. scripts – Any SQL scripts used in the DAG. data – Any datasets used in the DAG. A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample. An AWS Glue environment, which contains the following: An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A. A database called products_db in the AWS Glue Data Catalog. An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products. A VPC gateway endpointto Amazon S3. An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA). Create Amazon Redshift resources Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file. In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products. Configure Airflow permissions After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder. Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment. The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle. Set up the environment This section outlines the steps to configure the environment. The process involves the following high-level steps: Update any necessary providers. Set up cross-account access. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC. Configure Secrets Manager to integrate with Amazon MWAA. Define Airflow connections. Update the providers Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post). Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider. Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality. The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package. Specify the requirements as follows: --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt" apache-airflow-providers-amazon==8.4.0 Update the version in the constraints file to 8.4.0 or higher. Add the constraints-3.11-updated.txt file to the /dags folder. Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version. Navigate to the Amazon MWAA environment and choose Edit. Under DAG code in Amazon S3, for Requirements file, choose the latest version. Choose Save. This will update the environment and new providers will be in effect. To verify the providers version, go to Providers under the Admin table. The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details. Set up cross-account access You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps: In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": [ "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>", "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>" ] }, "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::sample-inp-bucket-etl-<username>/*", "arn:aws:s3:::sample-inp-bucket-etl-<username>" ] } ] } Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>" }, "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::sample-opt-bucket-etl-<username>/*", "arn:aws:s3:::sample-opt-bucket-etl-<username>" ] } ] } In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket: { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:Encrypt", "kms:GenerateDataKey" ], "Resource": [ "<KMS_KEY_ARN_Used_for_S3_encryption>" ] }, { "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetBucketAcl", "s3:GetBucketCors", "s3:GetEncryptionConfiguration", "s3:GetBucketLocation", "s3:ListAllMyBuckets", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListBucketVersions", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::sample-opt-bucket-etl-<username>", "arn:aws:s3:::sample-opt-bucket-etl-<username>/*" ] } ] } Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A. Add this policy to the AWS Glue role and Amazon MWAA role: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*" } ] } In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A: { "Version": "2012-10-17", "Statement": [ { "Sid": "CrossAccountPolicy", "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA" } ] } Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A. Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA. Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering. Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot. If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run. Configure the Amazon MWAA connection with Secrets Manager When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret. Complete the following steps: Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager). This allows Amazon MWAA to access credentials stored in Secrets Manager. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings. This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths: secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"} To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell. Run the following code to generate the connection URI string: import urllib.parse conn_type = 'redshift' host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint port = '5439' login = 'admin' #Specify the username to use for authentication with Amazon Redshift password = '<password>' #Specify the password to use for authentication with Amazon Redshift role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>') database = 'dev' region = 'us-east-1' #YOUR_REGION conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}®ion={7}'.format(conn_type, login, password, host, port, role_arn, database, region) print(conn_string) The connection string should be generated as follows: redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev®ion=<region> Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI). This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext. aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev®ion=us-east-1" --region=us-east-1 Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name. You can also add secrets using the Secrets Manager console as key-value pairs. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test. Create an Airflow connection through the metadata database You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI. For Connection Id, enter a name for the connection. For Connection Type, choose Amazon Redshift. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless. For Database, enter dev. For User, enter your admin user name. For Password, enter your password. For Port, use port 5439. For Extra, set the region and timeout parameters. Test the connection, then save your settings. Create and run a DAG In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets. Create a DAG In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules: The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket. For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder. We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler. After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps. The DAG uses GlueCrawlerSensor to wait for the crawler to complete. When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter. The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file. You can connect to Amazon Redshift from Airflow using three different operators: PythonOperator. SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection. RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection. In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator. Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections. In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed. In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter. As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data. TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete. The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence. The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI. from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator from airflow.utils.trigger_rule import TriggerRule dag_id = "data_pipeline" vYear = datetime.today().strftime("%Y") vMonth = datetime.today().strftime("%m") vDay = datetime.today().strftime("%d") src_bucket_name = "sample-inp-bucket-etl-<username>" tgt_bucket_name = "sample-opt-bucket-etl-<username>" s3_folder="products" #Please replace the variable with the glue_role_arn glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>" glue_crawler_name = "products" glue_db_name = "products_db" glue_job_name = "sample_glue_job" glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py" workgroup_name = "sample-workgroup" redshift_table = "products_f" redshift_conn_id_name="secrets_redshift_connection" db_name = "dev" secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx" poll_interval = 10 @task def get_role_name(arn: str) -> str: return arn.split("/")[-1] @task def get_s3_loc(s3_folder: str) -> str: s3_loc = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv" return s3_loc with DAG( dag_id=dag_id, schedule="@once", start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: role_arn = glue_role_arn_key glue_role_name = get_role_name(role_arn) s3_loc = get_s3_loc(s3_folder) # Check for new incremental files in S3 source/input bucket sensor_key = S3KeySensor( task_id="sensor_key", bucket_key=s3_loc, bucket_name=src_bucket_name, wildcard_match=True, #timeout=18*60*60, #poke_interval=120, timeout=60, poke_interval=30, mode="reschedule" ) # Run Glue crawler glue_crawler_config = { "Name": glue_crawler_name, "Role": role_arn, "DatabaseName": glue_db_name, } crawl_s3 = GlueCrawlerOperator( task_id="crawl_s3", config=glue_crawler_config, ) # GlueCrawlerOperator waits by default, setting as False to test the Sensor below. crawl_s3.wait_for_completion = False # Wait for Glue crawler to complete wait_for_crawl = GlueCrawlerSensor( task_id="wait_for_crawl", crawler_name=glue_crawler_name, ) # Run Glue Job submit_glue_job = GlueJobOperator( task_id="submit_glue_job", job_name=glue_job_name, script_location=glue_script_location, iam_role_name=glue_role_name, create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"}, ) # GlueJobOperator waits by default, setting as False to test the Sensor below. submit_glue_job.wait_for_completion = False # Wait for Glue Job to complete wait_for_job = GlueJobSensor( task_id="wait_for_job", job_name=glue_job_name, # Job ID extracted from previous Glue Job Operator task run_id=submit_glue_job.output, verbose=True, # prints glue job logs in airflow logs ) wait_for_job.poke_interval = 5 # Execute the Stored Procedure in Redshift Serverless using Data Operator execute_redshift_stored_proc = RedshiftDataOperator( task_id="execute_redshift_stored_proc", database=db_name, workgroup_name=workgroup_name, secret_arn=secret_arn, sql="""CALL sp_products();""", poll_interval=poll_interval, wait_for_completion=True, ) # Execute the Stored Procedure in Redshift Serverless using SQL Operator delete_from_table = SQLExecuteQueryOperator( task_id="delete_from_table", conn_id=redshift_conn_id_name, sql="DELETE FROM products;", trigger_rule=TriggerRule.ALL_DONE, ) # Unload the data from Redshift table to S3 transfer_redshift_to_s3 = RedshiftToS3Operator( task_id="transfer_redshift_to_s3", s3_bucket=tgt_bucket_name, s3_key=s3_loc, schema="PUBLIC", table=redshift_table, redshift_conn_id=redshift_conn_id_name, ) transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE #Chain the tasks to be executed chain( sensor_key, crawl_s3, wait_for_crawl, submit_glue_job, wait_for_job, execute_redshift_stored_proc, delete_from_table, transfer_redshift_to_s3 ) Verify the DAG run After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot. In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status. Verify the results On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files. On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/. Clean up Clean up the resources created as part of this post to avoid incurring ongoing charges: Delete the CloudFormation stacks and S3 bucket that you created as prerequisites. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager. Conclusion With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples. About the Authors Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture. Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well. View the full article
-
- etl
- etl pipelines
- (and 5 more)
-
ETL is like doing a jigsaw puzzle with no picture on the box. You don’t know what the final result should look like, but you must make all the pieces fit together. Extract, transform, and load (ETL) processes play a critical role in any data-driven organization. It is likely that your ETL project will not […]View the full article
-
A fundamental requirement for any data-driven organization is to have a streamlined data delivery mechanism. With organizations collecting data at a rate like never before, devising data pipelines for adequate flow of information for analytics and Machine Learning tasks becomes crucial for businesses. As organizations gather information from multiple sources and data can come in […]View the full article
-
- etl
- data ingestion
-
(and 5 more)
Tagged with:
-
ETL processes often involve aggregating data from various sources into a data warehouse or data lake. Bucketing can be used during the transformation phase to aggregate data into predefined buckets or intervals. For example, you might want to aggregate daily sales data into monthly buckets or hourly sensor readings into daily buckets. It plays a […]View the full article
-
According to Expert Market Research, the global big data & Analytics market is expected to grow at a CAGR of 10%. This report also forecasts that the global investment in big data and analytics will reach $450 Billion by 2026. Hence, considering the massive scope of the analytics and the amount of data you deal […]View the full article
-
etl Salesforce ETL: 2 Easy Methods
Hevo Data posted a topic in Databases, Data Engineering & Data Science
Organizations are constantly on the lookout for simple solutions to integrate their company data from several sources into a centralized location, then analyze it to make informed decisions. This process is termed Data Integration. One of the most popular Data Integration techniques is ETL (Extract, Transform and Load). You will get to know more about […]View the full article -
Learn about how configuring your warehouse as a data source can fully unlock your data’s value, how Reverse ETL works, and how to set it up in RudderStackView the full article
-
- data warehousing
- etl
-
(and 1 more)
Tagged with:
-
When it comes to Reverse ETL, the business use cases usually get all the attention. Here, we focus on how it makes data engineering easier. View the full article
-
- data engineering
- event streaming
-
(and 1 more)
Tagged with:
-
ETL, or Extract, Transform, Load, serves as the backbone for data-driven decision-making in today's rapidly evolving business landscape. However, traditional ETL processes often suffer from challenges like high operational costs, error-prone execution, and difficulty scaling. Enter automation—a strategy not merely as a facilitator but a necessity to alleviate these burdens. So, let's dive into the transformative impact of automating ETL workflows, the tools that make it possible, and methodologies that ensure robustness. The Evolution of ETL Gone are the days when ETL processes were relegated to batch jobs that ran in isolation, churning through records in an overnight slog. The advent of big data and real-time analytics has fundamentally altered the expectations from ETL processes. As Doug Cutting, the co-creator of Hadoop, aptly said, "The world is one big data problem." This statement resonates more than ever as we are bombarded with diverse, voluminous, and fast-moving data from myriad sources. View the full article
-
- etl
- automation
-
(and 1 more)
Tagged with:
-
Get insights into the day-to-day challenges of builders. In this issue, Peter Reitz from our partner tecRacer talks about how to build Serverless ETL (extract, transform and load) pipelines with the help of Amazon Managed Workflows for Apache Airflow (MWAA) and Amazon Athena. /images/2022/06/diary.jpg If you prefer a video or podcast instead of reading, here you go. JavaScript is disabled. Please visit YouTube.com to watch the video. Do you prefer listening to a podcast episode over reading a blog post? Here you go! What sparked your interest in cloud computing? Computers have always held a great fascination for me. I taught myself how to program. That’s how I ended up working as a web developer during my economics studies. When I first stumbled upon Amazon Web Services, I was intrigued by the technology and wide variety of services. How did you grow into the role of a cloud consultant? After completing my economics degree, I was looking for a job. By chance, a job ad drew my attention to a vacancy for a cloud consultant at tecRacer. To be honest, my skills didn’t match the requirements very well. But because I found the topic exciting, I applied anyway. Right from the job interview, I felt right at home at tecRacer in Duisburg. Since I had no experience with AWS, there was a lot to learn within the first months. My first goal was to achieve the AWS Certified Solutions Architect - Associate certification. The entire team supported and motivated me during this intensive learning phase. After that, I joined a small team working on a project for one of our consulting clients. This allowed me to gain practical experience at a very early stage. What does your day-to-day work as a cloud consultant at tecRacer look like? As a cloud consultant, I work on projects for our clients. I specialize in machine learning and data analytics. Since tecRacer has a 100% focus on AWS, I invest in my knowledge of related AWS services like S3, Athena, EMR, SageMaker, and more. I work remotely or at our office in Hamburg and am at the customer’s site every now and then. For example, to analyze the requirements for a project in workshops. What project are you currently working on? I’m currently working on building an ETL pipeline. Several data providers upload CSV files to an S3 bucket. My client’s challenge is to extract and transform 3 billion data points and store them in a way that allows efficient data analytics. This process can be roughly described as follows. Fetch CSV files from S3. Parse CSV files. Filter, transform, and enrich columns. Partition data to enable efficient queries in the future. Transform to a file format optimized for data analytics. Upload data to S3. I’ve been implementing similar data pipelines in the past. My preferred solution consists of the following building blocks: S3 storing the input data. Apache Airflow to orchestrate the ETL pipeline. Athena to extract, transform, and load the data. S3 storing the output data. /images/2022/09/serverless-etl-airflow-athena.png How do you build an ETL pipeline based on Athena? Amazon Athena enables me to query data stored on S3 on-demand using SQL. The remarkable thing about Athena is that the service is serverless, which means we only have to pay for the processed data when running a query. There are no idle costs except the S3 storage costs. As mentioned before, in my current project, the challenge is to extract data from CSV files and store the data in a way that is optimized for data analytics. My approach is transforming the CSV files into more efficient formats such as Parquet. The Parquet file format is designed for efficient data analysis and organizes data in rows, not columns, as CSV does. Therefore, Athena skips fetching and processing all other columns when querying only a subset of the available columns. Also, Parquet compresses the data to minimize storage and network consumption. I like using Athena for ETL jobs because of its simplicity and pay-per-use pricing mode. The CREATE TABLE AS SELECT (CTAS) statement implements ETL as described in the following: Extract: Load data from CSV files stored on S3 (SELECT FROM "awsmp"."cas_daily_business_usage_by_instance_type") Transform: Filter and enrich columns. (SELECT product_code, SUM(estimated_revenue) AS revenue, concat(year, '-', month, '-01') as date) Load: Store results in Parquet file format on S3 (CREATE TABLE monthly_recurring_revenue). CREATE TABLE monthly_recurring_revenue WITH ( format = 'Parquet', external_location = 's3://demo-datalake/monthly_recurring_revenue/', partitioned_by = ARRAY['date'] ) AS SELECT product_code, SUM(estimated_revenue) AS revenue, concat(year, '-', month, '-01') as date FROM ( SELECT year, month, day, product_code, estimated_revenue FROM "awsmp"."cas_daily_business_usage_by_instance_type" ORDER BY year, month, day ) GROUP BY year, month, product_code ORDER BY year, month, product_code Besides converting the data into the Parquet file format, the statement also partitions the data. This means the keys of the objects start with something like date=2022-08-01, which allows Athena to only fetch relevant files from S3 when querying by date. Why did you choose Athena instead of Amazon EMR to build an ETL pipeline? I’ve been using EMR for some projects in the past. But nowadays, I prefer adding Athena to the mix, wherever feasible. That’s because compared to EMR, Athena is a lightweight solution. Using Athena is less complex than running jobs on EMR. For example, I prefer using SQL to transform data instead of writing Python code. It takes me less time to build an ETL pipeline with Athena compared to EMR. Also, accessing Athena is much more convenient as all functionality is available via the AWS Management Console and API. In contrast, it requires a VPN connection to interact efficiently with EMR when developing a pipeline. I prefer a serverless solution due to its cost implications. With Athena, our customer only pays for the processed data. There are no idling costs. As an example, I migrated a workload from EMR to Athena, which reduced costs from $3,000 to $100. What is Apache Airflow? Apache Airflow is a popular open-source project providing a workflow management platform for data engineering pipelines. As a data engineer, I describe an ETL pipeline in Python as a directed acyclic graph (DAG). Here is a straightforward directed acyclic graph (DAG). The workflow consists of two steps: Creating an Athena query. Awaiting results from the Athena query. from airflow.models import DAG from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator from airflow.providers.amazon.aws.sensors.athena import AthenaSensor with DAG(dag_id='demo') as dag: read_table = AWSAthenaOperator( task_id='read_table', query='SELECT * FROM "sampledb"."elb_logs" limit 10;', output_location='s3://aws-athena-query-results-486555357186-eu-west-1/airflow/', database='sampledb' ) await_query = AthenaSensor( task_id='await_query', query_execution_id=read_table.output, ) Airflow allows you to run a DAG manually, via an API, or based on a schedule. /images/2022/09/airflow-demo.png Airflow consists of multiple components: Scheduler Worker Web Server PostgreSQL database Redis in-memory database Operating such a distributed system is complex. Luckily, AWS provides a managed service called Amazon Managed Workflows for Apache Airflow (MWAA), which we use in my current project. What does your development workflow for Airflow DAGs look like? We built a deployment pipeline for the project I’m currently involved in. So you can think of developing the ETL pipeline like any other software delivery process. The engineer pushes changes of DAGs to a Git repository. The deployment pipeline validates the Python code. The deployment pipeline spins up a container based on aws-mwaa-local-runner and verifies whether all dependencies are working as expected. The deployment pipeline runs an integration test. The deployment pipeline uploads the DAGs to S3. Airflow refreshes the DAGs. The deployment pipeline significantly speeds up the ETL pipeline’s development process, as many issues are spotted before deploying to AWS. Why do you use Airflow instead of AWS Step Functions? In general, Airflow is similar to AWS Step Functions. However, there are two crucial differences. First, Airflow is a popular choice for building ETL pipelines. Therefore, many engineers in the field of data analytics have already gained experience with the tool. And besides, the open-source community creates many integrations that help build ETL pipelines. Second, unlike Step Functions, Airflow is not only available on AWS. Being able to move ETL pipelines to another cloud vendor or on-premises is a plus. Why do you specialize in machine learning and data analytics? I enjoy working with data. Being able to answer questions by analyzing huge amounts of data and enabling better decisions backed by data motivates me. Also, I’m a huge fan of Athena. It’s one of the most powerful services offered by AWS. On top of that, machine learning, in general, and reinforced learning, in particular, fascinates me, as it allows us to recognize correlations that were not visible before. Would you like to join Peter’s team to implement solutions with the help of machine learning and data analytics? tecRacer is hiring Cloud Consultants focusing on machine learning and data analytics. Apply now! View the full article
-
- athena
- serverless
-
(and 1 more)
Tagged with:
-
Auto Scaling in AWS Glue Streaming ETL is now generally available. AWS Glue Streaming ETL jobs can now dynamically scale resources up and down based on the input stream. Auto Scaling helps customers reduce the cost and manual effort required to optimize resources by allocating the right resources necessary for Streaming ETL jobs. View the full article
-
Forum Statistics
63.7k
Total Topics61.7k
Total Posts