Search the Community
Showing results for tags 'amazon mwaa'.
-
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security. By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals. This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA. Solution overview For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security. Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A. Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables. The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. The workflow consists of the following components: The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data. In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones. Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager. VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources. Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion. The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed. The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA. Prerequisites Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA. Deploy resources in Account A using AWS CloudFormation In Account A, launch the provided AWS CloudFormation stack to create the following resources: The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/. A sample dataset called products.csv, which we use in this post. Upload the AWS Glue job to Amazon S3 in Account B In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location. Deploy resources in Account B using AWS CloudFormation In Account B, launch the provided CloudFormation stack template to create the following resources: The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure: dags – The folder for DAG files. plugins – The file for any custom or community Airflow plugins. requirements – The requirements.txt file for any Python packages. scripts – Any SQL scripts used in the DAG. data – Any datasets used in the DAG. A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample. An AWS Glue environment, which contains the following: An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A. A database called products_db in the AWS Glue Data Catalog. An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products. A VPC gateway endpointto Amazon S3. An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA). Create Amazon Redshift resources Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file. In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products. Configure Airflow permissions After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder. Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment. The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle. Set up the environment This section outlines the steps to configure the environment. The process involves the following high-level steps: Update any necessary providers. Set up cross-account access. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC. Configure Secrets Manager to integrate with Amazon MWAA. Define Airflow connections. Update the providers Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post). Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider. Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality. The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package. Specify the requirements as follows: --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt" apache-airflow-providers-amazon==8.4.0 Update the version in the constraints file to 8.4.0 or higher. Add the constraints-3.11-updated.txt file to the /dags folder. Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version. Navigate to the Amazon MWAA environment and choose Edit. Under DAG code in Amazon S3, for Requirements file, choose the latest version. Choose Save. This will update the environment and new providers will be in effect. To verify the providers version, go to Providers under the Admin table. The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details. Set up cross-account access You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps: In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": [ "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>", "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>" ] }, "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::sample-inp-bucket-etl-<username>/*", "arn:aws:s3:::sample-inp-bucket-etl-<username>" ] } ] } Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>" }, "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::sample-opt-bucket-etl-<username>/*", "arn:aws:s3:::sample-opt-bucket-etl-<username>" ] } ] } In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket: { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:Encrypt", "kms:GenerateDataKey" ], "Resource": [ "<KMS_KEY_ARN_Used_for_S3_encryption>" ] }, { "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetBucketAcl", "s3:GetBucketCors", "s3:GetEncryptionConfiguration", "s3:GetBucketLocation", "s3:ListAllMyBuckets", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListBucketVersions", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::sample-opt-bucket-etl-<username>", "arn:aws:s3:::sample-opt-bucket-etl-<username>/*" ] } ] } Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A. Add this policy to the AWS Glue role and Amazon MWAA role: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*" } ] } In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A: { "Version": "2012-10-17", "Statement": [ { "Sid": "CrossAccountPolicy", "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA" } ] } Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A. Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA. Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering. Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot. If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run. Configure the Amazon MWAA connection with Secrets Manager When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret. Complete the following steps: Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager). This allows Amazon MWAA to access credentials stored in Secrets Manager. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings. This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths: secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"} To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell. Run the following code to generate the connection URI string: import urllib.parse conn_type = 'redshift' host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint port = '5439' login = 'admin' #Specify the username to use for authentication with Amazon Redshift password = '<password>' #Specify the password to use for authentication with Amazon Redshift role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>') database = 'dev' region = 'us-east-1' #YOUR_REGION conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}®ion={7}'.format(conn_type, login, password, host, port, role_arn, database, region) print(conn_string) The connection string should be generated as follows: redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev®ion=<region> Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI). This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext. aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev®ion=us-east-1" --region=us-east-1 Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name. You can also add secrets using the Secrets Manager console as key-value pairs. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test. Create an Airflow connection through the metadata database You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI. For Connection Id, enter a name for the connection. For Connection Type, choose Amazon Redshift. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless. For Database, enter dev. For User, enter your admin user name. For Password, enter your password. For Port, use port 5439. For Extra, set the region and timeout parameters. Test the connection, then save your settings. Create and run a DAG In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets. Create a DAG In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules: The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket. For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder. We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler. After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps. The DAG uses GlueCrawlerSensor to wait for the crawler to complete. When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter. The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file. You can connect to Amazon Redshift from Airflow using three different operators: PythonOperator. SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection. RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection. In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator. Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections. In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed. In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter. As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data. TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete. The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence. The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI. from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator from airflow.utils.trigger_rule import TriggerRule dag_id = "data_pipeline" vYear = datetime.today().strftime("%Y") vMonth = datetime.today().strftime("%m") vDay = datetime.today().strftime("%d") src_bucket_name = "sample-inp-bucket-etl-<username>" tgt_bucket_name = "sample-opt-bucket-etl-<username>" s3_folder="products" #Please replace the variable with the glue_role_arn glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>" glue_crawler_name = "products" glue_db_name = "products_db" glue_job_name = "sample_glue_job" glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py" workgroup_name = "sample-workgroup" redshift_table = "products_f" redshift_conn_id_name="secrets_redshift_connection" db_name = "dev" secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx" poll_interval = 10 @task def get_role_name(arn: str) -> str: return arn.split("/")[-1] @task def get_s3_loc(s3_folder: str) -> str: s3_loc = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv" return s3_loc with DAG( dag_id=dag_id, schedule="@once", start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: role_arn = glue_role_arn_key glue_role_name = get_role_name(role_arn) s3_loc = get_s3_loc(s3_folder) # Check for new incremental files in S3 source/input bucket sensor_key = S3KeySensor( task_id="sensor_key", bucket_key=s3_loc, bucket_name=src_bucket_name, wildcard_match=True, #timeout=18*60*60, #poke_interval=120, timeout=60, poke_interval=30, mode="reschedule" ) # Run Glue crawler glue_crawler_config = { "Name": glue_crawler_name, "Role": role_arn, "DatabaseName": glue_db_name, } crawl_s3 = GlueCrawlerOperator( task_id="crawl_s3", config=glue_crawler_config, ) # GlueCrawlerOperator waits by default, setting as False to test the Sensor below. crawl_s3.wait_for_completion = False # Wait for Glue crawler to complete wait_for_crawl = GlueCrawlerSensor( task_id="wait_for_crawl", crawler_name=glue_crawler_name, ) # Run Glue Job submit_glue_job = GlueJobOperator( task_id="submit_glue_job", job_name=glue_job_name, script_location=glue_script_location, iam_role_name=glue_role_name, create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"}, ) # GlueJobOperator waits by default, setting as False to test the Sensor below. submit_glue_job.wait_for_completion = False # Wait for Glue Job to complete wait_for_job = GlueJobSensor( task_id="wait_for_job", job_name=glue_job_name, # Job ID extracted from previous Glue Job Operator task run_id=submit_glue_job.output, verbose=True, # prints glue job logs in airflow logs ) wait_for_job.poke_interval = 5 # Execute the Stored Procedure in Redshift Serverless using Data Operator execute_redshift_stored_proc = RedshiftDataOperator( task_id="execute_redshift_stored_proc", database=db_name, workgroup_name=workgroup_name, secret_arn=secret_arn, sql="""CALL sp_products();""", poll_interval=poll_interval, wait_for_completion=True, ) # Execute the Stored Procedure in Redshift Serverless using SQL Operator delete_from_table = SQLExecuteQueryOperator( task_id="delete_from_table", conn_id=redshift_conn_id_name, sql="DELETE FROM products;", trigger_rule=TriggerRule.ALL_DONE, ) # Unload the data from Redshift table to S3 transfer_redshift_to_s3 = RedshiftToS3Operator( task_id="transfer_redshift_to_s3", s3_bucket=tgt_bucket_name, s3_key=s3_loc, schema="PUBLIC", table=redshift_table, redshift_conn_id=redshift_conn_id_name, ) transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE #Chain the tasks to be executed chain( sensor_key, crawl_s3, wait_for_crawl, submit_glue_job, wait_for_job, execute_redshift_stored_proc, delete_from_table, transfer_redshift_to_s3 ) Verify the DAG run After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot. In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status. Verify the results On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files. On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/. Clean up Clean up the resources created as part of this post to avoid incurring ongoing charges: Delete the CloudFormation stacks and S3 bucket that you created as prerequisites. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager. Conclusion With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples. About the Authors Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture. Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well. View the full article
-
- etl
- etl pipelines
- (and 5 more)
-
Amazon Managed Workflow for Apache Airflow (Amazon MWAA) is a managed service that allows you to use a familiar Apache Airflow environment with improved scalability, availability, and security to enhance and scale your business workflows without the operational burden of managing the underlying infrastructure. In Airflow, Directed Acyclic Graphs (DAGs) are defined as Python code. Dynamic DAGs refer to the ability to generate DAGs on the fly during runtime, typically based on some external conditions, configurations, or parameters. Dynamic DAGs helps you to create, schedule, and run tasks within a DAG based on data and configurations that may change over time. There are various ways to introduce dynamism in Airflow DAGs (dynamic DAG generation) using environment variables and external files. One of the approaches is to use the DAG Factory YAML based configuration file method. This library aims to facilitate the creation and configuration of new DAGs by using declarative parameters in YAML. It allows default customizations and is open-source, making it simple to create and customize new functionalities. In this post, we explore the process of creating Dynamic DAGs with YAML files, using the DAG Factory library. Dynamic DAGs offer several benefits: Enhanced code reusability – By structuring DAGs through YAML files, we promote reusable components, reducing redundancy in your workflow definitions. Streamlined maintenance – YAML-based DAG generation simplifies the process of modifying and updating workflows, ensuring smoother maintenance procedures. Flexible parameterization – With YAML, you can parameterize DAG configurations, facilitating dynamic adjustments to workflows based on varying requirements. Improved scheduler efficiency – Dynamic DAGs enable more efficient scheduling, optimizing resource allocation and enhancing overall workflow runs Enhanced scalability – YAML-driven DAGs allow for parallel runs, enabling scalable workflows capable of handling increased workloads efficiently. By harnessing the power of YAML files and the DAG Factory library, we unleash a versatile approach to building and managing DAGs, empowering you to create robust, scalable, and maintainable data pipelines. Overview of solution In this post, we will use an example DAG file that is designed to process a COVID-19 data set. The workflow process involves processing an open source data set offered by WHO-COVID-19-Global. After we install the DAG-Factory Python package, we create a YAML file that has definitions of various tasks. We process the country-specific death count by passing Country as a variable, which creates individual country-based DAGs. The following diagram illustrates the overall solution along with data flows within logical blocks. Prerequisites For this walkthrough, you should have the following prerequisites: An AWS account If you don’t already have an AWS account, you can sign up for one.. Python 3.6.0+ and Amazon MWAA 2.0+ Environment in order to operate the dag-factory library Additionally, complete the following steps (run the setup in an AWS Region where Amazon MWAA is available): Create an Amazon MWAA environment (if you don’t have one already). If this is your first time using Amazon MWAA, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA). Make sure the AWS Identity and Access Management (IAM) user or role used for setting up the environment has IAM policies attached for the following permissions: Read and write access to Amazon Simple Storage Service (Amazon S3). For details, refer to Amazon S3: Allows read and write access to objects in an S3 Bucket, programmatically and in the console. Full access to the Amazon MWAA console. The access policies mentioned here are just for the example in this post. In a production environment, provide only the needed granular permissions by exercising least privilege principles. Create an unique (within an account) Amazon S3 bucket name while creating your Amazon MWAA environment, and create folders called dags and requirements. Create and upload a requirements.txt file with the following content to the requirements folder. Replace {environment-version} with your environment’s version number, and {Python-version} with the version of Python that’s compatible with your environment: --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt" dag-factory==0.19.0 pandas==2.1.4 Pandas is needed just for the example use case described in this post, and dag-factory is the only required plug-in. It is recommended to check the compatibility of the latest version of dag-factory with Amazon MWAA. The boto and psycopg2-binary libraries are included with the Apache Airflow v2 base install and don’t need to be specified in your requirements.txt file. Download the WHO-COVID-19-global data file to your local machine and upload it under the dags prefix of your S3 bucket. Make sure that you are pointing to the latest AWS S3 bucket version of your requirements.txt file for the additional package installation to happen. This should typically take between 15 – 20 minutes depending on your environment configuration. Validate the DAGs When your Amazon MWAA environment shows as Available on the Amazon MWAA console, navigate to the Airflow UI by choosing Open Airflow UI next to your environment. Verify the existing DAGs by navigating to the DAGs tab. Configure your DAGs Complete the following steps: Create empty files named dynamic_dags.yml, example_dag_factory.py and process_s3_data.py on your local machine. Edit the process_s3_data.py file and save it with following code content, then upload the file back to the Amazon S3 bucket dags folder. We are doing some basic data processing in the code: Read the file from an Amazon S3 location Rename the Country_code column as appropriate to the country. Filter data by the given country. Write the processed final data into CSV format and upload back to S3 prefix. import boto3 import pandas as pd import io def process_s3_data(COUNTRY): ### Top level Variables replace S3_BUCKET with your bucket name ### s3 = boto3.client('s3') S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm" INPUT_KEY = "dags/WHO-COVID-19-global-data.csv" OUTPUT_KEY = "dags/count_death" ### get csv file ### response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY) status = response['ResponseMetadata']['HTTPStatusCode'] if status == 200: ### read csv file and filter based on the country to write back ### df = pd.read_csv(response.get("Body")) df.rename(columns={"Country_code": "country"}, inplace=True) filtered_df = df[df['country'] == COUNTRY] with io.StringIO() as csv_buffer: filtered_df.to_csv(csv_buffer, index=False) response = s3.put_object( Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Body=csv_buffer.getvalue() ) status = response['ResponseMetadata']['HTTPStatusCode'] if status == 200: print(f"Successful S3 put_object response. Status - {status}") else: print(f"Unsuccessful S3 put_object response. Status - {status}") else: print(f"Unsuccessful S3 get_object response. Status - {status}") Edit the dynamic_dags.yml and save it with the following code content, then upload the file back to the dags folder. We are stitching various DAGs based on the country as follows: Define the default arguments that are passed to all DAGs. Create a DAG definition for individual countries by passing op_args Map the process_s3_data function with python_callable_name. Use Python Operator to process csv file data stored in Amazon S3 bucket. We have set schedule_interval as 10 minutes, but feel free to adjust this value as needed. default: default_args: owner: "airflow" start_date: "2024-03-01" retries: 1 retry_delay_sec: 300 concurrency: 1 max_active_runs: 1 dagrun_timeout_sec: 600 default_view: "tree" orientation: "LR" schedule_interval: "*/10 * * * *" module3_dynamic_dag_Australia: tasks: task_process_s3_data: task_id: process_s3_data operator: airflow.operators.python.PythonOperator python_callable_name: process_s3_data python_callable_file: /usr/local/airflow/dags/process_s3_data.py op_args: - "Australia" module3_dynamic_dag_Brazil: tasks: task_process_s3_data: task_id: process_s3_data operator: airflow.operators.python.PythonOperator python_callable_name: process_s3_data python_callable_file: /usr/local/airflow/dags/process_s3_data.py op_args: - "Brazil" module3_dynamic_dag_India: tasks: task_process_s3_data: task_id: process_s3_data operator: airflow.operators.python.PythonOperator python_callable_name: process_s3_data python_callable_file: /usr/local/airflow/dags/process_s3_data.py op_args: - "India" module3_dynamic_dag_Japan: tasks: task_process_s3_data: task_id: process_s3_data operator: airflow.operators.python.PythonOperator python_callable_name: process_s3_data python_callable_file: /usr/local/airflow/dags/process_s3_data.py op_args: - "Japan" module3_dynamic_dag_Mexico: tasks: task_process_s3_data: task_id: process_s3_data operator: airflow.operators.python.PythonOperator python_callable_name: process_s3_data python_callable_file: /usr/local/airflow/dags/process_s3_data.py op_args: - "Mexico" module3_dynamic_dag_Russia: tasks: task_process_s3_data: task_id: process_s3_data operator: airflow.operators.python.PythonOperator python_callable_name: process_s3_data python_callable_file: /usr/local/airflow/dags/process_s3_data.py op_args: - "Russia" module3_dynamic_dag_Spain: tasks: task_process_s3_data: task_id: process_s3_data operator: airflow.operators.python.PythonOperator python_callable_name: process_s3_data python_callable_file: /usr/local/airflow/dags/process_s3_data.py op_args: - "Spain" Edit the file example_dag_factory.py and save it with the following code content, then upload the file back to dags folder. The code cleans the existing the DAGs and generates clean_dags() method and the creating new DAGs using the generate_dags() method from the DagFactory instance. from airflow import DAG import dagfactory config_file = "/usr/local/airflow/dags/dynamic_dags.yml" example_dag_factory = dagfactory.DagFactory(config_file) ## to clean up or delete any existing DAGs ## example_dag_factory.clean_dags(globals()) ## generate and create new DAGs ## example_dag_factory.generate_dags(globals()) After you upload the files, go back to the Airflow UI console and navigate to the DAGs tab, where you will find new DAGs. Once you upload the files, go back to the Airflow UI console and under the DAGs tab you will find new DAGs are appearing as shown below: You can enable DAGs by making them active and testing them individually. Upon activation, an additional CSV file named count_death_{COUNTRY_CODE}.csv is generated in the dags folder. Cleaning up There may be costs associated with using the various AWS services discussed in this post. To prevent incurring future charges, delete the Amazon MWAA environment after you have completed the tasks outlined in this post, and empty and delete the S3 bucket. Conclusion In this blog post we demonstrated how to use the dag-factory library to create dynamic DAGs. Dynamic DAGs are characterized by their ability to generate results with each parsing of the DAG file based on configurations. Consider using dynamic DAGs in the following scenarios: Automating migration from a legacy system to Airflow, where flexibility in DAG generation is crucial Situations where only a parameter changes between different DAGs, streamlining the workflow management process Managing DAGs that are reliant on the evolving structure of a source system, providing adaptability to changes Establishing standardized practices for DAGs across your team or organization by creating these blueprints, promoting consistency and efficiency Embracing YAML-based declarations over complex Python coding, simplifying DAG configuration and maintenance processes Creating data driven workflows that adapt and evolve based on the data inputs, enabling efficient automation By incorporating dynamic DAGs into your workflow, you can enhance automation, adaptability, and standardization, ultimately improving the efficiency and effectiveness of your data pipeline management. To learn more about Amazon MWAA DAG Factory, visit Amazon MWAA for Analytics Workshop: DAG Factory. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repository. About the Authors Jayesh Shinde is Sr. Application Architect with AWS ProServe India. He specializes in creating various solutions that are cloud centered using modern software development practices like serverless, DevOps, and analytics. Harshd Yeola is Sr. Cloud Architect with AWS ProServe India helping customers to migrate and modernize their infrastructure into AWS. He specializes in building DevSecOps and scalable infrastructure using containers, AIOPs, and AWS Developer Tools and services. View the full article
-
Amazon Managed Workflows for Apache Airflow (MWAA) now offers larger environment sizes, giving customers of the managed service the ability to define a greater number of workflows in each Apache Airflow environment, supporting more complex tasks that can utilize increased resources. View the full article
-
Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service for Apache Airflow that streamlines the setup and operation of the infrastructure to orchestrate data pipelines in the cloud. Customers use Amazon MWAA to manage the scalability, availability, and security of their Apache Airflow environments. As they design more intensive, complex, and ever-growing data processing pipelines, customers have asked us for additional underlying resources to provide greater concurrency and capacity for their tasks and workflows. To address this, today, we are announcing the availability of larger environment classes in Amazon MWAA. In this post, we dive into the capabilities of these new XL and 2XL environments, the scenarios they are well suited for, and how you can set up or upgrade your existing Amazon MWAA environment to take advantage of the increased resources. Current challenges When you create an Amazon MWAA environment, a set of managed Amazon Elastic Container Service (Amazon ECS) with AWS Fargate containers are provisioned with defined virtual CPUs and RAM. As you work with larger, complex, resource-intensive workloads, or run thousands of Directed Acyclic Graphs (DAGs) per day, you may start exhausting CPU availability on schedulers and workers, or reaching memory limits in workers. Running Apache Airflow at scale puts proportionally greater load on the Airflow metadata database, sometimes leading to CPU and memory issues on the underlying Amazon Relational Database Service (Amazon RDS) cluster. A resource-starved metadata database may lead to dropped connections from your workers, failing tasks prematurely. To improve performance and resiliency of your tasks, consider following Apache Airflow best practices to author DAGs. As an alternative, you can create multiple Amazon MWAA environments to distribute workloads. However, this requires additional engineering and management effort. New environment classes With today’s release, you can now create XL and 2XL environments in Amazon MWAA in addition to the existing environment classes. They have two and four times the compute, and three and six times the memory, respectively, of the current large Amazon MWAA environment instance class. These instances add compute and RAM linearly to directly improve capacity and performance of all Apache Airflow components. The following table summarizes the environment capabilities. . Scheduler and Worker CPU / RAM Web Server CPU / RAM Concurrent Tasks DAG Capacity mw1.xlarge 8 vCPUs / 24 GB 4 vCPUs / 12 GB 40 tasks (default) Up to 2000 mw1.2xlarge 16 vCPUs / 48 GB 8 vCPUs / 24 GB 80 tasks (default) Up to 4000 With the introduction of these larger environments, your Amazon Aurora metadata database will now use larger, memory-optimized instances powered by AWS Graviton2. With the Graviton2 family of processors, you get compute, storage, and networking improvements, and the reduction of your carbon footprint offered by the AWS family of processors. Pricing Amazon MWAA pricing dimensions remains unchanged, and you only pay for what you use: The environment class Additional worker instances Additional scheduler instances Metadata database storage consumed You now get two additional options in the first three dimensions: XL and 2XL for environment class, additional workers, and schedulers instances. Metadata database storage pricing remains the same. Refer to Amazon Managed Workflows for Apache Airflow Pricing for rates and more details. Observe Amazon MWAA performance to plan scaling to larger environments Before you start using the new environment classes, it’s important to understand if you are in a scenario that relates to capacity issues, such as metadata database out of memory, or workers or schedulers running at high CPU usage. Understanding the performance of your environment resources is key to troubleshooting issues related to capacity. We recommend following the guidance described in Introducing container, database, and queue utilization metrics for the Amazon MWAA environment to better understand the state of Amazon MWAA environments, and get insights to right-size your instances. In the following test, we simulate a high load scenario, use the CloudWatch observability metrics to identify common problems, and make an informed decision to plan scaling to larger environments to mitigate the issues. During our tests, we ran a complex DAG that dynamically creates over 500 tasks and uses external sensors to wait for a task completion in a different DAG. After running on an Amazon MWAA large environment class with auto scaling set up to a maximum of 10 worker nodes, we noticed the following metrics and values in the CloudWatch dashboard. The worker nodes have reached maximum CPU capacity, causing the number of queued tasks to keep increasing. The metadata database CPU utilization has peaked at over 65% capacity, and the available database free memory has been reduced. In this situation, we could further increase the worker nodes to scale, but that would put additional load on the metadata database CPU. This might lead to a drop in the number of worker database connections and available free database memory. With new environment classes, you can vertically scale to increase available resources by editing the environment and selecting a higher class of environment, as shown in the following screenshot. From the list of environments, we select the one in use for this test. Choose Edit to navigate to the Configure advanced settings page, and select the appropriate xlarge or 2xlarge environment as required. After you save the change, the environment upgrade will take 20–30 minutes to complete. Any running DAG that got interrupted during the upgrade is scheduled for a retry, depending on the way you configured the retries for your DAGs. You can now choose to invoke them manually or wait for the next scheduled run. After we upgraded the environment class, we tested the same DAG and observed the metrics were showing improved values because more resources are now available. With this XL environment, you can run more tasks on fewer worker nodes, and therefore the number of queued tasks kept decreasing. Alternately, if you have tasks that require more memory and/or CPU, you can reduce the tasks per worker, but still achieve a high number of tasks per worker with a larger environment size. For example, if you have a large environment where the worker node CPU is maxed out with celery.worker_autoscale (the Airflow configuration that defines the number of tasks per worker) Set at 20,20, you can increase to an XL environment and set celery.worker_autoscale to 20,20 on the XL, rather than the default 40 tasks per worker on an XL environment and the CPU load should reduce significantly. Set up a new XL environment in Amazon MWAA You can get started with Amazon MWAA in your account and preferred AWS Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts. Amazon MWAA XL and 2XL environment classes are available today in all Regions where Amazon MWAA is currently available. Conclusion Today, we are announcing the availability of two new environment classes in Amazon MWAA. With XL and 2XL environment classes, you can orchestrate larger volumes of complex or resource-intensive workflows. If you are running DAGs with a high number of dependencies, running thousands of DAGs across multiple environments, or in a scenario that requires you to heavily use workers for compute, you can now overcome the related capacity issues by increasing your environment resources in a few straightforward steps. In this post, we discussed the capabilities of the two new environment classes, including pricing and some common resource constraint problems they solve. We provided guidance and an example of how to observe your existing environments to plan scaling to XL or 2XL, and we described how you can upgrade existing environments to use the increased resources. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo. Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. About the Authors Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines. Jeetendra Vaidya is a Senior Solutions Architect at AWS, bringing his expertise to the realms of AI/ML, serverless, and data analytics domains. He is passionate about assisting customers in architecting secure, scalable, reliable, and cost-effective solutions. Sriharsh Adari is a Senior Solutions Architect at AWS, where he helps customers work backward from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise includes technology strategy, data analytics, and data science. In his spare time, he enjoys playing sports, watching TV shows, and playing Tabla. View the full article
-
You can now create Apache Airflow version 2.7 environments and execute deferrable operators on Amazon Managed Workflows for Apache Airflow (MWAA). Apache Airflow 2.7 is the latest minor release of the popular open-source tool that helps customers author, schedule, and monitor workflows. View the full article
-
Introduction Data scientists and engineers have made Apache Airflow a leading open-source tool to create data pipelines due to its active open-source community, familiar Python development as Directed Acyclic Graph (DAG) workflows, and an extensive library of pre-built integrations. Amazon Managed Workflows for Apache Airflow (MWAA) is a managed service for Apache Airflow that makes it easy to run Airflow on AWS without the operational burden of having to manage the underlying infrastructure. While business needs demand scalability, availability, and security, Airflow development often doesn’t require full production-ready infrastructure. Many DAGs are written locally, and when doing so, developers need to be assured that these workflows function correctly when they’re deployed to their production environment. To that end, the MWAA team created an open-source local-runner that uses many of the same library versions and runtimes as MWAA in a container that can run in a local Docker instance, along with utilities that can test and package Python requirements. There are times when a full MWAA environment isn’t required, but a local Docker container doesn’t have access to the AWS resources needed to properly develop and test end-to-end workflows. As such, the answer may be to run local-runner on a container on AWS, and by running on the same configuration as MWAA you can closely replicate your production MWAA environment in a light-weight development container. This post covers the topic of launching MWAA local-runner containers on Amazon Elastic Container Service (ECS) Fargate. Prerequisites This tutorial assumes you have an existing Amazon MWAA environment and wish to create a development container with a similar configuration. If you don’t already have an MWAA environment, then you can follow the quick start documentation here to get started. Docker on your local desktop. AWS Command Line Interface (AWS CLI). Terraform CLI (only if using Terraform). Walkthrough Clone the local-runner repository, set the environment variables, and build the image We’ll start by pulling the latest Airflow version of the Amazon MWAA local-runner to our local machine. Note: Replace <your_region> with your region and <airflow_version> with the version specified here. git clone https://github.com/aws/aws-mwaa-local-runner.git cd aws-mwaa-local-runner export ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text) export REGION=<your_region> export AIRFLOW_VERSION=<airflow_version> ./mwaa-local-env build-image Note: We’re expressly using the latest version of the Amazon MWAA local-runner as it supports the functionality needed for this tutorial. 2. Push your local-runner image to Amazon ECR aws ecr get-login-password --region $REGION| docker login --username AWS --password-stdin $ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com aws ecr create-repository --repository-name mwaa-local-runner --region $REGION export AIRFLOW_IMAGE=$(docker image ls | grep amazon/mwaa-local | grep $AIRFLOW_VERSION | awk '{ print $3 }') docker tag $AIRFLOW_IMAGE $ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/mwaa-local-runner docker push $ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com/mwaa-local-runner Modify the MWAA execution role For this example, we enable an existing MWAA role to work with Amazon ECS Fargate. As an alternative ,you may also create a new task execution role. From the Amazon MWAA console, select the link of the environment whose role you wish to use for your Amazon ECS Fargate local-runner instance. Scroll down to Permissions and select the link to open the Execution role. Select the Trust relationships tab. Choose Edit trust policy. Under Statement -> Principal -> Service add ecs-tasks.amazonaws.com. { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "ecs-tasks.amazonaws.com", "airflow.amazonaws.com", "airflow-env.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] } 6. Select Update policy. 7. Choose the Permissions tab. 8. Select the link to the MWAA-Execution-Policy. 9. Choose Edit policy. 10. Choose the JSON tab. 11. In the Statement section describing logs permissions, under Resource, add arn:aws:logs:us-east-1:012345678910:log-group:/ecs/mwaa-local-runner-task-definition:*, where 012345678910 is replaced with your account number and us-east-1 is replaced with your region. { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource": [ "arn:aws:logs:us-east-1:012345678910:log-group:/ecs/mwaa-local-runner-task-definition:*", "arn:aws:logs:us-east-1:012345678910:log-group:airflow-MWAA-Demo-IAD-1-*" ] }, 12. We also want to add permissions that allow us to execute commands on the container and pull the image from Amazon ECR. { "Effect": "Allow", "Action": [ "ssmmessages:CreateControlChannel", "ssmmessages:CreateDataChannel", "ssmmessages:OpenControlChannel", "ssmmessages:OpenDataChannel" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ecr:GetAuthorizationToken", "ecr:BatchCheckLayerAvailability", "ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } Note: Ensure that your private subnets have access to AWS Systems Manager (SSM) via Internet Gateway or PrivateLink to “com.amazonaws.us-east-1.ssmmessages” in order to enable command execution 13. Choose Review policy. 14. Choose Save changes. The creation of the Aurora Postgress Serverless instance and Amazon ECS resources can either be done using AWS CloudFormation or Terraform, as per the following sections. To create the resources required, clone the aws-samples/amazon-mwaa-samples repository. git clone https://github.com/aws-samples/amazon-mwaa-examples.git Take note of the variables from the existing MWAA environment needed to create the Amazon ECS environment (i.e., security groups, subnet IDs, Virtual Private Cloud (VPC) ID, and execution role). $ export MWAAENV=test-MwaaEnvironment $ aws mwaa get-environment --name $MWAAENV --query 'Environment.NetworkConfiguration' --region $REGION { "SecurityGroupIds": [ "sg-12345" ], "SubnetIds": [ "subnet-12345", "subnet-56789" ] } $ aws mwaa get-environment --name $MWAAENV --query 'Environment.ExecutionRoleArn' "arn:aws:iam::123456789:role/service-role/MwaaExecutionRole" AWS CloudFormation Navigate to the ECS CloudFormation directory: $ cd amazon-mwaa-examples/usecases/local-runner-on-ecs-fargate/cloudformation Update the AWS CloudFormation template input parameters file parameter-values.json in your favorite code editor (e.g., vscode). { "Parameters": { "ECSClusterName": "mwaa-local-runner-cluster", "VpcId": "your-mwaa-vpc-id", "ECRImageURI" : "123456789.dkr.ecr.us-east-1.amazonaws.com/mwaa-local-runner:latest", "SecurityGroups" : "sg-security-group-id", "PrivateSubnetIds" : "subnet-mwaapvtsubnetid1,subnet-mwaapvtsubnetid2", "PublicSubnetIds" : "subnet-mwaapublicsubnetid1,subnet-mwaapublicsubnetid2", "S3BucketURI" : "s3://your-mwaa-bucket-path", "ECSTaskExecutionRoleArn": "arn:aws:iam::123456789:role/service-role/mwaaExecutionRoleName", "AssignPublicIpToTask" : "yes" } } [Optional] Additional AWS CloudFormation template input parameter values can be overridden in either template directly (mwaa-ecs-on-fargate.yml) or supplied in input parameter file in step # 2. Deploy the AWS CloudFormation template. $ aws cloudformation deploy \ --stack-name mwaa-ecs-sandbox \ --region $REGION --template-file mwaa-on-ecs-fargate.yml \ --parameter-overrides file://parameter-values.json \ --capabilities CAPABILITY_IAM Where … Stack-name – AWS CloudFormation Stack name is e.g., mwaa-ecs-sandbox Region – where you want to install the stack. It can be sourced from env variable or replaced with the value e.g., ap-east-2, us-west-2 Template-file – CF template name in subfolder mwaa-on-ecs-fargate.yml Parameter – overrides is updated input parameter file with your environment values in step 2 It takes time (up to 40 minutes) to create required Amazon ECS and Amazon Relational Database Service (RDS) resources before showing output on successful completion as … Waiting for changeset to be created.. Waiting for stack create/update to complete Successfully created/updated stack - mwaa-ecs-sandbox To test validate the deployed environment, lets get the output parameters AWS CloudFormation template generated including Load Balancer with AWS CloudFormation describe command as: $ aws cloudformation describe-stacks --stack-name mwaa-ecs-sandbox --query 'Stacks[0].Outputs[*]' [ { "OutputKey": "LoadBalancerURL", "OutputValue": "mwaa-LoadB-S3WM6Y7GE1WA-18678459101.us-east-1.elb.amazonaws.com", "Description": "Load Balancer URL" }, { "OutputKey": "DBClusterEP", "OutputValue": "database-mwaa-local-runner.cluster-ckxppcrgfesp.us-east-1.rds.amazonaws.com", "Description": "RDS Cluster end point" } ] To test validate the local runner on Amazon ECS Fargate, go to Access Airflow Interface Step below after the Terraform steps. Terraform Navigate to the ECS Terraform directory: $ cd amazon-mwaa-examples/usecases/local-runner-on-ecs-fargate/terraform/ecs Create the tfvars file that contains all the required parameters. Replace all the parameters with the required parameters for your configuration. $ cat <<EOT>> terraform.tfvars assign_public_ip_to_task = true ecs_task_execution_role_arn = "arn:aws:iam::123456789:role/ecsTaskExecutionRole" elb_subnets = ["subnet-b06911ed", "subnet-f3bf01dd"] image_uri = "123456789.dkr.ecr.us-east-1.amazonaws.com/mwaa-local-runner:latest" mwaa_subnet_ids = ["subnet-b06911ed", "subnet-f3bf01dd"] region = "us-east-1" s3_dags_path = "s3://airflow-mwaa-test/DAG/" s3_plugins_path = "s3://airflow-mwaa-test/plugins.zip" s3_requirements_path = "s3://airflow-mwaa-test/requirements.txt" vpc_id = "vpc-e4678d9f" vpc_security_group_ids = ["sg-ad76c8e5"] EOT Initialize the Terraform modules and plan the environment to create the RDS Aurora Serverless database. The subnet IDs and security group IDs of your environment can be retrieved from the previous step. Note: Make use of the existing MWAA Environment subnets, VPC, and security groups. The security group also needs to allow traffic to itself. The security group needs allow traffic from your local machine on port 80 to access the loadbalancer URL. $ terraform init $ terraform plan Once the plan has succeeded, create the resources using the variables used in the previous step. $ terraform apply -auto-approve ... ... Outputs: database_name = "AirflowMetadata" db_passsword = <sensitive> loadbalancer_url = "mwaa-local-runner-alb-552640779.us-east-1.elb.amazonaws.com" rds_endpoint = "database-mwaa-local-runner.cluster-cqvb75x52nu8.us-east-1.rds.amazonaws.com" Note: you may face the error create: ExpiredToken: The security token included in the request is expired │ status code: 403. If you do face this error, untaint the RDS resource and re-apply. Access the Airflow user interface Direct your browser to the Application Load Balancer (ALB) URL from the AWS Cloudformation/Terraform output, being sure to preface with http (mwaa-local-runner-alb-552640779.us-east-1.elb.amazonaws.com/home). Note: If you chose an internal ALB, you’ll need to be on your VPC private subnet via VPN or similar. When presented with the Airflow user interface, provide the username admin and the default password specified as test1234. You now are in a standard Airflow deployment that closely resembles the configuration of MWAA using local-runner. Updating the environment When you stop and restart the Amazon ECS Fargate task, the dags, plugins, and requirements will be re-initialized. This can be done through a forced update: $ aws ecs update-service \ --service mwaa-local-runner-service \ --cluster mwaa-local-runner-cluster \ --region $REGION \ --force-new-deployment If you wish to do so without restarting the task, you may run the command directly via execute-command: If this is your first time running execute-command then we need to update the service to allow this functionality: $ aws ecs update-service \ --service mwaa-local-runner-service \ --cluster mwaa-local-runner-cluster \ --region $REGION \ --enable-execute-command \ --force-new-deployment When the AWS Fargate task resumes availability, we need to know the task ID: $ aws ecs list-tasks \ --cluster mwaa-local-runner-cluster \ --region $REGION This returns a JSON string that contains an ARN with the unique task ID in the format: { "taskArns": [ "arn:aws:ecs:us-east-1:012345678910:task/mwaa-local-runner-cluster/11aa22bb33cc44dd55ee66ff77889900" ] } In this case 11aa22bb33cc44dd55ee66ff77889900, which we’ll use in the next command: $ aws ecs execute-command \ --region $REGION \ --cluster mwaa-local-runner-cluster \ --task 11aa22bb33cc44dd55ee66ff77889900 \ --command "/bin/bash" \ --interactive Note: You may need to install Session Manager in order to execute commands via the AWS CLI. At this point you can run any activities you wish, such as execute the s3 sync command to update your dags: $ aws s3 sync —exact-timestamp —delete $S3_DAGS_PATH /usr/local/airflow/dags Or view your scheduler logs: $ cd /usr/local/airflow/logs/scheduler/latest;cat * When complete, type exit to return to your terminal. Prerequisites Cleaning up If no longer needed, be sure to delete your AWS Fargate cluster, task definitions, ALB, Amazon ECR repository, Aurora RDS instance, and any other items you do not wish to retain. With AWS Cloudformation, delete the stack. $ aws cloudformation delete-stack --stack-name mwaa-ecs-sandbox With terraform, run $ terraform destroy Important: Terminating resources that aren’t actively being used reduces costs and is a best practice. Not terminating your resources can result in additional charges. Conclusion In this post, we showed you how to configure Amazon MWAA open-source local-runner container image on Amazon ECS Fargate containers to provide a development and testing environment, using Amazon Aurora Serverless v2 as the database backend and execute-command on the AWS Fargate task to interact with the system. To learn more about Amazon MWAA visit the Amazon MWAA documentation. For more blog posts about Amazon MWAA, please visit the Amazon MWAA resources page. View the full article
-
- apache airflow
- amazon ecs
-
(and 1 more)
Tagged with:
-
amazon mwaa Amazon MWAA is now SOC Compliant
Amazon Web Services posted a topic in Amazon Web Services
You can now use Amazon Managed Workflows for Apache Airflow (MWAA) for use cases that are subject to Service Organization Control (SOC) requirements. MWAA is now SOC 1, 2 and 3 compliant, allowing you to get deep insight into the security processes and controls that protect customer data. AWS maintains SOC compliance through extensive third-party audits of AWS controls. These audits ensure that the appropriate safeguards and procedures are in place to protect against security risks that may affect the confidentiality, integrity, and availability of customer and company data. AWS SOC reports are independent third-party examination reports that you can download in AWS Artifact. View the full article -
Amazon Managed Workflows for Apache Airflow (MWAA) is now Payment Card Industry Data Security Standard (PCI DSS) compliant. Amazon MWAA is a managed orchestration service for Apache Airflow that makes it easier to set up and operate end-to-end data pipelines in the cloud. Customers can now use Amazon MWAA to manage workflows that store, process, and transmit information for use cases such as payment processing that are subject to PCI DSS. View the full article
-
- pci-dss
- pci compliance
-
(and 1 more)
Tagged with:
-
You can now create Apache Airflow version 2.4 environments on Amazon Managed Workflows for Apache Airflow (MWAA) with Python 3.10 support. View the full article
-
- amazon mwaa
- python
-
(and 1 more)
Tagged with:
-
Amazon Managed Workflows is a new managed orchestration service for Apache Airflow that makes it easier to set up and operate end-to-end data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as “workflows”. View the full article
-
Forum Statistics
70.4k
Total Topics68.3k
Total Posts