You are viewing an unreleased or outdated version of the documentation

Dagster Pipes subprocess reference#

This reference shows usage of Dagster Pipes with other entities in the Dagster system. For a step-by-step walkthrough, refer to the Dagster Pipes tutorial.


Specifying environment variables and extras#

When launching the subprocess, you may want to make environment variables or additoional parameters available in the external process. Extras are arbitrary, user-defined parameters made available on the context object in the external process.

In the external code, you can access extras via the PipesContext object:

import os

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    total_orders = len(orders_df)
    # get the Dagster Pipes context
    context = PipesContext.get()
    # get all extras provided by Dagster asset
    print(context.extras)
    # get the value of an extra
    print(context.get_extra("foo"))
    # get env var
    print(os.environ["MY_ENV_VAR_IN_SUBPROCESS"])


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

Working with @asset_check#

Sometimes, you may not want to materialize an asset, but instead want to report a data quality check result. When your asset has data quality checks defined in @asset_check:

From the external code, you can report to Dagster that an asset check has been performed via PipesContext.report_asset_check. Note that asset_key in this case is required, and must match the asset key defined in @asset_check:

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame({"order_id": [1, 2], "item_id": [432, 878]})
    # get the Dagster Pipes context
    context = PipesContext.get()
    # send structured metadata back to Dagster
    context.report_asset_check(
        asset_key="my_asset",
        passed=orders_df[["item_id"]].notnull().all().bool(),
        check_name="no_empty_order_check",
    )


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()

Working with multi-assets#

Sometimes, you may invoke a single call to an API that results in multiple tables being updated, or you may have a single script that computes multiple assets. In these cases, you can use Dagster Pipes to report back on multiple assets at once.

Note: when working with multi-assets, `PipesContext.report_asset_materialization may only be called once per unique asset key. If called more than once, an error similar to the following will surface:

Calling {method} with asset key {asset_key} is undefined. Asset has already been materialized, so no additional data can be reported for it

Instead, you’ll need to set the asset_key parameter for each instance of PipesContext.report_asset_materialization:

import pandas as pd
from dagster_pipes import PipesContext, open_dagster_pipes


def main():
    orders_df = pd.DataFrame(
        {"order_id": [1, 2, 3], "item_id": [432, 878, 102], "user_id": ["a", "b", "a"]}
    )
    total_orders = len(orders_df)
    total_users = orders_df["user_id"].nunique()

    # get the Dagster Pipes context
    context = PipesContext.get()
    # send structured metadata back to Dagster. asset_key is required when there are multiple assets
    context.report_asset_materialization(
        asset_key="orders", metadata={"total_orders": total_orders}
    )
    context.report_asset_materialization(
        asset_key="users", metadata={"total_users": total_users}
    )


if __name__ == "__main__":
    # connect to Dagster Pipes
    with open_dagster_pipes():
        main()