May 29, 2019

SOLID Principles and Apache Airflow

The powerful thing about Apache Airflow is its versatility. The core funtionalities are abstracted away and based on my understanding, loosely follow some of principles of SOLID. Two examples of SOLID prinples that is followed are Open-closed principle and Liskov substitution principle. Airflow comes with several Operators out of the box, however, they are all open to extention and replacement. Many of the classic code examples you may come across when reading about SOLID principles are usually about shapes or some other non-real-world subject but I thought since I am spending most of my time on Airflow lately I might as well use it as my main subject.


Cunningham’s Law: “the best way to get the right answer on the internet is not to ask a question; it’s to post the wrong answer”.

Disclaimer: Please post a comment if I am wrong on anything


SOLID Principles

The lines between some of the principles can get hazy and it takes practice and attention to closely follow them. Now onto the open-closed principle, this principle states that:

software entities (classes, modules, functions, etc.) should be open for extension, but closed for modification

Now if you look at Airflow’s DbApiHook source code the whole class revolves around functions so it’s built in a modular manner. If there is any function that you would like to extend, you can create a new class and inherit from DbApiHook and replace an existing method or create a new one. You can skip creating an __init__ method for your class if there is no fundamental behavioral change being made to the class.

Moving on Liskov substitution principle, this principles pertains to behavioral subtyping of classes in object-oriented programming and states that:

if S is a subtype of T, then objects of type T may be replaced with objects of type S without altering any of the desirable properties of the program (correctness, task performed, etc.)

Let’s see you went one level down in the Airflow hooks hierarchy and pick a descendent of DbApiHook, for instance, MsSqlHook, the general behavioral characteristics of this class (e.g. connections, cursors) is the same as its ancestor DbApiHook. That means that you can take the ancestor and directly interact with it and not lose any of the desirable properties.

Docker and Airflow Interaction

There is already an official Docker Operator on Airflow’s repository, I’ve written this section to highlight a small part of Airflow’s core design and show you how easy it is to extend it. I encourage you to explore currently available operators on Airflow’s repository and if there is gap that Airflow is not filling for you, contribute to it and make other people’s lives easier.

Thanks to an article by Tomasz Dudek that I read a while back and recently dug up, I got the idea of creating my own Docker operator. In the article, the author uses Airflow to run Papermill and Jupyter Notebooks in Docker containers. It is well known that Netflix runs notebooks at scale for all sort of applications and have a whole infrastructure for it.

You might say a Docker Swarm Executor would be a better idea but for my purposes, it is sufficient at this point. The core enablers of all of this is Docker daemon’s Unix socket and Docker SDK for Python. Docker daemon listens on the Unix socket and it allows you to send commands and interact with it. Docker SDK allows our operator to send commands to the daemon. Enough talk, show me the code:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import docker
import os


class DockerOperator(BaseOperator):
    @apply_defaults
    def __init__(self, image_name, *args, **kwargs):
        super(DockerOperator, self).__init__(*args, **kwargs)
        self.image_name = image_name

    def execute(self, context):
        client = docker.APIClient(base_url=os.environ["DOCKER_HOST"])

        self.log.info(f"Creating image {self.image_name}")

        container = client.create_container(image=self.image_name)

        container_id = container.get("Id")

        self.log.info(f"Running container with ID {container_id}")

        client.start(container=container_id)

        logs = client.logs(
            container_id, follow=True, stderr=True, stdout=True, stream=True, tail="all"
        )

        try:
            while True:
                log_line = next(logs)
                self.log.info(f"Container log: {log_line}")
        except StopIteration:
            pass

        inspect = client.inspect_container(container)

        self.log.info(inspect)

        if inspect["State"]["ExitCode"] != 0:
            raise Exception("Container exited with non-zero value")

        self.log.info(f"Docker container exited")

As you can see with a few lines of code we have our own handy dandy operator. Now go on and subclass BaseOperator and create your own operator.

Now to re-iterate points made earlier about SOLID principles, first off read the princples and relate it to your own code. Also, as you go about your daily software development, before you write any code, think for minute how you could apply these principles to write a more maintainable code.

Note to reader: thanks for taking the time to read this post, if you have any suggestions for improvement or would like to point out anything I have missed, please leave a comment.

© Mike Hosseini 2019