Using Apache Airflow to Extract CoT Data

In today’s post we are going to be extracting CoT (Commitment of Traders) reports from the CFTC website using a pipeline built on Apache Airflow.

What is CoT data?

The CoT report is a weekly publication which reports the open positions of market participants in the U.S futures market. It’s published every Friday at 3:30 E.T but the actual report from the participants is compiled on the same week Tuesday. See here for more information on it.

Getting Data From “Everywhere” to “here”

I’m sure you heard of the term data is everywhere. It definitely is. However, you often want it “here” where you can analyze it, not out there.

There are many ways of getting data from everywhere to here. The path which time- and resource-constrained traders should choose is the one that offers the most return on their effort. For example, chances are you are not going to get any improvement in productivity if you decide to code a bespoke system that fits your “needs”. Simply using the ones that are already there will be more than enough.

A python library that fits the bill for what we intend to do is Apache Airflow.

What is Apache Airflow?

Apache Airflow is an Open Source python library that is used to build pipelines.

What is a pipeline? 

It’s a series of tasks that need to be executed in their respective order. One of the most compelling reasons to go with Airflow is it’s out of the box scalability, modular approach, neat UI and logging features. You can get up to speed with Airflow concepts here.

Airflow is meant to be run on distributed cloud systems like Kubernetes. Google offers a hosted Airflow service called Composer. I recommend you either run Airflow there or for simpler tasks like the one we will be doing here you could run it on a Virtual Machine. For this post, we are simply going to run it on a Virtual Machine.

Setting up Apache Airflow running as Cloud Composer on Google Cloud Platform

Firstly we need to set up a Virtual Machine, you can do this on any cloud provider but, the one we are going to be using is Google Cloud.

Click here to set up a new GCP project.

create-gcp-project

After you click Create you should have your new GCP project ready to go.

Next, create a VM Instance by clicking on the VM Instances tab to your left and then clicking on Create.

create-gcp-vm

For this project, I’m choosing Ubutnu 18.04 LTS and I have allowed HTTP and HTTPS traffic to the VM:

 

One of Airflow’s coolest features is its webserver. Using it allows us to connect to the Airflow instance and actually see our data pipeline being executed. To use it, we need to allow firewall access to port 8080.

Click on View network details.

Here you add TCP port 8080 under Specified protocols and ports and click Save.

Now we are going to connect to our machine by clicking on the SSH button.

One of the first commands you should run whenever you log on to a fresh Linux machine is:

sudo apt update

After successfully running that we need to install additional python3 packages:

sudo apt install python3 python3-dev python3-venv
sudo apt-get install python3-pip

Now we are ready to install Airflow on our machine, to do that simply run:

sudo pip3 install apache-airflow

After successfully installing Apache airflow you need to initialize the airflow database and export the path variable for airflow:

airflow initdb
export PATH=$PATH:~/.local/bin

Now that was a slightly tedious setup process but, if you have followed everything it should work as intended. If not, the great thing about airflow is its big community of contributors. Usually, if you have a problem someone else has an answer. This is another thing to consider when choosing to go with a system.

Next up we are going to run airflow for the first time. There are two stages to this. First, we need to start up the Webserver which gives us a nice UI to interact with. Then we need to start up the Airflow Scheduler which as the name suggests schedules tasks. Learn more about the airflow scheduler here

airflow webserver -p 8080 -D
airflow scheduler -D

Now that we have the system up and running we can visit it by going to the external IP of our VM and adding port 8080 like this: external vm IP:8080

If you have issues with getting the webserver running try changing the max_threads parameter to 1 in the airflow.cfg file in your home directory

The image above is the Airflow UI – it’s a neat way of viewing and managing your workflows.

Imagine you are pulling tick data from your broker along with sentiment data from another website. It gets out of hand pretty quickly if you use bash scripts and cron jobs. The Airflow UI offers a lot of creature comforts that you don’t always find in data engineering.

Python Code to Pull Data From the CFTC Website

Okay, we’ve set up Airflow. Let’s leave it aside for a while and figure out the actual code that will pull the data from the CFTC website.

If you can write your task in Python, you can write it in Apache Airflow.

Let’s check out what data we want to download from CFTC. If you visit their page you’ll see a wide range of data options. Here, we’ll focus on downloading Traders in Financial Futures (Futures Only Reports). The process is easily repeatable for other data sets.

The data is available in zipped format.

If we right-click on one of the Text fields we can copy the link to the file. We’ll then use that in our python code to programmatically download the file.

Here is the code for downloading all the files from 2010-2020

import pandas as pd 
import requests
from zipfile import ZipFile
from io import BytesIO

#high bound exclusive in python
years = [i for i in range(2010,2021)]


def download_extract_zip(link): 
    r = requests.get(link,stream=True) 
    zip_file = ZipFile(BytesIO(r.content)) 
    df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0]))) 
    return df  

for year in years:
    df = download_extract_zip(f'https://www.cftc.gov/files/dea/history/fut_fin_txt_{year}.zip')
    df.to_csv(f'finfut_{year}.csv',index=False)

Now we have a script that downloads all the data from 2010 to 2020 and saves it in our local directory as a csv file.

Creating an Apache Airflow DAG

We don’t need to always download everything. Going forward, we only need to download new data every Friday as it gets published.  This is where Airflow can help us.

We’re going to use Airflow to schedule download_extract_zip  to run every Friday and download the latest 2020 data.

With that in mind, it’s now time to create our first DAG.

Wait, what’s a DAG?

A dag is a lock of wool matted with dung hanging from the hindquarters of a sheep.

It’s also an acronym for Directed Acyclic Graph, which is basically a collection of all the tasks that you want to run organized in order.

You can structure DAGs so that the tasks inside them have specific dependencies between one another. You could also run a task conditional on the success/failure of the previous one. Really the sky is the limit. All we need for now is a simple task that downloads the CoT data every Friday:

import pandas as pd
import requests
import datetime
from airflow import DAG 
from airflow.operators.python_operator import PythonOperator
from io import BytesIO
from zipfile import ZipFile




def download_extract_zip(): 
    link = 'https://www.cftc.gov/files/dea/history/fut_fin_txt_2020.zip'
    r = requests.get(link,stream=True) 
    zip_file = ZipFile(BytesIO(r.content)) 
    df = pd.read_csv(BytesIO(zip_file.read(zip_file.namelist()[0])))
    df.to_csv('~/finfut.csv',index=False) 

default_args = {
    'owner':'me',
    'start_date':datetime.datetime(2020,1,1),
    'retries':1,
    'retry_delay':datetime.timedelta(minutes=5),
    'is_paused_upon_creation':False,}



with DAG('cot-dag',default_args = default_args,schedule_interval='0 21 * * 5',catchup=False) as dag:
    cot_download = PythonOperator(task_id=f'cot-download',python_callable=download_extract_zip)
    cot_download

As you can see there aren’t many differences from the airflow DAG and the original batch downloader. We can see the Airflow DAG object has a task called cot-download  which calls the download_extract_zip function each Friday at 21:00 UTC (Airflow works in UTC).

Now we need to get this code inside the Airflow dags directory. To do that you need to SSH to your machine and type in cd airflow  to change your working directory to the Airflow one

Then you create a directory called days, which is where all your dag files will live.

Finally, we need to upload the python file from our local machine to the Google VM. The way you do that is by clicking the settings icon on the top right and uploading a file. The file should be in your home directory.

Now you can move it from the home directory to the dags folder we just created.

If everything went well we should have the DAG pop up on our Airflow UI

You can either let the dag trigger by itself in due time, or you can trigger it by pressing the play button under the Links tab.

Airflow will handle all the logging and error exceptions. In case something goes wrong the logs will be easily available through the UI for you to investigate.

The final output of this DAG should be a file in your home directory called finfut.csv. This file will be updated each Friday 21:00 UTC

Congratulations – you created and ran your first DAG!

What’s Next?!

Hopefully, you found this tutorial useful.

How might you extend this work?

  • Maybe you could download the other datasets on the website?
  • How would you handle federal holidays?
  • Do you think we should have a notification service if the task fails?
  • Having data on a drive on a VM isn’t ideal. Could you push it to a storage bucket, a cloud database?

All this is possible with Apache Airflow. It’s a tool that really starts shining when your pipeline gets bigger.

Happy Data Scraping! #stayhome

9 thoughts on “Using Apache Airflow to Extract CoT Data”

    • The first thing you want to do with any new data set is to plot it. Have a good look at it and sanity check that it makes sense.

      Next, you want to think about how it might be related to future asset prices – you want to form some sort of hypothesis that you can test. An example related to CoT data might be: the CoT report quantifies the net positioning of various participants in the futures market, including hedgers, speculators and managed money. I hypothesise that the positioning of managed money might be (noisily) correlated with futures returns.

      Now, that hypothesis might be right or it might be wrong (it’s almost certainly wrong). But the point is, it’s something you can test.

      How would you test it?

      You could create a factor out of the net positioning of managed money – simply divide the data into say 5 equal quantiles and plot the mean forward returns associated with each quantile.
      That will provide some initial feedback – either it will invalidate your hypothesis or give you something to be excited about and provide justification for digging deeper.

      One thing to be aware of with the CoT report is that it isn’t released in real time – there’s a lag of several days between the time the data relates to, and the time it’s released. So be sure to consider that in any analysis.

      Reply
  1. Thanks for the great article. Given that you are using Linux, can you please clarify what the advantages are over scheduling a cronjob to run the python?

    Reply
    • Hey Jon, I’m glad you liked the blog post. The main advantages of Airflow appear when your workflow gets bigger and more complex.

      Cron jobs are time-based job schedulers. Managing hundreds of tasks (some of which will almost surely fail) can get quite cumbersome.

      Airflow is a workflow management platform that offers a UI with built in logging, all sorts of integrations imaginable and out of the box scalability, all of these features come native with Airflow.

      Airflow has a steeper learning curve than a cron job, but if you are serious about building your own data extraction and transformation pipeline, then using Airflow becomes the natural choice for the reasons mentioned above.

      Reply
      • Thanks for the answer Ajet. Really appreciate it!

        I used Autosys on a couple of jobs a while back. Is Airflow similar in some way, eg jobs with triggers and conditions and alerting?

        Reply
        • My Pleasure Jon.

          I’ve never used Autosys, looking at it briefly it seems to have a different architecture, but there is some overlap in functionality.

          Reply
          • Thanks Ajet for having a look and getting back. Would definitely be really interested to read more like this on the whole pipeline of Google data engineering possibilities, especially for data being updated in real time like market data. Comparisons of functionality and the costs of running such bespoke infrastructure in the cloud and on demand, with alternatives from the likes of Amazon and Microsoft, would also be great to hear from Robot Wealth’s perspective / use case.

Leave a Comment