How to Set Up Local Airflow Development Environment with AWS S3 and Qubole Access

Background

Airflow is a very popular workflow framework that is used by many data engineering teams. Our company is also using it to build our data pipelines. I've been using it for a couple of months now and absolutely love it because it's very powerful, flexible and easy to use. However, one of the pain points I have is it's very difficult to set up airflow locally with connections with other systems like S3, Hive, Spark, and Qubole. The airflow server is deployed on an EC2 instance so it doesn't have to worry about authentication with S3. Every time when I modify my code, I need to deploy it to the airflow cluster and test my code on the server which is a very time-consuming process. I've been trying to set up the airflow environment locally but the missing piece is authentication with AWS.

Our use case is even harder since we are using MFA and SAML to authenticate with AWS. If you are in a similar situation as me, I hope this article can help you a little bit.

Prerequisite

  1. You are able to use AWS CLI tool without any permission issue.
  2. You have a credential file successfully generated:
~/.aws/credential

This file should contain the following info:

[default]
aws_access_key_id = XXXXXXXXXXXXXXXXXXXXXX
aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
output = json
region = us-east-1
aws_session_token = FQoDYXdzEDAaDIxuNj5lt9gJ85gEfSKaAriAIFvknMOZ1dPj8gbT9iSrjO7ZVroSbMrvwgR6tQfddQ5vvfpEvn8R+61UKXQwt3BTjdy2qIp6DH8zj0ujqV+MBpeQNH0GcMiRgvo1xgc/tjHQgphXAgjKwSjXNbAcKoHaGvuyCrspQo304EbLytPS0iKvJpBG/QK/4j1TNgPteZSIIRmF6sUVxZ2XMSPZSobfTEgJv7fh6GAC78HiEC8tabvidhCkXVJCLVWxsJzqDpuh+woIXy+ixxf7Z9XIH2uJ0nD1iX2x9nqZJ+IuGV8jgxRlYuyGWUEjh0vSCIXmkDIw400hNsx2MqrL6TTWLxUuWaWplagEhDXRyGfETCR7i5TZ3/AngqaG7nllR7viYZjVoMW0x90aUyiAmfrYBQ==
aws_session_token_expiration = 2018-06-11T22:51:44Z

The key thing here is the aws_session_token, which is used as the temporary credential. Our company is using SAML and MFA to authenticate with AWS, so there is no way to get a permananent aws_access_key_id and aws_secret_access_key. I'm going to write another article to talk about how to programmatically do this and generate this file.

Airflow installation

Our production server is running Airflow 1.8.2, which does not support temporary credential (only available in boto3). The latest release version (1.9.0) also does not support temporary credential out of the box. So, unfortunately, I have to get the latest dev version from GitHub and customize it a little bit to make this work.

## get the latest master branch from GitHub
git clone https://github.com/apache/incubator-airflow.git

## install airflow
python setup.py install

If airflow is installed successfully, you should be able to start the airflow web server using the following command:

airflow webserver -p 8080

Airflow customization

I have to modify the aws_hook.py to load the temporary credential from the file. Just replace the _get_credentials method with the following:

def _get_credentials(self, region_name):
    config = ConfigParser.RawConfigParser()
    config.read(os.path.expanduser('~/.aws/credentials'))
    aws_access_key_id = config.get('dev', 'aws_access_key_id')
    aws_secret_access_key = config.get('dev', 'aws_secret_access_key')
    aws_session_token = config.get('dev', 'aws_session_token')
    region_name = config.get('dev', 'region')
    endpoint_url = None

    return boto3.session.Session(
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        aws_session_token=aws_session_token,
        region_name=region_name), endpoint_url

Qubole connection setup

Go to Airflow Admin menu and click connections and modify the Qubole connection as follows:
Google-Chrome_2018-06-16-16-58-06
Please note the password is the API token that you can get from the Qubole account page.

Example

Here is a simple DAG I created to test S3 connection:

def test(ds, **kwargs):
    s3_hook = S3Hook('aws_default')
    yaml_str = s3_hook.read_key(key='platform/distribution_engine/config/wei-distributor-config.yml', bucket_name=Env.getEDLCodeBucket())
    print(yaml_str)

test = PythonOperator(
    task_id='test',
    provide_context=True,
    python_callable=test,
    dag=dag)

Then I can use the airflow CLI to test this:

airflow test npan_test test 2018-01-01

Nankai Pan

Read more posts by this author.