Open-Stage is a framework for building data pipelines by connecting reusable components. Inspired by IBM DataStage.
Build pipelines by connecting three types of components with pipes. Data flows left to right.
Read a CSV, query a database, call a REST API. An Origin has no inputs and generates exactly one output.
Filter, aggregate, join, copy, route. Nodes receive data, process it, and forward it to the next component.
Save to CSV, load into PostgreSQL, upload to BigQuery. A Destination has no outputs — it's the end of the pipeline.
Once the components are connected, origin.pump() fires the entire pipeline. No orchestration wrappers or DAGs needed.
from open_stage.core.base import Pipe from open_stage.core.common import CSVOrigin, Filter, Aggregator, CSVDestination # 1. Define components origin = CSVOrigin("sales", filepath_or_buffer="sales.csv") filter_node = Filter("only_2024", field="year", condition="=", value_or_values=2024) aggregator = Aggregator("by_region", key="region", agg_field_name="total", agg_type="sum", field_to_agg="amount") dest = CSVDestination("result", path_or_buf="sales_2024.csv", index=False) # 2. Connect with pipes (method chaining) origin.add_output_pipe(Pipe("p1")).set_destination(filter_node) filter_node.add_output_pipe(Pipe("p2")).set_destination(aggregator) aggregator.add_output_pipe(Pipe("p3")).set_destination(dest) # 3. Run origin.pump()
Components with connectivity validated at construction time. Wire them wrong and you get a clear error immediately, not at runtime.
before_query, after_query, parameters, and max_results.max_results.pandas.to_csv. Passes **kwargs to pandas.if_exists, schema, before_query, and after_query.write_disposition, and schema_update_options.< > <= >= != = in not_in between.sum count mean min max first last.inner left right. Supports composite keys.first / last).value → pipe_name.From a database migration to AI-powered transformation — all with the same pattern.
CSV → Filter → Aggregator → CSV
from open_stage.core.base import Pipe from open_stage.core.common import ( CSVOrigin, Filter, Aggregator, CSVDestination ) origin = CSVOrigin("sales", filepath_or_buffer="sales.csv") filter_node = Filter("high_value", field="amount", condition=">", value_or_values=1000) aggregator = Aggregator("by_region", key="region", agg_field_name="total", agg_type="sum", field_to_agg="amount") dest = CSVDestination("summary", path_or_buf="summary.csv", index=False) origin.add_output_pipe(Pipe("p1")).set_destination(filter_node) filter_node.add_output_pipe(Pipe("p2")).set_destination(aggregator) aggregator.add_output_pipe(Pipe("p3")).set_destination(dest) origin.pump()
MySQL → PostgreSQL
from open_stage.core.base import Pipe from open_stage.mysql.common import MySQLOrigin from open_stage.postgres.common import PostgresDestination origin = MySQLOrigin( name="source", host="localhost", database="origin", user="root", password="...", query="SELECT * FROM customers WHERE active = 1", ) dest = PostgresDestination( name="destination", host="localhost", database="target", user="postgres", password="...", table="customers", if_exists="append", ) origin.add_output_pipe(Pipe("migrate")).set_destination(dest) origin.pump()
Switcher routes, Funnel merges
from open_stage.core.base import Pipe from open_stage.core.common import ( OpenOrigin, Switcher, Funnel, Printer ) import pandas as pd df = pd.DataFrame({ "product": ["A", "B", "C", "D"], "category": ["elec", "clothing", "elec", "clothing"], }) origin = OpenOrigin("data", df) switcher = Switcher("by_cat", field="category", mapping={"elec": "p_elec", "clothing": "p_clothing"}) funnel = Funnel("merge") printer = Printer("output") origin.add_output_pipe(Pipe("input")).set_destination(switcher) switcher.add_output_pipe(Pipe("p_elec")).set_destination(funnel) switcher.add_output_pipe(Pipe("p_clothing")).set_destination(funnel) funnel.add_output_pipe(Pipe("output")).set_destination(printer) origin.pump()
Any Python logic as a Node
from open_stage.core.base import Pipe from open_stage.core.common import ( CSVOrigin, Transformer, CSVDestination ) def apply_tax(df, rate, shipping): df = df.copy() df["final_price"] = df["price"] * (1 + rate) + shipping return df origin = CSVOrigin("products", filepath_or_buffer="products.csv") tx = Transformer( name="final_price", transformer_function=apply_tax, transformer_kwargs={"rate": 0.16, "shipping": 50}, ) dest = CSVDestination("result", path_or_buf="with_price.csv", index=False) origin.add_output_pipe(Pipe("p1")).set_destination(tx) tx.add_output_pipe(Pipe("p2")).set_destination(dest) origin.pump()
AI Transformers receive a DataFrame, serialize it to CSV, send it to the model with your prompt, and parse the response back to a DataFrame.
Claude — Anthropic
GPT — OpenAI
Gemini — Google
DeepSeek R1 / V3
from open_stage.core.base import Pipe from open_stage.core.common import CSVOrigin, CSVDestination from open_stage.anthropic.claude import AnthropicPromptTransformer origin = CSVOrigin("reviews", filepath_or_buffer="reviews.csv") ai = AnthropicPromptTransformer( name="sentiment", model="claude-sonnet-4-5-20250929", api_key="sk-ant-...", prompt="Add a 'sentiment' column with: positive, neutral, or negative, " "based on the 'review' column.", ) dest = CSVDestination("result", path_or_buf="classified.csv", index=False) origin.add_output_pipe(Pipe("p1")).set_destination(ai) ai.add_output_pipe(Pipe("p2")).set_destination(dest) origin.pump()
AI Transformer internal flow:
1. Receives the DataFrame →
2. Serializes it to CSV →
3. Sends it to the LLM along with the prompt →
4. Parses the CSV response back to a DataFrame →
5. Forwards it to the next component
Install only the dependencies you need with optional extras.
# Install with all extras pip install -e ".[all]"
# PostgreSQL only pip install -e ".[postgres]"
# Anthropic + OpenAI pip install -e ".[anthropic,openai]"
# Mix and match what you need pip install -e ".[postgres,anthropic,bigquery]"