Search the Community
Showing results for tags 'cloud composer'.
-
Customers can have numerous data analytics teams within a single organization that each require workflow or data pipeline orchestration. It is important to evaluate the tenancy design of your implementation to improve the efficiency, scalability, and security of your organization. Google Cloud offers Cloud Composer, a fully managed workflow orchestration service built on Apache Airflow offering end-to-end integration with Google Cloud products including BigQuery, Dataflow, Dataproc, Datastore, Cloud Storage, Pub/Sub, and Vertex AI. This guide compares the pros and cons of different tenancy strategies for Cloud Composer. We’ll evaluate the differences between a multi-tenant Composer strategy versus a single-tenant Composer strategy. In other words, a single shared Composer environment for all data teams vs. a Composer environment for each data team... View the full article
-
This comprehensive blog presents various approaches for monitoring, troubleshooting, and minimizing DAG parse times, leading to notable performance improvements in Cloud Composer / Airflow: Increase environment scalability by efficiently handling larger workloads and accommodating more DAGs. Improve environment stability by limiting the chance of task overlaps and resource contention. Enhance productivity and overall efficiency for developers through faster feedback loops and reduced processing time. A low DAG parse time serves as a reliable indicator of a healthy Cloud Composer / Airflow environment Getting startedWhat is an Airflow DAG?An Airflow DAG (Directed Acyclic Graph) is a collection of tasks that are organized in a way that reflects their relationships and dependencies. DAGs are defined in Python scripts, and they are the core concept of Airflow. A DAG defines four things: The tasks that need to be run The order in which the tasks need to be run The dependencies between the tasks The schedule for running the tasks DAGs are a powerful way to define and manage complex workflows. They can be used to automate tasks, schedule tasks, and monitor the execution of tasks. What is the Airflow Scheduler?The Airflow Scheduler monitors all tasks and DAGs, then triggers the task instances once dependent tasks are complete. Once every 30 seconds by default, the Scheduler collects DAG parsing results and checks whether any active tasks can be triggered. What is the DAG Processor? As of Airflow 2.3.0, the DAG Processor is separate from the Airflow Scheduler. For more information about this change, check out AIP-43 DAG Processor separation. Monitoring and alertingMonitoring DAG parse timesIn Google Cloud console you can use the Monitoring page and the Logs tab to inspect DAG parse times. On Cloud Composer environment Run the following commands to check DAG parse times on the Cloud Composer environment.: code_block[StructValue([(u'code', u'gcloud composer environments run $ENVIRONMENT_NAME \\\r\n --location $LOCATION \\\r\n dags report'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebec548d7d0>)])]Locally using time command code_block[StructValue([(u'code', u'time python airflow/example_dags/example.py'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf205402d0>)])]Make sure to run it several times in succession to account for caching effects. Compare the results before and after the optimization (in the same conditions - using the same machine, environment etc.) in order to assess the impact of any optimization. Sample output: code_block[StructValue([(u'code', u'real 0m0.699s\r\n user 0m0.590s\r\n sys 0m0.108s'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20540510>)])]The important metric is the "real time" - which tells you how long time it took to process the DAG. Note that when loading the file this way, you are starting a new interpreter so there is an initial loading time that is not present when Airflow parses the DAG. You can assess the time of initialization by running: code_block[StructValue([(u'code', u'time python -c'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20540b50>)])]Result: code_block[StructValue([(u'code', u'real 0m0.073s\r\n user 0m0.037s\r\n sys 0m0.039s'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20fa80d0>)])]In this case the initial interpreter startup time is ~ 0.07s which is about 10% of time needed to parse the example_python_operator.py above so the actual parsing time is about ~ 0.62 s for the example DAG. What is an ideal parse time metric?On the Monitoring dashboard, in the DAG Statistics section, observe graphs for the total DAG parse time. If the number exceeds about 10 seconds, your Schedulers might be overloaded with DAG parsing and cannot run DAGs effectively. How can I receive alerts for long parse times?You can create alerting policies to monitor the values of metrics and to notify you when those metrics violate a condition. This can also be done through the Composer Monitoring Dashboard. DAG code optimizationGeneralized DAG code improvementsCheck out Optimize Cloud Composer via Better Airflow DAGs to view a generalized checklist of activities when authoring Apache Airflow DAGs. These items follow best practices determined by Google Cloud and the open source community. A collection of performant DAGs will enable Cloud Composer to work optimally and standardized authoring will help developers manage hundreds or thousands of DAGs. Each item will benefit your Cloud Composer environment and your development process. The two highest priorities should be limiting top-level code and avoiding the use of variables/xcoms in top-level code. Limit top-level codeFollow established best practices. You should avoid writing the top level code which is not necessary to create Operators and build DAG relations between them. This is because of the design decision for the Scheduler of Airflow and the impact the top-level code parsing speed on both performance and scalability of Airflow. One of the important factors impacting DAG loading time, that might be overlooked by Python developers is that top-level imports might take surprisingly a lot of time (in the order of seconds) and they can generate a lot of overhead and this can be easily avoided by converting them to local imports inside Python callables for example. Avoid the use of Variables and Xcoms in top-level codeIf you are using Variable.get() in top level code, every time the .py file is parsed, Airflow executes a Variable.get() which opens a session to the DB. This can dramatically slow down parse times. Use JSON dictionaries or Jinja templates as values if absolutely necessary. (one connection for many values inside dict) DAG folder cleanupRemove unused DAGs, unnecessary files from the DAGs folderAirflow Scheduler wastes time and resources parsing files in DAGs folder that aren’t used. Use .airflowignore An .airflowignore file specifies the directories or files in DAG_FOLDER or PLUGINS_FOLDER that Airflow should intentionally ignore. Airflow supports two syntax flavors for patterns in the file, as specified by the DAG_IGNORE_FILE_SYNTAX configuration parameter (added in Airflow 2.3): regexp and glob. More files ignored = less files parsed by Airflow Scheduler. Review paused DAGsPaused DAGs are still continuously parsed by the Airflow Scheduler. Determine why each DAG is paused and whether it should be removed, ignored, or unpaused. Airflow configurationsmin_file_process_intervalThe Scheduler parses your DAG files every min_file_process_interval number of seconds. Airflow starts using your updated DAG code only after this interval ends. Consider increasing this interval when you have a high number of DAGs that do not change too often, or observe a high Scheduler load in general. Consider decreasing this interval to parse your DAGs faster. Updates to DAGs are reflected after this interval. Keeping this number low will increase CPU usage. For example, if you have >1000 dag files, raise the min_file_process_interval to 600 (10 minutes), 6000 (100 minutes), or a higher value. dag_dir_list_intervalDag_dir_list_interval determines how often Airflow should scan the DAGs directory in seconds. A lower value here means that new DAGs will be processed faster, but this comes at the cost of CPU usage. Increasing the DAG directory listing interval reduces the Scheduler load associated with discovery of new DAGs in the environment's bucket. Consider increasing this interval if you deploy new DAGs infrequently. Consider decreasing this interval if you want Airflow to react faster to newly deployed DAG files. parsing_processesThe DAG Processor can run multiple processes in parallel to parse DAGs, and parsing_processes (formerly max_threads) determines how many of those processes can run in parallel. Increasing this value can help to serialize DAGs if you have a large number of them. By default, this is set to 2. code_block[StructValue([(u'code', u'[scheduler]\r\nparsing_processes = <NUMBER_OF_CORES_IN_MACHINE - 1>'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3ebf20fa81d0>)])]file_parsing_sort_mode Evaluate the following file_parsing_sort_mode options if you are running more than one Airflow Scheduler. The Scheduler will list and sort the dag files to decide the parsing order. modified_time: Sort by modified time of the files. This is useful on a large scale to parse the recently modified DAGs first. (default) random_seeded_by_host: Sort randomly across multiple Schedulers but with the same order on the same host. This is useful when running with Scheduler in HA mode where each Scheduler can parse different DAG files. alphabetical: Sort by filename When there are a lot (>1000) of dags files, you can prioritize parsing of new files by changing the file_parsing_sort_mode to modified_time. Cloud Composer upgradesIf you’ve gotten this far and still observe long DAG parse times, you’ll need to consider adding more resources to your Cloud Composer Environment. Note: this will add to the overall cost of your Cloud Composer environment. Change/Increase the number of Airflow SchedulersAdjusting the number of Schedulers improves the Scheduler capacity and resilience of Airflow scheduling. Caution: Don't configure more than three Airflow Schedulers in your Cloud Composer environment without special consideration. If you increase the number of Schedulers, this increases the traffic to and from the Airflow database. We recommend using two Airflow Schedulers in most scenarios. Increase CPU/Memory of Airflow SchedulersYou can specify the amount of CPUs, memory, and disk space used by your environment. In this way, you can increase performance of your environment, in addition to horizontal scaling provided by using multiple workers and Schedulers. ConclusionBy following these next steps, you can maximize the benefits of Cloud Composer / Airflow, enhance the performance of your environment, and create a smoother development experience.
-
Hosting, orchestrating, and managing data pipelines is a complex process for any business. Google Cloud offers Cloud Composer - a fully managed workflow orchestration service - enabling businesses to create, schedule, monitor, and manage workflows that span across clouds and on-premises data centers. Cloud Composer is built on the popular Apache Airflow open source project and operates using the Python programming language. Apache Airflow allows users to create directed acyclic graphs (DAGs) of tasks, which can be scheduled to run at specific intervals or triggered by external events. This guide contains a generalized checklist of activities when authoring Apache Airflow DAGs. These items follow best practices determined by Google Cloud and the open source community. A collection of performant DAGs will enable Cloud Composer to work optimally and standardized authoring will help developers manage hundreds or even thousands of DAGs. Each item will benefit your Cloud Composer environment and your development process. Get Started 1. Standardize file names. Help other developers browse your collection of DAG files. a. ex) team_project_workflow_version.py 2. DAGs should be deterministic. a. A given input will always produce the same output. 3. DAGs should be idempotent. a. Triggering the DAG multiple times has the same effect/outcome. 4. Tasks should be atomic and idempotent. a. Each task should be responsible for one operation that can be re-run independently of the others. In an atomized task, a success in part of the task means a success of the entire task. 5. Simplify DAGs as much as possible. a. Simpler DAGs with fewer dependencies between tasks tend to have better scheduling performance because they have less overhead. A linear structure (e.g. A -> B -> C) is generally more efficient than a deeply nested tree structure with many dependencies. Standardize DAG Creation 6. Add an owner to your default_args. a. Determine whether you’d prefer the email address / id of a developer, or a distribution list / team name. 7. Use with DAG() as dag: instead of dag = DAG() a. Prevent the need to pass the dag object to every operator or task group. 8. Set a version in the DAG ID. a. Update the version after any code change in the DAG. b. This prevents deleted Task logs from vanishing from the UI, no-status tasks generated for old dag runs, and general confusion of when DAGs have changed. c. Airflow open-source has plans to implement versioning in the future. 9. Add tags to your DAGs. a. Help developers navigate the Airflow UI via tag filtering. b. Group DAGs by organization, team, project, application, etc. 10. Add a DAG description. a. Help other developers understand your DAG. 11. Pause your DAGs on creation. a. This will help avoid accidental DAG runs that add load to the Cloud Composer environment. 12. Set catchup=False to avoid automatic catch ups overloading your Cloud Composer Environment. 13. Set a dagrun_timeout to avoid dags not finishing, and holding Cloud Composer Environment resources or introducing collisions on retries. 14. Set SLAs at the DAG level to receive alerts for long-running DAGs. a. Airflow SLAs are always defined relative to the start time of the DAG, not to individual tasks. b. Ensure that sla_miss_timeout is less than the dagrun_timeout. c. Example: If your DAG usually takes 5 minutes to successfully finish, set the sla_miss_timeout to 7 minutes and the dagrun_timeout to 10 minutes. Determine these thresholds based on the priority of your DAGs. 15. Ensure all tasks have the same start_date by default by passing arg to DAG during instantiation 16. Use a static start_date with your DAGs. a. A dynamic start_date is misleading, and can cause failures when clearing out failed task instances and missing DAG runs. 17. Set retries as a default_arg applied at the DAG level and get more granular for specific tasks only where necessary. a. A good range is 1–4 retries. Too many retries will add unnecessary load to the Cloud Composer environment. Example putting all the above together: code_block [StructValue([(u'code', u'import airflow\r\nfrom airflow import DAG\r\nfrom airflow.operators.bash_operator import BashOperator\r\n\r\n# Define default_args dictionary to specify default parameters of the DAG, such as the start date, frequency, and other settings\r\ndefault_args = {\r\n \'owner\': \'me\',\r\n \'retries\': 2, # 2-4 retries max\r\n \'retry_delay\': timedelta(minutes=5),\r\n \'is_paused_upon_creation\': True,\r\n \'catchup\': False,\r\n}\r\n\r\n# Use the `with` statement to define the DAG object and specify the unique DAG ID and default_args dictionary\r\nwith DAG(\r\n \'dag_id_v1_0_0\', #versioned ID\r\n default_args=default_args,\r\n description=\'This is a detailed description of the DAG\', #detailed description\r\n start_date=datetime(2022, 1, 1), # Static start date\r\n dagrun_timeout=timedelta(minutes=10), #timeout specific to this dag\r\n sla_miss_timeout=timedelta(minutes=7), # sla miss less than timeout\r\n tags=[\'example\', \'versioned_dag_id\'], # tags specific to this dag\r\n schedule_interval=None,\r\n) as dag:\r\n # Define a task using the BashOperator\r\n task = BashOperator(\r\n task_id=\'bash_task\',\r\n bash_command=\'echo "Hello World"\'\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e33a31090>)])] 18. Define what should occur for each callback function. (send an email, log a context, message slack channel, etc.). Depending on the DAG you may be comfortable doing nothing. a. success b. failure c. sla_miss d. retry Example: code_block [StructValue([(u'code', u'from airflow import DAG\r\nfrom airflow.operators.python_operator import PythonOperator\r\n\r\ndefault_args = {\r\n \'owner\': \'me\',\r\n \'retries\': 2, # 2-4 retries max\r\n \'retry_delay\': timedelta(minutes=5),\r\n \'is_paused_upon_creation\': True,\r\n \'catchup\': False,\r\n}\r\n\r\ndef on_success_callback(context):\r\n # when a task in the DAG succeeds\r\n print(f"Task {context[\'task_instance_key_str\']} succeeded!")\r\n\r\ndef on_sla_miss_callback(context):\r\n # when a task in the DAG misses its SLA\r\n print(f"Task {context[\'task_instance_key_str\']} missed its SLA!")\r\n\r\ndef on_retry_callback(context):\r\n # when a task in the DAG retries\r\n print(f"Task {context[\'task_instance_key_str\']} retrying...")\r\n\r\ndef on_failure_callback(context):\r\n # when a task in the DAG fails\r\n print(f"Task {context[\'task_instance_key_str\']} failed!")\r\n\r\n# Create a DAG and set the callbacks\r\nwith DAG(\r\n \'dag_id_v1_0_0\',\r\n default_args=default_args,\r\n description=\'This is a detailed description of the DAG\',\r\n start_date=datetime(2022, 1, 1), \r\n dagrun_timeout=timedelta(minutes=10),\r\n sla_miss_timeout=timedelta(minutes=7),\r\n tags=[\'example\', \'versioned_dag_id\'],\r\n schedule_interval=None,\r\n on_success_callback=on_success_callback, # what to do on success\r\n on_sla_miss_callback=on_sla_miss_callback, # what to do on sla miss\r\n on_retry_callback=on_retry_callback, # what to do on retry\r\n on_failure_callback=on_failure_callback # what to do on failure\r\n) as dag:\r\n\r\n def example_task(**kwargs):\r\n # This is an example task that will be part of the DAG\r\n print(f"Running example task with context: {kwargs}")\r\n\r\n # Create a task and add it to the DAG\r\n task = PythonOperator(\r\n task_id="example_task",\r\n python_callable=example_task,\r\n provide_context=True,\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e33a31490>)])] 19. Use Task Groups to organize Tasks. Example: code_block [StructValue([(u'code', u'# Use the `with` statement to define the DAG object and specify the unique DAG ID and default_args dictionary\r\nwith DAG(\r\n \'example_dag\',\r\n default_args=default_args,\r\n schedule_interval=timedelta(hours=1),\r\n) as dag:\r\n # Define the first task group\r\n with TaskGroup(name=\'task_group_1\') as tg1:\r\n # Define the first task in the first task group\r\n task_1_1 = BashOperator(\r\n task_id=\'task_1_1\',\r\n bash_command=\'echo "Task 1.1"\',\r\n dag=dag,\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e42683150>)])] Reduce the Load on Your Composer Environment 20. Use Jinja Templating / Macros instead of python functions. a. Airflow's template fields allow you to incorporate values from environment variables and jinja templates into your DAGs. This helps make your DAGs idempotent (meaning multiple invocations do not change the result) and prevents unnecessary function execution during Scheduler heartbeats. b. The Airflow engine passes a few variables by default that are accessible in all templates. Contrary to best practices, the following example defines variables based on datetime Python functions: code_block [StructValue([(u'code', u"# Variables used by tasks\r\n# Bad example - Define today's and yesterday's date using datetime module\r\ntoday = datetime.today()\r\nyesterday = datetime.today() - timedelta(1)"), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229ca90>)])] If this code is in a DAG file, these functions execute on every Scheduler heartbeat, which may not be performant. Even more importantly, this doesn't produce an idempotent DAG. You can't rerun a previously failed DAG run for a past date because datetime.today() is relative to the current date, not the DAG execution date. A better way of implementing this is by using an Airflow Variable as such: code_block [StructValue([(u'code', u"# Variables used by tasks\r\n# Good example - Define yesterday's date with an Airflow variable\r\nyesterday = {{ yesterday_ds_nodash }}"), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c750>)])] 21. Avoid creating your own additional Airflow Variables. a. The metadata database stores these variables and requires database connections to retrieve them. This can affect the performance of the Cloud Composer Environment. Use Environment Variables or Google Cloud Secrets instead. 22. Avoid running all DAGs on the exact same schedules (disperse workload as much as possible). a. Prefer to use cron expressions for schedule intervals compared to airflow macros or time_deltas. This allows a more rigid schedule and it’s easier to spread out workloads throughout the day, making it easier on your Cloud Composer environment. b. Crontab.guru can help with generating specific cron expression schedules. Check out the examples here. Examples: code_block [StructValue([(u'code', u'schedule_interval="*/5 * * * *", # every 5 minutes.\r\n\r\n schedule_interval="0 */6 * * *", # at minute 0 of every 6th hour.'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c090>)])] 23. Avoid XComs except for small amounts of data. a. These add storage and introduce more connections to the database. b. Use JSON dicts as values if absolutely necessary. (one connection for many values inside dict) 24. Avoid adding unnecessary objects in the dags/ Google Cloud Storage path. a. If you must, add an .airflowignore file to GCS paths that the Airflow Scheduler does not need to parse. (sql, plug-ins, etc.) 25. Set execution timeouts for tasks. Example: code_block [StructValue([(u'code', u"# Use the `PythonOperator` to define the task\r\ntask = PythonOperator(\r\n task_id='my_task',\r\n python_callable=my_task_function,\r\n execution_timeout=timedelta(minutes=30), # Set the execution timeout to 30 minutes\r\n dag=dag,\r\n)"), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c350>)])] 26. Use Deferrable Operators over Sensors when possible. a. A deferrable operator can suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to a Trigger. As a result, while it suspends (defers), it is not taking up a worker slot and your cluster will have fewer/lesser resources wasted on idle Operators or Sensors. Example: code_block [StructValue([(u'code', u'PYSPARK_JOB = {\r\n "reference": { "project_id": "PROJECT_ID" },\r\n "placement": { "cluster_name": "PYSPARK_CLUSTER_NAME" },\r\n "pyspark_job": {\r\n "main_python_file_uri": "gs://dataproc-examples/pyspark/hello-world/hello-world.py"\r\n },\r\n}\r\n\r\nDataprocSubmitJobOperator(\r\n task_id="dataproc-deferrable-example",\r\n job=PYSPARK_JOB,\r\n deferrable=True,\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229cd10>)])] 27. When using Sensors, always define mode, poke_interval, and timeout. a. Sensors require Airflow workers to run. b. Sensor checking every n seconds (i.e. poke_interval < 60)? Use mode=poke. A sensor in mode=poke will continuously poll every n seconds and hold Airflow worker resources. c. Sensor checking every n minutes (i.e. poke_interval >= 60)? Use mode=reschedule. A sensor in mode=reschedule will free up Airflow worker resources between poke intervals. Example: code_block [StructValue([(u'code', u'table_partition_sensor = BigQueryTablePartitionExistenceSensor(\r\n project_id="{{ project_id }}",\r\n task_id="bq_check_table_partition",\r\n dataset_id="{{ dataset }}",\r\n table_id="comments_partitioned",\r\n partition_id="{{ ds_nodash }}",\r\n mode="reschedule"\r\n poke_interval=60,\r\n timeout=60 * 5\r\n )'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229cc50>)])] 28. Offload processing to external services (BigQuery, Dataproc, Cloud Functions, etc.) to minimize load on the Cloud Composer environment. a. These services usually have their own Airflow Operators for you to utilize. 29. Do not use sub-DAGs. a. Sub-DAGs were a feature in older versions of Airflow that allowed users to create reusable groups of tasks within DAGs. However, Airflow 2.0 deprecated sub-DAGs because they caused performance and functional issues. 30. UsePub/Subfor DAG-to-DAG dependencies. a. Here is an example for multi-cluster / dag-to-dag dependencies. 31. Make DAGs load faster. a. Avoid unnecessary “Top-level” Python code. DAGs with many imports, variables, functions outside of the DAG will introduce greater parse times for the Airflow Scheduler and in turn reduce the performance and scalability of Cloud Composer / Airflow. b. Moving imports and functions within the DAG can reduce parse time (in the order of seconds). c. Ensure that developed DAGs do not increase DAG parse times too much. Example: code_block [StructValue([(u'code', u"import airflow\r\nfrom airflow import DAG\r\nfrom airflow.operators.python_operator import PythonOperator\r\n\r\n# Define default_args dictionary\r\ndefault_args = {\r\n 'owner': 'me',\r\n 'start_date': datetime(2022, 11, 17),\r\n}\r\n\r\n# Use with statement and DAG context manager to instantiate the DAG\r\nwith DAG(\r\n 'my_dag_id',\r\n default_args=default_args,\r\n schedule_interval=timedelta(days=1),\r\n) as dag:\r\n # Import module within DAG block\r\n import my_module # DO THIS\r\n\r\n # Define function within DAG block\r\n def greet(): # DO THIS\r\n greeting = my_module.generate_greeting()\r\n print(greeting)\r\n\r\n # Use the PythonOperator to execute the function\r\n greet_task = PythonOperator(\r\n task_id='greet_task',\r\n python_callable=greet\r\n )"), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c6d0>)])] Improve Development and Testing 32. Implement “self-checks” (via Sensors or Deferrable Operators). a. To ensure that tasks are functioning as expected, you can add checks to your DAG. For example, if a task pushes data to a BigQuery partition, you can add a check in the next task to verify that the partition generates and that the data is correct. Example: code_block [StructValue([(u'code', u'# ------------------------------------------------------------\r\n # Transform source data and transfer to partitioned table\r\n # ------------------------------------------------------------\r\n\r\n create_or_replace_partitioned_table_job = BigQueryInsertJobOperator(\r\n task_id="create_or_replace_comments_partitioned_query_job",\r\n configuration={\r\n "query": {\r\n "query": \'sql/create_or_replace_comments_partitioned.sql\',\r\n "useLegacySql": False,\r\n }\r\n },\r\n location="US",\r\n )\r\n\r\n create_or_replace_partitioned_table_job_error = dummy_operator.DummyOperator(\r\n task_id="create_or_replace_partitioned_table_job_error",\r\n trigger_rule="one_failed",\r\n )\r\n\r\n create_or_replace_partitioned_table_job_ok = dummy_operator.DummyOperator(\r\n task_id="create_or_replace_partitioned_table_job_ok", trigger_rule="one_success"\r\n )\r\n\r\n # ------------------------------------------------------------\r\n # Determine if today\'s partition exists in comments_partitioned\r\n # ------------------------------------------------------------\r\n\r\n table_partition_sensor = BigQueryTablePartitionExistenceSensor(\r\n project_id="{{ project_id }}",\r\n task_id="bq_check_table_partition",\r\n dataset_id="{{ dataset }}",\r\n table_id="comments_partitioned",\r\n partition_id="{{ ds_nodash }}",\r\n mode="reschedule"\r\n poke_interval=60,\r\n timeout=60 * 5\r\n )\r\n\r\n create_or_replace_partitioned_table_job >> [\r\n create_or_replace_partitioned_table_job_error,\r\n create_or_replace_partitioned_table_job_ok,\r\n ]\r\n create_or_replace_partitioned_table_job_ok >> table_partition_sensor'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c890>)])] 33. Look for opportunities to dynamically generate similar tasks/task groups/DAGs via Python code. a. This can simplify and standardize the development process for DAGs. Example: code_block [StructValue([(u'code', u'import airflow\r\nfrom airflow import DAG\r\nfrom airflow.operators.python_operator import PythonOperator\r\n\r\ndef create_dag(dag_id, default_args, task_1_func, task_2_func):\r\n with DAG(dag_id, default_args=default_args) as dag:\r\n task_1 = PythonOperator(\r\n task_id=\'task_1\',\r\n python_callable=task_1_func,\r\n dag=dag\r\n )\r\n task_2 = PythonOperator(\r\n task_id=\'task_2\',\r\n python_callable=task_2_func,\r\n dag=dag\r\n )\r\n task_1 >> task_2\r\n return dag\r\n\r\ndef task_1_func():\r\n print("Executing task 1")\r\n\r\ndef task_2_func():\r\n print("Executing task 2")\r\n\r\ndefault_args = {\r\n \'owner\': \'me\',\r\n \'start_date\': airflow.utils.dates.days_ago(2),\r\n}\r\n\r\nmy_dag_id = create_dag(\r\n dag_id=\'my_dag_id\',\r\n default_args=default_args,\r\n task_1_func=task_1_func,\r\n task_2_func=task_2_func\r\n)'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e4229c2d0>)])] 34. Implement unit-testing for your DAGs Example: code_block [StructValue([(u'code', u'from airflow import models\r\nfrom airflow.utils.dag_cycle_tester import test_cycle\r\n\r\n\r\ndef assert_has_valid_dag(module):\r\n """Assert that a module contains a valid DAG."""\r\n\r\n no_dag_found = True\r\n\r\n for dag in vars(module).values():\r\n if isinstance(dag, models.DAG):\r\n no_dag_found = False\r\n test_cycle(dag) # Throws if a task cycle is found.\r\n\r\n if no_dag_found:\r\n raise AssertionError(\'module does not contain a valid DAG\')'), (u'language', u''), (u'caption', <wagtail.wagtailcore.rich_text.RichText object at 0x3e7e30435750>)])] 35. Perform local development via the Composer Local Development CLI Tool. a. Composer Local Development CLI tool streamlines Apache Airflow DAG development for Cloud Composer 2 by running an Airflow environment locally. This local Airflow environment uses an image of a specific Cloud Composer version. 36. If possible, keep a staging Cloud Composer Environment to fully test the complete DAG run before deploying in the production. a. Parameterize your DAG to change the variables, e.g., the output path of Google Cloud Storage operation or the database used to read the configuration. Do not hard code values inside the DAG and then change them manually according to the environment. 37. Use a Python linting tool such as Pylint or Flake8 for standardized code. 38. Use a Python formatting tool such as Black or YAPF for standardized code. Next Steps In summary, this blog provides a comprehensive checklist of best practices for developing Airflow DAGs for use in Google Cloud Composer. By following these best practices, developers can help ensure that Cloud Composer is working optimally and that their DAGs are well-organized and easy to manage. For more information about Cloud Composer, check out the following related blog posts and documentation pages: What is Cloud Composer? Deutsche Bank uses Cloud Composer workload automation Using Cloud Build to keep Airflow Operators up-to-date in your Composer environment Writing DAGs (workflows) | Cloud Composer
-
Forum Statistics
67.4k
Total Topics65.3k
Total Posts