Op events and exceptions
If you are just getting started with Dagster, we strongly recommend you use assets rather than ops to build your data pipelines. The ops documentation is for Dagster users who need to manage existing ops, or who have complex use cases.
Within the body of an op, it is possible to communicate with the Dagster framework either by yielding an event, logging an event, or raising an exception. This page describes these different possibilities and the scenarios in which you might use them.
Relevant APIs
Name | Description |
---|---|
Output | Dagster event used to yield an output from an op |
AssetMaterialization | Dagster event indicating that an op has materialized an asset |
AssetObservation | Dagster event indicating that an op has observed an asset |
ExpectationResult | Dagster event representing the result of a data quality check |
Failure | Dagster exception indicating that a failure has occurred |
RetryRequested | Dagster exception requesting the step to be retried |
Overview
Within the body of an op, a stream of structured events can be yielded or logged. These events will be processed by Dagster and recorded in the event log, along with some additional context about the op that emitted it.
It is also possible to raise Dagster-specific exceptions, which will indicate to the framework to halt the op execution entirely and perform some action.
Event metadata
Often, it may be useful to attach some arbitrary information to an event or exception that is not captured by its basic parameters. Through the MetadataValue
class, we provide a consistent interface for specifying this metadata. The available value types are accessible through a static API defined on MetadataValue
. These include simple datatypes (MetadataValue.float
, MetadataValue.int
, MetadataValue.text
), as well as more complex types such as markdown and json (MetadataValue.md
, MetadataValue.json
). Depending on the type of its value
, metadata will be rendered in the UI in a more useful format than a simple unstructured string.
Metadata is attached to events at construction time, using the metadata
argument, which takes a dictionary mapping string labels
to primitive types or MetadataValue
objects.
Events
Yielding events from within the body of an op is a useful way of communicating with the Dagster framework. The most critical event to the functionality of Dagster is the Output
event, which allows output data to be passed on from one op to the next. However, we also provide interfaces to inform Dagster about external assets and data quality checks during the run of an op.
Output objects
Because returning a value from an op is such a fundamental part of creating a data pipeline, we have a few different interfaces for this functionality.
For many use cases, Dagster ops can be used directly with python's native type annotations without additional modification.
@op
def my_output_op():
return 5
Check out the docs on Op Outputs to learn more about this functionality.
Dagster also provides the Output
object, which opens up additional functionality to outputs when using Dagster, such as specifying output metadata and conditional branching, all while maintaining coherent type annotations.
Output
objects can be either returned or yielded. The Output
type is also generic, for use with return annotations:
import dagster as dg
# Using dg.Output as type annotation without inner type
@dg.op
def my_output_op() -> dg.Output:
return dg.Output("some_value", metadata={"some_metadata": "a_value"})
# A single output with a parameterized type annotation
@dg.op
def my_output_generic_op() -> dg.Output[int]:
return dg.Output(5, metadata={"some_metadata": "a_value"})
# Multiple outputs using parameterized type annotation
@dg.op(out={"int_out": dg.Out(), "str_out": dg.Out()})
def my_multiple_generic_output_op() -> tuple[dg.Output[int], dg.Output[str]]:
return (
dg.Output(5, metadata={"some_metadata": "a_value"}),
dg.Output("foo", metadata={"some_metadata": "another_value"}),
)
When Output
objects are yielded, type annotations cannot be used. Instead, type information can be specified using the out
argument of the op decorator.
import dagster as dg
@dg.op(out={"out1": dg.Out(str), "out2": dg.Out(int)})
def my_op_yields():
yield dg.Output(5, output_name="out2")
yield dg.Output("foo", output_name="out1")
Attaching metadata to outputs (Experimental)
If there is information specific to an op output that you would like to log, you can use an Output
object to attach metadata to the op's output. To do this, use the metadata
parameter on the object, which expects a mapping of string labels to metadata values.
The EventMetadata
class contains a set of static wrappers to customize the display of certain types of structured metadata.
The following example demonstrates how you might use this functionality:
import dagster as dg
@dg.op
def my_metadata_output(context: dg.OpExecutionContext) -> dg.Output:
df = get_some_data()
return dg.Output(
df,
metadata={
"text_metadata": "Text-based metadata for this event",
"dashboard_url": dg.MetadataValue.url(
"http://mycoolsite.com/url_for_my_data"
),
"raw_count": len(df),
"size (bytes)": calculate_bytes(df),
},
)
Asset materializations
AssetMaterialization
events tell Dagster that you have written some data asset to an external system. The classic example would be writing to a table in a database, but really any sort of persisted object that you would want to keep track of can be considered an asset.
Generally, you'd want to send this event directly after you persist the asset to your external system. All AssetMaterialization
events must define an asset_key
, which is a unique identifier to describe the asset you are persisting. They can optionally include a partition
if they're persisting a particular partition of an asset.
If you're using asset definitions, you don't need to record these events explicitly – the framework handles it for you.
import dagster as dg
@dg.op
def my_asset_op(context: dg.OpExecutionContext):
df = get_some_data()
store_to_s3(df)
context.log_event(
dg.AssetMaterialization(
asset_key="s3.my_asset",
description="A df I stored in s3",
)
)
result = do_some_transform(df)
return result
Asset materializations can also be yielded:
import dagster as dg
@dg.op
def my_asset_op_yields():
df = get_some_data()
store_to_s3(df)
yield dg.AssetMaterialization(
asset_key="s3.my_asset",
description="A df I stored in s3",
)
result = do_some_transform(df)
yield dg.Output(result)
When yielding asset materializations, outputs must also be yielded via an Output
.
To learn more about assets and how they are surfaced once you send this event, check out the Asset Catalog documentation.
Attaching metadata to asset materializations
Attaching metadata to Asset Materializations is an important way of tracking aspects of a given asset over time. This functions essentially identically to other events which accept a metadata
parameter, allowing you to attach a set of structured labels and values to display.
from dagster import AssetMaterialization, MetadataValue, op, OpExecutionContext
@op
def my_metadata_materialization_op(context: OpExecutionContext):
df = read_df()
remote_storage_path = persist_to_storage(df)
context.log_event(
AssetMaterialization(
asset_key="my_dataset",
description="Persisted result to storage",
metadata={
"text_metadata": "Text-based metadata for this event",
"path": MetadataValue.path(remote_storage_path),
"dashboard_url": MetadataValue.url(
"http://mycoolsite.com/url_for_my_data"
),
"size (bytes)": calculate_bytes(df),
},
)
)
return remote_storage_path
Asset observations
AssetObservation
events record metadata about assets. Unlike
asset materializations, asset observations do not signify that an asset has been
mutated.
Within ops and assets, you can log or yield AssetObservation
events at runtime. Similar to attaching metadata to asset materializations, asset observations accept a metadata
parameter, allowing you to track specific properties of an asset over time.
from dagster import AssetObservation, op
@op
def observation_op(context: OpExecutionContext):
df = read_df()
context.log_event(
AssetObservation(asset_key="observation_asset", metadata={"num_rows": len(df)})
)
return 5
In the example above, an observation tracks the number of rows in an asset persisted to storage. This information can then be viewed on the Asset Details page.
To learn more about asset observations, check out the Asset Observation documentation.
Expectation results
Ops can emit structured events to represent the results of a data quality test. The data quality event class is the ExpectationResult
. To generate an expectation result, we can log or yield an ExpectationResult
event in our op.
import dagster as dg
@dg.op
def my_expectation_op(context: dg.OpExecutionContext, df):
do_some_transform(df)
context.log_event(
dg.ExpectationResult(
success=len(df) > 0, description="ensure dataframe has rows"
)
)
return df
Attaching metadata to expectation results
Like many other event types in Dagster, there are a variety of types of metadata that can be associated with an expectation result event, all through the MetadataValue
class. Each expectation event optionally takes a dictionary of metadata that is then displayed in the event log.
This example shows metadata entries of different types attached to the same expectation result:
import dagster as dg
@dg.op
def my_metadata_expectation_op(context: dg.OpExecutionContext, df):
df = do_some_transform(df)
context.log_event(
dg.ExpectationResult(
success=len(df) > 0,
description="ensure dataframe has rows",
metadata={
"text_metadata": "Text-based metadata for this event",
"dashboard_url": dg.MetadataValue.url(
"http://mycoolsite.com/url_for_my_data"
),
"raw_count": len(df),
"size (bytes)": calculate_bytes(df),
},
)
)
return df
Exceptions
Dagster also provides some op-specific exception classes, which can be raised to halt the execution of a op. The behavior after an exception is raised depends on the exception that you use. The exceptions are documented below.
Failures
A Failure
is a kind of Exception that contains metadata that can be interpreted by the Dagster framework. Like any Exception raised inside an op, it indicates that the op has failed in an unrecoverable way, and that execution should stop.
A Failure
can include a dictionary with structured MetadataValue
values, which will be rendered in the UI according to their type. It also has an allow_retries
argument that can be used to bypass the op's retry policy.
import dagster as dg
@dg.op
def my_failure_op():
path = "/path/to/files"
my_files = get_files(path)
if len(my_files) == 0:
raise dg.Failure(
description="No files to process",
metadata={
"filepath": dg.MetadataValue.path(path),
"dashboard_url": dg.MetadataValue.url("http://mycoolsite.com/failures"),
},
)
return some_calculation(my_files)
Retry requests
RetryRequested
exceptions are useful when you experience failures
that are possible to recover from. For example, if you have a flaky operation that
you expect to throw an exception once in a while, you can catch the exception and
throw a RetryRequested
to make Dagster halt and re-start op
execution.
You can configure the number of retries to be attempted with the max_retries
parameter.
import dagster as dg
@dg.op
def my_retry_op():
try:
result = flaky_operation()
except Exception as e:
raise dg.RetryRequested(max_retries=3) from e
return result