dagster#
What it is#
dagster is a Python framework for building data applications around the concept of software-defined assets: declaratively-typed data artifacts (tables, models, files) whose dependencies, materialization logic, and metadata live together in code. Created by Nick Schrock (formerly of GraphQL/Facebook), it competes directly with Airflow and Prefect.
Reach for Dagster when the unit you care about is the asset that gets produced (a Snowflake table, a dbt model, a Parquet file), not the task that runs. Reach for Prefect when you’d rather think in terms of flows and ad-hoc Python orchestration without the asset abstraction.
Install#
pip install dagster dagster-webserver
Output: (none — exits 0 on success). dagster is the engine; dagster-webserver is the UI/API server. Both are typically installed together.
uv add dagster dagster-webserver
Output: dependencies resolved + added to pyproject.toml
poetry add dagster dagster-webserver
Output: updated lockfile + virtualenv install
dagster dev
Output: launches a local instance with the daemon + webserver on http://127.0.0.1:3000.
Versioning & Python support#
- Current stable line is the
1.xseries. Dagster broke from0.xto1.0in August 2022. Confusingly, the company continues to release1.x.ywith breaking changes between minor versions, despite calling them stable — read each release note carefully before upgrading. - Supports Python 3.9+ on recent releases. The
dagster-*plugin family matches the core’s Python floor. - The
1.5release (late 2023) introduced the Definitions module pattern, deprecating the older@repositorydecorator. Code on the old pattern still runs but emits warnings. - Pin to a specific minor (
dagster~=1.8) and upgrade in lockstep with everydagster-*plugin you use — version drift between core and plugins produces obscure import errors.
Package metadata#
- Maintainer: Dagster Labs (formerly Elementl)
- Project home: github.com/dagster-io/dagster
- Docs: docs.dagster.io
- PyPI: pypi.org/project/dagster
- License: Apache-2.0 (open-source core); Dagster+ is a paid managed control plane
- Governance: company-led open source — Dagster Labs
- First released: 2019
- Downloads: millions per month — heavy use in modern data-platform teams.
Optional dependencies & extras#
Dagster has the largest plugin family in the orchestration space — every integration is a separate dagster-* PyPI package, not an extra on the main dagster distribution.
Install plugins explicitly: pip install dagster dagster-aws dagster-dbt dagster-snowflake.
Core orchestration tier:
dagster-webserver— the web UI + GraphQL API. Required for the local dev experience and any self-hosted server.dagster-graphql— standalone GraphQL client for the API; pulled in by the webserver.dagster-postgres/dagster-mysql— persistent storage for run history, schedules, sensors. SQLite is the default; switch to one of these before any team deployment.
Cloud / infra:
dagster-aws— S3, Redshift, ECS run launchers, Secrets Manager.dagster-gcp— GCS, BigQuery, Dataproc, Cloud Run.dagster-azure— Blob storage, ADLS, Azure resources.dagster-k8s— Kubernetes run launchers + executors.dagster-docker— Docker run launcher.
Compute / data tools:
dagster-dbt— first-class dbt Core integration; loads each dbt model as a Dagster asset.dagster-airbyte/dagster-fivetran— load ingest connectors as assets.dagster-snowflake/dagster-bigquery/dagster-duckdb— warehouse I/O managers.dagster-pandas/dagster-polars/dagster-pyspark— dataframe I/O managers and type checks.dagster-mlflow— log MLflow experiments from within Dagster assets.
Cloud-only:
dagster-cloud— Dagster+ (the managed service) CLI and agent. Only needed for Dagster+ deployments.
Alternatives#
| Package | Trade-off |
|---|---|
prefect | Flow/task model rather than asset-oriented. Lighter weight, less rigid; weaker lineage UI. |
airflow (apache-airflow) | DAG/task model, configuration-heavy. Industry default for legacy data engineering teams. |
kedro | Opinionated framework for data-science pipelines. Often paired with Dagster or Prefect for execution. |
temporal (temporalio) | Durable workflows for arbitrary code — not data-focused. Use when you need workflow durability across services. |
metaflow | Netflix’s data-science framework. Heavy AWS lean. |
argo workflows | Kubernetes-native YAML DAGs. Operations-focused, not asset-centric. |
mage-ai | Newer, notebook-style data engineering. Less mature ecosystem. |
Common gotchas#
- Definitions module pattern replaces
@repository. Since1.5, the canonical entry point is a singledefs = Definitions(assets=[...], schedules=[...], resources={...})object in a module (typicallyyour_project/__init__.py). Old code using@repositoryand@multi_repositorystill loads but is deprecated; new tooling assumes Definitions. - Asset materialization vs op concept. “Ops” (the older abstraction) are units of computation; “assets” are the things produced by computation. Most modern Dagster code declares
@assetdirectly and skips ops entirely. Mixing both styles in one project produces a confusing UI. dagster-dbtrequires explicit installation per dbt project. The integration loads a dbt manifest (manifest.json) and exposes each model as an asset.pip install dagster-dbtalone is not enough — your dbt project must build its manifest (dbt parse) before Dagster can load assets. Stale manifests produce ghost assets that no longer exist in dbt.dagster devuses SQLite in~/.dagster/. Fine for development; not suitable for production. Switch to Postgres viadagster.yamlstorage config before deploying.- Plugin version drift breaks imports. Every
dagster-*plugin pins a compatible core version range. Runningpip install -U dagsterwithout upgrading plugins (or vice versa) yieldsImportErroron next start. Always upgradedagster+ everydagster-*together. - I/O managers are an extra step. Assets don’t write anywhere by default — they hand the returned value to an I/O manager (Snowflake table, S3 file, etc.). New users frequently expect
@assetto just persist somewhere; without an I/O manager configured, the default is the local filesystem under~/.dagster/storage/. - Sensors and schedules need the daemon.
dagster-webserveralone runs the UI but does not tick schedules. Thedagster-daemonprocess (started bydagster devautomatically) is what evaluates schedules and sensors. Production deployments need to run both. materializeAPI vsexecute_in_process.Definitions.materializeis for asset-based code;JobDefinition.execute_in_processis for op/job-based code. Calling the wrong one for your codebase yields confusing “no such asset/job” errors.
Real-world recipes#
The Dagster recipes that show up across most modern data platforms. Each one stays grounded in the Definitions module pattern (canonical since 1.5).
Asset DAG with typed dependencies#
The bread-and-butter Dagster idiom: @asset functions whose parameters reference upstream assets. Dagster derives the DAG automatically from parameter names matching asset keys.
from dagster import asset, Definitions
import pandas as pd
@asset
def raw_users() -> pd.DataFrame:
"""Pull users from the source system."""
return pd.DataFrame([{"id": 1, "name": "alice"}, {"id": 2, "name": "bob"}])
@asset
def clean_users(raw_users: pd.DataFrame) -> pd.DataFrame:
"""Normalise names."""
return raw_users.assign(name=raw_users["name"].str.title())
@asset
def user_count(clean_users: pd.DataFrame) -> int:
"""How many users do we have?"""
return len(clean_users)
defs = Definitions(assets=[raw_users, clean_users, user_count])
Output: dagster dev shows a DAG with three nodes and two edges; clicking each shows the materialisation history and any logs.
Daily-partitioned asset#
A partitioned asset is one logical asset that ships in slices keyed by partition (typically a date). Each partition materialises independently and can be backfilled out of order.
from datetime import datetime
from dagster import asset, DailyPartitionsDefinition, Definitions, AssetExecutionContext
daily = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily)
def daily_events(context: AssetExecutionContext) -> int:
date = context.partition_key # "2026-01-15"
# ... query the warehouse for events where dt = date ...
return 42
defs = Definitions(assets=[daily_events])
Output: the UI shows a calendar of partitions; missing partitions render grey. Backfill from the UI or via dagster asset materialize --select daily_events --partition 2026-01-15.
dbt integration#
dagster-dbt exposes every dbt model as a first-class asset. The translator reads dbt’s manifest.json and creates one Dagster asset per dbt model.
from pathlib import Path
from dagster import Definitions
from dagster_dbt import DbtCliResource, dbt_assets
DBT_PROJECT_DIR = Path(__file__).parent / "dbt_project"
DBT_MANIFEST = DBT_PROJECT_DIR / "target" / "manifest.json"
@dbt_assets(manifest=DBT_MANIFEST)
def dbt_models(context, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
defs = Definitions(
assets=[dbt_models],
resources={"dbt": DbtCliResource(project_dir=str(DBT_PROJECT_DIR))},
)
Output: every dbt model becomes a Dagster asset with lineage, schedules, and asset checks. dbt build runs under Dagster’s logging; the UI surfaces dbt test results inline.
Sensor — react to upstream changes#
A sensor evaluates Python on a tick (default 30 s) and emits run requests. Use sensors for event-driven materialisation (file landed in S3, message in a queue, upstream warehouse table updated).
import os
from dagster import sensor, RunRequest, SensorEvaluationContext, Definitions
@sensor(target=daily_events)
def s3_drop_sensor(context: SensorEvaluationContext):
last_seen = context.cursor or ""
new_files = sorted(os.listdir("/mnt/inbox"))
new = [f for f in new_files if f > last_seen]
if not new:
return
for f in new:
yield RunRequest(run_key=f, partition_key=f.split(".")[0])
context.update_cursor(new[-1])
defs = Definitions(assets=[daily_events], sensors=[s3_drop_sensor])
Output: the daemon ticks the sensor every 30 s; new files emit one run per partition. The cursor persists across daemon restarts.
Asset checks — runtime data quality#
Asset checks run after materialisation and produce pass/fail signals. Use them for runtime invariants — row counts, null-ratio limits, schema drift detection.
from dagster import asset, asset_check, AssetCheckResult, AssetCheckSeverity
@asset
def clean_users() -> pd.DataFrame:
return pd.DataFrame([{"id": 1, "name": "alice"}])
@asset_check(asset=clean_users)
def no_null_names(clean_users: pd.DataFrame) -> AssetCheckResult:
nulls = clean_users["name"].isna().sum()
return AssetCheckResult(
passed=(nulls == 0),
severity=AssetCheckSeverity.ERROR if nulls else AssetCheckSeverity.WARN,
metadata={"null_count": nulls},
)
Output: the UI shows the check next to the asset; failures alert downstream consumers and block dependent materialisations if configured.
Production deployment#
A Dagster deployment is three long-running processes plus storage. Run all three to have a working orchestrator.
The three processes#
| Process | Role | Production setup |
|---|---|---|
dagster-webserver | UI + GraphQL API for humans and tools | Behind a reverse proxy with TLS; horizontally scalable. |
dagster-daemon | Ticks schedules, evaluates sensors, runs the queued-run coordinator | Single instance per deployment; HA via leader election. |
| Run worker | Spawned by the daemon to execute a single run | Process / Docker / Kubernetes / ECS based on RunLauncher config. |
# Run all three locally (dev mode)
dagster dev
# Run individually in production
dagster-webserver -h 0.0.0.0 -p 3000 -w workspace.yaml
dagster-daemon run -w workspace.yaml
# Run workers spawn automatically based on dagster.yaml RunLauncher config
Output: the webserver and daemon both read workspace.yaml to discover code locations; runs are spawned by whichever RunLauncher is configured (defaults to DefaultRunLauncher = subprocess on the daemon host).
Code locations#
A code location is a Python module/package containing a Definitions object. The webserver and daemon load code locations into isolated subprocesses, so a syntax error in one location doesn’t crash the whole deployment.
# workspace.yaml
load_from:
- python_file:
relative_path: my_pipelines/__init__.py
working_directory: .
- python_module:
module_name: my_other_pipelines
working_directory: .
- grpc_server:
host: pipelines.internal
port: 4000
Output: the UI shows each code location as a separate “deployment” with its own assets/jobs/schedules.
Postgres storage#
dagster dev uses SQLite under ~/.dagster/. For production, swap to Postgres via dagster.yaml:
# $DAGSTER_HOME/dagster.yaml
storage:
postgres:
postgres_db:
username: dagster
password:
env: DAGSTER_PG_PASSWORD
hostname: db.internal
db_name: dagster
port: 5432
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 10
tag_concurrency_limits:
- key: "team"
value: "alice-team"
limit: 5
Output: run history, asset materialisations, schedules, and sensor cursors all persist in Postgres. The QueuedRunCoordinator caps concurrent runs.
Kubernetes deployment#
The official Helm chart (dagster/dagster) provisions webserver + daemon + Postgres + each user code location as separate deployments. Each user code location runs as a gRPC server pod.
helm repo add dagster https://dagster-io.github.io/helm
helm install dagster dagster/dagster \
--set userDeployments.enabled=true \
--set "userDeployments.deployments[0].name=alice-pipelines" \
--set "userDeployments.deployments[0].image.repository=ghcr.io/alicedev/pipelines" \
--set "userDeployments.deployments[0].image.tag=v1.0.0"
Output: a full Dagster deployment on K8s; user pipelines run via the K8sRunLauncher, which spawns each run as its own pod.
Database migration strategies#
Dagster doesn’t manage schema for your data — it manages its own metadata (run history, event log, asset materialisations). When upgrading Dagster, the metadata schema may change.
Dagster metadata migrations#
# After every dagster upgrade — run before starting the daemon/webserver
dagster instance migrate
Output: the migration runs DDL against $DAGSTER_HOME storage (SQLite or Postgres); subsequent processes start cleanly. Skipping this step is the #1 cause of “schema_version mismatch” errors after an upgrade.
Asset materialisation key versioning#
When an asset’s key changes (e.g. you rename it or move it under a key_prefix), Dagster loses the materialisation history for the old key. The pattern to preserve continuity:
from dagster import asset, AssetKey
@asset(key=AssetKey(["analytics", "users"])) # new structured key
def users(): ...
# Old materialisations live under AssetKey("users") and are no longer linked.
# To migrate, use the Dagster API to "alias" old materialisations under the new key.
For partitioned assets, never change the partition scheme (DailyPartitionsDefinition(start_date=...)) without backfilling — old partitions disappear from the UI.
Versioning IO managers#
I/O managers store materialised values somewhere. Changing the I/O manager on an existing asset (e.g. from FilesystemIOManager to S3PickleIOManager) breaks reads of old materialisations — schedule a backfill or accept the gap.
Version migration guide#
Dagster’s 0.x → 1.0 jump (August 2022) preceded the asset-centric model. Within 1.x, the most important transition is @op/@graph → @asset (formalised in 1.5).
@op / @graph → @asset (post-1.5)#
The pre-1.5 model declared computation units as @ops wired into @graphs. The modern model declares assets (the things produced); Dagster derives the graph automatically.
# Pre-1.5 — ops + graphs
from dagster import op, graph, job
@op
def extract(): return [1, 2, 3]
@op
def transform(xs): return [x * 2 for x in xs]
@graph
def etl():
transform(extract())
etl_job = etl.to_job()
# 1.5+ — assets
from dagster import asset, Definitions
@asset
def raw_xs() -> list[int]:
return [1, 2, 3]
@asset
def doubled_xs(raw_xs: list[int]) -> list[int]:
return [x * 2 for x in raw_xs]
defs = Definitions(assets=[raw_xs, doubled_xs])
Output: both styles still run; the asset style is the canonical 1.5+ pattern and gets all new UI features (asset lineage graph, materialisation history, asset checks).
@repository → Definitions#
Pre-1.5 code declared a @repository; modern code exports a Definitions object as the top-level symbol.
# Pre-1.5
from dagster import repository
@repository
def repo():
return [my_job, my_schedule]
# 1.5+
from dagster import Definitions
defs = Definitions(jobs=[my_job], schedules=[my_schedule])
Output: the Definitions object replaces the decorator; the webserver discovers it via workspace.yaml.
Plugin compatibility ranges#
Each dagster-* plugin pins a tight compatible range on core. After every dagster upgrade:
pip install -U dagster dagster-webserver dagster-dbt dagster-aws dagster-snowflake
dagster instance migrate
Output: all plugins move together; the metadata schema migrates before any process starts. Drift between core and plugin produces import-time errors that look like API changes but are actually compatibility-pin mismatches.
Removed surfaces to audit#
pipeline→job— the@pipelinedecorator was removed in1.0. Equivalent is@graph+.to_job().solid→op— the@soliddecorator was renamed to@opin0.13. Old tutorials still show@solid.DagsterInstance.local_temp— removed; useDagsterInstance.ephemeral()for tests.Output.metadata_entries— replaced byOutput.metadata(a dict, not a list of typed objects).
Performance tuning#
Dagster’s overhead is per-asset-materialisation, not per-line-of-Python. Optimise by reducing materialisation boundaries — small frequent materialisations of cheap assets cost more than a single materialisation of a fat asset.
Multi-asset for shared computation#
When several assets share most of their computation, declare them as a @multi_asset so the body runs once and emits multiple outputs.
from dagster import multi_asset, AssetOut
@multi_asset(outs={
"users": AssetOut(),
"orders": AssetOut(),
})
def users_and_orders():
df = expensive_query()
return df[df["type"] == "user"], df[df["type"] == "order"]
Output: one body invocation, two materialisations recorded. The UI shows both assets as siblings with shared lineage.
Asset concurrency limits#
Tag-based concurrency limits cap parallelism across the workspace — useful when a downstream system (warehouse, API) has its own bottleneck.
# dagster.yaml
run_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
tag_concurrency_limits:
- key: "dagster/concurrency_key"
value: "snowflake"
limit: 4
@asset(op_tags={"dagster/concurrency_key": "snowflake"})
def warehouse_export(): ...
Output: at most 4 warehouse_export-tagged runs execute concurrently; the rest queue.
IO manager selection for large data#
Default I/O managers pickle the asset return value. For multi-GB pandas/polars frames, switch to a columnar I/O manager (dagster-snowflake-pandas, dagster-polars) or write to the warehouse directly and return a reference.
from dagster import asset, AssetExecutionContext
@asset(io_manager_key="snowflake_io_manager")
def large_table(context: AssetExecutionContext) -> pd.DataFrame:
return huge_dataframe # writes to Snowflake, doesn't pickle
Output: Dagster never holds the frame in memory across asset boundaries; the I/O manager handles persistence.
Sensor cadence#
Sensors default to a 30 s tick. For event-driven cases that need faster reaction (file landing in S3, message in a queue), lower the interval:
from dagster import sensor
@sensor(target=daily_events, minimum_interval_seconds=10)
def fast_sensor(context): ...
Output: the daemon ticks the sensor every 10 s; below that the daemon’s own loop overhead dominates.
Testing strategies#
Dagster’s testing primitives are materialize([...]) for assets and JobDefinition.execute_in_process() for jobs. Both run in-process against an ephemeral DagsterInstance.
import pytest
from dagster import materialize, DagsterInstance
from my_pipelines.assets import raw_users, clean_users, user_count
def test_asset_graph():
result = materialize([raw_users, clean_users, user_count])
assert result.success
df = result.output_for_node("clean_users")
assert df["name"].iloc[0] == "Alice"
def test_partitioned():
result = materialize(
[daily_events],
partition_key="2026-01-15",
)
assert result.success
assert result.output_for_node("daily_events") == 42
def test_with_mock_resource():
from unittest.mock import MagicMock
mock_db = MagicMock()
mock_db.query.return_value = [{"id": 1, "name": "test"}]
result = materialize([raw_users], resources={"db": mock_db})
assert result.success
Output: all three tests run in milliseconds against an ephemeral SQLite instance; no daemon, no webserver, no Postgres.
Key patterns:
materialize([...])— the primary asset-test primitive. Returns anExecuteInProcessResultwithoutput_for_node,success,dagster_events.partition_key="…"— for partitioned assets, supply the partition under test.resources={...}— override resources with mocks for unit isolation.DagsterInstance.ephemeral()— explicit ephemeral instance when you need to assert on event log state across multiple materialisations.
See also#
- Python: dagster — assets, jobs, schedules, sensors, resources
- Packages: pip-prefect — the flow/task-oriented alternative