import datetime
import functools
from abc import ABC, abstractmethod, abstractproperty
from collections import defaultdict
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
AbstractSet,
Callable,
Dict,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
)
import pytz
import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.auto_materialize_rule_evaluation import (
AutoMaterializeDecisionType,
AutoMaterializeRuleEvaluationData,
AutoMaterializeRuleSnapshot,
ParentUpdatedRuleEvaluationData,
RuleEvaluationResults,
WaitingOnAssetsRuleEvaluationData,
)
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.freshness_based_auto_materialize import (
freshness_evaluation_results_for_asset_key,
)
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
from dagster._core.definitions.partition_mapping import IdentityPartitionMapping
from dagster._core.definitions.time_window_partition_mapping import TimeWindowPartitionMapping
from dagster._core.definitions.time_window_partitions import get_time_partitions_def
from dagster._serdes.serdes import (
whitelist_for_serdes,
)
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
from dagster._utils.schedules import (
cron_string_iterator,
is_valid_cron_string,
reverse_cron_string_iterator,
)
from .asset_graph import AssetGraph, sort_key_for_asset_partition
if TYPE_CHECKING:
from dagster._core.definitions.asset_daemon_context import AssetDaemonContext
from dagster._core.definitions.asset_daemon_cursor import AssetDaemonCursor
from dagster._core.definitions.auto_materialize_rule_evaluation import (
AutoMaterializeAssetEvaluation,
)
@dataclass(frozen=True)
class RuleEvaluationContext:
asset_key: AssetKey
cursor: "AssetDaemonCursor"
instance_queryer: CachingInstanceQueryer
data_time_resolver: CachingDataTimeResolver
# Tracks which asset partitions are already slated for materialization in this tick. The asset
# keys in the values match the asset key in the corresponding key.
will_materialize_mapping: Mapping[AssetKey, AbstractSet[AssetKeyPartitionKey]]
expected_data_time_mapping: Mapping[AssetKey, Optional[datetime.datetime]]
candidates: AbstractSet[AssetKeyPartitionKey]
daemon_context: "AssetDaemonContext"
@property
def asset_graph(self) -> AssetGraph:
return self.instance_queryer.asset_graph
@property
def previous_tick_evaluation(self) -> Optional["AutoMaterializeAssetEvaluation"]:
"""Returns the evaluation of the asset on the previous tick."""
return self.cursor.latest_evaluation_by_asset_key.get(self.asset_key)
@property
def evaluation_time(self) -> datetime.datetime:
"""Returns the time at which this rule is being evaluated."""
return self.instance_queryer.evaluation_time
@functools.cached_property
def previous_tick_requested_or_discarded_asset_partitions(
self,
) -> AbstractSet[AssetKeyPartitionKey]:
"""Returns the set of asset partitions that were requested or discarded on the previous tick."""
if not self.previous_tick_evaluation:
return set()
return self.previous_tick_evaluation.get_requested_or_discarded_asset_partitions(
asset_graph=self.asset_graph
)
@functools.cached_property
def previous_tick_evaluated_asset_partitions(
self,
) -> AbstractSet[AssetKeyPartitionKey]:
"""Returns the set of asset partitions that were evaluated on the previous tick."""
if not self.previous_tick_evaluation:
return set()
return self.previous_tick_evaluation.get_evaluated_asset_partitions(
asset_graph=self.asset_graph
)
def get_previous_tick_results(self, rule: "AutoMaterializeRule") -> "RuleEvaluationResults":
"""Returns the results that were calculated for a given rule on the previous tick."""
if not self.previous_tick_evaluation:
return []
return self.previous_tick_evaluation.get_rule_evaluation_results(
rule_snapshot=rule.to_snapshot(), asset_graph=self.asset_graph
)
def get_candidates_not_evaluated_by_rule_on_previous_tick(
self,
) -> AbstractSet[AssetKeyPartitionKey]:
"""Returns the set of candidates that were not evaluated by the rule that is currently being
evaluated on the previous tick.
Any asset partition that was evaluated by any rule on the previous tick must have been
evaluated by *all* skip rules.
"""
return self.candidates - self.previous_tick_evaluated_asset_partitions
def get_candidates_with_updated_or_will_update_parents(
self,
) -> AbstractSet[AssetKeyPartitionKey]:
"""Returns the set of candidate asset partitions whose parents have been updated since the
last tick or will be requested on this tick.
Many rules depend on the state of the asset's parents, so this function is useful for
finding asset partitions that should be re-evaluated.
"""
updated_parents = self.get_asset_partitions_with_updated_parents_since_previous_tick()
will_update_parents = set(self.get_will_update_parent_mapping().keys())
return self.candidates & (updated_parents | will_update_parents)
def materialized_requested_or_discarded_since_previous_tick(
self, asset_partition: AssetKeyPartitionKey
) -> bool:
"""Returns whether an asset partition has been materialized, requested, or discarded since
the last tick.
"""
if asset_partition in self.previous_tick_requested_or_discarded_asset_partitions:
return True
return self.instance_queryer.asset_partition_has_materialization_or_observation(
asset_partition, after_cursor=self.cursor.latest_storage_id
)
def materializable_in_same_run(self, child_key: AssetKey, parent_key: AssetKey) -> bool:
"""Returns whether a child asset can be materialized in the same run as a parent asset."""
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
return (
# both assets must be materializable
child_key in self.asset_graph.materializable_asset_keys
and parent_key in self.asset_graph.materializable_asset_keys
# the parent must have the same partitioning
and self.asset_graph.have_same_partitioning(child_key, parent_key)
# the parent must have a simple partition mapping to the child
and (
not self.asset_graph.is_partitioned(parent_key)
or isinstance(
self.asset_graph.get_partition_mapping(child_key, parent_key),
(TimeWindowPartitionMapping, IdentityPartitionMapping),
)
)
# the parent must be in the same repository to be materialized alongside the candidate
and (
not isinstance(self.asset_graph, ExternalAssetGraph)
or self.asset_graph.get_repository_handle(child_key)
== self.asset_graph.get_repository_handle(parent_key)
)
)
def get_parents_that_will_not_be_materialized_on_current_tick(
self, *, asset_partition: AssetKeyPartitionKey
) -> AbstractSet[AssetKeyPartitionKey]:
"""Returns the set of parent asset partitions that will not be updated in the same run of
this asset partition if a run is launched for this asset partition on this tick.
"""
return {
parent
for parent in self.asset_graph.get_parents_partitions(
dynamic_partitions_store=self.instance_queryer,
current_time=self.instance_queryer.evaluation_time,
asset_key=asset_partition.asset_key,
partition_key=asset_partition.partition_key,
).parent_partitions
if parent not in self.will_materialize_mapping.get(parent.asset_key, set())
or not self.materializable_in_same_run(asset_partition.asset_key, parent.asset_key)
}
def get_asset_partitions_with_updated_parents_since_previous_tick(
self
) -> AbstractSet[AssetKeyPartitionKey]:
"""Returns the set of asset partitions for the current key which have parents that updated
since the last tick.
"""
return self.daemon_context.get_asset_partitions_with_newly_updated_parents_for_key(
self.asset_key
)
def get_will_update_parent_mapping(
self,
) -> Mapping[AssetKeyPartitionKey, AbstractSet[AssetKey]]:
"""Returns a mapping from asset partitions of the current asset to the set of parent keys
which will be requested this tick and can execute in the same run as the current asset.
"""
will_update_parents_by_asset_partition = defaultdict(set)
# these are the set of parents that will be requested this tick and can be materialized in
# the same run as this asset
for parent_key in self.asset_graph.get_parents(self.asset_key):
if not self.materializable_in_same_run(self.asset_key, parent_key):
continue
for parent_partition in self.will_materialize_mapping.get(parent_key, set()):
asset_partition = AssetKeyPartitionKey(
self.asset_key, parent_partition.partition_key
)
will_update_parents_by_asset_partition[asset_partition].add(parent_key)
return will_update_parents_by_asset_partition
def get_asset_partitions_by_asset_key(
self,
asset_partitions: AbstractSet[AssetKeyPartitionKey],
) -> Mapping[AssetKey, Set[AssetKeyPartitionKey]]:
asset_partitions_by_asset_key: Dict[AssetKey, Set[AssetKeyPartitionKey]] = defaultdict(set)
for parent in asset_partitions:
asset_partitions_by_asset_key[parent.asset_key].add(parent)
return asset_partitions_by_asset_key
[docs]class AutoMaterializeRule(ABC):
"""An AutoMaterializeRule defines a bit of logic which helps determine if a materialization
should be kicked off for a given asset partition.
Each rule can have one of two decision types, `MATERIALIZE` (indicating that an asset partition
should be materialized) or `SKIP` (indicating that the asset partition should not be
materialized).
Materialize rules are evaluated first, and skip rules operate over the set of candidates that
are produced by the materialize rules. Other than that, there is no ordering between rules.
"""
@abstractproperty
def decision_type(self) -> AutoMaterializeDecisionType:
"""The decision type of the rule (either `MATERIALIZE` or `SKIP`)."""
...
@abstractproperty
def description(self) -> str:
"""A human-readable description of this rule. As a basic guideline, this string should
complete the sentence: 'Indicates an asset should be (materialize/skipped) when ____'.
"""
...
def add_evaluation_data_from_previous_tick(
self,
context: RuleEvaluationContext,
asset_partitions_by_evaluation_data: Mapping[
Optional[AutoMaterializeRuleEvaluationData], Set[AssetKeyPartitionKey]
],
should_use_past_data_fn: Callable[[AssetKeyPartitionKey], bool],
) -> "RuleEvaluationResults":
"""Combines a given set of evaluation data with evaluation data from the previous tick. The
returned value will include the union of the evaluation data contained within
`asset_partitions_by_evaluation_data` and the evaluation data calculated for asset
partitions on the previous tick for which `should_use_past_data_fn` evaluates to `True`.
Args:
context: The current RuleEvaluationContext.
asset_partitions_by_evaluation_data: A mapping from evaluation data to the set of asset
partitions that the rule applies to.
should_use_past_data_fn: A function that returns whether a given asset partition from the
previous tick should be included in the results of this tick.
"""
asset_partitions_by_evaluation_data = defaultdict(set, asset_partitions_by_evaluation_data)
evaluated_asset_partitions = set().union(*asset_partitions_by_evaluation_data.values())
for evaluation_data, asset_partitions in context.get_previous_tick_results(self):
for ap in asset_partitions:
# evaluated data from this tick takes precedence over data from the previous tick
if ap in evaluated_asset_partitions:
continue
elif should_use_past_data_fn(ap):
asset_partitions_by_evaluation_data[evaluation_data].add(ap)
return list(asset_partitions_by_evaluation_data.items())
@abstractmethod
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
"""The core evaluation function for the rule. This function takes in a context object and
returns a mapping from evaluated rules to the set of asset partitions that the rule applies
to.
"""
...
[docs] @public
@staticmethod
def materialize_on_required_for_freshness() -> "MaterializeOnRequiredForFreshnessRule":
"""Materialize an asset partition if it is required to satisfy a freshness policy of this
asset or one of its downstream assets.
Note: This rule has no effect on partitioned assets.
"""
return MaterializeOnRequiredForFreshnessRule()
[docs] @public
@staticmethod
def materialize_on_cron(
cron_schedule: str, timezone: str = "UTC", all_partitions: bool = False
) -> "MaterializeOnCronRule":
"""Materialize an asset partition if it has not been materialized since the latest cron
schedule tick. For assets with a time component to their partitions_def, this rule will
request all partitions that have been missed since the previous tick.
Args:
cron_schedule (str): A cron schedule string (e.g. "`0 * * * *`") indicating the ticks for
which this rule should fire.
timezone (str): The timezone in which this cron schedule should be evaluated. Defaults
to "UTC".
all_partitions (bool): If True, this rule fires for all partitions of this asset on each
cron tick. If False, this rule fires only for the last partition of this asset.
Defaults to False.
"""
check.param_invariant(
is_valid_cron_string(cron_schedule), "cron_schedule", "must be a valid cron string"
)
check.param_invariant(
timezone in pytz.all_timezones_set, "timezone", "must be a valid timezone"
)
return MaterializeOnCronRule(
cron_schedule=cron_schedule, timezone=timezone, all_partitions=all_partitions
)
[docs] @public
@staticmethod
def materialize_on_parent_updated() -> "MaterializeOnParentUpdatedRule":
"""Materialize an asset partition if one of its parents has been updated more recently
than it has.
Note: For time-partitioned or dynamic-partitioned assets downstream of an unpartitioned
asset, this rule will only fire for the most recent partition of the downstream.
"""
return MaterializeOnParentUpdatedRule()
[docs] @public
@staticmethod
def materialize_on_missing() -> "MaterializeOnMissingRule":
"""Materialize an asset partition if it has never been materialized before. This rule will
not fire for non-root assets unless that asset's parents have been updated.
"""
return MaterializeOnMissingRule()
[docs] @public
@staticmethod
def skip_on_parent_missing() -> "SkipOnParentMissingRule":
"""Skip materializing an asset partition if one of its parent asset partitions has never
been materialized (for regular assets) or observed (for observable source assets).
"""
return SkipOnParentMissingRule()
[docs] @public
@staticmethod
def skip_on_parent_outdated() -> "SkipOnParentOutdatedRule":
"""Skip materializing an asset partition if any of its parents has not incorporated the
latest data from its ancestors.
"""
return SkipOnParentOutdatedRule()
[docs] @public
@staticmethod
def skip_on_not_all_parents_updated(
require_update_for_all_parent_partitions: bool = False,
) -> "SkipOnNotAllParentsUpdatedRule":
"""Skip materializing an asset partition if any of its parents have not been updated since
the asset's last materialization.
Args:
require_update_for_all_parent_partitions (Optional[bool]): Applies only to an unpartitioned
asset or an asset partition that depends on more than one partition in any upstream asset.
If true, requires all upstream partitions in each upstream asset to be materialized since
the downstream asset's last materialization in order to update it. If false, requires at
least one upstream partition in each upstream asset to be materialized since the downstream
asset's last materialization in order to update it. Defaults to false.
"""
return SkipOnNotAllParentsUpdatedRule(require_update_for_all_parent_partitions)
[docs] @public
@staticmethod
def skip_on_required_but_nonexistent_parents() -> "SkipOnRequiredButNonexistentParentsRule":
"""Skip an asset partition if it depends on parent partitions that do not exist.
For example, imagine a downstream asset is time-partitioned, starting in 2022, but has a
time-partitioned parent which starts in 2023. This rule will skip attempting to materialize
downstream partitions from before 2023, since the parent partitions do not exist.
"""
return SkipOnRequiredButNonexistentParentsRule()
[docs] @public
@staticmethod
def skip_on_backfill_in_progress(
all_partitions: bool = False,
) -> "SkipOnBackfillInProgressRule":
"""Skip an asset's partitions if targeted by an in-progress backfill.
Args:
all_partitions (bool): If True, skips all partitions of the asset being backfilled,
regardless of whether the specific partition is targeted by a backfill.
If False, skips only partitions targeted by a backfill. Defaults to False.
"""
return SkipOnBackfillInProgressRule(all_partitions)
def to_snapshot(self) -> AutoMaterializeRuleSnapshot:
"""Returns a serializable snapshot of this rule for historical evaluations."""
return AutoMaterializeRuleSnapshot(
class_name=self.__class__.__name__,
description=self.description,
decision_type=self.decision_type,
)
def __eq__(self, other) -> bool:
# override the default NamedTuple __eq__ method to factor in types
return type(self) == type(other) and super().__eq__(other)
def __hash__(self) -> int:
# override the default NamedTuple __hash__ method to factor in types
return hash(hash(type(self)) + super().__hash__())
@whitelist_for_serdes
class MaterializeOnRequiredForFreshnessRule(
AutoMaterializeRule, NamedTuple("_MaterializeOnRequiredForFreshnessRule", [])
):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.MATERIALIZE
@property
def description(self) -> str:
return "required to meet this or downstream asset's freshness policy"
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
freshness_conditions = freshness_evaluation_results_for_asset_key(
asset_key=context.asset_key,
data_time_resolver=context.data_time_resolver,
asset_graph=context.asset_graph,
current_time=context.instance_queryer.evaluation_time,
will_materialize_mapping=context.will_materialize_mapping,
expected_data_time_mapping=context.expected_data_time_mapping,
)
return freshness_conditions
@whitelist_for_serdes
class MaterializeOnCronRule(
AutoMaterializeRule,
NamedTuple(
"_MaterializeOnCronRule",
[("cron_schedule", str), ("timezone", str), ("all_partitions", bool)],
),
):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.MATERIALIZE
@property
def description(self) -> str:
return f"not materialized since last cron schedule tick of '{self.cron_schedule}' (timezone: {self.timezone})"
def missed_cron_ticks(self, context: RuleEvaluationContext) -> Sequence[datetime.datetime]:
"""Returns the cron ticks which have been missed since the previous cursor was generated."""
if not context.cursor.latest_evaluation_timestamp:
previous_dt = next(
reverse_cron_string_iterator(
end_timestamp=context.evaluation_time.timestamp(),
cron_string=self.cron_schedule,
execution_timezone=self.timezone,
)
)
return [previous_dt]
missed_ticks = []
for dt in cron_string_iterator(
start_timestamp=context.cursor.latest_evaluation_timestamp,
cron_string=self.cron_schedule,
execution_timezone=self.timezone,
):
if dt > context.evaluation_time:
break
missed_ticks.append(dt)
return missed_ticks
def get_asset_partitions_to_request(
self, context: RuleEvaluationContext
) -> AbstractSet[AssetKeyPartitionKey]:
missed_ticks = self.missed_cron_ticks(context)
if not missed_ticks:
return set()
partitions_def = context.asset_graph.get_partitions_def(context.asset_key)
if partitions_def is None:
return {AssetKeyPartitionKey(context.asset_key)}
# if all_partitions is set, then just return all partitions if any ticks have been missed
if self.all_partitions:
return {
AssetKeyPartitionKey(context.asset_key, partition_key)
for partition_key in partitions_def.get_partition_keys(
current_time=context.evaluation_time
)
}
# for partitions_defs without a time component, just return the last partition if any ticks
# have been missed
time_partitions_def = get_time_partitions_def(partitions_def)
if time_partitions_def is None:
return {
AssetKeyPartitionKey(context.asset_key, partitions_def.get_last_partition_key())
}
missed_time_partition_keys = filter(
None,
[
time_partitions_def.get_last_partition_key(current_time=missed_tick)
for missed_tick in missed_ticks
],
)
# for multi partitions definitions, request to materialize all partitions for each missed
# cron schedule tick
if isinstance(partitions_def, MultiPartitionsDefinition):
return {
AssetKeyPartitionKey(context.asset_key, partition_key)
for time_partition_key in missed_time_partition_keys
for partition_key in partitions_def.get_multipartition_keys_with_dimension_value(
partitions_def.time_window_dimension.name,
time_partition_key,
dynamic_partitions_store=context.instance_queryer,
)
}
else:
return {
AssetKeyPartitionKey(context.asset_key, time_partition_key)
for time_partition_key in missed_time_partition_keys
}
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
asset_partitions_to_request = self.get_asset_partitions_to_request(context)
asset_partitions_by_evaluation_data = defaultdict(set)
if asset_partitions_to_request:
asset_partitions_by_evaluation_data[None].update(asset_partitions_to_request)
return self.add_evaluation_data_from_previous_tick(
context,
asset_partitions_by_evaluation_data,
should_use_past_data_fn=lambda ap: not context.materialized_requested_or_discarded_since_previous_tick(
ap
),
)
@whitelist_for_serdes
class MaterializeOnParentUpdatedRule(
AutoMaterializeRule, NamedTuple("_MaterializeOnParentUpdatedRule", [])
):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.MATERIALIZE
@property
def description(self) -> str:
return "upstream data has changed since latest materialization"
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
"""Evaluates the set of asset partitions of this asset whose parents have been updated,
or will update on this tick.
"""
asset_partitions_by_evaluation_data = defaultdict(set)
will_update_parents_by_asset_partition = context.get_will_update_parent_mapping()
# the set of asset partitions whose parents have been updated since last tick, or will be
# requested this tick.
has_or_will_update = (
context.get_asset_partitions_with_updated_parents_since_previous_tick()
| set(will_update_parents_by_asset_partition.keys())
)
for asset_partition in has_or_will_update:
parent_asset_partitions = context.asset_graph.get_parents_partitions(
dynamic_partitions_store=context.instance_queryer,
current_time=context.instance_queryer.evaluation_time,
asset_key=asset_partition.asset_key,
partition_key=asset_partition.partition_key,
).parent_partitions
updated_parent_asset_partitions = context.instance_queryer.get_parent_asset_partitions_updated_after_child(
asset_partition,
parent_asset_partitions,
# do a precise check for updated parents, factoring in data versions, as long as
# we're within reasonable limits on the number of partitions to check
respect_materialization_data_versions=context.daemon_context.respect_materialization_data_versions
and len(parent_asset_partitions | has_or_will_update) < 100,
# ignore self-dependencies when checking for updated parents, to avoid historical
# rematerializations from causing a chain of materializations to be kicked off
ignored_parent_keys={context.asset_key},
)
updated_parents = {parent.asset_key for parent in updated_parent_asset_partitions}
will_update_parents = will_update_parents_by_asset_partition[asset_partition]
if updated_parents or will_update_parents:
asset_partitions_by_evaluation_data[
ParentUpdatedRuleEvaluationData(
updated_asset_keys=frozenset(updated_parents),
will_update_asset_keys=frozenset(will_update_parents),
)
].add(asset_partition)
return self.add_evaluation_data_from_previous_tick(
context,
asset_partitions_by_evaluation_data,
should_use_past_data_fn=lambda ap: not context.materialized_requested_or_discarded_since_previous_tick(
ap
),
)
@whitelist_for_serdes
class MaterializeOnMissingRule(AutoMaterializeRule, NamedTuple("_MaterializeOnMissingRule", [])):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.MATERIALIZE
@property
def description(self) -> str:
return "materialization is missing"
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
"""Evaluates the set of asset partitions for this asset which are missing and were not
previously discarded. Currently only applies to root asset partitions and asset partitions
with updated parents.
"""
asset_partitions_by_evaluation_data = defaultdict(set)
missing_asset_partitions = set(
context.daemon_context.get_never_handled_root_asset_partitions_for_key(
context.asset_key
)
)
# in addition to missing root asset partitions, check any asset partitions with updated
# parents to see if they're missing
for (
candidate
) in context.daemon_context.get_asset_partitions_with_newly_updated_parents_for_key(
context.asset_key
):
if not context.instance_queryer.asset_partition_has_materialization_or_observation(
candidate
):
missing_asset_partitions |= {candidate}
if missing_asset_partitions:
asset_partitions_by_evaluation_data[None] = missing_asset_partitions
return self.add_evaluation_data_from_previous_tick(
context,
asset_partitions_by_evaluation_data,
should_use_past_data_fn=lambda ap: ap not in missing_asset_partitions
and not context.materialized_requested_or_discarded_since_previous_tick(ap),
)
@whitelist_for_serdes
class SkipOnParentOutdatedRule(AutoMaterializeRule, NamedTuple("_SkipOnParentOutdatedRule", [])):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.SKIP
@property
def description(self) -> str:
return "waiting on upstream data to be up to date"
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
asset_partitions_by_evaluation_data = defaultdict(set)
# only need to evaluate net-new candidates and candidates whose parents have changed
candidates_to_evaluate = (
context.get_candidates_not_evaluated_by_rule_on_previous_tick()
| context.get_candidates_with_updated_or_will_update_parents()
)
for candidate in candidates_to_evaluate:
outdated_ancestors = set()
# find the root cause of why this asset partition's parents are outdated (if any)
for parent in context.get_parents_that_will_not_be_materialized_on_current_tick(
asset_partition=candidate
):
if context.instance_queryer.have_ignorable_partition_mapping_for_outdated(
candidate.asset_key, parent.asset_key
):
continue
outdated_ancestors.update(
context.instance_queryer.get_outdated_ancestors(asset_partition=parent)
)
if outdated_ancestors:
asset_partitions_by_evaluation_data[
WaitingOnAssetsRuleEvaluationData(frozenset(outdated_ancestors))
].add(candidate)
return self.add_evaluation_data_from_previous_tick(
context,
asset_partitions_by_evaluation_data,
should_use_past_data_fn=lambda ap: ap not in candidates_to_evaluate,
)
@whitelist_for_serdes
class SkipOnParentMissingRule(AutoMaterializeRule, NamedTuple("_SkipOnParentMissingRule", [])):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.SKIP
@property
def description(self) -> str:
return "waiting on upstream data to be present"
def evaluate_for_asset(
self,
context: RuleEvaluationContext,
) -> RuleEvaluationResults:
asset_partitions_by_evaluation_data = defaultdict(set)
# only need to evaluate net-new candidates and candidates whose parents have changed
candidates_to_evaluate = (
context.get_candidates_not_evaluated_by_rule_on_previous_tick()
| context.get_candidates_with_updated_or_will_update_parents()
)
for candidate in candidates_to_evaluate:
missing_parent_asset_keys = set()
for parent in context.get_parents_that_will_not_be_materialized_on_current_tick(
asset_partition=candidate
):
# ignore non-observable sources, which will never have a materialization or observation
if context.asset_graph.is_source(
parent.asset_key
) and not context.asset_graph.is_observable(parent.asset_key):
continue
if not context.instance_queryer.asset_partition_has_materialization_or_observation(
parent
):
missing_parent_asset_keys.add(parent.asset_key)
if missing_parent_asset_keys:
asset_partitions_by_evaluation_data[
WaitingOnAssetsRuleEvaluationData(frozenset(missing_parent_asset_keys))
].add(candidate)
return self.add_evaluation_data_from_previous_tick(
context,
asset_partitions_by_evaluation_data,
should_use_past_data_fn=lambda ap: ap not in candidates_to_evaluate,
)
@whitelist_for_serdes
class SkipOnNotAllParentsUpdatedRule(
AutoMaterializeRule,
NamedTuple(
"_SkipOnNotAllParentsUpdatedRule", [("require_update_for_all_parent_partitions", bool)]
),
):
"""An auto-materialize rule that enforces that an asset can only be materialized if all parents
have been materialized since the asset's last materialization.
Attributes:
require_update_for_all_parent_partitions (Optional[bool]): Applies only to an unpartitioned
asset or an asset partition that depends on more than one partition in any upstream asset.
If true, requires all upstream partitions in each upstream asset to be materialized since
the downstream asset's last materialization in order to update it. If false, requires at
least one upstream partition in each upstream asset to be materialized since the downstream
asset's last materialization in order to update it. Defaults to false.
"""
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.SKIP
@property
def description(self) -> str:
if self.require_update_for_all_parent_partitions is False:
return "waiting on upstream data to be updated"
else:
return "waiting until all upstream partitions are updated"
def evaluate_for_asset(
self,
context: RuleEvaluationContext,
) -> RuleEvaluationResults:
asset_partitions_by_evaluation_data = defaultdict(set)
# only need to evaluate net-new candidates and candidates whose parents have changed
candidates_to_evaluate = (
context.get_candidates_not_evaluated_by_rule_on_previous_tick()
| context.get_candidates_with_updated_or_will_update_parents()
)
for candidate in candidates_to_evaluate:
parent_partitions = context.asset_graph.get_parents_partitions(
context.instance_queryer,
context.instance_queryer.evaluation_time,
context.asset_key,
candidate.partition_key,
).parent_partitions
updated_parent_partitions = (
context.instance_queryer.get_parent_asset_partitions_updated_after_child(
candidate,
parent_partitions,
context.daemon_context.respect_materialization_data_versions,
ignored_parent_keys=set(),
)
| set().union(
*[
context.will_materialize_mapping.get(parent, set())
for parent in context.asset_graph.get_parents(context.asset_key)
]
)
)
if self.require_update_for_all_parent_partitions:
# All upstream partitions must be updated in order for the candidate to be updated
non_updated_parent_keys = {
parent.asset_key for parent in parent_partitions - updated_parent_partitions
}
else:
# At least one upstream partition in each upstream asset must be updated in order
# for the candidate to be updated
parent_asset_keys = context.asset_graph.get_parents(context.asset_key)
updated_parent_partitions_by_asset_key = context.get_asset_partitions_by_asset_key(
updated_parent_partitions
)
non_updated_parent_keys = {
parent
for parent in parent_asset_keys
if not updated_parent_partitions_by_asset_key.get(parent)
}
# do not require past partitions of this asset to be updated
non_updated_parent_keys -= {context.asset_key}
if non_updated_parent_keys:
asset_partitions_by_evaluation_data[
WaitingOnAssetsRuleEvaluationData(frozenset(non_updated_parent_keys))
].add(candidate)
return self.add_evaluation_data_from_previous_tick(
context,
asset_partitions_by_evaluation_data,
should_use_past_data_fn=lambda ap: ap not in candidates_to_evaluate,
)
@whitelist_for_serdes
class SkipOnRequiredButNonexistentParentsRule(
AutoMaterializeRule, NamedTuple("_SkipOnRequiredButNonexistentParentsRule", [])
):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.SKIP
@property
def description(self) -> str:
return "required parent partitions do not exist"
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
asset_partitions_by_evaluation_data = defaultdict(set)
candidates_to_evaluate = context.get_candidates_not_evaluated_by_rule_on_previous_tick()
for candidate in candidates_to_evaluate:
nonexistent_parent_partitions = context.asset_graph.get_parents_partitions(
context.instance_queryer,
context.instance_queryer.evaluation_time,
candidate.asset_key,
candidate.partition_key,
).required_but_nonexistent_parents_partitions
nonexistent_parent_keys = {parent.asset_key for parent in nonexistent_parent_partitions}
if nonexistent_parent_keys:
asset_partitions_by_evaluation_data[
WaitingOnAssetsRuleEvaluationData(frozenset(nonexistent_parent_keys))
].add(candidate)
return self.add_evaluation_data_from_previous_tick(
context,
asset_partitions_by_evaluation_data,
should_use_past_data_fn=lambda ap: ap not in candidates_to_evaluate,
)
@whitelist_for_serdes
class SkipOnBackfillInProgressRule(
AutoMaterializeRule,
NamedTuple("_SkipOnBackfillInProgressRule", [("all_partitions", bool)]),
):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.SKIP
@property
def description(self) -> str:
if self.all_partitions:
return "part of an asset targeted by an in-progress backfill"
else:
return "targeted by an in-progress backfill"
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
backfill_in_progress_candidates: AbstractSet[AssetKeyPartitionKey] = set()
backfilling_subset = (
context.instance_queryer.get_active_backfill_target_asset_graph_subset()
)
if self.all_partitions:
backfill_in_progress_candidates = {
candidate
for candidate in context.candidates
if candidate.asset_key in backfilling_subset.asset_keys
}
else:
backfill_in_progress_candidates = {
candidate for candidate in context.candidates if candidate in backfilling_subset
}
if backfill_in_progress_candidates:
return [(None, backfill_in_progress_candidates)]
return []
@whitelist_for_serdes
class DiscardOnMaxMaterializationsExceededRule(
AutoMaterializeRule, NamedTuple("_DiscardOnMaxMaterializationsExceededRule", [("limit", int)])
):
@property
def decision_type(self) -> AutoMaterializeDecisionType:
return AutoMaterializeDecisionType.DISCARD
@property
def description(self) -> str:
return f"exceeds {self.limit} materialization(s) per minute"
def evaluate_for_asset(self, context: RuleEvaluationContext) -> RuleEvaluationResults:
# the set of asset partitions which exceed the limit
rate_limited_asset_partitions = set(
sorted(
context.candidates,
key=lambda x: sort_key_for_asset_partition(context.asset_graph, x),
)[self.limit :]
)
if rate_limited_asset_partitions:
return [(None, rate_limited_asset_partitions)]
return []