Jump to content

Search the Community

Showing results for tags 'apache spark'.

  • Search By Tags

    Type tags separated by commas.
  • Search By Author

Content Type


Forums

There are no results to display.

There are no results to display.


Find results in...

Find results that contain...


Date Created

  • Start

    End


Last Updated

  • Start

    End


Filter by number of...

Joined

  • Start

    End


Group


Website URL


LinkedIn Profile URL


About Me


Cloud Platforms


Cloud Experience


Development Experience


Current Role


Skills


Certifications


Favourite Tools


Interests

Found 7 results

  1. until
    About Experience everything that Summit has to offer. Attend all the parties, build your session schedule, enjoy the keynotes and then watch it all again on demand. Expo access to 150 + partners and 100’s of Databricks experts 500 + breakout sessions and keynotes 20 + Hands-on trainings Four days food and beverage Networking events and parties On-Demand session streaming after the event Join leading experts, researchers and open source contributors — from Databricks and across the data and AI community — who will speak at Data + AI Summit. Over 500 sessions covering everything from data warehousing, governance and the latest in generative AI. Join thousands of data leaders, engineers, scientists and architects to explore the convergence of data and AI. Explore the latest advances in Apache Spark™, Delta Lake, MLflow, PyTorch, dbt, Presto/Trino and much more. You’ll also get a first look at new products and features in the Databricks Data Intelligence Platform. Connect with thousands of data and AI community peers and grow your professional network in social meetups, on the Expo floor or at our event party. Register https://dataaisummit.databricks.com/flow/db/dais2024/landing/page/home Further Details https://www.databricks.com/dataaisummit/
  2. This article is part of a project that’s split into two main phases. The first phase focuses on building a data pipeline. This involves getting data from an API and storing it in a PostgreSQL database. In the second phase, we’ll develop an application that uses a language model to interact with this database. Ideal for those new to data systems or language model applications, this project is structured into two segments: This initial article guides you through constructing a data pipeline utilizing Kafka for streaming, Airflow for orchestration, Spark for data transformation, and PostgreSQL for storage. To set-up and run these tools we will use Docker.The second article, which will come later, will delve into creating agents using tools like LangChain to communicate with external databases.This first part project is ideal for beginners in data engineering, as well as for data scientists and machine learning engineers looking to deepen their knowledge of the entire data handling process. Using these data engineering tools firsthand is beneficial. It helps in refining the creation and expansion of machine learning models, ensuring they perform effectively in practical settings. This article focuses more on practical application rather than theoretical aspects of the tools discussed. For detailed understanding of how these tools work internally, there are many excellent resources available online. OverviewLet’s break down the data pipeline process step-by-step: Data Streaming: Initially, data is streamed from the API into a Kafka topic.Data Processing: A Spark job then takes over, consuming the data from the Kafka topic and transferring it to a PostgreSQL database.Scheduling with Airflow: Both the streaming task and the Spark job are orchestrated using Airflow. While in a real-world scenario, the Kafka producer would constantly listen to the API, for demonstration purposes, we’ll schedule the Kafka streaming task to run daily. Once the streaming is complete, the Spark job processes the data, making it ready for use by the LLM application.All of these tools will be built and run using docker, and more specifically docker-compose. Overview of the data pipeline. Image by the author.Now that we have a blueprint of our pipeline, let’s dive into the technical details ! Local setupFirst you can clone the Github repo on your local machine using the following command: git clone https://github.com/HamzaG737/data-engineering-project.gitHere is the overall structure of the project: ├── LICENSE ├── README.md ├── airflow │ ├── Dockerfile │ ├── __init__.py │ └── dags │ ├── __init__.py │ └── dag_kafka_spark.py ├── data │ └── last_processed.json ├── docker-compose-airflow.yaml ├── docker-compose.yml ├── kafka ├── requirements.txt ├── spark │ └── Dockerfile └── src ├── __init__.py ├── constants.py ├── kafka_client │ ├── __init__.py │ └── kafka_stream_data.py └── spark_pgsql └── spark_streaming.pyThe airflow directory contains a custom Dockerfile for setting up airflow and a dags directory to create and schedule the tasks.The data directory contains the last_processed.json file which is crucial for the Kafka streaming task. Further details on its role will be provided in the Kafka section.The docker-compose-airflow.yaml file defines all the services required to run airflow.The docker-compose.yaml file specifies the Kafka services and includes a docker-proxy. This proxy is essential for executing Spark jobs through a docker-operator in Airflow, a concept that will be elaborated on later.The spark directory contains a custom Dockerfile for spark setup.src contains the python modules needed to run the application.To set up your local development environment, start by installing the required Python packages. The only essential package is psycopg2-binary. You have the option to install just this package or all the packages listed in the requirements.txt file. To install all packages, use the following command: pip install -r requirements.txtNext let’s dive step by step into the project details. About the APIThe API is RappelConso from the French public services. It gives access to data relating to recalls of products declared by professionals in France. The data is in French and it contains initially 31 columns (or fields). Some of the most important are: reference_fiche (reference sheet): Unique identifier of the recalled product. It will act as the primary key of our Postgres database later.categorie_de_produit (Product category): For instance food, electrical appliance, tools, transport means, etc …sous_categorie_de_produit (Product sub-category): For instance we can have meat, dairy products, cereals as sub-categories for the food category.motif_de_rappel (Reason for recall): Self explanatory and one of the most important fields.date_de_publication which translates to the publication date.risques_encourus_par_le_consommateur which contains the risks that the consumer may encounter when using the product.There are also several fields that correspond to different links, such as link to product image, link to the distributers list, etc..You can see some examples and query manually the dataset records using this link. We refined the data columns in a few key ways: Columns like ndeg_de_version and rappelguid, which were part of a versioning system, have been removed as they aren’t needed for our project.We combined columns that deal with consumer risks — risques_encourus_par_le_consommateur and description_complementaire_du_risque — for a clearer overview of product risks.The date_debut_fin_de_commercialisation column, which indicates the marketing period, has been divided into two separate columns. This split allows for easier queries about the start or end of a product’s marketing.We’ve removed accents from all columns except for links, reference numbers, and dates. This is important because some text processing tools struggle with accented characters.For a detailed look at these changes, check out our transformation script at src/kafka_client/transformations.py. The updated list of columns is available insrc/constants.py under DB_FIELDS. Kafka streamingTo avoid sending all the data from the API each time we run the streaming task, we define a local json file that contains the last publication date of the latest streaming. Then we will use this date as the starting date for our new streaming task. To give an example, suppose that the latest recalled product has a publication date of 22 november 2023. If we make the hypothesis that all of the recalled products infos before this date are already persisted in our Postgres database, We can now stream the data starting from the 22 november. Note that there is an overlap because we may have a scenario where we didn’t handle all of the data of the 22nd of November. The file is saved in ./data/last_processed.json and has this format: {last_processed:"2023-11-22"}By default the file is an empty json which means that our first streaming task will process all of the API records which are 10 000 approximately. Note that in a production setting this approach of storing the last processed date in a local file is not viable and other approaches involving an external database or an object storage service may be more suitable. The code for the kafka streaming can be found on ./src/kafka_client/kafka_stream_data.py and it involves primarily querying the data from the API, making the transformations, removing potential duplicates, updating the last publication date and serving the data using the kafka producer. The next step is to run the kafka service defined the docker-compose defined below: version: '3' services: kafka: image: 'bitnami/kafka:latest' ports: - '9094:9094' networks: - airflow-kafka environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - ./kafka:/bitnami/kafka kafka-ui: container_name: kafka-ui-1 image: provectuslabs/kafka-ui:latest ports: - 8800:8080 depends_on: - kafka environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: PLAINTEXT://kafka:9092 DYNAMIC_CONFIG_ENABLED: 'true' networks: - airflow-kafka networks: airflow-kafka: external: trueThe key highlights from this file are: The kafka service uses a base image bitnami/kafka.We configure the service with only one broker which is enough for our small project. A Kafka broker is responsible for receiving messages from producers (which are the sources of data), storing these messages, and delivering them to consumers (which are the sinks or end-users of the data). The broker listens to port 9092 for internal communication within the cluster and port 9094 for external communication, allowing clients outside the Docker network to connect to the Kafka broker.In the volumes part, we map the local directory kafka to the docker container directory /bitnami/kafka to ensure data persistence and a possible inspection of Kafka’s data from the host system.We set-up the service kafka-ui that uses the docker image provectuslabs/kafka-ui:latest . This provides a user interface to interact with the Kafka cluster. This is especially useful for monitoring and managing Kafka topics and messages.To ensure communication between kafka and airflow which will be run as an external service, we will use an external network airflow-kafka.Before running the kafka service, let’s create the airflow-kafka network using the following command: docker network create airflow-kafkaNow everything is set to finally start our kafka service docker-compose up After the services start, visit the kafka-ui at http://localhost:8800/. Normally you should get something like this: Overview of the Kafka UI. Image by the author.Next we will create our topic that will contain the API messages. Click on Topics on the left and then Add a topic at the top left. Our topic will be called rappel_conso and since we have only one broker we set the replication factor to 1. We will also set the partitions number to 1 since we will have only one consumer thread at a time so we won’t need any parallelism. Finally, we can set the time to retain data to a small number like one hour since we will run the spark job right after the kafka streaming task, so we won’t need to retain the data for a long time in the kafka topic. Postgres set-upBefore setting-up our spark and airflow configurations, let’s create the Postgres database that will persist our API data. I used the pgadmin 4 tool for this task, however any other Postgres development platform can do the job. To install postgres and pgadmin, visit this link https://www.postgresql.org/download/ and get the packages following your operating system. Then when installing postgres, you need to setup a password that we will need later to connect to the database from the spark environment. You can also leave the port at 5432. If your installation has succeeded, you can start pgadmin and you should observe something like this window: Overview of pgAdmin interface. Image by the author.Since we have a lot of columns for the table we want to create, we chose to create the table and add its columns with a script using psycopg2, a PostgreSQL database adapter for Python. You can run the script with the command: python scripts/create_table.pyNote that in the script I saved the postgres password as environment variable and name it POSTGRES_PASSWORD. So if you use another method to access the password you need to modify the script accordingly. Spark Set-upHaving set-up our Postgres database, let’s delve into the details of the spark job. The goal is to stream the data from the Kafka topic rappel_conso to the Postgres table rappel_conso_table. from pyspark.sql import SparkSession from pyspark.sql.types import ( StructType, StructField, StringType, ) from pyspark.sql.functions import from_json, col from src.constants import POSTGRES_URL, POSTGRES_PROPERTIES, DB_FIELDS import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s:%(funcName)s:%(levelname)s:%(message)s" ) def create_spark_session() -> SparkSession: spark = ( SparkSession.builder.appName("PostgreSQL Connection with PySpark") .config( "spark.jars.packages", "org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0", ) .getOrCreate() ) logging.info("Spark session created successfully") return spark def create_initial_dataframe(spark_session): """ Reads the streaming data and creates the initial dataframe accordingly. """ try: # Gets the streaming data from topic random_names df = ( spark_session.readStream.format("kafka") .option("kafka.bootstrap.servers", "kafka:9092") .option("subscribe", "rappel_conso") .option("startingOffsets", "earliest") .load() ) logging.info("Initial dataframe created successfully") except Exception as e: logging.warning(f"Initial dataframe couldn't be created due to exception: {e}") raise return df def create_final_dataframe(df): """ Modifies the initial dataframe, and creates the final dataframe. """ schema = StructType( [StructField(field_name, StringType(), True) for field_name in DB_FIELDS] ) df_out = ( df.selectExpr("CAST(value AS STRING)") .select(from_json(col("value"), schema).alias("data")) .select("data.*") ) return df_out def start_streaming(df_parsed, spark): """ Starts the streaming to table spark_streaming.rappel_conso in postgres """ # Read existing data from PostgreSQL existing_data_df = spark.read.jdbc( POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES ) unique_column = "reference_fiche" logging.info("Start streaming ...") query = df_parsed.writeStream.foreachBatch( lambda batch_df, _: ( batch_df.join( existing_data_df, batch_df[unique_column] == existing_data_df[unique_column], "leftanti" ) .write.jdbc( POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES ) ) ).trigger(once=True) \ .start() return query.awaitTermination() def write_to_postgres(): spark = create_spark_session() df = create_initial_dataframe(spark) df_final = create_final_dataframe(df) start_streaming(df_final, spark=spark) if __name__ == "__main__": write_to_postgres()Let’s break down the key highlights and functionalities of the spark job: First we create the Spark sessiondef create_spark_session() -> SparkSession: spark = ( SparkSession.builder.appName("PostgreSQL Connection with PySpark") .config( "spark.jars.packages", "org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0", ) .getOrCreate() ) logging.info("Spark session created successfully") return spark2. The create_initial_dataframe function ingests streaming data from the Kafka topic using Spark's structured streaming. def create_initial_dataframe(spark_session): """ Reads the streaming data and creates the initial dataframe accordingly. """ try: # Gets the streaming data from topic random_names df = ( spark_session.readStream.format("kafka") .option("kafka.bootstrap.servers", "kafka:9092") .option("subscribe", "rappel_conso") .option("startingOffsets", "earliest") .load() ) logging.info("Initial dataframe created successfully") except Exception as e: logging.warning(f"Initial dataframe couldn't be created due to exception: {e}") raise return df3. Once the data is ingested, create_final_dataframe transforms it. It applies a schema (defined by the columns DB_FIELDS) to the incoming JSON data, ensuring that the data is structured and ready for further processing. def create_final_dataframe(df): """ Modifies the initial dataframe, and creates the final dataframe. """ schema = StructType( [StructField(field_name, StringType(), True) for field_name in DB_FIELDS] ) df_out = ( df.selectExpr("CAST(value AS STRING)") .select(from_json(col("value"), schema).alias("data")) .select("data.*") ) return df_out4. The start_streaming function reads existing data from the database, compares it with the incoming stream, and appends new records. def start_streaming(df_parsed, spark): """ Starts the streaming to table spark_streaming.rappel_conso in postgres """ # Read existing data from PostgreSQL existing_data_df = spark.read.jdbc( POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES ) unique_column = "reference_fiche" logging.info("Start streaming ...") query = df_parsed.writeStream.foreachBatch( lambda batch_df, _: ( batch_df.join( existing_data_df, batch_df[unique_column] == existing_data_df[unique_column], "leftanti" ) .write.jdbc( POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES ) ) ).trigger(once=True) \ .start() return query.awaitTermination()The complete code for the Spark job is in the file src/spark_pgsql/spark_streaming.py. We will use the Airflow DockerOperator to run this job, as explained in the upcoming section. Let’s go through the process of creating the Docker image we need to run our Spark job. Here’s the Dockerfile for reference: FROM bitnami/spark:latest WORKDIR /opt/bitnami/spark RUN pip install py4j COPY ./src/spark_pgsql/spark_streaming.py ./spark_streaming.py COPY ./src/constants.py ./src/constants.py ENV POSTGRES_DOCKER_USER=host.docker.internal ARG POSTGRES_PASSWORD ENV POSTGRES_PASSWORD=$POSTGRES_PASSWORDIn this Dockerfile, we start with the bitnami/spark image as our base. It's a ready-to-use Spark image. We then install py4j, a tool needed for Spark to work with Python. The environment variables POSTGRES_DOCKER_USER and POSTGRES_PASSWORD are set up for connecting to a PostgreSQL database. Since our database is on the host machine, we use host.docker.internal as the user. This allows our Docker container to access services on the host, in this case, the PostgreSQL database. The password for PostgreSQL is passed as a build argument, so it's not hard-coded into the image. It’s important to note that this approach, especially passing the database password at build time, might not be secure for production environments. It could potentially expose sensitive information. In such cases, more secure methods like Docker BuildKit should be considered. Now, let’s build the Docker image for Spark: docker build -f spark/Dockerfile -t rappel-conso/spark:latest --build-arg POSTGRES_PASSWORD=$POSTGRES_PASSWORD .This command will build the image rappel-conso/spark:latest . This image includes everything needed to run our Spark job and will be used by Airflow’s DockerOperator to execute the job. Remember to replace $POSTGRES_PASSWORD with your actual PostgreSQL password when running this command. AirflowAs said earlier, Apache Airflow serves as the orchestration tool in the data pipeline. It is responsible for scheduling and managing the workflow of the tasks, ensuring they are executed in a specified order and under defined conditions. In our system, Airflow is used to automate the data flow from streaming with Kafka to processing with Spark. Airflow DAGLet’s take a look at the Directed Acyclic Graph (DAG) that will outline the sequence and dependencies of tasks, enabling Airflow to manage their execution. start_date = datetime.today() - timedelta(days=1) default_args = { "owner": "airflow", "start_date": start_date, "retries": 1, # number of retries before failing the task "retry_delay": timedelta(seconds=5), } with DAG( dag_id="kafka_spark_dag", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False, ) as dag: kafka_stream_task = PythonOperator( task_id="kafka_data_stream", python_callable=stream, dag=dag, ) spark_stream_task = DockerOperator( task_id="pyspark_consumer", image="rappel-conso/spark:latest", api_version="auto", auto_remove=True, command="./bin/spark-submit --master local[*] --packages org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 ./spark_streaming.py", docker_url='tcp://docker-proxy:2375', environment={'SPARK_LOCAL_HOSTNAME': 'localhost'}, network_mode="airflow-kafka", dag=dag, ) kafka_stream_task >> spark_stream_taskHere are the key elements from this configuration The tasks are set to execute daily.The first task is the Kafka Stream Task. It is implemented using the PythonOperator to run the Kafka streaming function. This task streams data from the RappelConso API into a Kafka topic, initiating the data processing workflow.The downstream task is the Spark Stream Task. It uses the DockerOperator for execution. It runs a Docker container with our custom Spark image, tasked with processing the data received from Kafka.The tasks are arranged sequentially, where the Kafka streaming task precedes the Spark processing task. This order is crucial to ensure that data is first streamed and loaded into Kafka before being processed by Spark.About the DockerOperatorUsing docker operator allow us to run docker-containers that correspond to our tasks. The main advantage of this approach is easier package management, better isolation and enhanced testability. We will demonstrate the use of this operator with the spark streaming task. Here are some key details about the docker operator for the spark streaming task: We will use the image rappel-conso/spark:latest specified in the Spark Set-up section.The command will run the Spark submit command inside the container, specifying the master as local, including necessary packages for PostgreSQL and Kafka integration, and pointing to the spark_streaming.py script that contains the logic for the Spark job.docker_url represents the url of the host running the docker daemon. The natural solution is to set it as unix://var/run/docker.sock and to mount the var/run/docker.sock in the airflow docker container. One problem we had with this approach is a permission error to use the socket file inside the airflow container. A common workaround, changing permissions with chmod 777 var/run/docker.sock, poses significant security risks. To circumvent this, we implemented a more secure solution using bobrik/socat as a docker-proxy. This proxy, defined in a Docker Compose service, listens on TCP port 2375 and forwards requests to the Docker socket: docker-proxy: image: bobrik/socat command: "TCP4-LISTEN:2375,fork,reuseaddr UNIX-CONNECT:/var/run/docker.sock" ports: - "2376:2375" volumes: - /var/run/docker.sock:/var/run/docker.sock networks: - airflow-kafkaIn the DockerOperator, we can access the host docker /var/run/docker.sock via thetcp://docker-proxy:2375 url, as described here and here. Finally we set the network mode to airflow-kafka. This allows us to use the same network as the proxy and the docker running kafka. This is crucial since the spark job will consume the data from the kafka topic so we must ensure that both containers are able to communicate.After defining the logic of our DAG, let’s understand now the airflow services configuration in the docker-compose-airflow.yaml file. Airflow ConfigurationThe compose file for airflow was adapted from the official apache airflow docker-compose file. You can have a look at the original file by visiting this link. As pointed out by this article, this proposed version of airflow is highly resource-intensive mainly because the core-executor is set to CeleryExecutor that is more adapted for distributed and large-scale data processing tasks. Since we have a small workload, using a single-noded LocalExecutor is enough. Here is an overview of the changes we made on the docker-compose configuration of airflow: We set the environment variable AIRFLOW__CORE__EXECUTOR to LocalExecutor.We removed the services airflow-worker and flower because they only work for the Celery executor. We also removed the redis caching service since it works as a backend for celery. We also won’t use the airflow-triggerer so we remove it too.We replaced the base image ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.3} for the remaining services, mainly the scheduler and the webserver, by a custom image that we will build when running the docker-compose.version: '3.8' x-airflow-common: &airflow-common build: context: . dockerfile: ./airflow_resources/Dockerfile image: de-project/airflow:latestWe mounted the necessary volumes that are needed by airflow. AIRFLOW_PROJ_DIR designates the airflow project directory that we will define later. We also set the network as airflow-kafka to be able to communicate with the kafka boostrap servers.volumes: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ./src:/opt/airflow/dags/src - ./data/last_processed.json:/opt/airflow/data/last_processed.json user: "${AIRFLOW_UID:-50000}:0" networks: - airflow-kafkaNext, we need to create some environment variables that will be used by docker-compose: echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_PROJ_DIR=\"./airflow_resources\"" > .envWhere AIRFLOW_UID represents the User ID in Airflow containers and AIRFLOW_PROJ_DIR represents the airflow project directory. Now everything is set-up to run your airflow service. You can start it with this command: docker compose -f docker-compose-airflow.yaml upThen to access the airflow user interface you can visit this url http://localhost:8080 . Sign-in window on Airflow. Image by the author.By default, the username and password are airflow for both. After signing in, you will see a list of Dags that come with airflow. Look for the dag of our project kafka_spark_dag and click on it. Overview of the task window in airflow. Image by the author.You can start the task by clicking on the button next to DAG: kafka_spark_dag. Next, you can check the status of your tasks in the Graph tab. A task is done when it turns green. So, when everything is finished, it should look something like this: Image by the author.To verify that the rappel_conso_table is filled with data, use the following SQL query in the pgAdmin Query Tool: SELECT count(*) FROM rappel_conso_tableWhen I ran this in January 2024, the query returned a total of 10022 rows. Your results should be around this number as well. ConclusionThis article has successfully demonstrated the steps to build a basic yet functional data engineering pipeline using Kafka, Airflow, Spark, PostgreSQL, and Docker. Aimed primarily at beginners and those new to the field of data engineering, it provides a hands-on approach to understanding and implementing key concepts in data streaming, processing, and storage. Throughout this guide, we’ve covered each component of the pipeline in detail, from setting up Kafka for data streaming to using Airflow for task orchestration, and from processing data with Spark to storing it in PostgreSQL. The use of Docker throughout the project simplifies the setup and ensures consistency across different environments. It’s important to note that while this setup is ideal for learning and small-scale projects, scaling it for production use would require additional considerations, especially in terms of security and performance optimization. Future enhancements could include integrating more advanced data processing techniques, exploring real-time analytics, or even expanding the pipeline to incorporate more complex data sources. In essence, this project serves as a practical starting point for those looking to get their hands dirty with data engineering. It lays the groundwork for understanding the basics, providing a solid foundation for further exploration in the field. In the second part, we’ll explore how to effectively use the data stored in our PostgreSQL database. We’ll introduce agents powered by Large Language Models (LLMs) and a variety of tools that enable us to interact with the database using natural language queries. So, stay tuned ! To reach outLinkedIn : https://www.linkedin.com/in/hamza-gharbi-043045151/Twitter : https://twitter.com/HamzaGh25079790End-to-End Data Engineering System on Real Data with Kafka, Spark, Airflow, Postgres, and Docker was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story. View the full article
  3. Amazon Athena for Apache Spark now supports open-source data lake storage frameworks Apache Hudi 0.13, Apache Iceberg 1.2.1, and Linux Foundation Delta Lake 2.0.2. These frameworks simplify incremental data processing of large data sets using ACID (atomicity, consistency, isolation, durability) transactions and make it simpler to store and process large data sets in your data lakes. View the full article
  4. How Bad is Bad Code: The ROI of Fixing Broken Spark Code Once in a while I stumble upon Spark code that looks like it has been written by a Java developer and it never fails to make me wince because it is a missed opportunity to write elegant and efficient code: it is verbose, difficult to read, and full of distributed processing anti-patterns. One such occurrence happened a few weeks ago when one of my colleagues was trying to make some churn analysis code downloaded from GitHub work. I was looking for some broken code to add a workshop to our Spark Performance Tuning class and write a blog post about, and this fitted the bill perfectly. For convenience purposes I chose to limit the scope of this exercise to a specific function that prepares the data prior to the churn analysis. Here it is in all its glorious juiciness: from pyspark.sql.functions import udf,col from pyspark.sql.types import IntegerType def prepare_data_baseline(df): ''' Function to prepare the given dataframe and divid into groups of churn and non churn users while returnng the original datafrme with a new label column into a spark dataframe. Args: df- the original dataframe Returns: df - dataframe of the dataset with new column of churn added stayed - dataframe of the non -churn user's activities only. all_cancelled - dataframe of the churn user's activities only. ''' #Define a udf for cancelled canceled = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0) #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.withColumn('Churn', canceled(df.page)) #Dataframe of all that cancelled cancelled_df = df.select('page', 'userId','Churn').where(col('churn')==1) #List of cancelled list_cancelled = cancelled_df.select('userId').distinct().collect()#list of cancelled users #Put in a list format gb = []#temporary variable to store lists for row in list_cancelled: gb.append(row[0]) canc_list = [x for x in gb if x != '']#remove the invalid users #Total number of users who canceled print(f"The number of churned users is: {len(canc_list)}") #List of staying users all_users = df.select('userId').distinct().collect() gh = []#a temporary variable to store all users for row in all_users: gh.append(row[0]) stayed_list = set(gh)-set(gb)#list of users staying stayed_list = [x for x in stayed_list if x != '']#remove the invalid users #Total number of users who did not cancel print(f"The number of staying users is: {len(stayed_list)}") #Store both canceled and staying users in new dataframes containng all actions they undertook all_cancelled = df.select("*").where(col('userId').isin(canc_list)) stayed = df.select('*').where(col('userId').isin(stayed_list)) #Redefine a udf for churn churned = udf(lambda x: 0 if x in stayed_list else 1, IntegerType()) #Creat new column which will be our label column to track all users that eventually cancelled their subscription df = df.withColumn('label', churned(col('userId'))) return df, stayed, all_cancelled In this blog post, I will outline the steps I took to fix this code, and then measure the resulting difference in execution performance. In the process, I will explicitly state the best practices I will implement. Let’s jump in this rabbit hole! Define a non-regression test harness Stop! Resist the temptation to start tweaking the code right away! You want to be able to: Make sure that you do not introduce a regression by fixing the code Measure the improvements in terms of performance This is where limiting the scope of the analysis to a function came in handy: it allowed me to use ad hoc and simple tooling: I isolated the original function in a prepare_data_baseline function in a separate prepareData_baseline.py file I created a new file called prepare_data.py with the new version of the prepare_data function I measured the time to perform the processing using the time library And I compared the resulting DataFrames with subtract Because lazy evaluation defers the time when the code is actually executed, I added code that saves the DataFrames to files, thus forcing the materialization of the DataFrames via the execution of the code. I also added these lines in the scope of the time measurement. And this is what it looks like: from pyspark.sql import SparkSession import time, datetime from prepareData import prepare_data from prepareData_baseline import prepare_data_baseline spark = SparkSession \ .builder \ .appName("Churn Analysis Data Preparation Test Harness") \ .getOrCreate() spark.sparkContext.setLogLevel("ERROR") spark.conf.set('spark.sql.adaptive.enabled','false') print(f"AQE enabled: {spark.conf.get('spark.sql.adaptive.enabled')}") df = spark.read.json('data/mini_sparkify_event_data.json') #Baseline version process_time_start = time.perf_counter() # Start timer: begin processing df_baseline, stayed_baseline, all_cancelled_baseline = prepare_data_baseline(df) df_baseline.write.mode("overwrite").json('data/df_baseline') stayed_baseline.write.mode("overwrite").json('data/stayed_baseline') all_cancelled_baseline.write.mode("overwrite").json('data/all_cancelled_baseline') process_time_end = time.perf_counter() # Stop timer: end processing process_time = process_time_end - process_time_start # Elapsed time for processing totalTime = datetime.timedelta(seconds = process_time) print(f"Preparing data took with the baseline version took {totalTime}") #New version process_time_start = time.perf_counter() # Start timer: begin processing df, stayed, all_cancelled = prepare_data(df) df.write.mode("overwrite").json('data/df') stayed.write.mode("overwrite").json('data/stayed') all_cancelled.write.mode("overwrite").json('data/all_cancelled') process_time_end = time.perf_counter() # Stop timer: end processing process_time = process_time_end - process_time_start # Elapsed time for processing totalTime = datetime.timedelta(seconds = process_time) print(f"Preparing data took with the new version took {totalTime}") # Regression Testing def diffDataFrame(df1,df2): return df1.subtract(df2).count() print(f"New processing introduced {diffDataFrame(df,df_baseline)} differences in df.") print(f"New processing introduced {diffDataFrame(all_cancelled,all_cancelled_baseline)} differences in all_cancelled.") print(f"New processing introduced {diffDataFrame(stayed,stayed_baseline)} differences in stayed.") spark.stop() Retro document the requirements This step was quite easy because of the comments that were present in the initial code. This function: Takes a DataFrame containing activities from users, splits it into two groups of activities: activities from users who eventually churned and activities from users who did not, and adds a “label” column to the input DataFrame to tag activities that belong to users that eventually churned (1 if user churned 0 otherwise). If that sounds suspiciously redundant to you I agree. But let’s table that issue for now; we will revisit it once we are satisfied with our new version of the code. Refactor the code The main problem of the code is the use of Python lists to achieve the required results. Those lists are created by collecting the DataFrames onto the Spark driver where the for loops will be processed, making this code not scalable: above a certain number of users the driver memory might become overwhelmed and the program will crash. Also this choice prevents the code from leveraging all the optimizations that come with DataFrames operations. Then the code uses plain Pyspark UDFs for which you incur a performance penalty because of the need to: Deserialize the Spark DataFrame to its Java representation Transfer the resulting Java object to the Python process where the UDF will be executed Serialize back the output of the function to Spark format Beware of the cost of Pyspark UDFs There are ways to mitigate those issues by using PyArrow and vector UDFs when you really need to use them, but this is not one of those times. First, the function creates a “Churn” column, which I guess is for convenience purposes. A user is identified as “churned” if they have been to the “Cancellation Confirmation” page. This is achieved with a withColumn call and a UDF. #Define a udf for cancelled canceled = udf(lambda x: 1 if x == 'Cancellation Confirmation' else 0) #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.withColumn('Churn', canceled(df.page)) There is no need for a UDF in that case, those lines of code can be replaced by a simple column expression like so: #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.withColumn('Churn', (df.page == 'Cancellation Confirmation').cast('integer').cast('string')) I believe the correct type for that new column would be boolean, but for non-regression purposes I had to cast it to a string of 0 or 1. Then the author proceeds to create two lists: one for the users that churned and one for the users that stayed. Since my goal is to avoid those lists, I am going to create the corresponding DataFrames instead: all_users = df.select(df.userId).distinct().where(df.userId != '') churned_users = df.where(df.Churn == '1').select(df.userId).distinct().where(df.userId != '') stayed_users = all_users.subtract(churned_users) First I create a DataFrame of all the non-empty users, then the DataFrame of users that churned, and define the users that stayed as the difference between the two. The author uses the awkwardly created lists together with UDFs to create the all_cancelled and stayed DataFrames. Here is the code for the first one: #List of cancelled list_cancelled = cancelled_df.select('userId').distinct().collect()#list of cancelled users #Put in a list format gb = []#temporary variable to store lists for row in list_cancelled: gb.append(row[0]) canc_list = [x for x in gb if x != '']#remove the invalid users … all_cancelled = df.select("*").where(col('userId').isin(canc_list)) I realize now that the “Put in list format” loop is probably unnecessary. To create the same DataFrame I just do the following: all_cancelled = df.join(churned_users,'userId') The same technique is applied to create the stayed DataFrame: stayed = df.join(stayed_users,'userId') Last the author adds the “label” column to the main DataFrame by using a UDF: #Redefine a udf for churn churned = udf(lambda x: 0 if x in stayed_list else 1, IntegerType()) #Creat new column which will be our label column to track all users that eventually cancelled their subscription df = df.withColumn('label', churned(col('userId'))) Instead I just use a union: df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0))) That triggered a regression because I did not include the null users. I wonder what use could be made of records with null users for training a model to predict churn from users’ behavior, but for non-regression purposes I added those too: empty_users = df.where(df.userId.isNull()) … #Add empty users for non regression purposes df_label = df_label.union(empty_users.withColumn('label',lit(1))) Last, I also had to reorder the columns of my DataFrames for my simple non-regression tests to be successful: # Sort the columns columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn','label'] df_label_sorted = df_label.select(columns) columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn'] all_cancelled_sorted = all_cancelled.select(columns) stayed_sorted = stayed.select(columns) This is my full version of the function: from pyspark.sql.functions import lit def prepare_data(df): ''' Function to prepare the given dataframe and divide into groups of churn and non churn users while returning the original DataFrame with a new label column into a spark dataframe. Args: df- the original dataframe Returns: df - dataframe of the dataset with new column of churn added stayed - dataframe of the non -churn user's activities only. all_cancelled - dataframe of the churn user's activities only. ''' #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.withColumn('Churn', (df.page == 'Cancellation Confirmation').cast('integer').cast('string')) all_users = df.select(df.userId).distinct().where(df.userId != '') churned_users = df.where(df.Churn == '1').select(df.userId).distinct().where(df.userId != '') stayed_users = all_users.subtract(churned_users) empty_users = df.where(df.userId.isNull()) #Store both canceled and staying users in new DataFrames containing all actions they undertook all_cancelled = df.join(churned_users,'userId') stayed = df.join(stayed_users,'userId') df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0))) #Add empty users for non regression purposes df_label = df_label.union(empty_users.withColumn('label',lit(1))) # Sort the columns columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn','label'] df_label_sorted = df_label.select(columns) columns = ['artist','auth','firstName','gender','itemInSession','lastName','length','level','location','method','page','registration','sessionId','song','status','ts','userAgent','userId','Churn'] all_cancelled_sorted = all_cancelled.select(columns) stayed_sorted = stayed.select(columns) #Total number of users who canceled print(f"The number of churned users is: {churned_users.count()}") #Total number of users who did not cancel print(f"The number of staying users is: {stayed_users.count()}") return df_label_sorted, stayed_sorted, all_cancelled_sorted Non regression and performance I was able to verify that I had not introduced any regression in my version of the function on my desktop with Spark 3.3. In order to get meaningful performance measurements I needed to use the full 12G JSON dataset. Otherwise, with small data, most of the time is spent on overhead and results vary wildly. So I switched to our CML data service using Spark 3.2 and adapted the code accordingly. CML uses Spark on Kubernetes and the default is dynamic allocation of executors. I had to disable that to get a stable environment and thus, meaningful measures: import time, datetime from prepareData import prepare_data from prepareData_baseline import prepare_data_baseline from prepareData_improved import prepare_data_improved import cml.data_v1 as cmldata from env import S3_ROOT, S3_HOME, CONNECTION_NAME conn = cmldata.get_connection(CONNECTION_NAME) spark = ( SparkSession.builder.appName(conn.app_name) .config("spark.sql.hive.hwc.execution.mode", "spark") .config("spark.dynamicAllocation.enabled","false") .config("spark.executor.instances", 3) .config("spark.executor.memory","32g") .config("spark.executor.cores",4) .config("spark.yarn.access.hadoopFileSystems", conn.hive_external_dir) .getOrCreate() ) spark.sparkContext.setLogLevel("ERROR") spark.conf.set('spark.sql.adaptive.enabled','true') print(f"AQE enabled: {spark.conf.get('spark.sql.adaptive.enabled')}") That got me the desired result: I then found out that the full 12G data set contained a corrupt record that I had to deal with, and while I was at it I converted the file to Parquet format to save me some time: Convert early to compressed columnar formats (Parquet, ORC) I created a function that performs the tests to avoid repetitive code in which I also added calls to setJobGroup and setJobDescription to improve the readability of the Spark UI: def measureDataPreparation(df,f,versionName): spark.sparkContext.setJobGroup(versionName,"") # Start timer: begin processing process_time_start = time.perf_counter() df, stayed, all_cancelled = f(df) spark.sparkContext.setJobDescription("Write /data/df") df.write.mode("overwrite").json(S3_HOME + '/data/df') spark.sparkContext.setJobDescription("Write /data/stayed") stayed.write.mode("overwrite").json(S3_HOME + '/data/stayed') spark.sparkContext.setJobDescription("Write /data/all_cancelled") all_cancelled.write.mode("overwrite").json(S3_HOME + '/data/all_cancelled') # Stop timer: end processing process_time_end = time.perf_counter() # Elapsed time for processing process_time = process_time_end - process_time_start totalTime = datetime.timedelta(seconds = process_time) print(f"Preparing data with the {versionName} took {totalTime}") Use setJobGroup and setJobDescription to improve readability of the Spark UI And this is how the Spark UI looks as a result: Since I had established that I had not introduced any regression, I also removed the regression tests. Here is the the relevant part of the session’s output: measureDataPreparation(df,prepare_data_baseline,"baseline version") The number of churned users is: 4982 The number of staying users is: 17282 Preparing data with the baseline version took 0:09:11.799036 measureDataPreparation(df,prepare_data,"no regression version") The number of churned users is: 4982 The number of staying users is: 17282 Preparing data with the no regression version took 0:01:48.224514 Great success! The new version is more than four times more efficient! Further improvements Since I no longer need to test for non regression I can remove the sorting of the columns. I can also remove the code that prints the counts of the churned and stayed users. This code does not belong in a function that very likely will run unattended in a data pipeline. It triggers distributed execution to compute results that nobody will see. It should be left to the code that calls the function to log that kind of information or not. This is also an instance of breaking the following rule: Remove code that helped debugging with count(), take() or show() in production I checked the rest of the initial code, and after exhaustive data exploration and right before splitting the data set for training purposes, the author does remove the rows with null users. There is no point in carrying around this extra baggage all this time. In fact this breaks another rule of big data processing: Filter early Finally, I removed the casting of the “Churn” column and left it as a boolean. I also checked that it was not used outside of this function and renamed it “churn” because I hated that uppercase “C” with all the passion of a thousand white hot blazing suns. This is the final version of the code: from pyspark.sql.functions import lit def prepare_data_improved(df): ''' Function to prepare the given DataFrame and divide into groups of churn and non churn users while returning the original DataFrame with a new label column into a Spark DataFrame. Args: df- the original DataFrame Returns: df - DataFrame of the dataset with new column of churn added stayed - DataFrame of the non -churn user's activities only. all_cancelled - DataFrame of the churn user's activities only. ''' #define a new column 'churn' where 1 indicates cancellation of subscription, 0 otherwise df = df.where(df.userId != '').withColumn('churn', (df.page == 'Cancellation Confirmation')) all_users = df.select(df.userId).distinct() churned_users = df.where(df.churn).select(df.userId).distinct() stayed_users = all_users.subtract(churned_users) #Store both canceled and staying users in new DataFrames containing all actions they undertook all_cancelled = df.join(churned_users,'userId') stayed = df.join(stayed_users,'userId') df_label = all_cancelled.withColumn('label',lit(1)).union(stayed.withColumn('label',lit(0))) return df_label, stayed, all_cancelled Conclusion Now that I have achieved non regression using DataFrame exclusively, and that I also have an improved version, I should be able to measure the benefits of using the Spark cache and of the Adaptive Query Execution engine. Here are the full results: In this limited experiment, the number one factor that influences the performance of the execution is the refactoring of the Spark code to remove the distributed processing anti-patterns. Caching the data, improving the code further, or using AQE all bring marginal improvements compared to the elimination of the technical debt. The return on investment of training is always a thorny issue because of the difficulty to conveniently measure it in a spreadsheet but, with this experiment, I hope I have shown that the lack of skills should be a major concern for any organization running Spark workloads. If you’d like to get hands-on experience with Spark 3.2, as well as other tools and techniques for making your Spark jobs run at peak performance, sign up for Cloudera’s Apache Spark Performance Tuning course. If you need an introduction to AQE kindly refer to my previous blog post. The post Spark Technical Debt Deep Dive appeared first on Cloudera Blog. View the full article
  5. AWS Glue for Apache Spark now supports three open source data lake storage frameworks: Apache Hudi, Apache Iceberg, and Linux Foundation Delta Lake. These frameworks allow you to read and write data in Amazon Simple Storage Service (Amazon S3) in a transactionally consistent manner. AWS Glue is a serverless, scalable data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources. This feature removes the need to install a separate connector and reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark jobs. View the full article
  6. What is Cloudera Data Engineering (CDE) ? Cloudera Data Engineering is a serverless service for Cloudera Data Platform (CDP) that allows you to submit jobs to auto-scaling virtual clusters. CDE enables you to spend more time on your applications, and less time on infrastructure. CDE allows you to create, manage, and schedule Apache Spark jobs without the overhead of creating and maintaining Spark clusters. In addition to this, you can define virtual clusters with a range of CPU and memory resources, and the cluster scales up and down as needed to execute your Spark workloads, helping control your cloud costs. Managed, serverless Spark service helps our customers in a number of ways: Auto scaling of compute to eliminate static infrastructure costs. This feature ensures that customers do not have to maintain a large infrastructure footprint and hence reduce total cost of ownership. Ability for business users to easily control their own compute needs with a click of a button, without IT intervention. Complete view of the job performance, logging and debugging through a single pane of glass to enable efficient development on Spark. Refer to the following Cloudera blog to understand the full potential of Cloudera Data Engineering. Why should technology partners care about CDE? Unlike traditional data engineering workflows that have relied on a patchwork of tools for preparing, operationalizing, and debugging data pipelines, Cloudera Data Engineering is designed for efficiency and speed — seamlessly integrating and securing data pipelines to any CDP service including Machine Learning, Data Warehouse, Operational Database, or any other analytic tool in your business. Partner tools that leverage CDP as their backend store can leverage this new service to ensure their customers can take advantage of a serverless architecture for Spark. ISV Partners, like Precisely, support Cloudera’s hybrid vision. Precisely Data Integration, Change Data Capture and Data Quality tools support CDP Public Cloud as well as CDP Private Cloud. Precisely end-customers can now design a pipeline once and deploy it anywhere. Data pipelines that are bursty in nature can leverage the public cloud CDE service while longer running persistent loads can run on-prem. This ensures that the right data pipelines are running on the most cost-effective engines available in the market today. Using the CDE Integration API: CDE provides a robust API for integration with your existing continuous integration/continuous delivery platforms. The Cloudera Data Engineering service API is documented in Swagger. You can view the API documentation and try out individual API calls by accessing the API DOC link in any virtual cluster: In the CDE web console, select an environment. Click the Cluster Details icon in any of the listed virtual clusters. Click the link under API DOC. For further details on the API, please refer to the following doc link here. Custom base Image for Kubernetes: Partners who need to run their own business logic and require custom binaries or packages available on the Spark engine platform, can now leverage this feature for Cloudera Data Engineering. We believe customized engine images would allow greater flexibility to our partners to build cloud-native integrations and could potentially be leveraged by our enterprise customers as well. The following set of steps will describe the ability to run Spark jobs with dependencies on external libraries and packages. The libraries and packages will be installed on top of the base image to make them available to the Spark executors. First, obtain the latest CDE CLI a) Create a virtual cluster b) Go to virtual cluster details page c) Download the CLI Learn more on how to use the CLI here Run Spark jobs on customized container image – Overview Custom images are based on the base dex-spark-runtime image, which is accessible from the Cloudera docker repository. Users can then layer their packages and custom libraries on top of the base image. The final image is uploaded to a docker repo, which is then registered with CDE as a job resource. New jobs are defined with references to the resource which automatically downloads the custom runtime image to run the Spark drivers and executors. Run Spark jobs on customized container image: Steps 1. Pull “dex-spark-runtime” image from “docker.repository.cloudera.com” $ docker pull container.repository.cloudera.com/cloudera/dex/dex-spark-runtime:<version> Note: “docker.repository.cloudera.com” is behind the paywall and will require credentials to access, please ask your account team to provide 2. Create your “custom-dex-spark-runtime” image, based on “dex-spark-runtime” image $ docker build --network=host -t <company-registry>/custom-dex-spark-runtime:<version> . -f Dockerfile Dockerfile Example: FROM docker.repository.cloudera.com/<company-name>/dex-spark-runtime:<version> USER root RUN yum install ${YUM_OPTIONS} <package-to-install> && yum clean all && rm -rf /var/cache/yum RUN dnf install ${DNF_OPTIONS} <package-to-install> && dnf clean all && rm -rf /var/cache/dnf USER ${DEX_UID} 3. Push image to your company Docker registry $ docker push <company-registry>/custom-dex-spark-runtime:<version> 4. Create ImagePullSecret in DE cluster for the company’s Docker registry (Optional) REST API: # POST /api/v1/credentials { "name": "<company-registry-basic-credentials>", "type": "docker", "uri": "<company-registry>", "secret": { "username": "foo", "password": "bar", } } CDE CLI: === credential === ./cde credential create --type=docker-basic --name=docker-sandbox-cred --docker-server=https://docker-sandbox.infra.cloudera.com --docker-username=foo --tls-insecure --user srv_dex_mc --vcluster-endpoint https://gbz7t69f.cde-vl4zqll4.dex-a58x.svbr-nqvp.int.cldr.work/dex/api/v1 Note: Credentials will be stored as Kubernetes “Secret”. Never stored by DEX API. 5. Register “custom-dex-spark-runtime” in DE as a “Custom Spark Runtime Image” Resource. REST API: # POST /api/v1/resources { "name":"", "type":"custom-spark-runtime-container-image", "engine": "spark2", "image": <company-registry>/custom-dex-spark-runtime:<version>, "imagePullSecret": <company-registry-basic-credentials> } CDE CLI: === runtime resources === ./cde resource create --type="custom-runtime-image" --image-engine="spark2" --name="custom-dex-qe-1_1" --image-credential=docker-sandbox-cred --image="docker-sandbox.infra.cloudera.com/dex-qe/custom-dex-qe:1.1" --tls-insecure --user srv_dex_mc --vcluster-endpoint https://gbz7t69f.cde-vl4zqll4.dex-a58x.svbr-nqvp.int.cldr.work/dex/api/v1 6. You should now be able to define Spark jobs referencing the custom-dex-spark-runtime REST API: # POST /api/v1/jobs { "name":"spark-custom-image-job", "spark":{ "imageResource": "CustomSparkImage-1", ... } ... } CDE CLI: === job create === ./cde job create --type spark --name cde-job-docker --runtime-image-resource-name custom-dex-qe-1_1 --application-file /tmp/numpy_app.py --num-executors 1 --executor-memory 1G --driver-memory 1G --tls-insecure --user srv_dex_mc --vcluster-endpoint https://gbz7t69f.cde-vl4zqll4.dex-a58x.svbr-nqvp.int.cldr.work/dex/api/v1 7. Once the job is created either trigger it to run through Web UI or by running the following command in CLI: $> cde job run --name cde-job-docker In conclusion We introduced the “Custom Base Image” feature as part of our Design Partner Program to elicit feedback from our ISV partners. The response has been overwhelmingly positive and building custom integrations with our cloud-native CDE offering has never been easier. As a partner, you can leverage Spark running on Kubernetes Infrastructure for free. You can launch a trial of CDE on CDP in minutes here, giving you a hands-on introduction to data engineering innovations in the Public Cloud. References: https://www.cloudera.com/tutorials/cdp-getting-started-with-cloudera-data-engineering.html The post Cloudera Data Engineering – Integration steps to leverage Spark on Kubernetes appeared first on Cloudera Blog. View the full article
  7. Data driven enterprises are transforming their businesses by migrating their on-prem data lakes and data warehouses to the cloud so they can enable new analytics at scale. As every enterprise looks to migrate, it’s important that IT leaders don’t forget about a key data stakeholder, the data scientist. Open source software (OSS) and libraries are a crucial piece of a data scientists toolkit and we’ve made significant progress to make OSS easier to manage for data scientists on Google Cloud’s data analytics platform. Data scientists rely on a suite of powerful open source applications to work on solving the world’s biggest challenges. With the integration of leading open source tools like Dask and RAPIDS on Google Cloud, the Dataproc team is making NVIDIA GPU-accelerated data science at scale more accessible. Scott McClellan Sr Director, Data Science Product Group, NVIDIA Dataproc Hub feature is now Generally Available: Secure and scale open source machine learning Dataproc Hub, a feature now generally available for Dataproc users, provides an easier way to scale processing for common data science libraries and notebooks, govern custom open source clusters, and manage costs so that enterprises can maximize their existing skills and software investments. Dataproc Hub features include: Ready to use big data frameworks including JupyterLab with BigQuery, Presto, PySpark, SparkR, Dask, and Tensorflow on Spark. Access to custom Dataproc clusters within an isolated and controlled data science sandbox. Data scientists do not have to rely on IT to make changes to the programming environment. Access to BigQuery, Cloud Storage and AI Platform using the notebook users’ credentials ensures that permissions are always in sync and the right data is available to the right users. IT cost controls that include the ability to set auto scaling policies, CPU/RAM sizes and NVIDIA GPUs, auto-deletions and timeouts, and more. Integrated security controls including custom image versions, locations, VPC-SC, AXT, CMEK, Sole tenancy, shielded VMs, Apache Ranger, and Personal Cluster Authentication, to name a few. Easy to generate templated Dataproc configurations that can be reused for other clusters based on existing Dataproc clusters. A simple export is all that is needed. The current state of open source machine learning on Google Cloud Dataproc Hub was created by working in partnership with several companies that were facing rapid adoption of cloud sized datasets (big data), machine learning, and IoT. These new and large datasets were coupled with data analysis techniques and tools that simply do not fit into the traditional data warehousing model. Data science teams were combining methodologies across ETL (creating their own data structures), administration (using programming skills to configure resource sizing), and reporting (using Jupyter notebooks for exchanging data results). In addition, data scientists often work with unstructured data, which does not follow the same table/view permissions model as the data warehouse. The IT leaders we worked with wanted an easy way to control and secure data science environments. They also wanted to maintain production stability, control costs, and ensure security and governance controls were being met. They asked us to simplify the process of creating a secured data science environment that could serve as an extension of their BigQuery data warehouse. At the same time, the data scientists who are setting up their own data science environments felt frustrated by having to do what they consider “IT work” such as figuring out various security connections and package installations. They wanted to focus on exploring data and building models with the tools they are familiar with. Working with these organizations, we built Dataproc Hub to eliminate these primary concerns of both IT leaders and data science teams. IT governed Dataproc clusters personalized to your data scientist's use case With Dataproc Hub, you can extend existing data warehouse investments at a cost that grows in proportion to the value without having to compromise on security and compliance standards. Dataproc Hub allows IT leaders to specify templated Dataproc clusters that can leverage a variety of controls ranging from custom images which can be used to include standard IT software such as virus protection and asset management software to autoscaling policies that let customers automatically scale their code within limits set in advance. Dataproc templates can easily be created from a running Dataproc cluster using the export command. Customers of AI Platform Notebooks that want to use their BigQuery or Cloud Storage data for model training, feature engineering, and preprocessing will often exceed the limits of a single node machine. Data scientists also want to quickly iterate on ideas from inside the notebook environment without having to spend time packaging up their models to send off into a separate service just to try out an idea. With Dataproc Hub, data scientists can quickly tap into APIs like PySpark and Dask that are configured to autoscale to meet the demands of the data without having to do a lot of setup and configuration. They can even accelerate their Spark XGBoost pipelines with NVIDIA GPUs to process their data 44x faster at a 14x reduction in cost vs CPUs. The data scientist is in full control of the software environment spawned by Dataproc Hub and can install their own packages, libraries and configurations, achieving freedom within the framework set by IT. Using Dataproc Hub and Python-based libraries for genomic analysis One example of this need to balance IT guardrails with data science flexibility is in the field of genomics, where data volumes continue to explode. By 2025, an estimated 40 exabytes of storage capacity will be required for human genomic data. Researchers need the freedom to try out a variety of techniques and run large scale jobs without IT intervention. However, IT organizations need to protect personal health data that comes with genomics datasets — something that Google Cloud, Dataproc, and the open source community are well suited to help with. If you want to see the genomic analysis we talked about above in action, please register for our upcoming webinar where we will demo Dataproc Hub. Next steps The Dataproc Hub feature is now generally available and ready for use today. To get started, log into the Google Cloud Console and from the Dataproc page, choose Notebooks and then “New Instance”. Name the instance and populate the Dataproc Hub fields to configure the settings according to your standards. Alternatively, you can accept the default settings to be provided with a Dataproc Hub environment based on two example clusters. The IP address of the Dataproc Hub can then be provided to data scientists so teams can self-provision Jupyter environments based on Dataproc clusters. When the data task is completed, the user can go to File->Return to Control Panel and then “Stop Cluster”. Cluster templates can also be set with a TTL to ensure that resources are cleaned up.
  • Forum Statistics

    63.7k
    Total Topics
    61.7k
    Total Posts
×
×
  • Create New...