Intro to d6tflow
Recently I was introduced to the concept of data science pipelines.
In data science workflows (for machine learning, especially),
there are many different steps where data is transformed, cleaned, and prepared
for training. Stringing a sequence of functions can lead to very messy code. The
d6tflow
package takes an OOP approach to this problem that intelligently handles
dependent tasks.
If you are working with large datasets, it may also take some time to run all of the code to clean your data from scratch. It would be nice to have a way to save intermediate data and load it.
The d6tflow package offers the ability to take care of this for you. If you are familiar with the luigi project by Spotify
and are wondering how you might apply a similar workflow to data science, give d6tflow a try. Go check out the d6tflow getting started guide
or take a look at the documentation to get familiar with the package.
In this article, I will assume some basic knowledge of d6tflow
. If you want to learn more about d6tflow
before learning how to
make mock objects for unit testing, check out my article introducing d6tflow
.
Predicting Apple Stock Price Movement
To show a very simple application of d6tflow, I will be looking to see if we can predict Apple's (NYSE: AAPL) stock returns tomorrow based on its returns today. We are not interested in the actual price of the stock as much as the actual percentage return. We will try to predict tomorrow's percentage return based on today's percentage return and see if there is any pattern.
To keep things simple, I will use the sklearn
package and use a linear regression model. I have sourced price data
for Apple from Yahoo Finance. The data used in this article can be found at this GitHub repository.
The first task of any data science pipeline is obtaining and cleaning data. I already downloaded Apple pricing data in a CSV file from Yahoo Finance to simplify things.
Now, lets get to our first task and load the data.
1import d6tflow2import luigi3import pandas as pd4from sklearn.linear_model import LinearRegression5from sklearn.model_selection import train_test_split678class LoadData(d6tflow.tasks.TaskPqPandas):9 filename = luigi.Parameter(default='../data/AAPL.csv')1011 def run(self):12 aapl_df = pd.read_csv(self.filename, parse_dates=['Date'])13 aapl_df.columns = aapl_df.columns.str.replace(' ', '_').str.lower()14 self.save(aapl_df)
Each task needs to have a run
method. This method tells d6tflow
what it needs to do
during the task. Here, we read the CSV file with the pricing data, rename the columns by
replacing all spaces with underscores and make everything lowercase. Finally, we save
the data frame we just created and transformed to a parquet file.
Now let's preprocess the data to get ready for training.
1@d6tflow.requires(LoadData)2class PreprocessData(d6tflow.tasks.TaskPickle):3 persist = ['X', 'y', 'aapl_df']45 def run(self):6 aapl_df = self.inputLoad()7 # Calculate daily adj close pct change8 aapl_df['daily_adj_close_pct_change'] = aapl_df['adj_close'].pct_change()910 # Want to predict tomorrow's return % based on today's return %11 X = aapl_df[['volume', 'daily_adj_close_pct_change']].iloc[1:-1]12 y = aapl_df['daily_adj_close_pct_change'].iloc[2:].rename(13 'daily_adj_close_pct_change_next_day')14 self.save({'X': X, 'y': y, 'aapl_df': aapl_df})
A few things are going on here from d6tflow. Firstly, d6tflow
is requiring that the LoadData
task has been
run. Once d6tflow
knows that LoadData
has ran, it loads the output of the task. We can now pick up where we
left off in the last task. Because we want to predict what tomorrow's return will be, we calculate the percentage
change between the adjusted closing prices. Out of curiosity, let's see if today's volume has any effect on today's
return.
We line up yesterday's returns and today's volume in X
with tomorrow's returns in y
.
Now comes the fun part! We fit a basic linear regression model using yesterday's returns and today's volume to predict today's returns.
1@d6tflow.requires(PreprocessData)2class FitData(d6tflow.tasks.TaskPickle):3 persist = ['X_test', 'y_test', 'reg']45 def run(self):6 X = self.inputLoad('X')7 y = self.inputLoad('y')89 reg = LinearRegression()10 X_train, X_test, y_train, y_test = train_test_split(11 X, y, test_size=0.2, random_state=0)12 reg.fit(X_train, y_train)13 self.save({14 'X_test': X_test,15 'y_test': y_test,16 'reg': reg17 })
Again, we need to use @d6tflow.requires
to require that the data has been preprocessed. We load the features (X
)
and the response (y
) and fit the regression on part of the data, keeping some aside for out-of-sample testing.
Now that our model is trained, let's see how we did. How much information does yesterday's returns give about today's expected returns.
1@d6tflow.requires(FitData)2class ScoreFit(d6tflow.tasks.TaskPickle):3 def run(self):4 # See how well the regression fits the test data5 reg = self.inputLoad('reg')6 X_test = self.inputLoad('X_test')7 y_test = self.inputLoad('y_test')89 score = reg.score(X_test, y_test)10 self.save(score)
Up to this point, we haven't actually ran any of our pipeline. However, this is very easy to do using
d6tflow
. All we need to do is tell d6tflow
to run the ScoreFit
task. It will find and run all of
the dependencies for us. Let's keep the script to run the pipeline in the same file and only run the
pipeline if we explicitly tell Python to run the file.
1if __name__ == '__main__':2 d6tflow.run(ScoreFit())3 score = ScoreFit().outputLoad()4 print(f'Score of regression was {score:.2f%}')
After running the pipeline, we find that we get an r-squared of approximately -0.11. That is pretty close to zero.
Conclusion
It is perhaps trivial that a company's stock price would not be
What are Mocks?
A lot of times, when you are working with data science pipelines, you are going to have a lot of data. However, you don't need all of that data to test the functionality of tasks in your d6tflow pipeline.
To see how mocks work in action, let's consider a concrete example. Say we want to