Source code for sai_airflow_plugins.operators.conditional_skip_mixin

from typing import Callable, Optional, Iterable, Dict

from airflow.exceptions import AirflowSkipException
from airflow.utils.decorators import apply_defaults


[docs]class ConditionalSkipMixin(object): """ Mixin for making operators and sensors conditional. If the condition evaluates to True the operator or sensor executes normally, otherwise it skips the task. Note that you should correctly set the `template_field` in a derived class to include both the operator's and this mixin's templated fields. Example: class MyConditionalOperator(ConditionalSkipMixin, MyOperator): template_fields = MyOperator.template_fields + ConditionalSkipMixin.template_fields :param condition_callable: A callable that should evaluate to a truthy or falsy value to execute or skip the task respectively. Note that Airflow's context is also passed as keyword arguments so you need to define `**kwargs` in your function header. (templated) :param condition_kwargs: a dictionary of keyword arguments that will get unpacked in `condition_callable`. (templated) :param condition_args: a list of positional arguments that will get unpacked in `condition_callable`. (templated) :param condition_provide_context: if set to true, Airflow will pass a set of keyword arguments that can be used in your condition callable. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define `**kwargs` in your function header. """ template_fields = ("condition_callable", "condition_args", "condition_kwargs") @apply_defaults def __init__(self, condition_callable: Callable = False, condition_args: Optional[Iterable] = None, condition_kwargs: Optional[Dict] = None, condition_provide_context: Optional[bool] = False, *args, **kwargs): super().__init__(*args, **kwargs) self.condition_callable = condition_callable self.condition_args = condition_args or [] self.condition_kwargs = condition_kwargs or {} self.condition_provide_context = condition_provide_context self._condition_evaluated = False self._condition_value = None
[docs] def execute(self, context: Dict): """ If the condition evaluates to True execute the superclass `execute` method, otherwise skip the task. :param context: Context dict provided by airflow """ if self._get_evaluated_condition_or_skip(context): super().execute(context)
[docs] def poke(self, context: Dict) -> bool: """ If the condition evaluates to True execute the superclass `poke` method, otherwise skip the task. :param context: Context dict provided by airflow :return: The result of the superclass `poke` method """ if self._get_evaluated_condition_or_skip(context): return super().poke(context)
def _get_evaluated_condition_or_skip(self, context: Dict): """ Lazily evaluates `condition_callable` and raises `AirflowSkipException` if it's falsy. This is done because the `poke` method of a sensor may call the `execute` method as well. :param context: Context dict provided by airflow :return: The (cached) result of `condition_callable` if truthy, otherwise raises `AirflowSkipException` """ if not self._condition_evaluated: if self.condition_provide_context: # Merge airflow's context and the callable's kwargs context.update(self.condition_kwargs) self.condition_kwargs = context self._condition_value = self.condition_callable(*self.condition_args, **self.condition_kwargs) if self._condition_value: return self._condition_value else: raise AirflowSkipException( f"Condition callable {self.condition_callable.__name__} evaluated to False. Skipping this task." )