Azure (dagster-azure)
Utilities for using Azure Storage Accounts with Dagster. This is mostly aimed at Azure Data Lake Storage Gen 2 (ADLS2) but also contains some utilities for Azure Blob Storage.
Resources
- dagster_azure.adls2.ADLS2Resource ResourceDefinition
- Resource containing clients to access Azure Data Lake Storage Gen2. - Contains a client for both the Data Lake and Blob APIs, to work around the limitations of each. - Example usage: - Attach this resource to your Definitions to be used by assets and jobs. - from dagster import Definitions, asset, job, op
 from dagster_azure.adls2 import ADLS2Resource, ADLS2SASToken
 @asset
 def asset1(adls2: ADLS2Resource):
 adls2.adls2_client.list_file_systems()
 ...
 @op
 def my_op(adls2: ADLS2Resource):
 adls2.adls2_client.list_file_systems()
 ...
 @job
 def my_job():
 my_op()
 defs = Definitions(
 assets=[asset1],
 jobs=[my_job],
 resources={
 "adls2": ADLS2Resource(
 storage_account="my-storage-account",
 credential=ADLS2SASToken(token="my-sas-token"),
 )
 },
 )- Attach this resource to your job to make it available to your ops. - from dagster import job, op
 from dagster_azure.adls2 import ADLS2Resource, ADLS2SASToken
 @op
 def my_op(adls2: ADLS2Resource):
 adls2.adls2_client.list_file_systems()
 ...
 @job(
 resource_defs={
 "adls2": ADLS2Resource(
 storage_account="my-storage-account",
 credential=ADLS2SASToken(token="my-sas-token"),
 )
 },
 )
 def my_job():
 my_op()
- dagster_azure.fakes.FakeADLS2Resource ResourceDefinition
- Stateful mock of an ADLS2Resource for testing. - Wraps a - mock.MagicMock. Containers are implemented using an in-memory dict.
- dagster_azure.blob.AzureBlobStorageResource ResourceDefinition
- Resource for interacting with Azure Blob Storage. - Examples: - import os
 from dagster import Definitions, asset, EnvVar
 from dagster_azure.blob import (
 AzureBlobStorageResource,
 AzureBlobStorageKeyCredential,
 AzureBlobStorageDefaultCredential
 )
 @asset
 def my_table(azure_blob_storage: AzureBlobStorageResource):
 with azure_blob_storage.get_client() as blob_storage_client:
 response = blob_storage_client.list_containers()
 defs = Definitions(
 assets=[my_table],
 resources={
 "azure_blob_storage": AzureBlobStorageResource(
 account_url=EnvVar("AZURE_BLOB_STORAGE_ACCOUNT_URL"),
 credential=AzureBlobStorageDefaultCredential() if os.getenv("DEV") else
 AzureBlobStorageKeyCredential(key=EnvVar("AZURE_BLOB_STORAGE_KEY"))
 ),
 },
 )
- classdagster_azure.blob.AzureBlobComputeLogManager
- Logs op compute function stdout and stderr to Azure Blob Storage. - This is also compatible with Azure Data Lake Storage. - Users should not instantiate this class directly. Instead, use a YAML block in - dagster.yaml. Examples provided below will show how to configure with various credentialing schemes.- Parameters: - storage_account (str) – The storage account name to which to log.
- container (str) – The container (or ADLS2 filesystem) to which to log.
- secret_credential (Optional[dict]) – Secret credential for the storage account. This should be a dictionary with keys client_id, client_secret, and tenant_id.
- access_key_or_sas_token (Optional[str]) – Access key or SAS token for the storage account.
- default_azure_credential (Optional[dict]) – Use and configure DefaultAzureCredential. Cannot be used with sas token or secret key config.
- local_dir (Optional[str]) – Path to the local directory in which to stage logs. Default: dagster._seven.get_system_temp_directory().
- prefix (Optional[str]) – Prefix for the log file keys.
- upload_interval (Optional[int]) – Interval in seconds to upload partial log files blob storage. By default, will only upload when the capture is complete.
- show_url_only (bool) – Only show the URL of the log file in the UI, instead of fetching and displaying the full content. Default False.
- inst_data (Optional[ConfigurableClassData]) – Serializable representation of the compute log manager when newed up from config.
 - Examples: Using an Azure Blob Storage account with an AzureSecretCredential: - compute_logs:
 module: dagster_azure.blob.compute_log_manager
 class: AzureBlobComputeLogManager
 config:
 storage_account: my-storage-account
 container: my-container
 secret_credential:
 client_id: my-client-id
 client_secret: my-client-secret
 tenant_id: my-tenant-id
 prefix: "dagster-test-"
 local_dir: "/tmp/cool"
 upload_interval: 30
 show_url_only: false- Using an Azure Blob Storage account with a DefaultAzureCredential: - compute_logs:
 module: dagster_azure.blob.compute_log_manager
 class: AzureBlobComputeLogManager
 config:
 storage_account: my-storage-account
 container: my-container
 default_azure_credential:
 exclude_environment_credential: false
 prefix: "dagster-test-"
 local_dir: "/tmp/cool"
 upload_interval: 30
 show_url_only: false- Using an Azure Blob Storage account with an access key: - compute_logs:
 module: dagster_azure.blob.compute_log_manager
 class: AzureBlobComputeLogManager
 config:
 storage_account: my-storage-account
 container: my-container
 access_key_or_sas_token: my-access-key
 prefix: "dagster-test-"
 local_dir: "/tmp/cool"
 upload_interval: 30
 show_url_only: false
I/O Manager
- dagster_azure.adls2.ADLS2PickleIOManager IOManagerDefinition
- Persistent IO manager using Azure Data Lake Storage Gen2 for storage. - Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for ADLS and the backing container. - Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir. - Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”. - Example usage: - Attach this IO manager to a set of assets.
from dagster import Definitions, asset
 from dagster_azure.adls2 import ADLS2PickleIOManager, ADLS2Resource, ADLS2SASToken
 @asset
 def asset1():
 # create df ...
 return df
 @asset
 def asset2(asset1):
 return df[:5]
 defs = Definitions(
 assets=[asset1, asset2],
 resources={
 "io_manager": ADLS2PickleIOManager(
 adls2_file_system="my-cool-fs",
 adls2_prefix="my-cool-prefix",
 adls2=ADLS2Resource(
 storage_account="my-storage-account",
 credential=ADLS2SASToken(token="my-sas-token"),
 ),
 ),
 },
 )
- Attach this IO manager to your job to make it available to your ops.
from dagster import job
 from dagster_azure.adls2 import ADLS2PickleIOManager, ADLS2Resource, ADLS2SASToken
 @job(
 resource_defs={
 "io_manager": ADLS2PickleIOManager(
 adls2_file_system="my-cool-fs",
 adls2_prefix="my-cool-prefix",
 adls2=ADLS2Resource(
 storage_account="my-storage-account",
 credential=ADLS2SASToken(token="my-sas-token"),
 ),
 ),
 },
 )
 def my_job():
 ...
 
- Attach this IO manager to a set of assets.
File Manager
- dagster_azure.adls2.adls2_file_manager ResourceDefinition
- FileManager that provides abstract access to ADLS2. - Implements the - FileManagerAPI.
- classdagster_azure.adls2.ADLS2FileHandle
- A reference to a file on ADLS2. 
Legacy
- dagster_azure.adls2.ConfigurablePickledObjectADLS2IOManager IOManagerDefinition
- deprecatedThis API will be removed in version 2.0. Please use ADLS2PickleIOManager instead.. Renamed to ADLS2PickleIOManager. See ADLS2PickleIOManager for documentation. 
- dagster_azure.adls2.adls2_resource ResourceDefinition
- Resource that gives ops access to Azure Data Lake Storage Gen2. - The underlying client is a - DataLakeServiceClient.- Attach this resource definition to a - JobDefinitionin order to make it available to your ops.- Example: - from dagster import job, op
 from dagster_azure.adls2 import adls2_resource
 @op(required_resource_keys={'adls2'})
 def example_adls2_op(context):
 return list(context.resources.adls2.adls2_client.list_file_systems())
 @job(resource_defs={"adls2": adls2_resource})
 def my_job():
 example_adls2_op()- Note that your ops must also declare that they require this resource with required_resource_keys, or it will not be initialized for the execution of their compute functions. - You may pass credentials to this resource using either a SAS token, a key or by passing the DefaultAzureCredential object. - resources:
 adls2:
 config:
 storage_account: my_storage_account
 # str: The storage account name.
 credential:
 sas: my_sas_token
 # str: the SAS token for the account.
 key:
 env: AZURE_DATA_LAKE_STORAGE_KEY
 # str: The shared access key for the account.
 DefaultAzureCredential: {}
 # dict: The keyword arguments used for DefaultAzureCredential
 # or leave the object empty for no arguments
 DefaultAzureCredential:
 exclude_environment_credential: true
- dagster_azure.adls2.adls2_pickle_io_manager IOManagerDefinition
- Persistent IO manager using Azure Data Lake Storage Gen2 for storage. - Serializes objects via pickling. Suitable for objects storage for distributed executors, so long as each execution node has network connectivity and credentials for ADLS and the backing container. - Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir. - Subsequent materializations of an asset will overwrite previous materializations of that asset. With a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”. - Example usage: - Attach this IO manager to a set of assets. - from dagster import Definitions, asset
 from dagster_azure.adls2 import adls2_pickle_io_manager, adls2_resource
 @asset
 def asset1():
 # create df ...
 return df
 @asset
 def asset2(asset1):
 return df[:5]
 defs = Definitions(
 assets=[asset1, asset2],
 resources={
 "io_manager": adls2_pickle_io_manager.configured(
 {"adls2_file_system": "my-cool-fs", "adls2_prefix": "my-cool-prefix"}
 ),
 "adls2": adls2_resource,
 },
 )- Attach this IO manager to your job to make it available to your ops. - from dagster import job
 from dagster_azure.adls2 import adls2_pickle_io_manager, adls2_resource
 @job(
 resource_defs={
 "io_manager": adls2_pickle_io_manager.configured(
 {"adls2_file_system": "my-cool-fs", "adls2_prefix": "my-cool-prefix"}
 ),
 "adls2": adls2_resource,
 },
 )
 def my_job():
 ...