Image

Comparing results between Airflow runs

Dani Hodovic Jan. 10, 2022 3 min read
Responsive image

The Airbnb engineering team built Airflow to automate workflows related to data engineering problems.

Given how generic the tool is, it can be used for right about anything that needs to be run on a regular schedule. Think of it as cron on steroids, but with better interoperability with the outside world (integrations with third party providers) and a better debugging experience (built in UI, metadata and logs).

I use it to automate a various tasks, for example:

  • I track stock and BTC prices and notify when the price fluctuates above a certain percentage
  • I send myself a notification when one of my Github repositories gains a star
  • I monitor the air pollution in the city during winter

Airflow doesn't automatically store results between DAG runs (DAGs are a fancy word for Python scripts). It stores logs, success status and other metadata, but doesn't provide a trivial way to store custom data between script executions.

Airflow does store the dag run metadata in serial fashion and the metadata contains execution date. We can use it to construct a key ($DAG_ID__$DATE) which points to custom data. I use Redis in the examples below, but any key value store will do.

DAG table

To get historical DAG runs we use the Airflow internals:

def get_previous_dagrun():
    from airflow.models.dagrun import DagRun, DagRunState
    # Use the context to dynamically find the dag name. Otherwise you can just
    # use the name of the dag as it's displayed in the Airflow UI.
    ctx = get_current_context()
    dag_id = ctx["dag"].dag_id
    # DagRun.find returns dag metadata ordered by execution date. The last
    # entry corresponds to the last dag run
    dag_runs = DagRun.find(dag_id=dag_id, state=DagRunState.SUCCESS)
    if len(dag_runs) >= 1:
        return dag_runs[-1]

To construct a key based on the dag name and runtime:

def create_key(run_time):
    return f"airflow:github_stars:{run_time}"

last_run = get_previous_dagrun()
last_run_id = last_run.run_id if last_run else None
old_key = create_key(last_run_id)
# old_key = '"airflow:github_stars:scheduled__2022-01-01T08:00:00+00:00"'

Below is a full example of a script I use to check new Github stars for my repositories. It uses the Github API and Redis to store the number of stars for each day. If a repository gains a star a notification is sent to my Pushbullet (mobile) device.

import logging
from datetime import datetime

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.providers.redis.hooks.redis import RedisHook


@dag(
    schedule_interval="0 8 * * *",
    start_date=datetime(2021, 1, 1),
    catchup=False,
)
def github_stars_notifier():
    import requests
    from pushbullet import Pushbullet

    from airflow.models import Variable

    http = requests.Session()

    def read_most_popular_repos():
        res = http.get(
            "https://api.github.com/users/danihodovic/repos?per_page=100&sort=pushed"
        )
        res.raise_for_status()
        by_stars = sorted(res.json(), key=lambda d: d["stargazers_count"], reverse=True)
        return by_stars

    def notify_pushbullet(repo_name, delta, stargazers_count):
        pb_token = Variable.get("pushbullet_token")
        verb = "gained" if delta > 0 else "lost"
        message = (
            f"{repo_name} {verb} {delta} stars ★. It now has {stargazers_count} stars"
        )
        pb = Pushbullet(pb_token)
        pb.push_note("Airflow notification ★", message)

    def create_key(repo_name, run_time):
        return f"airflow:github_stars:{repo_name}:{run_time}"

    def get_previous_dagrun():
        from airflow.models.dagrun import DagRun, DagRunState
        ctx = get_current_context()
        dag_runs = DagRun.find(dag_id=ctx["dag"].dag_id, state=DagRunState.SUCCESS)
        if len(dag_runs) >= 1:
            return dag_runs[-1]

    @task()
    def check_for_new_stars():
        ctx = get_current_context()

        redis = RedisHook("redis_github_stars_dag").get_conn()
        by_stars = read_most_popular_repos()[0:10]
        for repo in by_stars:
            repo_name = repo["full_name"]
            last_run = get_previous_dagrun()
            last_run_id = last_run.run_id if last_run else None
            old_key = create_key(repo_name, last_run_id)
            stargazers_count = repo["stargazers_count"]
            cached_value_str = redis.get(old_key)

            if cached_value_str:
                cached_value = int(cached_value_str.decode())
                delta = stargazers_count - cached_value
                logging.info(
                    f"{repo_name=} {cached_value=} {stargazers_count=} {delta=}"
                )
                if delta != 0:
                    notify_pushbullet(repo_name, delta, stargazers_count)

            # Update the new value
            new_key = create_key(repo_name, ctx["run_id"])
            # Expire the key to avoid flooding the Redis DB
            redis.set(new_key, stargazers_count, keepttl=604800)

    check_for_new_stars()


instance = github_stars_notifier()