Jakub’s Projects
Polish Car Market Analysis Data Scraper [Python + Airflow]

Polish Car Market Analysis Data Scraper [Python + Airflow]

This project delivers a complete data engineering pipeline for the CEPIK (Centralna Ewidencja Pojazdów i Kierowców) vehicle registration database in Poland. It automates the process of scraping raw data directly from the CEPIK API, handling both dictionaries (such as vehicle brands, fuel types, voivodeships) and vehicle registration records (historical and current). The system is built on top of Apache Airflow for orchestration and scheduling, Apache Spark with Delta Lake for scalable data transformations, and HDFS for distributed raw storage. Transformed and curated data is stored in PostgreSQL, where it is ready to be consumed by analytical and BI tools.


Created: 2025-09-25 Updated: 2025-09-26

Installation & Usage

CEPIK Scraping Project

Airflow Apache Spark Delta Lake PostgreSQL Python HDFS

Overview

This project provides DAGs and ETL pipelines for scraping, transforming, and loading data from the CEPIK (Centralna Ewidencja Pojazdów i Kierowców) vehicle registration database in Poland.

The pipelines automate: - Downloading dictionaries (e.g., vehicle brands, fuel types, voivodeships). - Scraping historical and current vehicle registrations. - Persisting raw JSON into HDFS (bronze layer). - Transforming JSON into Delta Lake (silver layer) with a clean schema. - Loading data into PostgreSQL (gold layer) for analytics and reporting.

Features

Technology Stack

  • Apache Airflow – workflow orchestration.
  • Apache Spark + Delta Lake – distributed data processing and storage.
  • PostgreSQL – relational database for curated data.
  • HDFS (WebHDFS) – raw data storage.
  • Python – scraping and ETL logic.
  • Requests + SSL adapter – robust API integration with CEPIK.

Database Schema

  • dict_voivodeships – dictionary of voivodeships (from CEPIK API).
  • dict_voivodeships_coords – voivodeship names, capitals, and geocoordinates.
  • vehicles – main upserted table of vehicle registrations.
  • staging_vehicles – temporary table for incoming daily data.

Supporting stored functions and procedures: - upsert_vehicles_from_stage() – merges staging into main vehicles.
- updateVoivodeshioCoordsTableKeys() – enriches dict_voivodeships_coords with CEPIK codes.

Data Flow

  1. Scraping: Airflow DAG fetches JSON from CEPIK API.
  2. Bronze: Raw JSON stored in HDFS.
  3. Silver: Spark jobs flatten JSON into Delta Lake tables.
  4. Gold: Spark jobs load curated data into PostgreSQL.
  5. Analytics: PostgreSQL data can be used in BI tools like Power BI.

Setup

Requirements

  • Docker / Kubernetes for running Airflow & Spark.
  • PostgreSQL 14+.
  • HDFS cluster with WebHDFS enabled.
  • Python ≥ 3.9 with:
  • requests
  • beautifulsoup4
  • lxml
  • pyspark
  • delta-spark

Environment Variables / Airflow Variables

  • hdfs-data-path – base path in HDFS.
  • local-data-path – local temp path for scraped files.
  • task_pool – Airflow pool for API requests.
  • Connections:
  • conn-webhdfs – WebHDFS connection.
  • conn-pg-cepik-db – PostgreSQL connection for CEPIK DB.
  • spark-conn – Spark connection.

Running

  1. Deploy Airflow DAGs in your Airflow environment.
  2. Ensure Spark jobs are mounted under /opt/airflow/libs/cepik/transformations/.
  3. Start the DAGs from the Airflow UI: - cepik_dictionary_voivodeships_* - cepik_vehicles_current_v* - cepik_vehicles_history_*

Example Usage

License

MIT License – free to use and adapt.