Jump to content

Search the Community

Showing results for tags 'apache kafka'.

  • Search By Tags

    Type tags separated by commas.
  • Search By Author

Content Type


Forums

There are no results to display.

There are no results to display.


Find results in...

Find results that contain...


Date Created

  • Start

    End


Last Updated

  • Start

    End


Filter by number of...

Joined

  • Start

    End


Group


Website URL


LinkedIn Profile URL


About Me


Cloud Platforms


Cloud Experience


Development Experience


Current Role


Skills


Certifications


Favourite Tools


Interests

Found 8 results

  1. Finally RudderStack keys "Why they did not prefer Apache Kafka over PostgreSQL for building RudderStack?". Focuses on the challenges using Apache KafkaView the full article
  2. Real-time data streaming and event processing present scalability and management challenges. AWS offers a broad selection of managed real-time data streaming services to effortlessly run these workloads at any scale. In this post, Nexthink shares how Amazon Managed Streaming for Apache Kafka (Amazon MSK) empowered them to achieve massive scale in event processing. Experiencing business hyper-growth, Nexthink migrated to AWS to overcome the scaling limitations of on-premises solutions. With Amazon MSK, Nexthink now seamlessly processes trillions of events per day, reaching over 5 GB per second of aggregated throughput. In the following sections, Nexthink introduces their product and the need for scalability. They then highlight the challenges of their legacy on-premises application and present their transition to a cloud-centered software as a service (SaaS) architecture powered by Amazon MSK. Finally, Nexthink details the benefits achieved by adopting Amazon MSK. Nexthink’s need to scale Nexthink is the leader in digital employee experience (DeX). The company is shaping the future of work by providing IT leaders and C-levels with insights into employees’ daily technology experiences at the device and application level. This allows IT to evolve from reactive problem-solving to proactive optimization. The Nexthink Infinity platform combines analytics, monitoring, automation, and more to manage the employee digital experience. By collecting device and application events, processing them in real time, and storing them, our platform analyzes data to solve problems and boost experiences for over 15 million employees across five continents. In just 3 years, Nexthink’s business grew tenfold, and with the introduction of more real-time data our application had to scale from processing 200 MB per second to 5 GB per second and trillions of events daily. To enable this growth, we modernized our application from an on-premises single-tenant monolith to a cloud-based scalable SaaS solution powered by Amazon MSK. The next sections detail our modernization journey, including the challenges we faced and the benefits we realized with our new cloud-centered, AWS-based architecture. The on-premises solution and its challenges Let’s first explore our previous on-premises solution, Nexthink V6, before examining how Amazon MSK addressed its challenges. The following diagram illustrates its architecture. V6 was made up of two monolithic, single-tenant Java and C++ applications that were tightly coupled. The portal was a backend-for-frontend Java application, and the core engine was an in-house C++ in-memory database application that was also handling device connections, data ingestion, aggregation, and querying. By bundling all these functions together, the engine became difficult to manage and improve. V6 also lacked scalability. Initially supporting 10,000 devices, some new tenants had over 300,000 devices. We reacted by deploying multiple V6 engines per tenant, increasing complexity and cost, hampering user experience, and delaying time to market. This also led to longer proof of concept and onboarding cycles, which hurt the business. Furthermore, the absence of a streaming platform like Kafka created dependencies between teams through tight HTTP/gRPC coupling. Additionally, teams couldn’t access real-time events before ingestion into the database, limiting feature development. We also lacked a data buffer, risking potential data loss during outages. Such constraints impeded innovation and increased risks. In summary, although the V6 system served its initial purpose, reinventing it with cloud-centered technologies became imperative to enhance scalability, reliability, and foster innovation by our engineering and product teams. Transitioning to a cloud-centered architecture with Amazon MSK To achieve our modernization goals, after thorough research and iterations, we implemented an event-driven microservices design on Amazon Elastic Kubernetes Service (Amazon EKS), using Kafka on Amazon MSK for distributed event storage and streaming. Our transition from the v6 on-prem solution to the cloud-centered platform was phased over four iterations: Phase 1 – We lifted and shifted from on premises to virtual machines in the cloud, reducing operational complexities and accelerating proof of concept cycles while transparently migrating customers. Phase 2 – We extended the cloud architecture by implementing new product features with microservices and self-managed Kafka on Kubernetes. However, operating Kafka clusters ourselves proved overly difficult, leading us to Phase 3. Phase 3 – We switched from self-managed Kafka to Amazon MSK, improving stability and reducing operational costs. We realized that managing Kafka wasn’t our core competency or differentiator, and the overhead was high. Amazon MSK enabled us to focus on our core application, freeing us from the burden of undifferentiated Kafka management. Phase 4 – Finally, we eliminated all legacy components, completing the transition to a fully cloud-centered SaaS platform. This multi-year journey of learning and transformation took 3 years. Today, after our successful transition, we use Amazon MSK for two key functions: Real-time data ingestion and processing of trillions of daily events from over 15 million devices worldwide, as illustrated in the following figure. Enabling an event-driven system that decouples data producers and consumers, as depicted in the following figure. To further enhance our scalability and resilience, we adopted a cell-based architecture using the wide availability of Amazon MSK across AWS Regions. We currently operate over 10 cells, each representing an independent regional deployment of our SaaS solution. This cell-based approach minimizes the area of impact in case of issues, addresses data residency requirements, and enables horizontal scaling across AWS Regions, as illustrated in the following figure. Benefits of Amazon MSK Amazon MSK has been critical in enabling our event-driven design. In this section, we outline the main benefits we gained from its adoption. Improved data resilience In our new architecture, data from devices is pushed directly to Kafka topics in Amazon MSK, which provides high availability and resilience. This makes sure that events can be safely received and stored at any time. Our services consuming this data inherit the same resilience from Amazon MSK. If our backend ingestion services face disruptions, no event is lost, because Kafka retains all published messages. When our services resume, they seamlessly continue processing from where they left off, thanks to Kafka’s producer semantics, which allow processing messages exactly-once, at-least-once, or at-most-once based on application needs. Amazon MSK enables us to tailor the data retention duration to our specific requirements, ranging from seconds to unlimited duration. This flexibility grants uninterrupted data availability to our application, which wasn’t possible with our previous architecture. Furthermore, to safeguard data integrity in the event of processing errors or corruption, Kafka enabled us to implement a data replay mechanism, ensuring data consistency and reliability. Organizational scaling By adopting an event-driven architecture with Amazon MSK, we decomposed our monolithic application into loosely coupled, stateless microservices communicating asynchronously via Kafka topics. This approach enabled our engineering organization to scale rapidly from just 4–5 teams in 2019 to over 40 teams and approximately 350 engineers today. The loose coupling between event publishers and subscribers empowered teams to focus on distinct domains, such as data ingestion, identification services, and data lakes. Teams could develop solutions independently within their domains, communicating through Kafka topics without tight coupling. This architecture accelerated feature development by minimizing the risk of new features impacting existing ones. Teams could efficiently consume events published by others, offering new capabilities more rapidly while reducing cross-team dependencies. The following figure illustrates the seamless workflow of adding new domains to our system. Furthermore, the event-driven design allowed teams to build stateless services that could seamlessly auto scale based on MSK metrics like messages per second. This event-driven scalability eliminated the need for extensive capacity planning and manual scaling efforts, freeing up development time. By using an event-driven microservices architecture on Amazon MSK, we achieved organizational agility, enhanced scalability, and accelerated innovation while minimizing operational overhead. Seamless infrastructure scaling Nexthink’s business grew tenfold in 3 years, and many new capabilities were added to the product, leading to a substantial increase in traffic from 200 MB per second to 5 GB per second. This exponential data growth was enabled by the robust scalability of Amazon MSK. Achieving such scale with an on-premises solution would have been challenging and expensive, if not infeasible. Attempting to self-manage Kafka imposed unnecessary operational overhead without providing business value. Running it with just 5% of today’s traffic was already complex and required two engineers. At today’s volumes, we estimated needing 6–10 dedicated staff, increasing costs and diverting resources away from core priorities. Real-time capabilities By channeling all our data through Amazon MSK, we enabled real-time processing of events. This unlocked capabilities like real-time alerts, event-driven triggers, and webhooks that were previously unattainable. As such, Amazon MSK was instrumental in facilitating our event-driven architecture and powering impactful innovations. Secure data access Transitioning to our new architecture, we met our security and data integrity goals. With Kafka ACLs, we enforced strict access controls, allowing consumers and producers to only interact with authorized topics. We based these granular data access controls on criteria like data type, domain, and team. To securely scale decentralized management of topics, we introduced proprietary Kubernetes Custom Resource Definitions (CRDs). These CRDs enabled teams to independently manage their own topics, settings, and ACLs without compromising security. Amazon MSK encryption made sure that the data remained encrypted at rest and in transit. We also introduced a Bring Your Own Key (BYOK) option, allowing application-level encryption with customer keys for all single-tenant and multi-tenant topics. Enhanced observability Amazon MSK gave us great visibility into our data flows. The out-of-the-box Amazon CloudWatch metrics let us see the amount and types of data flowing through each topic and cluster. This helped us quantify the usage of our product features by tracking data volumes at the topic level. The Amazon MSK operational metrics enabled effortless monitoring and right-sizing of clusters and brokers. Overall, the rich observability of Amazon MSK facilitated data-driven decisions about architecture and product features. Conclusion Nexthink’s journey from an on-premises monolith to a cloud SaaS was streamlined by using Amazon MSK, a fully managed Kafka service. Amazon MSK allowed us to scale seamlessly while benefiting from enterprise-grade reliability and security. By offloading Kafka management to AWS, we could stay focused on our core business and innovate faster. Going forward, we plan to further improve performance, costs, and scalability by adopting Amazon MSK capabilities such as tiered storage and AWS Graviton-based EC2 instance types. We are also working closely with the Amazon MSK team to prepare for upcoming service features. Rapidly adopting new capabilities will help us remain at the forefront of innovation while continuing to grow our business. To learn more about how Nexthink uses AWS to serve its global customer base, explore the Nexthink on AWS case study. Additionally, discover other customer success stories with Amazon MSK by visiting the Amazon MSK blog category. About the Authors Moe Haidar is a principal engineer and special projects lead @ CTO office of Nexthink. He has been involved with AWS since 2018 and is a key contributor to the cloud transformation of the Nexthink platform to AWS. His focus is on product and technology incubation and architecture, but he also loves doing hands-on activities to keep his knowledge of technologies sharp and up to date. He still contributes heavily to the code base and loves to tackle complex problems. Simone Pomata is Senior Solutions Architect at AWS. He has worked enthusiastically in the tech industry for more than 10 years. At AWS, he helps customers succeed in building new technologies every day. Magdalena Gargas is a Solutions Architect passionate about technology and solving customer challenges. At AWS, she works mostly with software companies, helping them innovate in the cloud. She participates in industry events, sharing insights and contributing to the advancement of the containerization field. View the full article
  3. Over the past few years, Apache Kafka has emerged as the leading standard for streaming data. Fast-forward to the present day: Kafka has achieved ubiquity, being adopted by at least 80% of the Fortune 100. This widespread adoption is attributed to Kafka's architecture, which goes far beyond basic messaging. Kafka's architecture versatility makes it exceptionally suitable for streaming data at a vast "internet" scale, ensuring fault tolerance and data consistency crucial for supporting mission-critical applications. Flink is a high-throughput, unified batch and stream processing engine, renowned for its capability to handle continuous data streams at scale. It seamlessly integrates with Kafka and offers robust support for exactly-once semantics, ensuring each event is processed precisely once, even amidst system failures. Flink emerges as a natural choice as a stream processor for Kafka. While Apache Flink enjoys significant success and popularity as a tool for real-time data processing, accessing sufficient resources and current examples for learning Flink can be challenging. View the full article
  4. This article is part of a project that’s split into two main phases. The first phase focuses on building a data pipeline. This involves getting data from an API and storing it in a PostgreSQL database. In the second phase, we’ll develop an application that uses a language model to interact with this database. Ideal for those new to data systems or language model applications, this project is structured into two segments: This initial article guides you through constructing a data pipeline utilizing Kafka for streaming, Airflow for orchestration, Spark for data transformation, and PostgreSQL for storage. To set-up and run these tools we will use Docker.The second article, which will come later, will delve into creating agents using tools like LangChain to communicate with external databases.This first part project is ideal for beginners in data engineering, as well as for data scientists and machine learning engineers looking to deepen their knowledge of the entire data handling process. Using these data engineering tools firsthand is beneficial. It helps in refining the creation and expansion of machine learning models, ensuring they perform effectively in practical settings. This article focuses more on practical application rather than theoretical aspects of the tools discussed. For detailed understanding of how these tools work internally, there are many excellent resources available online. OverviewLet’s break down the data pipeline process step-by-step: Data Streaming: Initially, data is streamed from the API into a Kafka topic.Data Processing: A Spark job then takes over, consuming the data from the Kafka topic and transferring it to a PostgreSQL database.Scheduling with Airflow: Both the streaming task and the Spark job are orchestrated using Airflow. While in a real-world scenario, the Kafka producer would constantly listen to the API, for demonstration purposes, we’ll schedule the Kafka streaming task to run daily. Once the streaming is complete, the Spark job processes the data, making it ready for use by the LLM application.All of these tools will be built and run using docker, and more specifically docker-compose. Overview of the data pipeline. Image by the author.Now that we have a blueprint of our pipeline, let’s dive into the technical details ! Local setupFirst you can clone the Github repo on your local machine using the following command: git clone https://github.com/HamzaG737/data-engineering-project.gitHere is the overall structure of the project: ├── LICENSE ├── README.md ├── airflow │ ├── Dockerfile │ ├── __init__.py │ └── dags │ ├── __init__.py │ └── dag_kafka_spark.py ├── data │ └── last_processed.json ├── docker-compose-airflow.yaml ├── docker-compose.yml ├── kafka ├── requirements.txt ├── spark │ └── Dockerfile └── src ├── __init__.py ├── constants.py ├── kafka_client │ ├── __init__.py │ └── kafka_stream_data.py └── spark_pgsql └── spark_streaming.pyThe airflow directory contains a custom Dockerfile for setting up airflow and a dags directory to create and schedule the tasks.The data directory contains the last_processed.json file which is crucial for the Kafka streaming task. Further details on its role will be provided in the Kafka section.The docker-compose-airflow.yaml file defines all the services required to run airflow.The docker-compose.yaml file specifies the Kafka services and includes a docker-proxy. This proxy is essential for executing Spark jobs through a docker-operator in Airflow, a concept that will be elaborated on later.The spark directory contains a custom Dockerfile for spark setup.src contains the python modules needed to run the application.To set up your local development environment, start by installing the required Python packages. The only essential package is psycopg2-binary. You have the option to install just this package or all the packages listed in the requirements.txt file. To install all packages, use the following command: pip install -r requirements.txtNext let’s dive step by step into the project details. About the APIThe API is RappelConso from the French public services. It gives access to data relating to recalls of products declared by professionals in France. The data is in French and it contains initially 31 columns (or fields). Some of the most important are: reference_fiche (reference sheet): Unique identifier of the recalled product. It will act as the primary key of our Postgres database later.categorie_de_produit (Product category): For instance food, electrical appliance, tools, transport means, etc …sous_categorie_de_produit (Product sub-category): For instance we can have meat, dairy products, cereals as sub-categories for the food category.motif_de_rappel (Reason for recall): Self explanatory and one of the most important fields.date_de_publication which translates to the publication date.risques_encourus_par_le_consommateur which contains the risks that the consumer may encounter when using the product.There are also several fields that correspond to different links, such as link to product image, link to the distributers list, etc..You can see some examples and query manually the dataset records using this link. We refined the data columns in a few key ways: Columns like ndeg_de_version and rappelguid, which were part of a versioning system, have been removed as they aren’t needed for our project.We combined columns that deal with consumer risks — risques_encourus_par_le_consommateur and description_complementaire_du_risque — for a clearer overview of product risks.The date_debut_fin_de_commercialisation column, which indicates the marketing period, has been divided into two separate columns. This split allows for easier queries about the start or end of a product’s marketing.We’ve removed accents from all columns except for links, reference numbers, and dates. This is important because some text processing tools struggle with accented characters.For a detailed look at these changes, check out our transformation script at src/kafka_client/transformations.py. The updated list of columns is available insrc/constants.py under DB_FIELDS. Kafka streamingTo avoid sending all the data from the API each time we run the streaming task, we define a local json file that contains the last publication date of the latest streaming. Then we will use this date as the starting date for our new streaming task. To give an example, suppose that the latest recalled product has a publication date of 22 november 2023. If we make the hypothesis that all of the recalled products infos before this date are already persisted in our Postgres database, We can now stream the data starting from the 22 november. Note that there is an overlap because we may have a scenario where we didn’t handle all of the data of the 22nd of November. The file is saved in ./data/last_processed.json and has this format: {last_processed:"2023-11-22"}By default the file is an empty json which means that our first streaming task will process all of the API records which are 10 000 approximately. Note that in a production setting this approach of storing the last processed date in a local file is not viable and other approaches involving an external database or an object storage service may be more suitable. The code for the kafka streaming can be found on ./src/kafka_client/kafka_stream_data.py and it involves primarily querying the data from the API, making the transformations, removing potential duplicates, updating the last publication date and serving the data using the kafka producer. The next step is to run the kafka service defined the docker-compose defined below: version: '3' services: kafka: image: 'bitnami/kafka:latest' ports: - '9094:9094' networks: - airflow-kafka environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - ./kafka:/bitnami/kafka kafka-ui: container_name: kafka-ui-1 image: provectuslabs/kafka-ui:latest ports: - 8800:8080 depends_on: - kafka environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: PLAINTEXT://kafka:9092 DYNAMIC_CONFIG_ENABLED: 'true' networks: - airflow-kafka networks: airflow-kafka: external: trueThe key highlights from this file are: The kafka service uses a base image bitnami/kafka.We configure the service with only one broker which is enough for our small project. A Kafka broker is responsible for receiving messages from producers (which are the sources of data), storing these messages, and delivering them to consumers (which are the sinks or end-users of the data). The broker listens to port 9092 for internal communication within the cluster and port 9094 for external communication, allowing clients outside the Docker network to connect to the Kafka broker.In the volumes part, we map the local directory kafka to the docker container directory /bitnami/kafka to ensure data persistence and a possible inspection of Kafka’s data from the host system.We set-up the service kafka-ui that uses the docker image provectuslabs/kafka-ui:latest . This provides a user interface to interact with the Kafka cluster. This is especially useful for monitoring and managing Kafka topics and messages.To ensure communication between kafka and airflow which will be run as an external service, we will use an external network airflow-kafka.Before running the kafka service, let’s create the airflow-kafka network using the following command: docker network create airflow-kafkaNow everything is set to finally start our kafka service docker-compose up After the services start, visit the kafka-ui at http://localhost:8800/. Normally you should get something like this: Overview of the Kafka UI. Image by the author.Next we will create our topic that will contain the API messages. Click on Topics on the left and then Add a topic at the top left. Our topic will be called rappel_conso and since we have only one broker we set the replication factor to 1. We will also set the partitions number to 1 since we will have only one consumer thread at a time so we won’t need any parallelism. Finally, we can set the time to retain data to a small number like one hour since we will run the spark job right after the kafka streaming task, so we won’t need to retain the data for a long time in the kafka topic. Postgres set-upBefore setting-up our spark and airflow configurations, let’s create the Postgres database that will persist our API data. I used the pgadmin 4 tool for this task, however any other Postgres development platform can do the job. To install postgres and pgadmin, visit this link https://www.postgresql.org/download/ and get the packages following your operating system. Then when installing postgres, you need to setup a password that we will need later to connect to the database from the spark environment. You can also leave the port at 5432. If your installation has succeeded, you can start pgadmin and you should observe something like this window: Overview of pgAdmin interface. Image by the author.Since we have a lot of columns for the table we want to create, we chose to create the table and add its columns with a script using psycopg2, a PostgreSQL database adapter for Python. You can run the script with the command: python scripts/create_table.pyNote that in the script I saved the postgres password as environment variable and name it POSTGRES_PASSWORD. So if you use another method to access the password you need to modify the script accordingly. Spark Set-upHaving set-up our Postgres database, let’s delve into the details of the spark job. The goal is to stream the data from the Kafka topic rappel_conso to the Postgres table rappel_conso_table. from pyspark.sql import SparkSession from pyspark.sql.types import ( StructType, StructField, StringType, ) from pyspark.sql.functions import from_json, col from src.constants import POSTGRES_URL, POSTGRES_PROPERTIES, DB_FIELDS import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s:%(funcName)s:%(levelname)s:%(message)s" ) def create_spark_session() -> SparkSession: spark = ( SparkSession.builder.appName("PostgreSQL Connection with PySpark") .config( "spark.jars.packages", "org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0", ) .getOrCreate() ) logging.info("Spark session created successfully") return spark def create_initial_dataframe(spark_session): """ Reads the streaming data and creates the initial dataframe accordingly. """ try: # Gets the streaming data from topic random_names df = ( spark_session.readStream.format("kafka") .option("kafka.bootstrap.servers", "kafka:9092") .option("subscribe", "rappel_conso") .option("startingOffsets", "earliest") .load() ) logging.info("Initial dataframe created successfully") except Exception as e: logging.warning(f"Initial dataframe couldn't be created due to exception: {e}") raise return df def create_final_dataframe(df): """ Modifies the initial dataframe, and creates the final dataframe. """ schema = StructType( [StructField(field_name, StringType(), True) for field_name in DB_FIELDS] ) df_out = ( df.selectExpr("CAST(value AS STRING)") .select(from_json(col("value"), schema).alias("data")) .select("data.*") ) return df_out def start_streaming(df_parsed, spark): """ Starts the streaming to table spark_streaming.rappel_conso in postgres """ # Read existing data from PostgreSQL existing_data_df = spark.read.jdbc( POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES ) unique_column = "reference_fiche" logging.info("Start streaming ...") query = df_parsed.writeStream.foreachBatch( lambda batch_df, _: ( batch_df.join( existing_data_df, batch_df[unique_column] == existing_data_df[unique_column], "leftanti" ) .write.jdbc( POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES ) ) ).trigger(once=True) \ .start() return query.awaitTermination() def write_to_postgres(): spark = create_spark_session() df = create_initial_dataframe(spark) df_final = create_final_dataframe(df) start_streaming(df_final, spark=spark) if __name__ == "__main__": write_to_postgres()Let’s break down the key highlights and functionalities of the spark job: First we create the Spark sessiondef create_spark_session() -> SparkSession: spark = ( SparkSession.builder.appName("PostgreSQL Connection with PySpark") .config( "spark.jars.packages", "org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0", ) .getOrCreate() ) logging.info("Spark session created successfully") return spark2. The create_initial_dataframe function ingests streaming data from the Kafka topic using Spark's structured streaming. def create_initial_dataframe(spark_session): """ Reads the streaming data and creates the initial dataframe accordingly. """ try: # Gets the streaming data from topic random_names df = ( spark_session.readStream.format("kafka") .option("kafka.bootstrap.servers", "kafka:9092") .option("subscribe", "rappel_conso") .option("startingOffsets", "earliest") .load() ) logging.info("Initial dataframe created successfully") except Exception as e: logging.warning(f"Initial dataframe couldn't be created due to exception: {e}") raise return df3. Once the data is ingested, create_final_dataframe transforms it. It applies a schema (defined by the columns DB_FIELDS) to the incoming JSON data, ensuring that the data is structured and ready for further processing. def create_final_dataframe(df): """ Modifies the initial dataframe, and creates the final dataframe. """ schema = StructType( [StructField(field_name, StringType(), True) for field_name in DB_FIELDS] ) df_out = ( df.selectExpr("CAST(value AS STRING)") .select(from_json(col("value"), schema).alias("data")) .select("data.*") ) return df_out4. The start_streaming function reads existing data from the database, compares it with the incoming stream, and appends new records. def start_streaming(df_parsed, spark): """ Starts the streaming to table spark_streaming.rappel_conso in postgres """ # Read existing data from PostgreSQL existing_data_df = spark.read.jdbc( POSTGRES_URL, "rappel_conso", properties=POSTGRES_PROPERTIES ) unique_column = "reference_fiche" logging.info("Start streaming ...") query = df_parsed.writeStream.foreachBatch( lambda batch_df, _: ( batch_df.join( existing_data_df, batch_df[unique_column] == existing_data_df[unique_column], "leftanti" ) .write.jdbc( POSTGRES_URL, "rappel_conso", "append", properties=POSTGRES_PROPERTIES ) ) ).trigger(once=True) \ .start() return query.awaitTermination()The complete code for the Spark job is in the file src/spark_pgsql/spark_streaming.py. We will use the Airflow DockerOperator to run this job, as explained in the upcoming section. Let’s go through the process of creating the Docker image we need to run our Spark job. Here’s the Dockerfile for reference: FROM bitnami/spark:latest WORKDIR /opt/bitnami/spark RUN pip install py4j COPY ./src/spark_pgsql/spark_streaming.py ./spark_streaming.py COPY ./src/constants.py ./src/constants.py ENV POSTGRES_DOCKER_USER=host.docker.internal ARG POSTGRES_PASSWORD ENV POSTGRES_PASSWORD=$POSTGRES_PASSWORDIn this Dockerfile, we start with the bitnami/spark image as our base. It's a ready-to-use Spark image. We then install py4j, a tool needed for Spark to work with Python. The environment variables POSTGRES_DOCKER_USER and POSTGRES_PASSWORD are set up for connecting to a PostgreSQL database. Since our database is on the host machine, we use host.docker.internal as the user. This allows our Docker container to access services on the host, in this case, the PostgreSQL database. The password for PostgreSQL is passed as a build argument, so it's not hard-coded into the image. It’s important to note that this approach, especially passing the database password at build time, might not be secure for production environments. It could potentially expose sensitive information. In such cases, more secure methods like Docker BuildKit should be considered. Now, let’s build the Docker image for Spark: docker build -f spark/Dockerfile -t rappel-conso/spark:latest --build-arg POSTGRES_PASSWORD=$POSTGRES_PASSWORD .This command will build the image rappel-conso/spark:latest . This image includes everything needed to run our Spark job and will be used by Airflow’s DockerOperator to execute the job. Remember to replace $POSTGRES_PASSWORD with your actual PostgreSQL password when running this command. AirflowAs said earlier, Apache Airflow serves as the orchestration tool in the data pipeline. It is responsible for scheduling and managing the workflow of the tasks, ensuring they are executed in a specified order and under defined conditions. In our system, Airflow is used to automate the data flow from streaming with Kafka to processing with Spark. Airflow DAGLet’s take a look at the Directed Acyclic Graph (DAG) that will outline the sequence and dependencies of tasks, enabling Airflow to manage their execution. start_date = datetime.today() - timedelta(days=1) default_args = { "owner": "airflow", "start_date": start_date, "retries": 1, # number of retries before failing the task "retry_delay": timedelta(seconds=5), } with DAG( dag_id="kafka_spark_dag", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False, ) as dag: kafka_stream_task = PythonOperator( task_id="kafka_data_stream", python_callable=stream, dag=dag, ) spark_stream_task = DockerOperator( task_id="pyspark_consumer", image="rappel-conso/spark:latest", api_version="auto", auto_remove=True, command="./bin/spark-submit --master local[*] --packages org.postgresql:postgresql:42.5.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 ./spark_streaming.py", docker_url='tcp://docker-proxy:2375', environment={'SPARK_LOCAL_HOSTNAME': 'localhost'}, network_mode="airflow-kafka", dag=dag, ) kafka_stream_task >> spark_stream_taskHere are the key elements from this configuration The tasks are set to execute daily.The first task is the Kafka Stream Task. It is implemented using the PythonOperator to run the Kafka streaming function. This task streams data from the RappelConso API into a Kafka topic, initiating the data processing workflow.The downstream task is the Spark Stream Task. It uses the DockerOperator for execution. It runs a Docker container with our custom Spark image, tasked with processing the data received from Kafka.The tasks are arranged sequentially, where the Kafka streaming task precedes the Spark processing task. This order is crucial to ensure that data is first streamed and loaded into Kafka before being processed by Spark.About the DockerOperatorUsing docker operator allow us to run docker-containers that correspond to our tasks. The main advantage of this approach is easier package management, better isolation and enhanced testability. We will demonstrate the use of this operator with the spark streaming task. Here are some key details about the docker operator for the spark streaming task: We will use the image rappel-conso/spark:latest specified in the Spark Set-up section.The command will run the Spark submit command inside the container, specifying the master as local, including necessary packages for PostgreSQL and Kafka integration, and pointing to the spark_streaming.py script that contains the logic for the Spark job.docker_url represents the url of the host running the docker daemon. The natural solution is to set it as unix://var/run/docker.sock and to mount the var/run/docker.sock in the airflow docker container. One problem we had with this approach is a permission error to use the socket file inside the airflow container. A common workaround, changing permissions with chmod 777 var/run/docker.sock, poses significant security risks. To circumvent this, we implemented a more secure solution using bobrik/socat as a docker-proxy. This proxy, defined in a Docker Compose service, listens on TCP port 2375 and forwards requests to the Docker socket: docker-proxy: image: bobrik/socat command: "TCP4-LISTEN:2375,fork,reuseaddr UNIX-CONNECT:/var/run/docker.sock" ports: - "2376:2375" volumes: - /var/run/docker.sock:/var/run/docker.sock networks: - airflow-kafkaIn the DockerOperator, we can access the host docker /var/run/docker.sock via thetcp://docker-proxy:2375 url, as described here and here. Finally we set the network mode to airflow-kafka. This allows us to use the same network as the proxy and the docker running kafka. This is crucial since the spark job will consume the data from the kafka topic so we must ensure that both containers are able to communicate.After defining the logic of our DAG, let’s understand now the airflow services configuration in the docker-compose-airflow.yaml file. Airflow ConfigurationThe compose file for airflow was adapted from the official apache airflow docker-compose file. You can have a look at the original file by visiting this link. As pointed out by this article, this proposed version of airflow is highly resource-intensive mainly because the core-executor is set to CeleryExecutor that is more adapted for distributed and large-scale data processing tasks. Since we have a small workload, using a single-noded LocalExecutor is enough. Here is an overview of the changes we made on the docker-compose configuration of airflow: We set the environment variable AIRFLOW__CORE__EXECUTOR to LocalExecutor.We removed the services airflow-worker and flower because they only work for the Celery executor. We also removed the redis caching service since it works as a backend for celery. We also won’t use the airflow-triggerer so we remove it too.We replaced the base image ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.7.3} for the remaining services, mainly the scheduler and the webserver, by a custom image that we will build when running the docker-compose.version: '3.8' x-airflow-common: &airflow-common build: context: . dockerfile: ./airflow_resources/Dockerfile image: de-project/airflow:latestWe mounted the necessary volumes that are needed by airflow. AIRFLOW_PROJ_DIR designates the airflow project directory that we will define later. We also set the network as airflow-kafka to be able to communicate with the kafka boostrap servers.volumes: - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config - ./src:/opt/airflow/dags/src - ./data/last_processed.json:/opt/airflow/data/last_processed.json user: "${AIRFLOW_UID:-50000}:0" networks: - airflow-kafkaNext, we need to create some environment variables that will be used by docker-compose: echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_PROJ_DIR=\"./airflow_resources\"" > .envWhere AIRFLOW_UID represents the User ID in Airflow containers and AIRFLOW_PROJ_DIR represents the airflow project directory. Now everything is set-up to run your airflow service. You can start it with this command: docker compose -f docker-compose-airflow.yaml upThen to access the airflow user interface you can visit this url http://localhost:8080 . Sign-in window on Airflow. Image by the author.By default, the username and password are airflow for both. After signing in, you will see a list of Dags that come with airflow. Look for the dag of our project kafka_spark_dag and click on it. Overview of the task window in airflow. Image by the author.You can start the task by clicking on the button next to DAG: kafka_spark_dag. Next, you can check the status of your tasks in the Graph tab. A task is done when it turns green. So, when everything is finished, it should look something like this: Image by the author.To verify that the rappel_conso_table is filled with data, use the following SQL query in the pgAdmin Query Tool: SELECT count(*) FROM rappel_conso_tableWhen I ran this in January 2024, the query returned a total of 10022 rows. Your results should be around this number as well. ConclusionThis article has successfully demonstrated the steps to build a basic yet functional data engineering pipeline using Kafka, Airflow, Spark, PostgreSQL, and Docker. Aimed primarily at beginners and those new to the field of data engineering, it provides a hands-on approach to understanding and implementing key concepts in data streaming, processing, and storage. Throughout this guide, we’ve covered each component of the pipeline in detail, from setting up Kafka for data streaming to using Airflow for task orchestration, and from processing data with Spark to storing it in PostgreSQL. The use of Docker throughout the project simplifies the setup and ensures consistency across different environments. It’s important to note that while this setup is ideal for learning and small-scale projects, scaling it for production use would require additional considerations, especially in terms of security and performance optimization. Future enhancements could include integrating more advanced data processing techniques, exploring real-time analytics, or even expanding the pipeline to incorporate more complex data sources. In essence, this project serves as a practical starting point for those looking to get their hands dirty with data engineering. It lays the groundwork for understanding the basics, providing a solid foundation for further exploration in the field. In the second part, we’ll explore how to effectively use the data stored in our PostgreSQL database. We’ll introduce agents powered by Large Language Models (LLMs) and a variety of tools that enable us to interact with the database using natural language queries. So, stay tuned ! To reach outLinkedIn : https://www.linkedin.com/in/hamza-gharbi-043045151/Twitter : https://twitter.com/HamzaGh25079790End-to-End Data Engineering System on Real Data with Kafka, Spark, Airflow, Postgres, and Docker was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story. View the full article
  5. In this post, we demonstrate how to build and deploy a Kafka cluster with Amazon Elastic Kubernetes Service (Amazon EKS) using Data on EKS (DoEKS). DoEKS’s open-source project empowers customers to expedite their data platform development on Kubernetes. DoEKS provides templates that incorporate best practices around security, performance, cost-optimization among others that can be used as is or fine-tuned to meet more unique requirements. View the full article
  6. Today, Amazon Managed Streaming for Apache Kafka (Amazon MSK) announces an integration with Amazon EventBridge Pipes in the MSK service console, making it easier to send events from your Apache Kafka cluster to one of over 14 AWS service targets, including Amazon SQS, Amazon Kinesis Data Streams and Firehose, AWS Step Functions, Amazon SNS, or Amazon EventBridge event buses. The EventBridge Pipes integration also supports the EventBridge API Destinations target which uses API calls to send your events to software as a service (SaaS) applications or your own applications within or outside AWS. View the full article
  7. Last week I attended the AWS Summit Johannesburg. This was the first summit to be hosted in my own country and my own city since 2019 so it was very special to have the opportunity to attend. It was great to get to meet with so many of our customers and hear how they are building on AWS. Now on to the AWS updates. I’ve compiled a few announcements and upcoming events you need to know about. Let’s get started! Last Week’s Launches Amazon Bedrock Is Now Generally Available – Amazon Bedrock was announced in preview in April of this year as part of a set of new tools for building with generative AI on AWS. Last week’s announcement of this service being generally available was received with a lot of excitement and customers have already been sharing what they are building with Amazon Bedrock. I quite enjoyed this lighthearted post from AWS Serverless Hero Jones Zachariah Noel about the “Bengaluru with traffic-filled roads” image he produced using Stability AI’s Stable Diffusion XL image generation model on Amazon Bedrock. Amazon MSK Introduces Managed Data Delivery from Apache Kafka to Your Data Lake – Amazon MSK was released in 2019 to help our customers reduce the work needed to set up, scale, and manage Apache Kafka in production. Now you can continuously load data from an Apache Kafka cluster to Amazon Simple Storage Service (Amazon S3). Other AWS News A few more news items and blog posts you might have missed: The Community.AWS Blog is where builders share and learn with the community of cloud enthusiasts. Contributors to this blog include AWS employees, AWS Heroes, AWS Community Builders, and other members of the AWS Community. Last week, AWS Hero Johannes Koch published this awesome post on how to build a simple website using Flutter that interacts with a serverless backend powered by AppSync-merged APIs. For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page. Upcoming AWS Events We have the following upcoming events: AWS Cloud Days (October 10, 24) – Connect and collaborate with other like-minded folks while learning about AWS at the AWS Cloud Day in Athens and Prague. AWS Innovate Online (October 19) – Register for AWS Innovate Online to learn how you can build, run, and scale next-generation applications on the most extensive cloud platform. There will be 80+ sessions delivered in five languages and you’ll receive a certificate of attendance to showcase all you’ve learned. We’re focused on improving our content to provide a better customer experience, and we need your feedback to do so. Take this quick survey to share insights on your experience with the AWS Blog. Note that this survey is hosted by an external company, so the link doesn’t lead to our website. AWS handles your information as described in the AWS Privacy Notice. – Veliswa View the full article
  8. Amazon Personalize has launched a new open source Amazon Personalize Kafka Sink connector that makes it easy to stream data in real-time for use in Amazon Personalize. Amazon Personalize enables developers to improve customer engagement through personalized product and content recommendations – no ML expertise required. Amazon Personalize uses data provided by customers to train custom models on their behalf. With this launch, customers can readily ingest their data from Apache Kafka clusters and call the Amazon Personalize-specific APIs to enable real-time data steaming, without the need for custom code. This makes it faster and easier to bring real-time data to Amazon Personalize for those using Apache Kafka. View the full article
  • Forum Statistics

    67.4k
    Total Topics
    65.3k
    Total Posts
×
×
  • Create New...