Posted December 12, 2024Dec 12 One-time and complex queries are two common scenarios in enterprise data analytics. One-time queries are flexible and suitable for instant analysis and exploratory research. Complex queries, on the other hand, refer to large-scale data processing and in-depth analysis based on petabyte-level data warehouses in massive data scenarios. These complex queries typically involve data sources from multiple business systems, requiring multilevel nested SQL or associations with numerous tables for highly sophisticated analytical tasks. However, combining the data lineage of these two query types presents several challenges: Diversity of data sources Varying query complexity Inconsistent granularity in lineage tracking Different real-time requirements Difficulties in cross-system integration Moreover, maintaining the accuracy and completeness of lineage information while providing system performance and scalability are crucial considerations. Addressing these challenges requires a carefully designed architecture and advanced technical solutions. Amazon Athena offers serverless, flexible SQL analytics for one-time queries, enabling direct querying of Amazon Simple Storage Service (Amazon S3) data for rapid, cost-effective instant analysis. Amazon Redshift, optimized for complex queries, provides high-performance columnar storage and massively parallel processing (MPP) architecture, supporting large-scale data processing and advanced SQL capabilities. Amazon Neptune, as a graph database, is ideal for data lineage analysis, offering efficient relationship traversal and complex graph algorithms to handle large-scale, intricate data lineage relationships. The combination of these three services provides a powerful, comprehensive solution for end-to-end data lineage analysis. In the context of comprehensive data governance, Amazon DataZone offers organization-wide data lineage visualization using Amazon Web Services (AWS) services, while dbt provides project-level lineage through model analysis and supports cross-project integration between data lakes and warehouses. In this post, we use dbt for data modeling on both Amazon Athena and Amazon Redshift. dbt on Athena supports real-time queries, while dbt on Amazon Redshift handles complex queries, unifying the development language and significantly reducing the technical learning curve. Using a single dbt modeling language not only simplifies the development process but also automatically generates consistent data lineage information. This approach offers robust adaptability, easily accommodating changes in data structures. By integrating Amazon Neptune graph database to store and analyze complex lineage relationships, combined with AWS Step Functions and AWS Lambda functions, we achieve a fully automated data lineage generation process. This combination promotes consistency and completeness of lineage data while enhancing the efficiency and scalability of the entire process. The result is a powerful and flexible solution for end-to-end data lineage analysis. Architecture overview The experiment’s context involves a customer already using Amazon Athena for one-time queries. To better accommodate massive data processing and complex query scenarios, they aim to adopt a unified data modeling language across different data platforms. This led to the implementation of both Athena on dbt and Amazon Redshift on dbt architectures. AWS Glue crawler crawls data lake information from Amazon S3, generating a Data Catalog to support dbt on Amazon Athena data modeling. For complex query scenarios, AWS Glue performs extract, transform, and load (ETL) processing, loading data into the petabyte-scale data warehouse, Amazon Redshift. Here, data modeling uses dbt on Amazon Redshift. Lineage data original files from both parts are loaded into an S3 bucket, providing data support for end-to-end data lineage analysis. The following image is the architecture diagram for the solution. Some important considerations: For implementing dbt modeling on Athena, refer to the dbt-on-aws / athena GitHub repository for experimentation For implementing dbt modeling on Amazon Redshift, refer to the dbt-on-aws / redshift GitHub repository for experimentation. This experiment uses the following data dictionary: Source table Tool Target table imdb.name_basics DBT/Athena stg_imdb__name_basics imdb.title_akas DBT/Athena stg_imdb__title_akas imdb.title_basics DBT/Athena stg_imdb__title_basics imdb.title_crew DBT/Athena stg_imdb__title_crews imdb.title_episode DBT/Athena stg_imdb__title_episodes imdb.title_principals DBT/Athena stg_imdb__title_principals imdb.title_ratings DBT/Athena stg_imdb__title_ratings stg_imdb__name_basics DBT/Redshift new_stg_imdb__name_basics stg_imdb__title_akas DBT/Redshift new_stg_imdb__title_akas stg_imdb__title_basics DBT/Redshift new_stg_imdb__title_basics stg_imdb__title_crews DBT/Redshift new_stg_imdb__title_crews stg_imdb__title_episodes DBT/Redshift new_stg_imdb__title_episodes stg_imdb__title_principals DBT/Redshift new_stg_imdb__title_principals stg_imdb__title_ratings DBT/Redshift new_stg_imdb__title_ratings new_stg_imdb__name_basics DBT/Redshift int_primary_profession_flattened_from_name_basics new_stg_imdb__name_basics DBT/Redshift int_known_for_titles_flattened_from_name_basics new_stg_imdb__name_basics DBT/Redshift names new_stg_imdb__title_akas DBT/Redshift titles new_stg_imdb__title_basics DBT/Redshift int_genres_flattened_from_title_basics new_stg_imdb__title_basics DBT/Redshift titles new_stg_imdb__title_crews DBT/Redshift int_directors_flattened_from_title_crews new_stg_imdb__title_crews DBT/Redshift int_writers_flattened_from_title_crews new_stg_imdb__title_episodes DBT/Redshift titles new_stg_imdb__title_principals DBT/Redshift titles new_stg_imdb__title_ratings DBT/Redshift titles int_known_for_titles_flattened_from_name_basics DBT/Redshift titles int_primary_profession_flattened_from_name_basics DBT/Redshift int_directors_flattened_from_title_crews DBT/Redshift names int_genres_flattened_from_title_basics DBT/Redshift genre_titles int_writers_flattened_from_title_crews DBT/Redshift names genre_titles DBT/Redshift names DBT/Redshift titles DBT/Redshift The lineage data generated by dbt on Athena includes partial lineage diagrams, as exemplified in the following images. The first image shows the lineage of name_basics in dbt on Athena. The second image shows the lineage of title_crew in dbt on Athena. The lineage data generated by dbt on Amazon Redshift includes partial lineage diagrams, as illustrated in the following image. Referring to the data dictionary and screenshots, it’s evident that the complete data lineage information is highly dispersed, spread across 29 lineage diagrams. Understanding the end-to-end comprehensive view requires significant time. In real-world environments, the situation is often more complex, with complete data lineage potentially distributed across hundreds of files. Consequently, integrating a complete end-to-end data lineage diagram becomes crucial and challenging. This experiment will provide a detailed introduction to processing and merging data lineage files stored in Amazon S3, as illustrated in the following diagram. Prerequisites To perform the solution, you need to have the following prerequisites in place: The Lambda function for preprocessing lineage files must have permissions to access Amazon S3 and Amazon Redshift. The Lambda function for constructing the directed acyclic graph (DAG) must have permissions to access Amazon S3 and Amazon Neptune. Solution walkthrough To perform the solution, follow the steps in the next sections. Preprocess raw lineage data for DAG generation using Lambda functions Use Lambda to preprocess the raw lineage data generated by dbt, converting it into key-value pair JSON files that are easily understood by Neptune: athena_dbt_lineage_map.json and redshift_dbt_lineage_map.json. To create a new Lambda function in the Lambda console, enter a Function name, select the Runtime (Python in this example), configure the Architecture and Execution role, then click the “Create function” button. Open the created Lambda function and on the Configuration tab, in the navigation pane, select Environment variables and choose your configurations. Using Athena on dbt processing as an example, configure the environment variables as follows (the process for Amazon Redshift on dbt is similar): INPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path storing the original Athena on dbt lineage files) INPUT_KEY: athena_manifest.json (the original Athena on dbt lineage file) OUTPUT_BUCKET: data-lineage-analysis-24-09-22 (replace with the S3 bucket path for storing the preprocessed output of Athena on dbt lineage files) OUTPUT_KEY: athena_dbt_lineage_map.json (the output file after preprocessing the original Athena on dbt lineage file) On the Code tab, in the lambda_function.py file, enter the preprocessing code for the raw lineage data. Here’s a code reference using Athena on dbt processing as an example (the process for Amazon Redshift on dbt is similar). The preprocessing code for Athena on dbt’s original lineage file is as follows: The athena_manifest.json, redshift_manifest.json, and other files used in this experiment can be obtained from the Data Lineage Graph Construction GitHub repository. import json import boto3 import os def lambda_handler(event, context): # Set up S3 client s3 = boto3.client('s3') # Get input and output paths from environment variables input_bucket = os.environ['INPUT_BUCKET'] input_key = os.environ['INPUT_KEY'] output_bucket = os.environ['OUTPUT_BUCKET'] output_key = os.environ['OUTPUT_KEY'] # Define helper function def dbt_nodename_format(node_name): return node_name.split(".")[-1] # Read input JSON file from S3 response = s3.get_object(Bucket=input_bucket, Key=input_key) file_content = response['Body'].read().decode('utf-8') data = json.loads(file_content) lineage_map = data["child_map"] node_dict = {} dbt_lineage_map = {} # Process data for item in lineage_map: lineage_map[item] = [dbt_nodename_format(child) for child in lineage_map[item]] node_dict[item] = dbt_nodename_format(item) # Update key names lineage_map = {node_dict[old]: value for old, value in lineage_map.items()} dbt_lineage_map["lineage_map"] = lineage_map # Convert result to JSON string result_json = json.dumps(dbt_lineage_map) # Write JSON string to S3 s3.put_object(Body=result_json, Bucket=output_bucket, Key=output_key) print(f"Data written to s3://{output_bucket}/{output_key}") return { 'statusCode': 200, 'body': json.dumps('Athena data lineage processing completed successfully') } Merge preprocessed lineage data and write to Neptune using Lambda functions Before processing data with the Lambda function, create a Lambda layer by uploading the required Gremlin plugin. For detailed steps on creating and configuring Lambda Layers, see the AWS Lambda Layers documentation. Because connecting Lambda to Neptune for constructing a DAG requires the Gremlin plugin, it needs to be uploaded before using Lambda. The Gremlin package can be obtained from the Data Lineage Graph Construction GitHub repository. Create a new Lambda function. Choose the function to configure. To the recently created layer, at the bottom of the page, choose Add a layer. Create another Lambda layer for the requests library, similar to how you created the layer for the Gremlin plugin. This library will be used for HTTP client functionality in the Lambda function. Choose the recently created Lambda function to configure. Connect to Neptune through Lambda to merge the two datasets and construct a DAG. On the Code tab, the reference code to execute is as follows: import json import boto3 import os import requests from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import get_credentials from botocore.session import Session from concurrent.futures import ThreadPoolExecutor, as_completed def read_s3_file(s3_client, bucket, key): try: response = s3_client.get_object(Bucket=bucket, Key=key) data = json.loads(response['Body'].read().decode('utf-8')) return data.get("lineage_map", {}) except Exception as e: print(f"Error reading S3 file {bucket}/{key}: {str(e)}") raise def merge_data(athena_data, redshift_data): return {**athena_data, **redshift_data} def sign_request(request): credentials = get_credentials(Session()) auth = SigV4Auth(credentials, 'neptune-db', os.environ['AWS_REGION']) auth.add_auth(request) return dict(request.headers) def send_request(url, headers, data): try: response = requests.post(url, headers=headers, data=data, timeout=30) response.raise_for_status() return response.text except requests.exceptions.RequestException as e: print(f"Request Error: {str(e)}") if hasattr(e.response, 'text'): print(f"Response content: {e.response.text}") raise def write_to_neptune(data): endpoint = 'https://your neptune endpoint name:8182/gremlin' # replace with your neptune endpoint name # Clear Neptune database clear_query = "g.V().drop()" request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': clear_query})) signed_headers = sign_request(request) response = send_request(endpoint, signed_headers, json.dumps({'gremlin': clear_query})) print(f"Clear database response: {response}") # Verify if the database is empty verify_query = "g.V().count()" request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': verify_query})) signed_headers = sign_request(request) response = send_request(endpoint, signed_headers, json.dumps({'gremlin': verify_query})) print(f"Vertex count after clearing: {response}") def process_node(node, children): # Add node query = f"g.V().has('lineage_node', 'node_name', '{node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{node}'))" request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query})) signed_headers = sign_request(request) response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query})) print(f"Add node response for {node}: {response}") for child_node in children: # Add child node query = f"g.V().has('lineage_node', 'node_name', '{child_node}').fold().coalesce(unfold(), addV('lineage_node').property('node_name', '{child_node}'))" request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query})) signed_headers = sign_request(request) response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query})) print(f"Add child node response for {child_node}: {response}") # Add edge query = f"g.V().has('lineage_node', 'node_name', '{node}').as('a').V().has('lineage_node', 'node_name', '{child_node}').coalesce(inE('lineage_edge').where(outV().as('a')), addE('lineage_edge').from('a').property('edge_name', ' '))" request = AWSRequest(method='POST', url=endpoint, data=json.dumps({'gremlin': query})) signed_headers = sign_request(request) response = send_request(endpoint, signed_headers, json.dumps({'gremlin': query})) print(f"Add edge response for {node} -> {child_node}: {response}") with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(process_node, node, children) for node, children in data.items()] for future in as_completed(futures): try: future.result() except Exception as e: print(f"Error in processing node: {str(e)}") def lambda_handler(event, context): # Initialize S3 client s3_client = boto3.client('s3') # S3 bucket and file paths bucket_name = 'data-lineage-analysis' # Replace with your S3 bucket name athena_key = 'athena_dbt_lineage_map.json' # Replace with your athena lineage key value output json name redshift_key = 'redshift_dbt_lineage_map.json' # Replace with your redshift lineage key value output json name try: # Read Athena lineage data athena_data = read_s3_file(s3_client, bucket_name, athena_key) print(f"Athena data size: {len(athena_data)}") # Read Redshift lineage data redshift_data = read_s3_file(s3_client, bucket_name, redshift_key) print(f"Redshift data size: {len(redshift_data)}") # Merge data combined_data = merge_data(athena_data, redshift_data) print(f"Combined data size: {len(combined_data)}") # Write to Neptune (including clearing the database) write_to_neptune(combined_data) return { 'statusCode': 200, 'body': json.dumps('Data successfully written to Neptune') } except Exception as e: print(f"Error in lambda_handler: {str(e)}") return { 'statusCode': 500, 'body': json.dumps(f'Error: {str(e)}') } Create Step Functions workflow On the Step Functions console, choose State machines, and then choose Create state machine. On the Choose a template page, select Blank template. In the Blank template, choose Code to define your state machine. Use the following example code: { "Comment": "Daily Data Lineage Processing Workflow", "StartAt": "Parallel Processing", "States": { "Parallel Processing": { "Type": "Parallel", "Branches": [ { "StartAt": "Process Athena Data", "States": { "Process Athena Data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "athena-data-lineange-process-Lambda", ##Replace with your Athena data lineage process Lambda function name "Payload": { "input.$": "$" } }, "End": true } } }, { "StartAt": "Process Redshift Data", "States": { "Process Redshift Data": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "redshift-data-lineange-process-Lambda", ##Replace with your Redshift data lineage process Lambda function name "Payload": { "input.$": "$" } }, "End": true } } } ], "Next": "Load Data to Neptune" }, "Load Data to Neptune": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "data-lineage-analysis-lambda" ##Replace with your Lambda function Name }, "End": true } } } After completing the configuration, choose the Design tab to view the workflow shown in the following diagram. Create scheduling rules with Amazon EventBridge Configure Amazon EventBridge to generate lineage data daily during off-peak business hours. To do this: Create a new rule in the EventBridge console with a descriptive name. Set the rule type to “Schedule” and configure it to run once daily (using either a fixed rate or the Cron expression “0 0 * * ? *”). Select the AWS Step Functions state machine as the target and specify the state machine you created earlier. Query results in Neptune On the Neptune console, select Notebooks. Open an existing notebook or create a new one. In the notebook, create a new code cell to perform a query. The following code example shows the query statement and its results: %%gremlin -d node_name -de edge_name g.V().hasLabel('lineage_node').outE('lineage_edge').inV().hasLabel('lineage_node').path().by(elementMap()) You can now see the end-to-end data lineage graph information for both dbt on Athena and dbt on Amazon Redshift. The following image shows the merged DAG data lineage graph in Neptune. You can query the generated data lineage graph for data related to a specific table, such as title_crew. The sample query statement and its results are shown in the following code example: %%gremlin -d node_name -de edge_name g.V().has('lineage_node', 'node_name', 'title_crew') .repeat( union( __.inE('lineage_edge').outV(), __.outE('lineage_edge').inV() ) ) .until( __.has('node_name', within('names', 'genre_titles', 'titles')) .or() .loops().is(gt(10)) ) .path() .by(elementMap()) The following image shows the filtered results based on title_crew table in Neptune. Clean up To clean up your resources, complete the following steps: Delete EventBridge rules # Stop new events from triggering while removing dependencies aws events disable-rule --name <rule-name> # Break connections between rule and targets (like Lambda functions) aws events remove-targets --rule <rule-name> --ids <target-id> # Remove the rule completely from EventBridge aws events delete-rule --name <rule-name> Delete Step Functions state machine # Stop all running executions aws stepfunctions stop-execution --execution-arn <execution-arn> # Delete the state machine aws stepfunctions delete-state-machine --state-machine-arn <state-machine-arn> Delete Lambda functions # Delete Lambda function aws lambda delete-function --function-name <function-name> # Delete Lambda layers (if used) aws lambda delete-layer-version --layer-name <layer-name> --version-number <version> Clean up the Neptune database # Delete all snapshots aws neptune delete-db-cluster-snapshot --db-cluster-snapshot-identifier <snapshot-id> # Delete database instance aws neptune delete-db-instance --db-instance-identifier <instance-id> --skip-final-snapshot # Delete database cluster aws neptune delete-db-cluster --db-cluster-identifier <cluster-id> --skip-final-snapshot Follow the instructions at Deleting a single object to clean up the S3 buckets Conclusion In this post, we demonstrated how dbt enables unified data modeling across Amazon Athena and Amazon Redshift, integrating data lineage from both one-time and complex queries. By using Amazon Neptune, this solution provides comprehensive end-to-end lineage analysis. The architecture uses AWS serverless computing and managed services, including Step Functions, Lambda, and EventBridge, providing a highly flexible and scalable design. This approach significantly lowers the learning curve through a unified data modeling method while enhancing development efficiency. The end-to-end data lineage graph visualization and analysis not only strengthen data governance capabilities but also offer deep insights for decision-making. The solution’s flexible and scalable architecture effectively optimizes operational costs and improves business responsiveness. This comprehensive approach balances technical innovation, data governance, operational efficiency, and cost-effectiveness, thus supporting long-term business growth with the adaptability to meet evolving enterprise needs. With OpenLineage-compatible data lineage now generally available in Amazon DataZone, we plan to explore integration possibilities to further enhance the system’s capability to handle complex data lineage analysis scenarios. If you have any questions, please feel free to leave a comment in the comments section. About the authors Nancy Wu is a Solutions Architect at AWS, responsible for cloud computing architecture consulting and design for multinational enterprise customers. Has many years of experience in big data, enterprise digital transformation research and development, consulting, and project management across telecommunications, entertainment, and financial industries. Xu Feng is a Senior Industry Solution Architect at AWS, responsible for designing, building, and promoting industry solutions for the Media & Entertainment and Advertising sectors, such as intelligent customer service and business intelligence. With 20 years of software industry experience, currently focused on researching and implementing generative AI and AI-powered data solutions. Xu Da is a Amazon Web Services (AWS) Partner Solutions Architect based out of Shanghai, China. He has more than 25 years of experience in IT industry, software development and solution architecture. He is passionate about collaborative learning, knowledge sharing, and guiding community in their cloud technologies journey.View the full article
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.