Python 3.8+  ·  MIT License

Open Stage
ETL

Open-Stage is a framework for building data pipelines by connecting reusable components. Inspired by IBM DataStage.

MIT Python 3.8+

Pipes & Filters Architecture

CSVOrigin
Origin
pipe_1
Filter
Node
pipe_2
Aggregator
Node
pipe_3
AnthropicPromptTransformer
AI Node
pipe_4
CSVDestination
Destination

Simple. Declarative. Composable.

Build pipelines by connecting three types of components with pipes. Data flows left to right.

01 / ORIGIN

Produce data

Read a CSV, query a database, call a REST API. An Origin has no inputs and generates exactly one output.

02 / NODE

Transform data

Filter, aggregate, join, copy, route. Nodes receive data, process it, and forward it to the next component.

03 / DESTINATION

Write data

Save to CSV, load into PostgreSQL, upload to BigQuery. A Destination has no outputs — it's the end of the pipeline.

04 / PUMP

Run with one call

Once the components are connected, origin.pump() fires the entire pipeline. No orchestration wrappers or DAGs needed.

pipeline.py
from open_stage.core.base import Pipe
from open_stage.core.common import CSVOrigin, Filter, Aggregator, CSVDestination

# 1. Define components
origin      = CSVOrigin("sales", filepath_or_buffer="sales.csv")
filter_node = Filter("only_2024", field="year", condition="=", value_or_values=2024)
aggregator  = Aggregator("by_region", key="region", agg_field_name="total",
                          agg_type="sum", field_to_agg="amount")
dest        = CSVDestination("result", path_or_buf="sales_2024.csv", index=False)

# 2. Connect with pipes (method chaining)
origin.add_output_pipe(Pipe("p1")).set_destination(filter_node)
filter_node.add_output_pipe(Pipe("p2")).set_destination(aggregator)
aggregator.add_output_pipe(Pipe("p3")).set_destination(dest)

# 3. Run
origin.pump()
Pipeline
flowchart LR classDef origin fill:#0c2a14,stroke:#3fb950,color:#3fb950 classDef nd fill:#160c2a,stroke:#a371f7,color:#c199ff classDef dest fill:#2a1a0c,stroke:#ffa657,color:#ffa657 A([CSVOrigin]):::origin -->|p1| B[Filter]:::nd B -->|p2| C[Aggregator]:::nd C -->|p3| D([CSVDestination]):::dest

Everything you need, ready to use

Components with connectivity validated at construction time. Wire them wrong and you get a clear error immediately, not at runtime.

Origins
Destinations
Transformers
Routers
AI
CSVOrigin Origin
Reads a CSV using pandas.read_csv. Passes **kwargs to pandas.
0 inputs → 1 output
OpenOrigin Origin
Wraps an existing DataFrame to inject it into the pipeline.
0 inputs → 1 output
APIRestOrigin Origin
Consumes a REST endpoint and normalizes the JSON response to a DataFrame.
0 inputs → 1 output
Executes a SQL query on PostgreSQL. Supports before_query, after_query, parameters, and max_results.
0 inputs → 1 output
MySQLOrigin Origin
Executes a SQL query on MySQL. Same advanced options as PostgresOrigin.
0 inputs → 1 output
Queries Google BigQuery with service account authentication. Supports parameters and max_results.
0 inputs → 1 output
Printer Destination
Prints the DataFrame to the console. Ideal for pipeline debugging.
1 input → 0 outputs
CSVDestination Destination
Writes a CSV using pandas.to_csv. Passes **kwargs to pandas.
1 input → 0 outputs
PostgresDestination Destination
Loads a DataFrame into PostgreSQL. Supports if_exists, schema, before_query, and after_query.
1 input → 0 outputs
MySQLDestination Destination
Loads a DataFrame into MySQL. Same control options as PostgresDestination.
1 input → 0 outputs
Loads into BigQuery. Supports partitioning, clustering, write_disposition, and schema_update_options.
1 input → 0 outputs
Filter Node
Filters rows by condition. Operators: < > <= >= != = in not_in between.
1 input → 1 output
Aggregator Node
Groups by key and aggregates. Functions: sum count mean min max first last.
1 input → 1 output
Applies any Python function that receives a DataFrame and returns a DataFrame.
1 input → 1 output
Joiner Node
Joins two DataFrames by key. Types: inner left right. Supports composite keys.
2 inputs → 1 output
Removes a list of columns from the DataFrame.
1 input → 1 output
Deduplication by key, with sort and retention criterion (first / last).
1 input → 1 output
Copy Router
Duplicates the DataFrame to multiple outputs. Each output receives an independent copy.
1 input → N outputs
Switcher Router
Routes each row to a different pipe based on a field value. Mapping: value → pipe_name.
1 input → N outputs
Funnel Router
Concatenates multiple streams into one. Accepts any number of inputs and emits when all have arrived.
N inputs → 1 output
Transforms a DataFrame using Anthropic's Claude. Serializes to CSV, calls the LLM with your prompt, and parses the response back to a DataFrame.
1 input → 1 output
Transforms a DataFrame using OpenAI's GPT. Same CSV → LLM → CSV cycle as the other AI Transformers.
1 input → 1 output
Transforms a DataFrame using Google's Gemini. Authentication via API key or service account.
1 input → 1 output
Transforms a DataFrame using DeepSeek R1 / V3. Fixed output limit of 8192 tokens.
1 input → 1 output

Real-world use cases

From a database migration to AI-powered transformation — all with the same pattern.

Filter and aggregate

CSV → Filter → Aggregator → CSV

from open_stage.core.base import Pipe
from open_stage.core.common import (
    CSVOrigin, Filter, Aggregator, CSVDestination
)

origin      = CSVOrigin("sales", filepath_or_buffer="sales.csv")
filter_node = Filter("high_value", field="amount",
                     condition=">", value_or_values=1000)
aggregator  = Aggregator("by_region", key="region",
                          agg_field_name="total", agg_type="sum",
                          field_to_agg="amount")
dest        = CSVDestination("summary",
                             path_or_buf="summary.csv", index=False)

origin.add_output_pipe(Pipe("p1")).set_destination(filter_node)
filter_node.add_output_pipe(Pipe("p2")).set_destination(aggregator)
aggregator.add_output_pipe(Pipe("p3")).set_destination(dest)
origin.pump()
Pipeline
flowchart LR classDef origin fill:#0c2a14,stroke:#3fb950,color:#3fb950 classDef nd fill:#160c2a,stroke:#a371f7,color:#c199ff classDef dest fill:#2a1a0c,stroke:#ffa657,color:#ffa657 A([CSVOrigin]):::origin -->|p1| B[Filter]:::nd B -->|p2| C[Aggregator]:::nd C -->|p3| D([CSVDestination]):::dest

Database migration

MySQL → PostgreSQL

from open_stage.core.base import Pipe
from open_stage.mysql.common import MySQLOrigin
from open_stage.postgres.common import PostgresDestination

origin = MySQLOrigin(
    name="source",
    host="localhost", database="origin",
    user="root", password="...",
    query="SELECT * FROM customers WHERE active = 1",
)
dest = PostgresDestination(
    name="destination",
    host="localhost", database="target",
    user="postgres", password="...",
    table="customers", if_exists="append",
)

origin.add_output_pipe(Pipe("migrate")).set_destination(dest)
origin.pump()
Pipeline
flowchart LR classDef origin fill:#0c2a14,stroke:#3fb950,color:#3fb950 classDef dest fill:#2a1a0c,stroke:#ffa657,color:#ffa657 A([MySQLOrigin]):::origin -->|migrate| B([PostgresDestination]):::dest

Flow branching

Switcher routes, Funnel merges

from open_stage.core.base import Pipe
from open_stage.core.common import (
    OpenOrigin, Switcher, Funnel, Printer
)
import pandas as pd

df = pd.DataFrame({
    "product":  ["A", "B", "C", "D"],
    "category": ["elec", "clothing", "elec", "clothing"],
})

origin   = OpenOrigin("data", df)
switcher = Switcher("by_cat", field="category",
                   mapping={"elec": "p_elec", "clothing": "p_clothing"})
funnel   = Funnel("merge")
printer  = Printer("output")

origin.add_output_pipe(Pipe("input")).set_destination(switcher)
switcher.add_output_pipe(Pipe("p_elec")).set_destination(funnel)
switcher.add_output_pipe(Pipe("p_clothing")).set_destination(funnel)
funnel.add_output_pipe(Pipe("output")).set_destination(printer)
origin.pump()
Pipeline
flowchart LR classDef origin fill:#0c2a14,stroke:#3fb950,color:#3fb950 classDef nd fill:#160c2a,stroke:#a371f7,color:#c199ff classDef dest fill:#2a1a0c,stroke:#ffa657,color:#ffa657 A([OpenOrigin]):::origin -->|input| B[Switcher]:::nd B -->|p_elec| C[Funnel]:::nd B -->|p_clothing| C C -->|output| D([Printer]):::dest

Custom function transformation

Any Python logic as a Node

from open_stage.core.base import Pipe
from open_stage.core.common import (
    CSVOrigin, Transformer, CSVDestination
)

def apply_tax(df, rate, shipping):
    df = df.copy()
    df["final_price"] = df["price"] * (1 + rate) + shipping
    return df

origin = CSVOrigin("products", filepath_or_buffer="products.csv")
tx = Transformer(
    name="final_price",
    transformer_function=apply_tax,
    transformer_kwargs={"rate": 0.16, "shipping": 50},
)
dest = CSVDestination("result",
                       path_or_buf="with_price.csv", index=False)

origin.add_output_pipe(Pipe("p1")).set_destination(tx)
tx.add_output_pipe(Pipe("p2")).set_destination(dest)
origin.pump()
Pipeline
flowchart LR classDef origin fill:#0c2a14,stroke:#3fb950,color:#3fb950 classDef nd fill:#160c2a,stroke:#a371f7,color:#c199ff classDef dest fill:#2a1a0c,stroke:#ffa657,color:#ffa657 A([CSVOrigin]):::origin -->|p1| B[Transformer]:::nd B -->|p2| C([CSVDestination]):::dest

LLMs as Nodes in your pipeline

AI Transformers receive a DataFrame, serialize it to CSV, send it to the model with your prompt, and parse the response back to a DataFrame.

🤖

AnthropicPromptTransformer

Claude — Anthropic

🧠

OpenAIPromptTransformer

GPT — OpenAI

🌴

GeminiPromptTransformer

Gemini — Google

🔬

DeepSeekPromptTransformer

DeepSeek R1 / V3

ai_pipeline.py
from open_stage.core.base import Pipe
from open_stage.core.common import CSVOrigin, CSVDestination
from open_stage.anthropic.claude import AnthropicPromptTransformer

origin = CSVOrigin("reviews", filepath_or_buffer="reviews.csv")

ai = AnthropicPromptTransformer(
    name="sentiment",
    model="claude-sonnet-4-5-20250929",
    api_key="sk-ant-...",
    prompt="Add a 'sentiment' column with: positive, neutral, or negative, "
           "based on the 'review' column.",
)

dest = CSVDestination("result", path_or_buf="classified.csv", index=False)

origin.add_output_pipe(Pipe("p1")).set_destination(ai)
ai.add_output_pipe(Pipe("p2")).set_destination(dest)

origin.pump()
Pipeline
flowchart LR classDef origin fill:#0c2a14,stroke:#3fb950,color:#3fb950 classDef ai fill:#1c1030,stroke:#d2a8ff,color:#d2a8ff classDef dest fill:#2a1a0c,stroke:#ffa657,color:#ffa657 A([CSVOrigin]):::origin -->|p1| B[AnthropicPromptTransformer]:::ai B -->|p2| C([CSVDestination]):::dest

AI Transformer internal flow:
1. Receives the DataFrame  →  2. Serializes it to CSV  →  3. Sends it to the LLM along with the prompt  →  4. Parses the CSV response back to a DataFrame  →  5. Forwards it to the next component

Up and running in seconds

Install only the dependencies you need with optional extras.

All
Postgres
AI
Custom
# Install with all extras
pip install -e ".[all]"
# PostgreSQL only
pip install -e ".[postgres]"
# Anthropic + OpenAI
pip install -e ".[anthropic,openai]"
# Mix and match what you need
pip install -e ".[postgres,anthropic,bigquery]"
postgres
sqlalchemy · psycopg2-binary
mysql
sqlalchemy · pymysql
bigquery
google-cloud-bigquery · db-dtypes · google-auth
anthropic
anthropic
openai
openai
deepseek
openai
gemini
google-genai · google-generativeai
all
all of the above