Airflow is a very popular workflow framework that is used by many data engineering teams. Our company is also using it to build our data pipelines. I've been using it for a couple of months now and absolutely love it because it's very powerful, flexible and easy to use. However, one of the pain points I have is it's very difficult to set up airflow locally with connections with other systems like S3, Hive, Spark, and Qubole. The airflow server is deployed on an EC2 instance so it doesn't have to worry about authentication with S3. Every time when I modify my code, I need to deploy it to the airflow cluster and test my code on the server which is a very time-consuming process. I've been trying to set up the airflow environment locally but the missing piece is authentication with AWS.
Our use case is even harder since we are using MFA and SAML to authenticate with AWS. If you are in a similar situation as me, I hope this article can help you a little bit.
- You are able to use AWS CLI tool without any permission issue.
- You have a credential file successfully generated:
This file should contain the following info:
[default] aws_access_key_id = XXXXXXXXXXXXXXXXXXXXXX aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX output = json region = us-east-1 aws_session_token = FQoDYXdzEDAaDIxuNj5lt9gJ85gEfSKaAriAIFvknMOZ1dPj8gbT9iSrjO7ZVroSbMrvwgR6tQfddQ5vvfpEvn8R+61UKXQwt3BTjdy2qIp6DH8zj0ujqV+MBpeQNH0GcMiRgvo1xgc/tjHQgphXAgjKwSjXNbAcKoHaGvuyCrspQo304EbLytPS0iKvJpBG/QK/4j1TNgPteZSIIRmF6sUVxZ2XMSPZSobfTEgJv7fh6GAC78HiEC8tabvidhCkXVJCLVWxsJzqDpuh+woIXy+ixxf7Z9XIH2uJ0nD1iX2x9nqZJ+IuGV8jgxRlYuyGWUEjh0vSCIXmkDIw400hNsx2MqrL6TTWLxUuWaWplagEhDXRyGfETCR7i5TZ3/AngqaG7nllR7viYZjVoMW0x90aUyiAmfrYBQ== aws_session_token_expiration = 2018-06-11T22:51:44Z
The key thing here is the
aws_session_token, which is used as the temporary credential. Our company is using SAML and MFA to authenticate with AWS, so there is no way to get a permananent
aws_secret_access_key. I'm going to write another article to talk about how to programmatically do this and generate this file.
Our production server is running Airflow 1.8.2, which does not support temporary credential (only available in boto3). The latest release version (1.9.0) also does not support temporary credential out of the box. So, unfortunately, I have to get the latest dev version from GitHub and customize it a little bit to make this work.
## get the latest master branch from GitHub git clone https://github.com/apache/incubator-airflow.git ## install airflow python setup.py install
If airflow is installed successfully, you should be able to start the airflow web server using the following command:
airflow webserver -p 8080
I have to modify the
aws_hook.py to load the temporary credential from the file. Just replace the
_get_credentials method with the following:
def _get_credentials(self, region_name): config = ConfigParser.RawConfigParser() config.read(os.path.expanduser('~/.aws/credentials')) aws_access_key_id = config.get('dev', 'aws_access_key_id') aws_secret_access_key = config.get('dev', 'aws_secret_access_key') aws_session_token = config.get('dev', 'aws_session_token') region_name = config.get('dev', 'region') endpoint_url = None return boto3.session.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, aws_session_token=aws_session_token, region_name=region_name), endpoint_url
Qubole connection setup
Go to Airflow
Admin menu and click
connections and modify the Qubole connection as follows:
Please note the
password is the API token that you can get from the Qubole account page.
Here is a simple DAG I created to test S3 connection:
def test(ds, **kwargs): s3_hook = S3Hook('aws_default') yaml_str = s3_hook.read_key(key='platform/distribution_engine/config/wei-distributor-config.yml', bucket_name=Env.getEDLCodeBucket()) print(yaml_str) test = PythonOperator( task_id='test', provide_context=True, python_callable=test, dag=dag)
Then I can use the airflow CLI to test this:
airflow test npan_test test 2018-01-01