Jump to content

Search the Community

Showing results for tags 'apache airflow'.

  • Search By Tags

    Type tags separated by commas.
  • Search By Author

Content Type


Forums

There are no results to display.

There are no results to display.


Find results in...

Find results that contain...


Date Created

  • Start

    End


Last Updated

  • Start

    End


Filter by number of...

Joined

  • Start

    End


Group


Website URL


LinkedIn Profile URL


About Me


Cloud Platforms


Cloud Experience


Development Experience


Current Role


Skills


Certifications


Favourite Tools


Interests

  1. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security. By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals. This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA. Solution overview For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security. Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A. Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables. The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. The workflow consists of the following components: The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data. In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones. Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager. VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources. Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion. The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed. The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA. Prerequisites Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA. Deploy resources in Account A using AWS CloudFormation In Account A, launch the provided AWS CloudFormation stack to create the following resources: The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/. A sample dataset called products.csv, which we use in this post. Upload the AWS Glue job to Amazon S3 in Account B In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location. Deploy resources in Account B using AWS CloudFormation In Account B, launch the provided CloudFormation stack template to create the following resources: The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure: dags – The folder for DAG files. plugins – The file for any custom or community Airflow plugins. requirements – The requirements.txt file for any Python packages. scripts – Any SQL scripts used in the DAG. data – Any datasets used in the DAG. A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample. An AWS Glue environment, which contains the following: An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A. A database called products_db in the AWS Glue Data Catalog. An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products. A VPC gateway endpointto Amazon S3. An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA). Create Amazon Redshift resources Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file. In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products. Configure Airflow permissions After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder. Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment. The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle. Set up the environment This section outlines the steps to configure the environment. The process involves the following high-level steps: Update any necessary providers. Set up cross-account access. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC. Configure Secrets Manager to integrate with Amazon MWAA. Define Airflow connections. Update the providers Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post). Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider. Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality. The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package. Specify the requirements as follows: --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt" apache-airflow-providers-amazon==8.4.0 Update the version in the constraints file to 8.4.0 or higher. Add the constraints-3.11-updated.txt file to the /dags folder. Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version. Navigate to the Amazon MWAA environment and choose Edit. Under DAG code in Amazon S3, for Requirements file, choose the latest version. Choose Save. This will update the environment and new providers will be in effect. To verify the providers version, go to Providers under the Admin table. The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details. Set up cross-account access You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps: In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": [ "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>", "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>" ] }, "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::sample-inp-bucket-etl-<username>/*", "arn:aws:s3:::sample-inp-bucket-etl-<username>" ] } ] } Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>" }, "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::sample-opt-bucket-etl-<username>/*", "arn:aws:s3:::sample-opt-bucket-etl-<username>" ] } ] } In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket: { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:Encrypt", "kms:GenerateDataKey" ], "Resource": [ "<KMS_KEY_ARN_Used_for_S3_encryption>" ] }, { "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetBucketAcl", "s3:GetBucketCors", "s3:GetEncryptionConfiguration", "s3:GetBucketLocation", "s3:ListAllMyBuckets", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListBucketVersions", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::sample-opt-bucket-etl-<username>", "arn:aws:s3:::sample-opt-bucket-etl-<username>/*" ] } ] } Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A. Add this policy to the AWS Glue role and Amazon MWAA role: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*" } ] } In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A: { "Version": "2012-10-17", "Statement": [ { "Sid": "CrossAccountPolicy", "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA" } ] } Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A. Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA. Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering. Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot. If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run. Configure the Amazon MWAA connection with Secrets Manager When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret. Complete the following steps: Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager). This allows Amazon MWAA to access credentials stored in Secrets Manager. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings. This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths: secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"} To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell. Run the following code to generate the connection URI string: import urllib.parse conn_type = 'redshift' host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint port = '5439' login = 'admin' #Specify the username to use for authentication with Amazon Redshift password = '<password>' #Specify the password to use for authentication with Amazon Redshift role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>') database = 'dev' region = 'us-east-1' #YOUR_REGION conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region) print(conn_string) The connection string should be generated as follows: redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region> Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI). This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext. aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1 Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name. You can also add secrets using the Secrets Manager console as key-value pairs. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test. Create an Airflow connection through the metadata database You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI. For Connection Id, enter a name for the connection. For Connection Type, choose Amazon Redshift. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless. For Database, enter dev. For User, enter your admin user name. For Password, enter your password. For Port, use port 5439. For Extra, set the region and timeout parameters. Test the connection, then save your settings. Create and run a DAG In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets. Create a DAG In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules: The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket. For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder. We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler. After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps. The DAG uses GlueCrawlerSensor to wait for the crawler to complete. When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter. The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file. You can connect to Amazon Redshift from Airflow using three different operators: PythonOperator. SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection. RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection. In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator. Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections. In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed. In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter. As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data. TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete. The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence. The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI. from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.models.baseoperator import chain from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.operators.glue import GlueJobOperator from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator from airflow.utils.trigger_rule import TriggerRule dag_id = "data_pipeline" vYear = datetime.today().strftime("%Y") vMonth = datetime.today().strftime("%m") vDay = datetime.today().strftime("%d") src_bucket_name = "sample-inp-bucket-etl-<username>" tgt_bucket_name = "sample-opt-bucket-etl-<username>" s3_folder="products" #Please replace the variable with the glue_role_arn glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>" glue_crawler_name = "products" glue_db_name = "products_db" glue_job_name = "sample_glue_job" glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py" workgroup_name = "sample-workgroup" redshift_table = "products_f" redshift_conn_id_name="secrets_redshift_connection" db_name = "dev" secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx" poll_interval = 10 @task def get_role_name(arn: str) -> str: return arn.split("/")[-1] @task def get_s3_loc(s3_folder: str) -> str: s3_loc = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv" return s3_loc with DAG( dag_id=dag_id, schedule="@once", start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: role_arn = glue_role_arn_key glue_role_name = get_role_name(role_arn) s3_loc = get_s3_loc(s3_folder) # Check for new incremental files in S3 source/input bucket sensor_key = S3KeySensor( task_id="sensor_key", bucket_key=s3_loc, bucket_name=src_bucket_name, wildcard_match=True, #timeout=18*60*60, #poke_interval=120, timeout=60, poke_interval=30, mode="reschedule" ) # Run Glue crawler glue_crawler_config = { "Name": glue_crawler_name, "Role": role_arn, "DatabaseName": glue_db_name, } crawl_s3 = GlueCrawlerOperator( task_id="crawl_s3", config=glue_crawler_config, ) # GlueCrawlerOperator waits by default, setting as False to test the Sensor below. crawl_s3.wait_for_completion = False # Wait for Glue crawler to complete wait_for_crawl = GlueCrawlerSensor( task_id="wait_for_crawl", crawler_name=glue_crawler_name, ) # Run Glue Job submit_glue_job = GlueJobOperator( task_id="submit_glue_job", job_name=glue_job_name, script_location=glue_script_location, iam_role_name=glue_role_name, create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"}, ) # GlueJobOperator waits by default, setting as False to test the Sensor below. submit_glue_job.wait_for_completion = False # Wait for Glue Job to complete wait_for_job = GlueJobSensor( task_id="wait_for_job", job_name=glue_job_name, # Job ID extracted from previous Glue Job Operator task run_id=submit_glue_job.output, verbose=True, # prints glue job logs in airflow logs ) wait_for_job.poke_interval = 5 # Execute the Stored Procedure in Redshift Serverless using Data Operator execute_redshift_stored_proc = RedshiftDataOperator( task_id="execute_redshift_stored_proc", database=db_name, workgroup_name=workgroup_name, secret_arn=secret_arn, sql="""CALL sp_products();""", poll_interval=poll_interval, wait_for_completion=True, ) # Execute the Stored Procedure in Redshift Serverless using SQL Operator delete_from_table = SQLExecuteQueryOperator( task_id="delete_from_table", conn_id=redshift_conn_id_name, sql="DELETE FROM products;", trigger_rule=TriggerRule.ALL_DONE, ) # Unload the data from Redshift table to S3 transfer_redshift_to_s3 = RedshiftToS3Operator( task_id="transfer_redshift_to_s3", s3_bucket=tgt_bucket_name, s3_key=s3_loc, schema="PUBLIC", table=redshift_table, redshift_conn_id=redshift_conn_id_name, ) transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE #Chain the tasks to be executed chain( sensor_key, crawl_s3, wait_for_crawl, submit_glue_job, wait_for_job, execute_redshift_stored_proc, delete_from_table, transfer_redshift_to_s3 ) Verify the DAG run After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot. In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status. Verify the results On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files. On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/. Clean up Clean up the resources created as part of this post to avoid incurring ongoing charges: Delete the CloudFormation stacks and S3 bucket that you created as prerequisites. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager. Conclusion With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples. About the Authors Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture. Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well. View the full article
  2. Amazon Managed Workflows for Apache Airflow (MWAA) now offers larger environment sizes, giving customers of the managed service the ability to define a greater number of workflows in each Apache Airflow environment, supporting more complex tasks that can utilize increased resources. View the full article
  3. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service for Apache Airflow that streamlines the setup and operation of the infrastructure to orchestrate data pipelines in the cloud. Customers use Amazon MWAA to manage the scalability, availability, and security of their Apache Airflow environments. As they design more intensive, complex, and ever-growing data processing pipelines, customers have asked us for additional underlying resources to provide greater concurrency and capacity for their tasks and workflows. To address this, today, we are announcing the availability of larger environment classes in Amazon MWAA. In this post, we dive into the capabilities of these new XL and 2XL environments, the scenarios they are well suited for, and how you can set up or upgrade your existing Amazon MWAA environment to take advantage of the increased resources. Current challenges When you create an Amazon MWAA environment, a set of managed Amazon Elastic Container Service (Amazon ECS) with AWS Fargate containers are provisioned with defined virtual CPUs and RAM. As you work with larger, complex, resource-intensive workloads, or run thousands of Directed Acyclic Graphs (DAGs) per day, you may start exhausting CPU availability on schedulers and workers, or reaching memory limits in workers. Running Apache Airflow at scale puts proportionally greater load on the Airflow metadata database, sometimes leading to CPU and memory issues on the underlying Amazon Relational Database Service (Amazon RDS) cluster. A resource-starved metadata database may lead to dropped connections from your workers, failing tasks prematurely. To improve performance and resiliency of your tasks, consider following Apache Airflow best practices to author DAGs. As an alternative, you can create multiple Amazon MWAA environments to distribute workloads. However, this requires additional engineering and management effort. New environment classes With today’s release, you can now create XL and 2XL environments in Amazon MWAA in addition to the existing environment classes. They have two and four times the compute, and three and six times the memory, respectively, of the current large Amazon MWAA environment instance class. These instances add compute and RAM linearly to directly improve capacity and performance of all Apache Airflow components. The following table summarizes the environment capabilities. . Scheduler and Worker CPU / RAM Web Server CPU / RAM Concurrent Tasks DAG Capacity mw1.xlarge 8 vCPUs / 24 GB 4 vCPUs / 12 GB 40 tasks (default) Up to 2000 mw1.2xlarge 16 vCPUs / 48 GB 8 vCPUs / 24 GB 80 tasks (default) Up to 4000 With the introduction of these larger environments, your Amazon Aurora metadata database will now use larger, memory-optimized instances powered by AWS Graviton2. With the Graviton2 family of processors, you get compute, storage, and networking improvements, and the reduction of your carbon footprint offered by the AWS family of processors. Pricing Amazon MWAA pricing dimensions remains unchanged, and you only pay for what you use: The environment class Additional worker instances Additional scheduler instances Metadata database storage consumed You now get two additional options in the first three dimensions: XL and 2XL for environment class, additional workers, and schedulers instances. Metadata database storage pricing remains the same. Refer to Amazon Managed Workflows for Apache Airflow Pricing for rates and more details. Observe Amazon MWAA performance to plan scaling to larger environments Before you start using the new environment classes, it’s important to understand if you are in a scenario that relates to capacity issues, such as metadata database out of memory, or workers or schedulers running at high CPU usage. Understanding the performance of your environment resources is key to troubleshooting issues related to capacity. We recommend following the guidance described in Introducing container, database, and queue utilization metrics for the Amazon MWAA environment to better understand the state of Amazon MWAA environments, and get insights to right-size your instances. In the following test, we simulate a high load scenario, use the CloudWatch observability metrics to identify common problems, and make an informed decision to plan scaling to larger environments to mitigate the issues. During our tests, we ran a complex DAG that dynamically creates over 500 tasks and uses external sensors to wait for a task completion in a different DAG. After running on an Amazon MWAA large environment class with auto scaling set up to a maximum of 10 worker nodes, we noticed the following metrics and values in the CloudWatch dashboard. The worker nodes have reached maximum CPU capacity, causing the number of queued tasks to keep increasing. The metadata database CPU utilization has peaked at over 65% capacity, and the available database free memory has been reduced. In this situation, we could further increase the worker nodes to scale, but that would put additional load on the metadata database CPU. This might lead to a drop in the number of worker database connections and available free database memory. With new environment classes, you can vertically scale to increase available resources by editing the environment and selecting a higher class of environment, as shown in the following screenshot. From the list of environments, we select the one in use for this test. Choose Edit to navigate to the Configure advanced settings page, and select the appropriate xlarge or 2xlarge environment as required. After you save the change, the environment upgrade will take 20–30 minutes to complete. Any running DAG that got interrupted during the upgrade is scheduled for a retry, depending on the way you configured the retries for your DAGs. You can now choose to invoke them manually or wait for the next scheduled run. After we upgraded the environment class, we tested the same DAG and observed the metrics were showing improved values because more resources are now available. With this XL environment, you can run more tasks on fewer worker nodes, and therefore the number of queued tasks kept decreasing. Alternately, if you have tasks that require more memory and/or CPU, you can reduce the tasks per worker, but still achieve a high number of tasks per worker with a larger environment size. For example, if you have a large environment where the worker node CPU is maxed out with celery.worker_autoscale (the Airflow configuration that defines the number of tasks per worker) Set at 20,20, you can increase to an XL environment and set celery.worker_autoscale to 20,20 on the XL, rather than the default 40 tasks per worker on an XL environment and the CPU load should reduce significantly. Set up a new XL environment in Amazon MWAA You can get started with Amazon MWAA in your account and preferred AWS Region using the AWS Management Console, API, or AWS Command Line Interface (AWS CLI). If you’re adopting infrastructure as code (IaC), you can automate the setup using AWS CloudFormation, the AWS Cloud Development Kit (AWS CDK), or Terraform scripts. Amazon MWAA XL and 2XL environment classes are available today in all Regions where Amazon MWAA is currently available. Conclusion Today, we are announcing the availability of two new environment classes in Amazon MWAA. With XL and 2XL environment classes, you can orchestrate larger volumes of complex or resource-intensive workflows. If you are running DAGs with a high number of dependencies, running thousands of DAGs across multiple environments, or in a scenario that requires you to heavily use workers for compute, you can now overcome the related capacity issues by increasing your environment resources in a few straightforward steps. In this post, we discussed the capabilities of the two new environment classes, including pricing and some common resource constraint problems they solve. We provided guidance and an example of how to observe your existing environments to plan scaling to XL or 2XL, and we described how you can upgrade existing environments to use the increased resources. For additional details and code examples on Amazon MWAA, visit the Amazon MWAA User Guide and the Amazon MWAA examples GitHub repo. Apache, Apache Airflow, and Airflow are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. About the Authors Hernan Garcia is a Senior Solutions Architect at AWS based in the Netherlands. He works in the financial services industry, supporting enterprises in their cloud adoption. He is passionate about serverless technologies, security, and compliance. He enjoys spending time with family and friends, and trying out new dishes from different cuisines. Jeetendra Vaidya is a Senior Solutions Architect at AWS, bringing his expertise to the realms of AI/ML, serverless, and data analytics domains. He is passionate about assisting customers in architecting secure, scalable, reliable, and cost-effective solutions. Sriharsh Adari is a Senior Solutions Architect at AWS, where he helps customers work backward from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise includes technology strategy, data analytics, and data science. In his spare time, he enjoys playing sports, watching TV shows, and playing Tabla. View the full article
  4. What is Apache Airflow? Apache Airflow addresses the need for a robust, scalable, and flexible solution for orchestrating data workflows.View the full article
  5. Data Engineering Tools in 2024 The data engineering landscape in 2024 is bustling with innovative tools and evolving trends. Here’s an updated perspective on some of the key players and how they can empower your data pipelines: Data Integration: Informatica Cloud: Still a leader for advanced data quality and governance, with enhanced cloud-native capabilities. MuleSoft Anypoint Platform: Continues to shine in building API-based integrations, now with deeper cloud support and security features. Fivetran: Expands its automated data pipeline creation with pre-built connectors and advanced transformations. Hevo Data: Remains a strong contender for ease of use and affordability, now offering serverless options for scalability. Data Warehousing: Snowflake: Maintains its edge in cloud-based warehousing, with improved performance and broader integrations for analytics. Google BigQuery: Offers even more cost-effective options for variable workloads, while deepening its integration with other Google Cloud services. Amazon Redshift: Continues to be a powerful choice for AWS environments, now with increased focus on security and data governance. Microsoft Azure Synapse Analytics: Further integrates its data warehousing, lake, and analytics capabilities, providing a unified platform for diverse data needs. Data Processing and Orchestration: Apache Spark: Remains the reigning champion for large-scale data processing, now with enhanced performance optimizations and broader ecosystem support. Apache Airflow: Maintains its popularity for workflow orchestration, with improved scalability and user-friendliness. Databricks: Expands its cloud-based platform for Spark with advanced features like AI integration and real-time streaming. AWS Glue: Simplifies data processing and ETL within the AWS ecosystem, now with serverless options for cost efficiency. Emerging Trends: GitOps: Gaining traction for managing data pipelines with version control and collaboration, ensuring consistency and traceability. AI and Machine Learning: Increasingly integrated into data engineering tools for automation, anomaly detection, and data quality improvement. Serverless Data Processing: Offering cost-effective and scalable options for event-driven and real-time data processing. Choosing the right tools: With this diverse landscape, selecting the right tools depends on your specific needs. Consider factors like: Data volume and complexity: Match tool capabilities to your data size and structure. Cloud vs. on-premises: Choose based on your infrastructure preferences and security requirements. Budget: Evaluate pricing models and potential costs associated with each tool. Integration needs: Ensure seamless compatibility with your existing data sources and BI tools. Skillset: Consider the technical expertise required for each tool and available support resources. By carefully evaluating your needs and exploring the strengths and limitations of these top contenders, you’ll be well-equipped to choose the data engineering tools that empower your organization to unlock valuable insights from your data in 2024. The post Data Engineering Tools in 2024 appeared first on DevOpsSchool.com. View the full article
  6. You can now create Apache Airflow version 2.8 environments on Amazon Managed Workflows for Apache Airflow (MWAA). Apache Airflow 2.8 is the latest minor release of the popular open-source tool that helps customers author, schedule, and monitor workflows. View the full article
  7. Top list of open-source tools for building and managing workflows.View the full article
  8. This article delves into the integration of Airbyte with some of the most popular data orchestrators in the industry – Apache Airflow, Dagster, and Prefect. We'll not only guide you through the process of integrating Airbyte with these orchestrators but also provide a comparative insight into how each one can uniquely enhance your data workflows. We also provide links to working code examples for each of these integrations. These resources are designed for quick deployment, allowing you to seamlessly integrate Airbyte with your orchestrator of choice. View the full article
  9. This article is part of a project that’s split into two main phases. The first phase focuses on building a data pipeline. This involves getting data from an API and storing it in a PostgreSQL database. In the second phase, we’ll develop an application that uses a language model to interact with this database. Ideal for those new to data systems or language model applications, this project is structured into two segments: This initial article guides you through constructing a data pipeline utilizing Kafka for streaming, Airflow for orchestration, Spark for data transformation, and PostgreSQL for storage. To set-up and run these tools we will use Docker.The second article, which will come later, will delve into creating agents using tools like LangChain to communicate with external databases.This first part project is ideal for beginners in data engineering, as well as for data scientists and machine learning engineers looking to deepen their knowledge of the entire data handling process. Using these data engineering tools firsthand is beneficial. It helps in refining the creation and expansion of machine learning models, ensuring they perform effectively in practical settings. This article focuses more on practical application rather than theoretical aspects of the tools discussed. For detailed understanding of how these tools work internally, there are many excellent resources available online. OverviewLet’s break down the data pipeline process step-by-step: Data Streaming: Initially, data is streamed from the API into a Kafka topic.Data Processing: A Spark job then takes over, consuming the data from the Kafka topic and transferring it to a PostgreSQL database.Scheduling with Airflow: Both the streaming task and the Spark job are orchestrated using Airflow. While in a real-world scenario, the Kafka producer would constantly listen to the API, for demonstration purposes, we’ll schedule the Kafka streaming task to run daily. Once the streaming is complete, the Spark job processes the data, making it ready for use by the LLM application.All of these tools will be built and run using docker, and more specifically docker-compose. Overview of the data pipeline. Image by the author.Now that we have a blueprint of our pipeline, let’s dive into the technical details ! Local setupFirst you can clone the Github repo on your local machine using the following command: git clone https://github.com/HamzaG737/data-engineering-project.gitHere is the overall structure of the project: ├── LICENSE ├── README.md ├── airflow │ ├── Dockerfile │ ├── __init__.py │ └── dags │ ├── __init__.py │ └── dag_kafka_spark.py ├── data │ └── last_processed.json ├── docker-compose-airflow.yaml ├── docker-compose.yml ├── kafka ├── requirements.txt ├── spark │ └── Dockerfile └── src ├── __init__.py ├── constants.py ├── kafka_client │ ├── __init__.py │ └── kafka_stream_data.py └── spark_pgsql └── spark_streaming.pyThe airflow directory contains a custom Dockerfile for setting up airflow and a dags directory to create and schedule the tasks.The data directory contains the last_processed.json file which is crucial for the Kafka streaming task. Further details on its role will be provided in the Kafka section.The docker-compose-airflow.yaml file defines all the services required to run airflow.The docker-compose.yaml file specifies the Kafka services and includes a docker-proxy. This proxy is essential for executing Spark jobs through a docker-operator in Airflow, a concept that will be elaborated on later.The spark directory contains a custom Dockerfile for spark setup.src contains the python modules needed to run the application.To set up your local development environment, start by installing the required Python packages. The only essential package is psycopg2-binary. You have the option to install just this package or all the packages listed in the requirements.txt file. To install all packages, use the following command: pip install -r requirements.txtNext let’s dive step by step into the project details. About the APIThe API is RappelConso from the French public services. It gives access to data relating to recalls of products declared by professionals in France. The data is in French and it contains initially 31 columns (or fields). Some of the most important are: reference_fiche (reference sheet): Unique identifier of the recalled product. It will act as the primary key of our Postgres database later.categorie_de_produit (Product category): For instance food, electrical appliance, tools, transport means, etc …sous_categorie_de_produit (Product sub-category): For instance we can have meat, dairy products, cereals as sub-categories for the food category.motif_de_rappel (Reason for recall): Self explanatory and one of the most important fields.date_de_publication which translates to the publication date.risques_encourus_par_le_consommateur which contains the risks that the consumer may encounter when using the product.There are also several fields that correspond to different links, such as link to product image, link to the distributers list, etc..You can see some examples and query manually the dataset records using this link. We refined the data columns in a few key ways: Columns like ndeg_de_version and rappelguid, which were part of a versioning system, have been removed as they aren’t needed for our project.We combined columns that deal with consumer risks — risques_encourus_par_le_consommateur and description_complementaire_du_risque — for a clearer overview of product risks.The date_debut_fin_de_commercialisation column, which indicates the marketing period, has been divided into two separate columns. This split allows for easier queries about the start or end of a product’s marketing.We’ve removed accents from all columns except for links, reference numbers, and dates. This is important because some text processing tools struggle with accented characters.For a detailed look at these changes, check out our transformation script at src/kafka_client/transformations.py. The updated list of columns is available insrc/constants.py under DB_FIELDS. Kafka streamingTo avoid sending all the data from the API each time we run the streaming task, we define a local json file that contains the last publication date of the latest streaming. Then we will use this date as the starting date for our new streaming task. To give an example, suppose that the latest recalled product has a publication date of 22 november 2023. If we make the hypothesis that all of the recalled products infos before this date are already persisted in our Postgres database, We can now stream the data starting from the 22 november. Note that there is an overlap because we may have a scenario where we didn’t handle all of the data of the 22nd of November. The file is saved in ./data/last_processed.json and has this format: {last_processed:"2023-11-22"}By default the file is an empty json which means that our first streaming task will process all of the API records which are 10 000 approximately. Note that in a production setting this approach of storing the last processed date in a local file is not viable and other approaches involving an external database or an object storage service may be more suitable. The code for the kafka streaming can be found on ./src/kafka_client/kafka_stream_data.py and it involves primarily querying the data from the API, making the transformations, removing potential duplicates, updating the last publication date and serving the data using the kafka producer. The next step is to run the kafka service defined the docker-compose defined below: version: '3' services: kafka: image: 'bitnami/kafka:latest' ports: - '9094:9094' networks: - airflow-kafka environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - ./kafka:/bitnami/kafka kafka-ui: container_name: kafka-ui-1 image: provectuslabs/kafka-ui:latest ports: - 8800:8080 depends_on: - kafka environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: PLAINTEXT://kafka:9092 DYNAMIC_CONFIG_ENABLED: 'true' networks: - airflow-kafka networks: airflow-kafka: external: trueThe key highlights from this file are: The kafka service uses a base image bitnami/kafka.We configure the service with only one broker which is enough for our small project. A Kafka broker is responsible for receiving messages from producers (which are the sources of data), storing these messages, and delivering them to consumers (which are the sinks or end-users of the data). The broker listens to port 9092 for internal communication within the cluster and port 9094 for external communication, allowing clients outside the Docker network to connect to the Kafka broker.In the volumes part, we map the local directory kafka to the docker container directory /bitnami/kafka to ensure data persistence and a possible inspection of Kafka’s data from the host system.We set-up the service kafka-ui that uses the docker image provectuslabs/kafka-ui:latest . This provides a user interface to interact with the Kafka cluster. This is especially useful for monitoring and managing Kafka topics and messages.To ensure communication between kafka and airflow which will be run as an external service, we will use an external network airflow-kafka.Before running the kafka service, let’s create the airflow-kafka network using the following command: docker network create airflow-kafkaNow everything is set to finally start our kafka service docker-compose up After the services start, visit the kafka-ui at http://localhost:8800/. Normally you should get something like this: Overview of the Kafka UI. Image by the author.Next we will create our topic that will contain the API messages. Click on Topics on the left and then Add a topic at the top left. Our topic will be called rappel_conso and since we have only one broker we set the replication factor to 1. We will also set the partitions number to 1 since we will have only one consumer thread at a time so we won’t need any parallelism. Finally, we can set the time to retain data to a small number like one hour since we will run the spark job right after the kafka streaming task, so we won’t need to retain the data for a long time in the kafka topic. Postgres set-upBefore setting-up our spark and airflow configurations, let’s create the Postgres database that will persist our API data. I used the pgadmin 4 tool for this task, however any other Postgres development platform can do the job. To install postgres and pgadmin, visit this link https://www.postgresql.org/download/ and get the packages following your operating system. Then when installing postgres, you need to setup a password that we will need later to connect to the database from the spark environment. You can also leave the port at 5432. If your installation has succeeded, you can start pgadmin and you should observe something like this window: Overview of pgAdmin interface. Image by the author.Since we have a lot of columns for the table we want to create, we chose to create the table and add its columns with a script using psycopg2, a PostgreSQL database adapter for Python. You can run the script with the command: python scripts/create_table.pyNote that in the script I saved the postgres password as environment variable and name it POSTGRES_PASSWORD. So if you use another method to access the password you need to modify the script accordingly. Spark Set-upHaving set-up our Postgres database, let’s delve into the details of the spark job. The goal is to stream the data from the Kafka topic rappel_conso to the Postgres table rappel_conso_table. from pyspark.sql import SparkSession from pyspark.sql.types import ( StructType, StructField, StringType, ) from pyspark.sql.functions import from_json, col from src.constants import POSTGRES_URL, POSTGRES_PROPERTIES, DB_FIELDS import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s:%(funcName)s:%(levelname)s:%(message)s" ) def create_spark_session() -> SparkSession: spark = ( SparkSession.builder.appName("PostgreSQL Connection with PySpark") .config( "spark.jars.packages", "org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0", ) .getOrCreate() ) logging.info("Spark session created successfully") return spark def create_initial_dataframe(spark_session): """ Reads the streaming data and creates the initial dataframe accordingly. """ try: # Gets the streaming data from topic random_names df = ( spark_session.readStream.format("kafka") .option("kafka.bootstrap.servers", "kafka:9092") .option("subscribe", "rappel_conso") .option("startingOffsets", "earliest") .load() ) logging.info("Initial dataframe created successfully") except Exception as e: logging.warning(f"Initial dataframe couldn't be created due to exception: {e}") raise return df def create_final_dataframe(df): """ Modifies the initial dataframe, and creates the final dataframe. """ schema = StructType( [StructField(field_name, StringType(), True) for field_name in DB_FIELDS] ) df_out = ( df.selectExpr("CAST(value AS STRING)") .select(from_json(col("value"), schema).alias("data")) .select("data.*") ) return df_out def start_streaming(df_parsed, spark): """ Starts the streaming to table spark_streaming.rappel_conso in postgres """ # Read existing data from PostgreSQL existing_data_df = spark.read.jdbc( POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES ) unique_column = "reference_fiche" logging.info("Start streaming ...") query = df_parsed.writeStream.foreachBatch( lambda batch_df, _: ( batch_df.join( existing_data_df, batch_df[unique_column] == existing_data_df[unique_column], "leftanti" ) .write.jdbc( POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES ) ) ).trigger(once=True) \ .start() return query.awaitTermination() def write_to_postgres(): spark = create_spark_session() df = create_initial_dataframe(spark) df_final = create_final_dataframe(df) start_streaming(df_final, spark=spark) if __name__ == "__main__": write_to_postgres()Let’s break down the key highlights and functionalities of the spark job: First we create the Spark sessiondef create_spark_session() -> SparkSession: spark = ( SparkSession.builder.appName("PostgreSQL Connection with PySpark") .config( "spark.jars.packages", "org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0", ) .getOrCreate() ) logging.info("Spark session created successfully") return spark2. The create_initial_dataframe function ingests streaming data from the Kafka topic using Spark's structured streaming. def create_initial_dataframe(spark_session): """ Reads the streaming data and creates the initial dataframe accordingly. """ try: # Gets the streaming data from topic random_names df = ( spark_session.readStream.format("kafka") .option("kafka.bootstrap.servers", "kafka:9092") .option("subscribe", "rappel_conso") .option("startingOffsets", "earliest") .load() ) logging.info("Initial dataframe created successfully") except Exception as e: logging.warning(f"Initial dataframe couldn't be created due to exception: {e}") raise return df3. Once the data is ingested, create_final_dataframe transforms it. It applies a schema (defined by the columns DB_FIELDS) to the incoming JSON data, ensuring that the data is structured and ready for further processing. def create_final_dataframe(df): """ Modifies the initial dataframe, and creates the final dataframe. """ schema = StructType( [StructField(field_name, StringType(), True) for field_name in DB_FIELDS] ) df_out = ( df.selectExpr("CAST(value AS STRING)") .select(from_json(col("value"), schema).alias("data")) .select("data.*") ) return df_out4. The start_streaming function reads existing data from the database, compares it with the incoming stream, and appends new records. def start_streaming(df_parsed, spark): """ Starts the streaming to table spark_streaming.rappel_conso in postgres """ # Read existing data from PostgreSQL existing_data_df = spark.read.jdbc( POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES ) unique_column = "reference_fiche" logging.info("Start streaming ...") query = df_parsed.writeStream.foreachBatch( lambda batch_df, _: ( batch_df.join( existing_data_df, batch_df[unique_column] == existing_data_df[unique_column], "leftanti" ) .write.jdbc( POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES ) ) ).trigger(once=True) \ .start() return query.awaitTermination()The complete code for the Spark job is in the file src/spark_pgsql/spark_streaming.py. We will use the Airflow DockerOperator to run this job, as explained in the upcoming section. Let’s go through the process of creating the Docker image we need to run our Spark job. Here’s the Dockerfile for reference: FROM bitnami/spark:latest WORKDIR /opt/bitnami/spark RUN pip install py4j COPY ./src/spark_pgsql/spark_streaming.py ./spark_streaming.py COPY ./src/constants.py ./src/constants.py ENV POSTGRES_DOCKER_USER=host.docker.internal ARG POSTGRES_PASSWORD ENV POSTGRES_PASSWORD=$POSTGRES_PASSWORDIn this Dockerfile, we start with the bitnami/spark image as our base. It's a ready-to-use Spark image. We then install py4j, a tool needed for Spark to work with Python. The environment variables POSTGRES_DOCKER_USER and POSTGRES_PASSWORD are set up for connecting to a PostgreSQL database. Since our database is on the host machine, we use host.docker.internal as the user. This allows our Docker container to access services on the host, in this case, the PostgreSQL database. The password for PostgreSQL is passed as a build argument, so it's not hard-coded into the image. It’s important to note that this approach, especially passing the database password at build time, might not be secure for production environments. It could potentially expose sensitive information. In such cases, more secure methods like Docker BuildKit should be considered. Now, let’s build the Docker image for Spark: docker build -f spark/Dockerfile -t rappel-conso/spark:latest --build-arg POSTGRES_PASSWORD=$POSTGRES_PASSWORD .This command will build the image rappel-conso/spark:latest . This image includes everything needed to run our Spark job and will be used by Airflow’s DockerOperator to execute the job. Remember to replace $POSTGRES_PASSWORD with your actual PostgreSQL password when running this command. AirflowAs said earlier, Apache Airflow serves as the orchestration tool in the data pipeline. It is responsible for scheduling and managing the workflow of the tasks, ensuring they are executed in a specified order and under defined conditions. In our system, Airflow is used to automate the data flow from streaming with Kafka to processing with Spark. Airflow DAGLet’s take a look at the Directed Acyclic Graph (DAG) that will outline the sequence and dependencies of tasks, enabling Airflow to manage their execution. start_date = datetime.today() - timedelta(days=1) default_args = { "owner": "airflow", "start_date": start_date, "retries": 1, # number of retries before failing the task "retry_delay": timedelta(seconds=5), } with DAG( dag_id="kafka_spark_dag", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False, ) as dag: kafka_stream_task = PythonOperator( task_id="kafka_data_stream", python_callable=stream, dag=dag, ) spark_stream_task = DockerOperator( task_id="pyspark_consumer", image="rappel-conso/spark:latest", api_version="auto", auto_remove=True, command="./bin/spark-submit --master local[*] --packages org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 ./spark_streaming.py", docker_url='tcp://docker-proxy:2375', environment={'SPARK_LOCAL_HOSTNAME': 'localhost'}, network_mode="airflow-kafka", dag=dag, ) kafka_stream_task >> spark_stream_taskHere are the key elements from this configuration The tasks are set to execute daily.The first task is the Kafka Stream Task. It is implemented using the PythonOperator to run the Kafka streaming function. This task streams data from the RappelConso API into a Kafka topic, initiating the data processing workflow.The downstream task is the Spark Stream Task. It uses the DockerOperator for execution. It runs a Docker container with our custom Spark image, tasked with processing the data received from Kafka.The tasks are arranged sequentially, where the Kafka streaming task precedes the Spark processing task. This order is crucial to ensure that data is first streamed and loaded into Kafka before being processed by Spark.About the DockerOperatorUsing docker operator allow us to run docker-containers that correspond to our tasks. The main advantage of this approach is easier package management, better isolation and enhanced testability. We will demonstrate the use of this operator with the spark streaming task. Here are some key details about the docker operator for the spark streaming task: We will use the image rappel-conso/spark:latest specified in the Spark Set-up section.The command will run the Spark submit command inside the container, specifying the master as local, including necessary packages for PostgreSQL and Kafka integration, and pointing to the spark_streaming.py script that contains the logic for the Spark job.docker_url represents the url of the host running the docker daemon. The natural solution is to set it as unix://var/run/docker.sock and to mount the var/run/docker.sock in the airflow docker container. One problem we had with this approach is a permission error to use the socket file inside the airflow container. A common workaround, changing permissions with chmod 777 var/run/docker.sock, poses significant security risks. To circumvent this, we implemented a more secure solution using bobrik/socat as a docker-proxy. This proxy, defined in a Docker Compose service, listens on TCP port 2375 and forwards requests to the Docker socket: docker-proxy: image: bobrik/socat command: "TCP4-LISTEN:2375,fork,reuseaddr UNIX-CONNECT:/var/run/docker.sock" ports: - "2376:2375" volumes: - /var/run/docker.sock:/var/run/docker.sock networks: - airflow-kafkaIn the DockerOperator, we can access the host docker /var/run/docker.sock via thetcp://docker-proxy:2375 url, as described here and here. Finally we set the network mode to airflow-kafka. This allows us to use the same network as the proxy and the docker running kafka. This is crucial since the spark job will consume the data from the kafka topic so we must ensure that both containers are able to communicate.After defining the logic of our DAG, let’s understand now the airflow services configuration in the docker-compose-airflow.yaml file. Airflow ConfigurationThe compose file for airflow was adapted from the official apache airflow docker-compose file. You can have a look at the original file by visiting this link. As pointed out by this article, this proposed version of airflow is highly resource-intensive mainly because the core-executor is set to CeleryExecutor that is more adapted for distributed and large-scale data processing tasks. Since we have a small workload, using a single-noded LocalExecutor is enough. Here is an overview of the changes we made on the docker-compose configuration of airflow: We set the environment variable AIRFLOW__CORE__EXECUTOR to LocalExecutor.We removed the services airflow-worker and flower because they only work for the Celery executor. We also removed the redis caching service since it works as a backend for celery. We also won’t use the airflow-triggerer so we remove it too.We replaced the base image ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.3} for the remaining services, mainly the scheduler and the webserver, by a custom image that we will build when running the docker-compose.version: '3.8' x-airflow-common: &airflow-common build: context: . dockerfile: ./airflow_resources/Dockerfile image: de-project/airflow:latestWe mounted the necessary volumes that are needed by airflow. AIRFLOW_PROJ_DIR designates the airflow project directory that we will define later. We also set the network as airflow-kafka to be able to communicate with the kafka boostrap servers.volumes: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ./src:/opt/airflow/dags/src - ./data/last_processed.json:/opt/airflow/data/last_processed.json user: "${AIRFLOW_UID:-50000}:0" networks: - airflow-kafkaNext, we need to create some environment variables that will be used by docker-compose: echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_PROJ_DIR=\"./airflow_resources\"" > .envWhere AIRFLOW_UID represents the User ID in Airflow containers and AIRFLOW_PROJ_DIR represents the airflow project directory. Now everything is set-up to run your airflow service. You can start it with this command: docker compose -f docker-compose-airflow.yaml upThen to access the airflow user interface you can visit this url http://localhost:8080 . Sign-in window on Airflow. Image by the author.By default, the username and password are airflow for both. After signing in, you will see a list of Dags that come with airflow. Look for the dag of our project kafka_spark_dag and click on it. Overview of the task window in airflow. Image by the author.You can start the task by clicking on the button next to DAG: kafka_spark_dag. Next, you can check the status of your tasks in the Graph tab. A task is done when it turns green. So, when everything is finished, it should look something like this: Image by the author.To verify that the rappel_conso_table is filled with data, use the following SQL query in the pgAdmin Query Tool: SELECT count(*) FROM rappel_conso_tableWhen I ran this in January 2024, the query returned a total of 10022 rows. Your results should be around this number as well. ConclusionThis article has successfully demonstrated the steps to build a basic yet functional data engineering pipeline using Kafka, Airflow, Spark, PostgreSQL, and Docker. Aimed primarily at beginners and those new to the field of data engineering, it provides a hands-on approach to understanding and implementing key concepts in data streaming, processing, and storage. Throughout this guide, we’ve covered each component of the pipeline in detail, from setting up Kafka for data streaming to using Airflow for task orchestration, and from processing data with Spark to storing it in PostgreSQL. The use of Docker throughout the project simplifies the setup and ensures consistency across different environments. It’s important to note that while this setup is ideal for learning and small-scale projects, scaling it for production use would require additional considerations, especially in terms of security and performance optimization. Future enhancements could include integrating more advanced data processing techniques, exploring real-time analytics, or even expanding the pipeline to incorporate more complex data sources. In essence, this project serves as a practical starting point for those looking to get their hands dirty with data engineering. It lays the groundwork for understanding the basics, providing a solid foundation for further exploration in the field. In the second part, we’ll explore how to effectively use the data stored in our PostgreSQL database. We’ll introduce agents powered by Large Language Models (LLMs) and a variety of tools that enable us to interact with the database using natural language queries. So, stay tuned ! To reach outLinkedIn : https://www.linkedin.com/in/hamza-gharbi-043045151/Twitter : https://twitter.com/HamzaGh25079790End-to-End Data Engineering System on Real Data with Kafka, Spark, Airflow, Postgres, and Docker was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story. View the full article
  10. The way you retrieve variables from Airflow can impact the performance of your DAGs Continue reading on Towards Data Science » View the full article
  11. This post series is about mastering offline data pipeline's best practices, focusing on the potent combination of Apache Airflow and data processing engines like Hive and Spark. In Part 1 of our series explored the strategies for enhancing Airflow data pipelines using Apache Hive on AWS EMR. Our primary objective was to attain cost efficiency and establish effective job configurations. In this concluding Part 2, we will extensively explore Apache Spark, another pivotal element in our comprehensive data engineering toolkit. By optimizing the Airflow job parameters specifically for Spark, there is a substantial potential for enhancing performance and realizing substantial cost savings. Why Apache Spark in Airflow? Apache Spark is a really important framework and tool for data processing in companies all about data. It's genuinely outstanding at processing massive amounts of data quickly and efficiently. It's especially great for complex data analytics with fast query performance and advanced analytics capabilities. This makes Spark a preferred choice for enterprises handling vast amounts of data and requiring real-time analytics. View the full article
  12. Welcome to the first post in our exciting series on mastering offline data pipeline's best practices, focusing on the potent combination of Apache Airflow and data processing engines like Hive and Spark. This post focuses on elevating our data engineering game, streamlining your data workflows, and significantly cutting computing costs. The need to optimize offline data pipeline optimization has become a necessity with the growing complexity and scale of modern data pipelines. In this kickoff post, we delve into the intricacies of Apache Airflow and AWS EMR, a managed cluster platform for big data processing. Working together, they form the backbone of many modern data engineering solutions. However, they can become a source of increased costs and inefficiencies without the right optimization strategies. Let's dive into the journey to transform your data workflows and embrace cost-efficiency in your data engineering environment. View the full article
  13. Using Airflow sensors to control the execution of DAGs on a different schedule Continue reading on Towards Data Science » View the full article
  14. You can now create Apache Airflow version 2.7 environments and execute deferrable operators on Amazon Managed Workflows for Apache Airflow (MWAA). Apache Airflow 2.7 is the latest minor release of the popular open-source tool that helps customers author, schedule, and monitor workflows. View the full article
  15. You can now create Apache Airflow version 2.6 environments on Amazon Managed Workflows for Apache Airflow (MWAA). Apache Airflow 2.6 is the latest minor release of the popular open-source tool that helps customers author, schedule, and monitor workflows. View the full article
  16. This comprehensive blog presents various approaches for monitoring, troubleshooting, and minimizing DAG parse times, leading to notable performance improvements in Cloud Composer / Airflow: Increase environment scalability by efficiently handling larger workloads and accommodating more DAGs. Improve environment stability by limiting the chance of task overlaps and resource contention. Enhance productivity and overall efficiency for developers through faster feedback loops and reduced processing time. A low DAG parse time serves as a reliable indicator of a healthy Cloud Composer / Airflow environment Getting startedWhat is an Airflow DAG?An Airflow DAG (Directed Acyclic Graph) is a collection of tasks that are organized in a way that reflects their relationships and dependencies. DAGs are defined in Python scripts, and they are the core concept of Airflow. A DAG defines four things: The tasks that need to be run The order in which the tasks need to be run The dependencies between the tasks The schedule for running the tasks DAGs are a powerful way to define and manage complex workflows. They can be used to automate tasks, schedule tasks, and monitor the execution of tasks. What is the Airflow Scheduler?The Airflow Scheduler monitors all tasks and DAGs, then triggers the task instances once dependent tasks are complete. Once every 30 seconds by default, the Scheduler collects DAG parsing results and checks whether any active tasks can be triggered. What is the DAG Processor? As of Airflow 2.3.0, the DAG Processor is separate from the Airflow Scheduler. For more information about this change, check out AIP-43 DAG Processor separation. Monitoring and alertingMonitoring DAG parse timesIn Google Cloud console you can use the Monitoring page and the Logs tab to inspect DAG parse times. On Cloud Composer environment Run the following commands to check DAG parse times on the Cloud Composer environment.: code_block[StructValue([(u'code', u'gcloud composer environments run $ENVIRONMENT_NAME \\\r\n --location $LOCATION \\\r\n dags report'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebec548d7d0>)])]Locally using time command code_block[StructValue([(u'code', u'time python airflow/example_dags/example.py'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf205402d0>)])]Make sure to run it several times in succession to account for caching effects. Compare the results before and after the optimization (in the same conditions - using the same machine, environment etc.) in order to assess the impact of any optimization. Sample output: code_block[StructValue([(u'code', u'real 0m0.699s\r\n user 0m0.590s\r\n sys 0m0.108s'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20540510>)])]The important metric is the "real time" - which tells you how long time it took to process the DAG. Note that when loading the file this way, you are starting a new interpreter so there is an initial loading time that is not present when Airflow parses the DAG. You can assess the time of initialization by running: code_block[StructValue([(u'code', u'time python -c'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20540b50>)])]Result: code_block[StructValue([(u'code', u'real 0m0.073s\r\n user 0m0.037s\r\n sys 0m0.039s'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20fa80d0>)])]In this case the initial interpreter startup time is ~ 0.07s which is about 10% of time needed to parse the example_python_operator.py above so the actual parsing time is about ~ 0.62 s for the example DAG. What is an ideal parse time metric?On the Monitoring dashboard, in the DAG Statistics section, observe graphs for the total DAG parse time. If the number exceeds about 10 seconds, your Schedulers might be overloaded with DAG parsing and cannot run DAGs effectively. How can I receive alerts for long parse times?You can create alerting policies to monitor the values of metrics and to notify you when those metrics violate a condition. This can also be done through the Composer Monitoring Dashboard. DAG code optimizationGeneralized DAG code improvementsCheck out Optimize Cloud Composer via Better Airflow DAGs to view a generalized checklist of activities when authoring Apache Airflow DAGs. These items follow best practices determined by Google Cloud and the open source community. A collection of performant DAGs will enable Cloud Composer to work optimally and standardized authoring will help developers manage hundreds or thousands of DAGs. Each item will benefit your Cloud Composer environment and your development process. The two highest priorities should be limiting top-level code and avoiding the use of variables/xcoms in top-level code. Limit top-level codeFollow established best practices. You should avoid writing the top level code which is not necessary to create Operators and build DAG relations between them. This is because of the design decision for the Scheduler of Airflow and the impact the top-level code parsing speed on both performance and scalability of Airflow. One of the important factors impacting DAG loading time, that might be overlooked by Python developers is that top-level imports might take surprisingly a lot of time (in the order of seconds) and they can generate a lot of overhead and this can be easily avoided by converting them to local imports inside Python callables for example. Avoid the use of Variables and Xcoms in top-level codeIf you are using Variable.get() in top level code, every time the .py file is parsed, Airflow executes a Variable.get() which opens a session to the DB. This can dramatically slow down parse times. Use JSON dictionaries or Jinja templates as values if absolutely necessary. (one connection for many values inside dict) DAG folder cleanupRemove unused DAGs, unnecessary files from the DAGs folderAirflow Scheduler wastes time and resources parsing files in DAGs folder that aren’t used. Use .airflowignore An .airflowignore file specifies the directories or files in DAG_FOLDER or PLUGINS_FOLDER that Airflow should intentionally ignore. Airflow supports two syntax flavors for patterns in the file, as specified by the DAG_IGNORE_FILE_SYNTAX configuration parameter (added in Airflow 2.3): regexp and glob. More files ignored = less files parsed by Airflow Scheduler. Review paused DAGsPaused DAGs are still continuously parsed by the Airflow Scheduler. Determine why each DAG is paused and whether it should be removed, ignored, or unpaused. Airflow configurationsmin_file_process_intervalThe Scheduler parses your DAG files every min_file_process_interval number of seconds. Airflow starts using your updated DAG code only after this interval ends. Consider increasing this interval when you have a high number of DAGs that do not change too often, or observe a high Scheduler load in general. Consider decreasing this interval to parse your DAGs faster. Updates to DAGs are reflected after this interval. Keeping this number low will increase CPU usage. For example, if you have >1000 dag files, raise the min_file_process_interval to 600 (10 minutes), 6000 (100 minutes), or a higher value. dag_dir_list_intervalDag_dir_list_interval determines how often Airflow should scan the DAGs directory in seconds. A lower value here means that new DAGs will be processed faster, but this comes at the cost of CPU usage. Increasing the DAG directory listing interval reduces the Scheduler load associated with discovery of new DAGs in the environment's bucket. Consider increasing this interval if you deploy new DAGs infrequently. Consider decreasing this interval if you want Airflow to react faster to newly deployed DAG files. parsing_processesThe DAG Processor can run multiple processes in parallel to parse DAGs, and parsing_processes (formerly max_threads) determines how many of those processes can run in parallel. Increasing this value can help to serialize DAGs if you have a large number of them. By default, this is set to 2. code_block[StructValue([(u'code', u'[scheduler]\r\nparsing_processes = <NUMBER_OF_CORES_IN_MACHINE - 1>'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20fa81d0>)])]file_parsing_sort_mode Evaluate the following file_parsing_sort_mode options if you are running more than one Airflow Scheduler. The Scheduler will list and sort the dag files to decide the parsing order. modified_time: Sort by modified time of the files. This is useful on a large scale to parse the recently modified DAGs first. (default) random_seeded_by_host: Sort randomly across multiple Schedulers but with the same order on the same host. This is useful when running with Scheduler in HA mode where each Scheduler can parse different DAG files. alphabetical: Sort by filename When there are a lot (>1000) of dags files, you can prioritize parsing of new files by changing the file_parsing_sort_mode to modified_time. Cloud Composer upgradesIf you’ve gotten this far and still observe long DAG parse times, you’ll need to consider adding more resources to your Cloud Composer Environment. Note: this will add to the overall cost of your Cloud Composer environment. Change/Increase the number of Airflow SchedulersAdjusting the number of Schedulers improves the Scheduler capacity and resilience of Airflow scheduling. Caution: Don't configure more than three Airflow Schedulers in your Cloud Composer environment without special consideration. If you increase the number of Schedulers, this increases the traffic to and from the Airflow database. We recommend using two Airflow Schedulers in most scenarios. Increase CPU/Memory of Airflow SchedulersYou can specify the amount of CPUs, memory, and disk space used by your environment. In this way, you can increase performance of your environment, in addition to horizontal scaling provided by using multiple workers and Schedulers. ConclusionBy following these next steps, you can maximize the benefits of Cloud Composer / Airflow, enhance the performance of your environment, and create a smoother development experience.
  17. Introduction Data scientists and engineers have made Apache Airflow a leading open-source tool to create data pipelines due to its active open-source community, familiar Python development as Directed Acyclic Graph (DAG) workflows, and an extensive library of pre-built integrations. Amazon Managed Workflows for Apache Airflow (MWAA) is a managed service for Apache Airflow that makes it easy to run Airflow on AWS without the operational burden of having to manage the underlying infrastructure. While business needs demand scalability, availability, and security, Airflow development often doesn’t require full production-ready infrastructure. Many DAGs are written locally, and when doing so, developers need to be assured that these workflows function correctly when they’re deployed to their production environment. To that end, the MWAA team created an open-source local-runner that uses many of the same library versions and runtimes as MWAA in a container that can run in a local Docker instance, along with utilities that can test and package Python requirements. There are times when a full MWAA environment isn’t required, but a local Docker container doesn’t have access to the AWS resources needed to properly develop and test end-to-end workflows. As such, the answer may be to run local-runner on a container on AWS, and by running on the same configuration as MWAA you can closely replicate your production MWAA environment in a light-weight development container. This post covers the topic of launching MWAA local-runner containers on Amazon Elastic Container Service (ECS) Fargate. Prerequisites This tutorial assumes you have an existing Amazon MWAA environment and wish to create a development container with a similar configuration. If you don’t already have an MWAA environment, then you can follow the quick start documentation here to get started. Docker on your local desktop. AWS Command Line Interface (AWS CLI). Terraform CLI (only if using Terraform). Walkthrough Clone the local-runner repository, set the environment variables, and build the image We’ll start by pulling the latest Airflow version of the Amazon MWAA local-runner to our local machine. Note: Replace <your_region> with your region and <airflow_version> with the version specified here. git clone https://github.com/aws/aws-mwaa-local-runner.git cd aws-mwaa-local-runner export ACCOUNT_ID=$(aws sts get-caller-identity --query "Account" --output text) export REGION=<your_region> export AIRFLOW_VERSION=<airflow_version> ./mwaa-local-env build-image Note: We’re expressly using the latest version of the Amazon MWAA local-runner as it supports the functionality needed for this tutorial. 2. Push your local-runner image to Amazon ECR aws ecr get-login-password --region $REGION| docker login --username AWS --password-stdin $ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com aws ecr create-repository --repository-name mwaa-local-runner --region $REGION export AIRFLOW_IMAGE=$(docker image ls | grep amazon/mwaa-local | grep $AIRFLOW_VERSION | awk '{ print $3 }') docker tag $AIRFLOW_IMAGE $ACCOUNT_ID.dkr.ecr.us-east-1.amazonaws.com/mwaa-local-runner docker push $ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com/mwaa-local-runner Modify the MWAA execution role For this example, we enable an existing MWAA role to work with Amazon ECS Fargate. As an alternative ,you may also create a new task execution role. From the Amazon MWAA console, select the link of the environment whose role you wish to use for your Amazon ECS Fargate local-runner instance. Scroll down to Permissions and select the link to open the Execution role. Select the Trust relationships tab. Choose Edit trust policy. Under Statement -> Principal -> Service add ecs-tasks.amazonaws.com. { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "ecs-tasks.amazonaws.com", "airflow.amazonaws.com", "airflow-env.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] } 6. Select Update policy. 7. Choose the Permissions tab. 8. Select the link to the MWAA-Execution-Policy. 9. Choose Edit policy. 10. Choose the JSON tab. 11. In the Statement section describing logs permissions, under Resource, add arn:aws:logs:us-east-1:012345678910:log-group:/ecs/mwaa-local-runner-task-definition:*, where 012345678910 is replaced with your account number and us-east-1 is replaced with your region. { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource": [ "arn:aws:logs:us-east-1:012345678910:log-group:/ecs/mwaa-local-runner-task-definition:*", "arn:aws:logs:us-east-1:012345678910:log-group:airflow-MWAA-Demo-IAD-1-*" ] }, 12. We also want to add permissions that allow us to execute commands on the container and pull the image from Amazon ECR. { "Effect": "Allow", "Action": [ "ssmmessages:CreateControlChannel", "ssmmessages:CreateDataChannel", "ssmmessages:OpenControlChannel", "ssmmessages:OpenDataChannel" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ecr:GetAuthorizationToken", "ecr:BatchCheckLayerAvailability", "ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } Note: Ensure that your private subnets have access to AWS Systems Manager (SSM) via Internet Gateway or PrivateLink to “com.amazonaws.us-east-1.ssmmessages” in order to enable command execution 13. Choose Review policy. 14. Choose Save changes. The creation of the Aurora Postgress Serverless instance and Amazon ECS resources can either be done using AWS CloudFormation or Terraform, as per the following sections. To create the resources required, clone the aws-samples/amazon-mwaa-samples repository. git clone https://github.com/aws-samples/amazon-mwaa-examples.git Take note of the variables from the existing MWAA environment needed to create the Amazon ECS environment (i.e., security groups, subnet IDs, Virtual Private Cloud (VPC) ID, and execution role). $ export MWAAENV=test-MwaaEnvironment $ aws mwaa get-environment --name $MWAAENV --query 'Environment.NetworkConfiguration' --region $REGION { "SecurityGroupIds": [ "sg-12345" ], "SubnetIds": [ "subnet-12345", "subnet-56789" ] } $ aws mwaa get-environment --name $MWAAENV --query 'Environment.ExecutionRoleArn' "arn:aws:iam::123456789:role/service-role/MwaaExecutionRole" AWS CloudFormation Navigate to the ECS CloudFormation directory: $ cd amazon-mwaa-examples/usecases/local-runner-on-ecs-fargate/cloudformation Update the AWS CloudFormation template input parameters file parameter-values.json in your favorite code editor (e.g., vscode). { "Parameters": { "ECSClusterName": "mwaa-local-runner-cluster", "VpcId": "your-mwaa-vpc-id", "ECRImageURI" : "123456789.dkr.ecr.us-east-1.amazonaws.com/mwaa-local-runner:latest", "SecurityGroups" : "sg-security-group-id", "PrivateSubnetIds" : "subnet-mwaapvtsubnetid1,subnet-mwaapvtsubnetid2", "PublicSubnetIds" : "subnet-mwaapublicsubnetid1,subnet-mwaapublicsubnetid2", "S3BucketURI" : "s3://your-mwaa-bucket-path", "ECSTaskExecutionRoleArn": "arn:aws:iam::123456789:role/service-role/mwaaExecutionRoleName", "AssignPublicIpToTask" : "yes" } } [Optional] Additional AWS CloudFormation template input parameter values can be overridden in either template directly (mwaa-ecs-on-fargate.yml) or supplied in input parameter file in step # 2. Deploy the AWS CloudFormation template. $ aws cloudformation deploy \ --stack-name mwaa-ecs-sandbox \ --region $REGION --template-file mwaa-on-ecs-fargate.yml \ --parameter-overrides file://parameter-values.json \ --capabilities CAPABILITY_IAM Where … Stack-name – AWS CloudFormation Stack name is e.g., mwaa-ecs-sandbox Region – where you want to install the stack. It can be sourced from env variable or replaced with the value e.g., ap-east-2, us-west-2 Template-file – CF template name in subfolder mwaa-on-ecs-fargate.yml Parameter – overrides is updated input parameter file with your environment values in step 2 It takes time (up to 40 minutes) to create required Amazon ECS and Amazon Relational Database Service (RDS) resources before showing output on successful completion as … Waiting for changeset to be created.. Waiting for stack create/update to complete Successfully created/updated stack - mwaa-ecs-sandbox To test validate the deployed environment, lets get the output parameters AWS CloudFormation template generated including Load Balancer with AWS CloudFormation describe command as: $ aws cloudformation describe-stacks --stack-name mwaa-ecs-sandbox --query 'Stacks[0].Outputs[*]' [ { "OutputKey": "LoadBalancerURL", "OutputValue": "mwaa-LoadB-S3WM6Y7GE1WA-18678459101.us-east-1.elb.amazonaws.com", "Description": "Load Balancer URL" }, { "OutputKey": "DBClusterEP", "OutputValue": "database-mwaa-local-runner.cluster-ckxppcrgfesp.us-east-1.rds.amazonaws.com", "Description": "RDS Cluster end point" } ] To test validate the local runner on Amazon ECS Fargate, go to Access Airflow Interface Step below after the Terraform steps. Terraform Navigate to the ECS Terraform directory: $ cd amazon-mwaa-examples/usecases/local-runner-on-ecs-fargate/terraform/ecs Create the tfvars file that contains all the required parameters. Replace all the parameters with the required parameters for your configuration. $ cat <<EOT>> terraform.tfvars assign_public_ip_to_task = true ecs_task_execution_role_arn = "arn:aws:iam::123456789:role/ecsTaskExecutionRole" elb_subnets = ["subnet-b06911ed", "subnet-f3bf01dd"] image_uri = "123456789.dkr.ecr.us-east-1.amazonaws.com/mwaa-local-runner:latest" mwaa_subnet_ids = ["subnet-b06911ed", "subnet-f3bf01dd"] region = "us-east-1" s3_dags_path = "s3://airflow-mwaa-test/DAG/" s3_plugins_path = "s3://airflow-mwaa-test/plugins.zip" s3_requirements_path = "s3://airflow-mwaa-test/requirements.txt" vpc_id = "vpc-e4678d9f" vpc_security_group_ids = ["sg-ad76c8e5"] EOT Initialize the Terraform modules and plan the environment to create the RDS Aurora Serverless database. The subnet IDs and security group IDs of your environment can be retrieved from the previous step. Note: Make use of the existing MWAA Environment subnets, VPC, and security groups. The security group also needs to allow traffic to itself. The security group needs allow traffic from your local machine on port 80 to access the loadbalancer URL. $ terraform init $ terraform plan Once the plan has succeeded, create the resources using the variables used in the previous step. $ terraform apply -auto-approve ... ... Outputs: database_name = "AirflowMetadata" db_passsword = <sensitive> loadbalancer_url = "mwaa-local-runner-alb-552640779.us-east-1.elb.amazonaws.com" rds_endpoint = "database-mwaa-local-runner.cluster-cqvb75x52nu8.us-east-1.rds.amazonaws.com" Note: you may face the error create: ExpiredToken: The security token included in the request is expired │ status code: 403. If you do face this error, untaint the RDS resource and re-apply. Access the Airflow user interface Direct your browser to the Application Load Balancer (ALB) URL from the AWS Cloudformation/Terraform output, being sure to preface with http (mwaa-local-runner-alb-552640779.us-east-1.elb.amazonaws.com/home). Note: If you chose an internal ALB, you’ll need to be on your VPC private subnet via VPN or similar. When presented with the Airflow user interface, provide the username admin and the default password specified as test1234. You now are in a standard Airflow deployment that closely resembles the configuration of MWAA using local-runner. Updating the environment When you stop and restart the Amazon ECS Fargate task, the dags, plugins, and requirements will be re-initialized. This can be done through a forced update: $ aws ecs update-service \ --service mwaa-local-runner-service \ --cluster mwaa-local-runner-cluster \ --region $REGION \ --force-new-deployment If you wish to do so without restarting the task, you may run the command directly via execute-command: If this is your first time running execute-command then we need to update the service to allow this functionality: $ aws ecs update-service \ --service mwaa-local-runner-service \ --cluster mwaa-local-runner-cluster \ --region $REGION \ --enable-execute-command \ --force-new-deployment When the AWS Fargate task resumes availability, we need to know the task ID: $ aws ecs list-tasks \ --cluster mwaa-local-runner-cluster \ --region $REGION This returns a JSON string that contains an ARN with the unique task ID in the format: { "taskArns": [ "arn:aws:ecs:us-east-1:012345678910:task/mwaa-local-runner-cluster/11aa22bb33cc44dd55ee66ff77889900" ] } In this case 11aa22bb33cc44dd55ee66ff77889900, which we’ll use in the next command: $ aws ecs execute-command \ --region $REGION \ --cluster mwaa-local-runner-cluster \ --task 11aa22bb33cc44dd55ee66ff77889900 \ --command "/bin/bash" \ --interactive Note: You may need to install Session Manager in order to execute commands via the AWS CLI. At this point you can run any activities you wish, such as execute the s3 sync command to update your dags: $ aws s3 sync —exact-timestamp —delete $S3_DAGS_PATH /usr/local/airflow/dags Or view your scheduler logs: $ cd /usr/local/airflow/logs/scheduler/latest;cat * When complete, type exit to return to your terminal. Prerequisites Cleaning up If no longer needed, be sure to delete your AWS Fargate cluster, task definitions, ALB, Amazon ECR repository, Aurora RDS instance, and any other items you do not wish to retain. With AWS Cloudformation, delete the stack. $ aws cloudformation delete-stack --stack-name mwaa-ecs-sandbox With terraform, run $ terraform destroy Important: Terminating resources that aren’t actively being used reduces costs and is a best practice. Not terminating your resources can result in additional charges. Conclusion In this post, we showed you how to configure Amazon MWAA open-source local-runner container image on Amazon ECS Fargate containers to provide a development and testing environment, using Amazon Aurora Serverless v2 as the database backend and execute-command on the AWS Fargate task to interact with the system. To learn more about Amazon MWAA visit the Amazon MWAA documentation. For more blog posts about Amazon MWAA, please visit the Amazon MWAA resources page. View the full article
  18. You can now create Apache Airflow version 2.5 environments on Amazon Managed Workflows for Apache Airflow (MWAA). Apache Airflow 2.5 is the latest minor release of the popular open-source tool that helps customers author, schedule, and monitor workflows. View the full article
  19. We are thrilled to introduce Data on EKS (DoEKS), a new open-source project aimed at streamlining and accelerating the process of building, deploying, and scaling data workloads on Amazon Elastic Kubernetes Service (Amazon EKS). With DoEKS, customers get access to a comprehensive range of resources including Infrastructure as Code (IaC) templates, performance benchmark reports, deployment examples, and architectures optimized for data-centric workloads aligned with AWS best practices and industry expertise. This means that customers can quickly and easily provision popular open-source data frameworks (e.g., Apache Spark, Ray, Apache Airflow, Argo Workflows, and Kubeflow) to run on Amazon EKS. Additionally, DoEKS areas of focus include distributed streaming platforms, query engines, and databases to meet the growing demands of data processing. DoEKS blueprints are made with managed AWS services and popular open-source tools to provide customers flexibility to choose the right combination of managed and self-managed components to suit their needs. For example, DoEKS includes several blueprints with Amazon EMR on EKS so customers can take advantage of optimized features like automated provisioning, scaling, faster runtimes, and debugging tools that Amazon EMR provides for running Spark applications... View the full article
  20. Hosting, orchestrating, and managing data pipelines is a complex process for any business. Google Cloud offers Cloud Composer - a fully managed workflow orchestration service - enabling businesses to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers. Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language. Apache Airflow allows users to create directed acyclic graphs (DAGs) of tasks, which can be scheduled to run at specific intervals or triggered by external events. This guide contains a generalized checklist of activities when authoring Apache Airflow DAGs. These items follow best practices determined by Google Cloud and the open source community. A collection of performant DAGs will enable Cloud Composer to work optimally and standardized authoring will help developers manage hundreds or even thousands of DAGs. Each item will benefit your Cloud Composer environment and your development process. Get Started 1. Standardize file names. Help other developers browse your collection of DAG files. a. ex) team_project_workflow_version.py 2. DAGs should be deterministic. a. A given input will always produce the same output. 3. DAGs should be idempotent. a. Triggering the DAG multiple times has the same effect/outcome. 4. Tasks should be atomic and idempotent. a. Each task should be responsible for one operation that can be re-run independently of the others. In an atomized task, a success in part of the task means a success of the entire task. 5. Simplify DAGs as much as possible. a. Simpler DAGs with fewer dependencies between tasks tend to have better scheduling performance because they have less overhead. A linear structure (e.g. A -> B -> C) is generally more efficient than a deeply nested tree structure with many dependencies. Standardize DAG Creation 6. Add an owner to your default_args. a. Determine whether you’d prefer the email address / id of a developer, or a distribution list / team name. 7. Use with DAG() as dag: instead of dag = DAG() a. Prevent the need to pass the dag object to every operator or task group. 8. Set a version in the DAG ID. a. Update the version after any code change in the DAG. b. This prevents deleted Task logs from vanishing from the UI, no-status tasks generated for old dag runs, and general confusion of when DAGs have changed. c. Airflow open-source has plans to implement versioning in the future. 9. Add tags to your DAGs. a. Help developers navigate the Airflow UI via tag filtering. b. Group DAGs by organization, team, project, application, etc. 10. Add a DAG description. a. Help other developers understand your DAG. 11. Pause your DAGs on creation. a. This will help avoid accidental DAG runs that add load to the Cloud Composer environment. 12. Set catchup=False to avoid automatic catch ups overloading your Cloud Composer Environment. 13. Set a dagrun_timeout to avoid dags not finishing, and holding Cloud Composer Environment resources or introducing collisions on retries. 14. Set SLAs at the DAG level to receive alerts for long-running DAGs. a. Airflow SLAs are always defined relative to the start time of the DAG, not to individual tasks. b. Ensure that sla_miss_timeout is less than the dagrun_timeout. c. Example: If your DAG usually takes 5 minutes to successfully finish, set the sla_miss_timeout to 7 minutes and the dagrun_timeout to 10 minutes. Determine these thresholds based on the priority of your DAGs. 15. Ensure all tasks have the same start_date by default by passing arg to DAG during instantiation 16. Use a static start_date with your DAGs. a. A dynamic start_date is misleading, and can cause failures when clearing out failed task instances and missing DAG runs. 17. Set retries as a default_arg applied at the DAG level and get more granular for specific tasks only where necessary. a. A good range is 1–4 retries. Too many retries will add unnecessary load to the Cloud Composer environment. Example putting all the above together: code_block [StructValue([(u'code', u'import airflow\r\nfrom airflow import DAG\r\nfrom airflow.operators.bash_operator import BashOperator\r\n\r\n# Define default_args dictionary to specify default parameters of the DAG, such as the start date, frequency, and other settings\r\ndefault_args = {\r\n \'owner\': \'me\',\r\n \'retries\': 2, # 2-4 retries max\r\n \'retry_delay\': timedelta(minutes=5),\r\n \'is_paused_upon_creation\': True,\r\n \'catchup\': False,\r\n}\r\n\r\n# Use the `with` statement to define the DAG object and specify the unique DAG ID and default_args dictionary\r\nwith DAG(\r\n \'dag_id_v1_0_0\', #versioned ID\r\n default_args=default_args,\r\n description=\'This is a detailed description of the DAG\', #detailed description\r\n start_date=datetime(2022, 1, 1), # Static start date\r\n dagrun_timeout=timedelta(minutes=10), #timeout specific to this dag\r\n sla_miss_timeout=timedelta(minutes=7), # sla miss less than timeout\r\n tags=[\'example\', \'versioned_dag_id\'], # tags specific to this dag\r\n schedule_interval=None,\r\n) as dag:\r\n # Define a task using the BashOperator\r\n task = BashOperator(\r\n task_id=\'bash_task\',\r\n bash_command=\'echo "Hello World"\'\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e33a31090>)])] 18. Define what should occur for each callback function. (send an email, log a context, message slack channel, etc.). Depending on the DAG you may be comfortable doing nothing. a. success b. failure c. sla_miss d. retry Example: code_block [StructValue([(u'code', u'from airflow import DAG\r\nfrom airflow.operators.python_operator import PythonOperator\r\n\r\ndefault_args = {\r\n \'owner\': \'me\',\r\n \'retries\': 2, # 2-4 retries max\r\n \'retry_delay\': timedelta(minutes=5),\r\n \'is_paused_upon_creation\': True,\r\n \'catchup\': False,\r\n}\r\n\r\ndef on_success_callback(context):\r\n # when a task in the DAG succeeds\r\n print(f"Task {context[\'task_instance_key_str\']} succeeded!")\r\n\r\ndef on_sla_miss_callback(context):\r\n # when a task in the DAG misses its SLA\r\n print(f"Task {context[\'task_instance_key_str\']} missed its SLA!")\r\n\r\ndef on_retry_callback(context):\r\n # when a task in the DAG retries\r\n print(f"Task {context[\'task_instance_key_str\']} retrying...")\r\n\r\ndef on_failure_callback(context):\r\n # when a task in the DAG fails\r\n print(f"Task {context[\'task_instance_key_str\']} failed!")\r\n\r\n# Create a DAG and set the callbacks\r\nwith DAG(\r\n \'dag_id_v1_0_0\',\r\n default_args=default_args,\r\n description=\'This is a detailed description of the DAG\',\r\n start_date=datetime(2022, 1, 1), \r\n dagrun_timeout=timedelta(minutes=10),\r\n sla_miss_timeout=timedelta(minutes=7),\r\n tags=[\'example\', \'versioned_dag_id\'],\r\n schedule_interval=None,\r\n on_success_callback=on_success_callback, # what to do on success\r\n on_sla_miss_callback=on_sla_miss_callback, # what to do on sla miss\r\n on_retry_callback=on_retry_callback, # what to do on retry\r\n on_failure_callback=on_failure_callback # what to do on failure\r\n) as dag:\r\n\r\n def example_task(**kwargs):\r\n # This is an example task that will be part of the DAG\r\n print(f"Running example task with context: {kwargs}")\r\n\r\n # Create a task and add it to the DAG\r\n task = PythonOperator(\r\n task_id="example_task",\r\n python_callable=example_task,\r\n provide_context=True,\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e33a31490>)])] 19. Use Task Groups to organize Tasks. Example: code_block [StructValue([(u'code', u'# Use the `with` statement to define the DAG object and specify the unique DAG ID and default_args dictionary\r\nwith DAG(\r\n \'example_dag\',\r\n default_args=default_args,\r\n schedule_interval=timedelta(hours=1),\r\n) as dag:\r\n # Define the first task group\r\n with TaskGroup(name=\'task_group_1\') as tg1:\r\n # Define the first task in the first task group\r\n task_1_1 = BashOperator(\r\n task_id=\'task_1_1\',\r\n bash_command=\'echo "Task 1.1"\',\r\n dag=dag,\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e42683150>)])] Reduce the Load on Your Composer Environment 20. Use Jinja Templating / Macros instead of python functions. a. Airflow's template fields allow you to incorporate values from environment variables and jinja templates into your DAGs. This helps make your DAGs idempotent (meaning multiple invocations do not change the result) and prevents unnecessary function execution during Scheduler heartbeats. b. The Airflow engine passes a few variables by default that are accessible in all templates. Contrary to best practices, the following example defines variables based on datetime Python functions: code_block [StructValue([(u'code', u"# Variables used by tasks\r\n# Bad example - Define today's and yesterday's date using datetime module\r\ntoday = datetime.today()\r\nyesterday = datetime.today() - timedelta(1)"), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229ca90>)])] If this code is in a DAG file, these functions execute on every Scheduler heartbeat, which may not be performant. Even more importantly, this doesn't produce an idempotent DAG. You can't rerun a previously failed DAG run for a past date because datetime.today() is relative to the current date, not the DAG execution date. A better way of implementing this is by using an Airflow Variable as such: code_block [StructValue([(u'code', u"# Variables used by tasks\r\n# Good example - Define yesterday's date with an Airflow variable\r\nyesterday = {{ yesterday_ds_nodash }}"), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c750>)])] 21. Avoid creating your own additional Airflow Variables. a. The metadata database stores these variables and requires database connections to retrieve them. This can affect the performance of the Cloud Composer Environment. Use Environment Variables or Google Cloud Secrets instead. 22. Avoid running all DAGs on the exact same schedules (disperse workload as much as possible). a. Prefer to use cron expressions for schedule intervals compared to airflow macros or time_deltas. This allows a more rigid schedule and it’s easier to spread out workloads throughout the day, making it easier on your Cloud Composer environment. b. Crontab.guru can help with generating specific cron expression schedules. Check out the examples here. Examples: code_block [StructValue([(u'code', u'schedule_interval="*/5 * * * *", # every 5 minutes.\r\n\r\n schedule_interval="0 */6 * * *", # at minute 0 of every 6th hour.'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c090>)])] 23. Avoid XComs except for small amounts of data. a. These add storage and introduce more connections to the database. b. Use JSON dicts as values if absolutely necessary. (one connection for many values inside dict) 24. Avoid adding unnecessary objects in the dags/ Google Cloud Storage path. a. If you must, add an .airflowignore file to GCS paths that the Airflow Scheduler does not need to parse. (sql, plug-ins, etc.) 25. Set execution timeouts for tasks. Example: code_block [StructValue([(u'code', u"# Use the `PythonOperator` to define the task\r\ntask = PythonOperator(\r\n task_id='my_task',\r\n python_callable=my_task_function,\r\n execution_timeout=timedelta(minutes=30), # Set the execution timeout to 30 minutes\r\n dag=dag,\r\n)"), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c350>)])] 26. Use Deferrable Operators over Sensors when possible. a. A deferrable operator can suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to a Trigger. As a result, while it suspends (defers), it is not taking up a worker slot and your cluster will have fewer/lesser resources wasted on idle Operators or Sensors. Example: code_block [StructValue([(u'code', u'PYSPARK_JOB = {\r\n "reference": { "project_id": "PROJECT_ID" },\r\n "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },\r\n "pyspark_job": {\r\n "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"\r\n },\r\n}\r\n\r\nDataprocSubmitJobOperator(\r\n task_id="dataproc-deferrable-example",\r\n job=PYSPARK_JOB,\r\n deferrable=True,\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229cd10>)])] 27. When using Sensors, always define mode, poke_interval, and timeout. a. Sensors require Airflow workers to run. b. Sensor checking every n seconds (i.e. poke_interval < 60)? Use mode=poke. A sensor in mode=poke will continuously poll every n seconds and hold Airflow worker resources. c. Sensor checking every n minutes (i.e. poke_interval >= 60)? Use mode=reschedule. A sensor in mode=reschedule will free up Airflow worker resources between poke intervals. Example: code_block [StructValue([(u'code', u'table_partition_sensor = BigQueryTablePartitionExistenceSensor(\r\n project_id="{{ project_id }}",\r\n task_id="bq_check_table_partition",\r\n dataset_id="{{ dataset }}",\r\n table_id="comments_partitioned",\r\n partition_id="{{ ds_nodash }}",\r\n mode="reschedule"\r\n poke_interval=60,\r\n timeout=60 * 5\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229cc50>)])] 28. Offload processing to external services (BigQuery, Dataproc, Cloud Functions, etc.) to minimize load on the Cloud Composer environment. a. These services usually have their own Airflow Operators for you to utilize. 29. Do not use sub-DAGs. a. Sub-DAGs were a feature in older versions of Airflow that allowed users to create reusable groups of tasks within DAGs. However, Airflow 2.0 deprecated sub-DAGs because they caused performance and functional issues. 30. UsePub/Subfor DAG-to-DAG dependencies. a. Here is an example for multi-cluster / dag-to-dag dependencies. 31. Make DAGs load faster. a. Avoid unnecessary “Top-level” Python code. DAGs with many imports, variables, functions outside of the DAG will introduce greater parse times for the Airflow Scheduler and in turn reduce the performance and scalability of Cloud Composer / Airflow. b. Moving imports and functions within the DAG can reduce parse time (in the order of seconds). c. Ensure that developed DAGs do not increase DAG parse times too much. Example: code_block [StructValue([(u'code', u"import airflow\r\nfrom airflow import DAG\r\nfrom airflow.operators.python_operator import PythonOperator\r\n\r\n# Define default_args dictionary\r\ndefault_args = {\r\n 'owner': 'me',\r\n 'start_date': datetime(2022, 11, 17),\r\n}\r\n\r\n# Use with statement and DAG context manager to instantiate the DAG\r\nwith DAG(\r\n 'my_dag_id',\r\n default_args=default_args,\r\n schedule_interval=timedelta(days=1),\r\n) as dag:\r\n # Import module within DAG block\r\n import my_module # DO THIS\r\n\r\n # Define function within DAG block\r\n def greet(): # DO THIS\r\n greeting = my_module.generate_greeting()\r\n print(greeting)\r\n\r\n # Use the PythonOperator to execute the function\r\n greet_task = PythonOperator(\r\n task_id='greet_task',\r\n python_callable=greet\r\n )"), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c6d0>)])] Improve Development and Testing 32. Implement “self-checks” (via Sensors or Deferrable Operators). a. To ensure that tasks are functioning as expected, you can add checks to your DAG. For example, if a task pushes data to a BigQuery partition, you can add a check in the next task to verify that the partition generates and that the data is correct. Example: code_block [StructValue([(u'code', u'# ------------------------------------------------------------\r\n # Transform source data and transfer to partitioned table\r\n # ------------------------------------------------------------\r\n\r\n create_or_replace_partitioned_table_job = BigQueryInsertJobOperator(\r\n task_id="create_or_replace_comments_partitioned_query_job",\r\n configuration={\r\n "query": {\r\n "query": \'sql/create_or_replace_comments_partitioned.sql\',\r\n "useLegacySql": False,\r\n }\r\n },\r\n location="US",\r\n )\r\n\r\n create_or_replace_partitioned_table_job_error = dummy_operator.DummyOperator(\r\n task_id="create_or_replace_partitioned_table_job_error",\r\n trigger_rule="one_failed",\r\n )\r\n\r\n create_or_replace_partitioned_table_job_ok = dummy_operator.DummyOperator(\r\n task_id="create_or_replace_partitioned_table_job_ok", trigger_rule="one_success"\r\n )\r\n\r\n # ------------------------------------------------------------\r\n # Determine if today\'s partition exists in comments_partitioned\r\n # ------------------------------------------------------------\r\n\r\n table_partition_sensor = BigQueryTablePartitionExistenceSensor(\r\n project_id="{{ project_id }}",\r\n task_id="bq_check_table_partition",\r\n dataset_id="{{ dataset }}",\r\n table_id="comments_partitioned",\r\n partition_id="{{ ds_nodash }}",\r\n mode="reschedule"\r\n poke_interval=60,\r\n timeout=60 * 5\r\n )\r\n\r\n create_or_replace_partitioned_table_job >> [\r\n create_or_replace_partitioned_table_job_error,\r\n create_or_replace_partitioned_table_job_ok,\r\n ]\r\n create_or_replace_partitioned_table_job_ok >> table_partition_sensor'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c890>)])] 33. Look for opportunities to dynamically generate similar tasks/task groups/DAGs via Python code. a. This can simplify and standardize the development process for DAGs. Example: code_block [StructValue([(u'code', u'import airflow\r\nfrom airflow import DAG\r\nfrom airflow.operators.python_operator import PythonOperator\r\n\r\ndef create_dag(dag_id, default_args, task_1_func, task_2_func):\r\n with DAG(dag_id, default_args=default_args) as dag:\r\n task_1 = PythonOperator(\r\n task_id=\'task_1\',\r\n python_callable=task_1_func,\r\n dag=dag\r\n )\r\n task_2 = PythonOperator(\r\n task_id=\'task_2\',\r\n python_callable=task_2_func,\r\n dag=dag\r\n )\r\n task_1 >> task_2\r\n return dag\r\n\r\ndef task_1_func():\r\n print("Executing task 1")\r\n\r\ndef task_2_func():\r\n print("Executing task 2")\r\n\r\ndefault_args = {\r\n \'owner\': \'me\',\r\n \'start_date\': airflow.utils.dates.days_ago(2),\r\n}\r\n\r\nmy_dag_id = create_dag(\r\n dag_id=\'my_dag_id\',\r\n default_args=default_args,\r\n task_1_func=task_1_func,\r\n task_2_func=task_2_func\r\n)'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c2d0>)])] 34. Implement unit-testing for your DAGs Example: code_block [StructValue([(u'code', u'from airflow import models\r\nfrom airflow.utils.dag_cycle_tester import test_cycle\r\n\r\n\r\ndef assert_has_valid_dag(module):\r\n """Assert that a module contains a valid DAG."""\r\n\r\n no_dag_found = True\r\n\r\n for dag in vars(module).values():\r\n if isinstance(dag, models.DAG):\r\n no_dag_found = False\r\n test_cycle(dag) # Throws if a task cycle is found.\r\n\r\n if no_dag_found:\r\n raise AssertionError(\'module does not contain a valid DAG\')'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e30435750>)])] 35. Perform local development via the Composer Local Development CLI Tool. a. Composer Local Development CLI tool streamlines Apache Airflow DAG development for Cloud Composer 2 by running an Airflow environment locally. This local Airflow environment uses an image of a specific Cloud Composer version. 36. If possible, keep a staging Cloud Composer Environment to fully test the complete DAG run before deploying in the production. a. Parameterize your DAG to change the variables, e.g., the output path of Google Cloud Storage operation or the database used to read the configuration. Do not hard code values inside the DAG and then change them manually according to the environment. 37. Use a Python linting tool such as Pylint or Flake8 for standardized code. 38. Use a Python formatting tool such as Black or YAPF for standardized code. Next Steps In summary, this blog provides a comprehensive checklist of best practices for developing Airflow DAGs for use in Google Cloud Composer. By following these best practices, developers can help ensure that Cloud Composer is working optimally and that their DAGs are well-organized and easy to manage. For more information about Cloud Composer, check out the following related blog posts and documentation pages: What is Cloud Composer? Deutsche Bank uses Cloud Composer workload automation Using Cloud Build to keep Airflow Operators up-to-date in your Composer environment Writing DAGs (workflows) | Cloud Composer
  21. You can now create Apache Airflow version 2.4 environments on Amazon Managed Workflows for Apache Airflow (MWAA) with Python 3.10 support. View the full article
  22. Amazon Managed Workflows for Apache Airflow (MWAA) now provides Amazon CloudWatch metrics for container, database, and queue utilization. View the full article
  23. Today, Amazon Web Services (AWS) announced that Amazon Managed Workflows for Apache Airflow (MWAA) is now HIPAA (Health Insurance Portability and Accountability Act) eligible. View the full article
  24. Get insights into the day-to-day challenges of builders. In this issue, Peter Reitz from our partner tecRacer talks about how to build Serverless ETL (extract, transform and load) pipelines with the help of Amazon Managed Workflows for Apache Airflow (MWAA) and Amazon Athena. /images/2022/06/diary.jpg If you prefer a video or podcast instead of reading, here you go. JavaScript is disabled. Please visit YouTube.com to watch the video. Do you prefer listening to a podcast episode over reading a blog post? Here you go! What sparked your interest in cloud computing? Computers have always held a great fascination for me. I taught myself how to program. That’s how I ended up working as a web developer during my economics studies. When I first stumbled upon Amazon Web Services, I was intrigued by the technology and wide variety of services. How did you grow into the role of a cloud consultant? After completing my economics degree, I was looking for a job. By chance, a job ad drew my attention to a vacancy for a cloud consultant at tecRacer. To be honest, my skills didn’t match the requirements very well. But because I found the topic exciting, I applied anyway. Right from the job interview, I felt right at home at tecRacer in Duisburg. Since I had no experience with AWS, there was a lot to learn within the first months. My first goal was to achieve the AWS Certified Solutions Architect - Associate certification. The entire team supported and motivated me during this intensive learning phase. After that, I joined a small team working on a project for one of our consulting clients. This allowed me to gain practical experience at a very early stage. What does your day-to-day work as a cloud consultant at tecRacer look like? As a cloud consultant, I work on projects for our clients. I specialize in machine learning and data analytics. Since tecRacer has a 100% focus on AWS, I invest in my knowledge of related AWS services like S3, Athena, EMR, SageMaker, and more. I work remotely or at our office in Hamburg and am at the customer’s site every now and then. For example, to analyze the requirements for a project in workshops. What project are you currently working on? I’m currently working on building an ETL pipeline. Several data providers upload CSV files to an S3 bucket. My client’s challenge is to extract and transform 3 billion data points and store them in a way that allows efficient data analytics. This process can be roughly described as follows. Fetch CSV files from S3. Parse CSV files. Filter, transform, and enrich columns. Partition data to enable efficient queries in the future. Transform to a file format optimized for data analytics. Upload data to S3. I’ve been implementing similar data pipelines in the past. My preferred solution consists of the following building blocks: S3 storing the input data. Apache Airflow to orchestrate the ETL pipeline. Athena to extract, transform, and load the data. S3 storing the output data. /images/2022/09/serverless-etl-airflow-athena.png How do you build an ETL pipeline based on Athena? Amazon Athena enables me to query data stored on S3 on-demand using SQL. The remarkable thing about Athena is that the service is serverless, which means we only have to pay for the processed data when running a query. There are no idle costs except the S3 storage costs. As mentioned before, in my current project, the challenge is to extract data from CSV files and store the data in a way that is optimized for data analytics. My approach is transforming the CSV files into more efficient formats such as Parquet. The Parquet file format is designed for efficient data analysis and organizes data in rows, not columns, as CSV does. Therefore, Athena skips fetching and processing all other columns when querying only a subset of the available columns. Also, Parquet compresses the data to minimize storage and network consumption. I like using Athena for ETL jobs because of its simplicity and pay-per-use pricing mode. The CREATE TABLE AS SELECT (CTAS) statement implements ETL as described in the following: Extract: Load data from CSV files stored on S3 (SELECT FROM "awsmp"."cas_daily_business_usage_by_instance_type") Transform: Filter and enrich columns. (SELECT product_code, SUM(estimated_revenue) AS revenue, concat(year, '-', month, '-01') as date) Load: Store results in Parquet file format on S3 (CREATE TABLE monthly_recurring_revenue). CREATE TABLE monthly_recurring_revenue WITH ( format = 'Parquet', external_location = 's3://demo-datalake/monthly_recurring_revenue/', partitioned_by = ARRAY['date'] ) AS SELECT product_code, SUM(estimated_revenue) AS revenue, concat(year, '-', month, '-01') as date FROM ( SELECT year, month, day, product_code, estimated_revenue FROM "awsmp"."cas_daily_business_usage_by_instance_type" ORDER BY year, month, day ) GROUP BY year, month, product_code ORDER BY year, month, product_code Besides converting the data into the Parquet file format, the statement also partitions the data. This means the keys of the objects start with something like date=2022-08-01, which allows Athena to only fetch relevant files from S3 when querying by date. Why did you choose Athena instead of Amazon EMR to build an ETL pipeline? I’ve been using EMR for some projects in the past. But nowadays, I prefer adding Athena to the mix, wherever feasible. That’s because compared to EMR, Athena is a lightweight solution. Using Athena is less complex than running jobs on EMR. For example, I prefer using SQL to transform data instead of writing Python code. It takes me less time to build an ETL pipeline with Athena compared to EMR. Also, accessing Athena is much more convenient as all functionality is available via the AWS Management Console and API. In contrast, it requires a VPN connection to interact efficiently with EMR when developing a pipeline. I prefer a serverless solution due to its cost implications. With Athena, our customer only pays for the processed data. There are no idling costs. As an example, I migrated a workload from EMR to Athena, which reduced costs from $3,000 to $100. What is Apache Airflow? Apache Airflow is a popular open-source project providing a workflow management platform for data engineering pipelines. As a data engineer, I describe an ETL pipeline in Python as a directed acyclic graph (DAG). Here is a straightforward directed acyclic graph (DAG). The workflow consists of two steps: Creating an Athena query. Awaiting results from the Athena query. from airflow.models import DAG from airflow.providers.amazon.aws.operators.athena import AWSAthenaOperator from airflow.providers.amazon.aws.sensors.athena import AthenaSensor with DAG(dag_id='demo') as dag: read_table = AWSAthenaOperator( task_id='read_table', query='SELECT * FROM "sampledb"."elb_logs" limit 10;', output_location='s3://aws-athena-query-results-486555357186-eu-west-1/airflow/', database='sampledb' ) await_query = AthenaSensor( task_id='await_query', query_execution_id=read_table.output, ) Airflow allows you to run a DAG manually, via an API, or based on a schedule. /images/2022/09/airflow-demo.png Airflow consists of multiple components: Scheduler Worker Web Server PostgreSQL database Redis in-memory database Operating such a distributed system is complex. Luckily, AWS provides a managed service called Amazon Managed Workflows for Apache Airflow (MWAA), which we use in my current project. What does your development workflow for Airflow DAGs look like? We built a deployment pipeline for the project I’m currently involved in. So you can think of developing the ETL pipeline like any other software delivery process. The engineer pushes changes of DAGs to a Git repository. The deployment pipeline validates the Python code. The deployment pipeline spins up a container based on aws-mwaa-local-runner and verifies whether all dependencies are working as expected. The deployment pipeline runs an integration test. The deployment pipeline uploads the DAGs to S3. Airflow refreshes the DAGs. The deployment pipeline significantly speeds up the ETL pipeline’s development process, as many issues are spotted before deploying to AWS. Why do you use Airflow instead of AWS Step Functions? In general, Airflow is similar to AWS Step Functions. However, there are two crucial differences. First, Airflow is a popular choice for building ETL pipelines. Therefore, many engineers in the field of data analytics have already gained experience with the tool. And besides, the open-source community creates many integrations that help build ETL pipelines. Second, unlike Step Functions, Airflow is not only available on AWS. Being able to move ETL pipelines to another cloud vendor or on-premises is a plus. Why do you specialize in machine learning and data analytics? I enjoy working with data. Being able to answer questions by analyzing huge amounts of data and enabling better decisions backed by data motivates me. Also, I’m a huge fan of Athena. It’s one of the most powerful services offered by AWS. On top of that, machine learning, in general, and reinforced learning, in particular, fascinates me, as it allows us to recognize correlations that were not visible before. Would you like to join Peter’s team to implement solutions with the help of machine learning and data analytics? tecRacer is hiring Cloud Consultants focusing on machine learning and data analytics. Apply now! View the full article
  25. You can now launch Apache Airflow 2.0 environments on Amazon Managed Workflows for Apache Airflow (MWAA). Apache Airflow 2.0 is the latest version of the popular open-source tool that helps customers author, schedule, and monitor workflows. View the full article
  • Forum Statistics

    70.4k
    Total Topics
    68.3k
    Total Posts
×
×
  • Create New...