Source code for sai_airflow_plugins.sensors.fabric_sensor

from typing import Dict

from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults

from sai_airflow_plugins.operators.fabric_operator import FabricOperator


[docs]class FabricSensor(BaseSensorOperator, FabricOperator): """ Executes a command on a remote host using the [Fabric](https://www.fabfile.org) library and returns True if and only if the exit code is 0. Like `FabricOperator` it uses a standard `SSHHook` for the connection configuration. The parameters for this sensor are the combined parameters of `FabricOperator` and `BaseSensorOperator`. """ template_fields = FabricOperator.template_fields template_ext = FabricOperator.template_ext @apply_defaults def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] def poke(self, context: Dict) -> bool: """ Executes ``self.command`` over the configured SSH connection and checks its exit code. :param context: Context dict provided by airflow :return: True if the command's exit code was 0, else False. """ result = self.execute_fabric_command() self.log.info(f"Fabric command exited with {result.exited}") return not result.exited