From Airflow to API: Data Orchestration with Flask and PostgreSQL

1. Project Overview

Data is the lifeblood of modern decision-making. In my previous project, I built an ETL-pipeline using Airflow, PostgreSQL, Redis, and Docker. Now, I’ve taken it one step further by adding a REST API built with Flask. This new layer not only provides on-demand data access but also enriches the existing pipeline by archiving hourly snapshots from the Path of Exile trading website. In addition, I’ve used Tableau to visualize the data, offering both a snapshot and historical perspective of in-game trading prices.

Technologies used: Flask, Airflow, PostgreSQL, Docker, Tableau

2. Motivation and Context

Why this Project?

Although I frequently use REST APIs, I had never built one from scratch. My regular use of the public API for Path of Exile, which restricts requests to one every 10 seconds, proved too slow for my needs. With 21 different items to monitor for my trading strategy, the delay was frustrating when I just wanted a quick check. By leveraging my existing Airflow setup, I automated data collection on an hourly basis and stored it in PostgreSQL. This allowed me to bypass the rate limits and instantly query historical data via my own API.

Building on my Previous Work

My earlier project already had a robust, dockerized setup for Airflow and PostgreSQL. This project extends that foundation by:

  • Added a new DAG to gather and process data from the Path of Exile trade API.
  • Introducing a dockerized REST API layer with Flask to serve the data.
  • Using Tableau to visualize trends over time.

3. Architecture and Workflow

Overview of the Architecture

  • The pipeline runs on a Raspberry Pi with a Docker Compose file that initializes containers for Airflow, Redis, PostgreSQL and Flask.
  • Airflow schedules tasks to fetch data regularly from the Path of Exile REST API for different items.
  • The collected data is stored in the PostgreSQL database.
  • Flask is used to build a REST API that makes the collected data available.

Technologies Used

  • Flask: Flask-based API to make collected data available.
  • Airflow: Orchestrates the ETL-pipeline tasks.
  • PostgreSQL: Central data store for Flask and the Airflow tasks.
  • Docker: Manages containers for Airflow, PostgreSQL and Flask.
  • Tableau: Visualization of data obtained from the self-built REST API.

4. Implementation Details

Docker Setup

The Flask API is containerized with a simple Dockerfile based on python:3.8-slim-bullseye:

FROM python:3.8-slim-bullseye
WORKDIR /app

# Install build tools and the latest libpq-dev
RUN apt-get update && apt-get install -y gcc libpq-dev && rm -rf /var/lib/apt/lists/*

# Copy requirements and install them, forcing a source build for psycopg2
COPY requirements.txt requirements.txt
RUN pip install --no-cache-dir --no-binary :all: -r requirements.txt

# Copy the rest of the application code
COPY . .

# Expose port 5000 for the API
EXPOSE 5000
CMD ["python", "app.py"]

This containerization ensures a lightweight, reproducible environment for running the API.

Docker-Compose:

  flask-api:
    build: ./flask-api
    ports:
      - "5000:5000"
    environment:
      - POSTGRES_HOST=postgresql
      - POSTGRES_DB=bitnami_airflow
      - POSTGRES_USER=${POSTGRESQL_USERNAME}
      - POSTGRES_PASSWORD=${POSTGRESQL_PASSWORD}
    depends_on:
      - postgresql

Airflow DAG

The DAG in Airflow performs the following:

Data Collection

A new DAG in Airflow runs every hour, sending a POST request to the endpoint: https://www.pathofexile.com/api/trade/exchange/Phrecia
The request includes my private session ID and clearance cookies.

Payload Construction

I iterate over a list of 21 essences to dynamically build the request payload:

payload = {
    "query": {
        "status": {"option": "online"},
        "have": ["divine"],
        "want": [buyName],
        "stock": {"min": 10}
    },
    "sort": {"have": "asc"}
}

Data Transformation & Insertion

After processing the JSON response to extract the cheapest price and calculate the cost for 500 items, the data is inserted into a PostgreSQL table:

postgres_hook = PostgresHook(postgres_conn_id='postgres_default')
insert_sql = """
INSERT INTO api_data (name, bought, spent, timestamp, min_price)
VALUES (%s, %s, %s, %s, %s)
"""
for entry in results:
    ts = datetime.now(timezone.utc)
    postgres_hook.run(
        insert_sql,
        parameters=(
            entry["name"],
            entry["bought"],
            entry["spent"],
            ts,
            entry["min_price"]
        )
    )

PostgreSQL

Acts as the central data store. The new ‘api_data‘ table archives the fetched data (item name, quantities, spending, timestamp, and minimum price), facilitating historical analysis and quick retrieval via the REST API.

Flask API

Development & Containerization

The Flask-based API is containerized using Docker and runs independently. It accepts optional query parameters (start_time and end_time) and returns the data from PostgreSQL – defaulting to the past hour if no parameters are provided.

def get_db_connection():
    try:
        connection = psycopg2.connect(
            host=os.environ.get('POSTGRES_HOST', 'postgresql'),
            database=os.environ.get('POSTGRES_DB', 'bitnami_airflow'),
            user=os.environ.get('POSTGRES_USER', 'postgres'),
            password=os.environ.get('POSTGRES_PASSWORD', 'password')
        )
        return connection
    except Exception as e:
        app.logger.error("Database connection error: %s", e)
        raise

@app.route('/data', methods=['GET'])
def get_data():
    start_time_str = request.args.get('start_time')
    end_time_str = request.args.get('end_time')
    
    if start_time_str:
        try:
            start_time = datetime.datetime.fromisoformat(start_time_str)
        except ValueError:
            return jsonify(error="Invalid start_time format. Use ISO 8601."), 400
    else:
        start_time = datetime.datetime.now() - datetime.timedelta(hours=1)
    
    if end_time_str:
        try:
            end_time = datetime.datetime.fromisoformat(end_time_str)
        except ValueError:
            return jsonify(error="Invalid end_time format. Use ISO 8601."), 400
    else:
        end_time = datetime.datetime.now()

    query = "SELECT name, bought, spent, min_price, timestamp FROM api_data WHERE timestamp BETWEEN %s AND %s"
    params = (start_time, end_time)

    conn = get_db_connection()
    cur = conn.cursor()
    cur.execute(query, params)
    rows = cur.fetchall()
    cur.close()
    conn.close()

    results = [
        {
            "name": row[0],
            "bought": row[1],
            "spent": float(row[2]),
            "min_price": float(row[3]) if row[3] is not None else None,
            "timestamp": row[4].isoformat() if row[4] else None
        }
        for row in rows
    ]
    
    return jsonify(results)

Tableau Visualization

I used Tableau to create basic visualizations that provide an overview of the data. These graphs highlight changes in trading prices over time and help in identifying patterns. These graphs will become more insightful as additional data is collected over time. (Screenshots and graphs will be included in the results section of this post.)

5. Learning Outcomes

Skills gained

Flask: Learned the fundamentals of building a REST API with Flask.

Docker: Gained further experience in creating Dockerfiles and containerizing applications.

Airflow / PostgreSQL: Improved my capabilities in obtaining, managing and storing data.

Tableau: Gained first-hand experience in transforming raw data into visual insights using Tableau.

End-to-End Pipeline Integration: Successfully integrated multiple technologies. From data ingestion to API exposure and visualization to demonstrating a holistic approach to system design.

6. Results

REST API request:

Tableau Graphs:

In the game, there are 20 different essences that can be rerolled at a certain cost. In this case we buy 2000 of the cheapest essences, reroll them four times on average into one of the five most expensive ones. (This is just an example.)


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *