DuckDB (dagster-duckdb)
This library provides an integration with the DuckDB database.
Related Guides:
- dagster_duckdb.DuckDBIOManager IOManagerDefinition
- Base class for an IO manager definition that reads inputs from and writes outputs to DuckDB. - Examples: - from dagster_duckdb import DuckDBIOManager
 from dagster_duckdb_pandas import DuckDBPandasTypeHandler
 class MyDuckDBIOManager(DuckDBIOManager):
 @staticmethod
 def type_handlers() -> Sequence[DbTypeHandler]:
 return [DuckDBPandasTypeHandler()]
 @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pd.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb")}
 )- You can set a default schema to store the assets using the - schemaconfiguration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.- defs = Definitions(
 assets=[my_table],
 resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb", schema="my_schema")}
 )- On individual assets, you an also specify the schema where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pd.DataFrame:
 ...
 @asset(
 metadata={"schema": "my_schema"} # will be used as the schema in duckdb
 )
 def my_other_table() -> pd.DataFrame:
 ...- For ops, the schema can be specified by including a “schema” entry in output metadata. - @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pd.DataFrame:
 ...- If none of these is provided, the schema will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pd.DataFrame):
 # my_table will just contain the data from column "a"
 ...- Set DuckDB configuration options using the connection_config field. See https://duckdb.org/docs/sql/configuration.html for all available settings. - defs = Definitions(
 assets=[my_table],
 resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb",
 connection_config={"arrow_large_buffer_size": True})}
 )
- dagster_duckdb.DuckDBResource ResourceDefinition
- Resource for interacting with a DuckDB database. - Examples: - from dagster import Definitions, asset
 from dagster_duckdb import DuckDBResource
 @asset
 def my_table(duckdb: DuckDBResource):
 with duckdb.get_connection() as conn:
 conn.execute("SELECT * from MY_SCHEMA.MY_TABLE")
 defs = Definitions(
 assets=[my_table],
 resources={"duckdb": DuckDBResource(database="path/to/db.duckdb")}
 )
Legacy
- dagster_duckdb.build_duckdb_io_manager IOManagerDefinition
- Builds an IO manager definition that reads inputs from and writes outputs to DuckDB. - Parameters: - type_handlers (Sequence[DbTypeHandler]) – Each handler defines how to translate between DuckDB tables and an in-memory type - e.g. a Pandas DataFrame. If only one DbTypeHandler is provided, it will be used as teh default_load_type.
- default_load_type (Type) – When an input has no type annotation, load it as this type.
 - Returns: IOManagerDefinition Examples: - from dagster_duckdb import build_duckdb_io_manager
 from dagster_duckdb_pandas import DuckDBPandasTypeHandler
 @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pd.DataFrame: # the name of the asset will be the table name
 ...
 duckdb_io_manager = build_duckdb_io_manager([DuckDBPandasTypeHandler()])
 defs = Definitions(
 assets=[my_table]
 resources={"io_manager" duckdb_io_manager.configured({"database": "my_db.duckdb"})}
 )- You can set a default schema to store the assets using the - schemaconfiguration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.- defs = Definitions(
 assets=[my_table]
 resources={"io_manager" duckdb_io_manager.configured(
 {"database": "my_db.duckdb", "schema": "my_schema"} # will be used as the schema
 )}
 )- On individual assets, you an also specify the schema where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pd.DataFrame:
 ...
 @asset(
 metadata={"schema": "my_schema"} # will be used as the schema in duckdb
 )
 def my_other_table() -> pd.DataFrame:
 ...- For ops, the schema can be specified by including a “schema” entry in output metadata. - @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pd.DataFrame:
 ...- If none of these is provided, the schema will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pd.DataFrame):
 # my_table will just contain the data from column "a"
 ...