DuckDB + PySpark (dagster-duckdb-pyspark)
This library provides an integration with the DuckDB database and PySpark data processing library.
Related guides:
- dagster_duckdb_pyspark.DuckDBPySparkIOManager IOManagerDefinition
- An I/O manager definition that reads inputs from and writes PySpark DataFrames to DuckDB. When using the DuckDBPySparkIOManager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames. - Returns: IOManagerDefinition Examples: - from dagster_duckdb_pyspark import DuckDBPySparkIOManager
 @asset(
 key_prefix=["my_schema"] # will be used as the schema in DuckDB
 )
 def my_table() -> pyspark.sql.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={"io_manager": DuckDBPySparkIOManager(database="my_db.duckdb")}
 )- You can set a default schema to store the assets using the - schemaconfiguration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.- defs = Definitions(
 assets=[my_table],
 resources={"io_manager": DuckDBPySparkIOManager(database="my_db.duckdb", schema="my_schema")}
 )- On individual assets, you an also specify the schema where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pyspark.sql.DataFrame:
 ...
 @asset(
 metadata={"schema": "my_schema"} # will be used as the schema in duckdb
 )
 def my_other_table() -> pyspark.sql.DataFrame:
 ...- For ops, the schema can be specified by including a “schema” entry in output metadata. - @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pyspark.sql.DataFrame:
 ...- If none of these is provided, the schema will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
 # my_table will just contain the data from column "a"
 ...
- classdagster_duckdb_pyspark.DuckDBPySparkTypeHandler
- Stores PySpark DataFrames in DuckDB. - To use this type handler, return it from the - type_handlersmethod of an I/O manager that inherits from ``DuckDBIOManager`.- Example: - from dagster_duckdb import DuckDBIOManager
 from dagster_duckdb_pyspark import DuckDBPySparkTypeHandler
 class MyDuckDBIOManager(DuckDBIOManager):
 @staticmethod
 def type_handlers() -> Sequence[DbTypeHandler]:
 return [DuckDBPySparkTypeHandler()]
 @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pyspark.sql.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={"io_manager": MyDuckDBIOManager(database="my_db.duckdb")}
 )
Legacy
- dagster_duckdb_pyspark.duckdb_pyspark_io_manager IOManagerDefinition
- An I/O manager definition that reads inputs from and writes PySpark DataFrames to DuckDB. When using the duckdb_pyspark_io_manager, any inputs and outputs without type annotations will be loaded as PySpark DataFrames. - Returns: IOManagerDefinition Examples: - from dagster_duckdb_pyspark import duckdb_pyspark_io_manager
 @asset(
 key_prefix=["my_schema"] # will be used as the schema in DuckDB
 )
 def my_table() -> pyspark.sql.DataFrame: # the name of the asset will be the table name
 ...
 defs = Definitions(
 assets=[my_table],
 resources={"io_manager": duckdb_pyspark_io_manager.configured({"database": "my_db.duckdb"})}
 )- You can set a default schema to store the assets using the - schemaconfiguration value of the DuckDB I/O Manager. This schema will be used if no other schema is specified directly on an asset or op.- defs = Definitions(
 assets=[my_table],
 resources={"io_manager": duckdb_pyspark_io_manager.configured({"database": "my_db.duckdb", "schema": "my_schema"})}
 )- On individual assets, you an also specify the schema where they should be stored using metadata or by adding a - key_prefixto the asset key. If both- key_prefixand metadata are defined, the metadata will take precedence.- @asset(
 key_prefix=["my_schema"] # will be used as the schema in duckdb
 )
 def my_table() -> pyspark.sql.DataFrame:
 ...
 @asset(
 metadata={"schema": "my_schema"} # will be used as the schema in duckdb
 )
 def my_other_table() -> pyspark.sql.DataFrame:
 ...- For ops, the schema can be specified by including a “schema” entry in output metadata. - @op(
 out={"my_table": Out(metadata={"schema": "my_schema"})}
 )
 def make_my_table() -> pyspark.sql.DataFrame:
 ...- If none of these is provided, the schema will default to “public”. - To only use specific columns of a table as input to a downstream op or asset, add the metadata “columns” to the In or AssetIn. - @asset(
 ins={"my_table": AssetIn("my_table", metadata={"columns": ["a"]})}
 )
 def my_table_a(my_table: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
 # my_table will just contain the data from column "a"
 ...