So recently I was working on a small project to automatically place Forex trades according to predictions of some model. I had all the data in Google Data Storage, I had everything ready to go. My idea was to build a model that given previous forex open, high, low and close values, gives suggestion whether, to buy, hold or sell. The simplest version of the model that I wanted to build, let’s call this the alpha model, would make use of 5 minute aggregated forex data.
One problem though, the data I had was tick data and it was partitioned in files which weren’t divided by financial instrument. Therefore, some rows might be in one file, while others could be in another file. If the data was relatively small, we could load all these chunks using Pandas, resample and we’re good. However, these files added up to Gigabytes which we all know would make life in Pandas difficult without the use of Dask. Instead of going down that route, I took this as an opportunity to build a pipeline in Apache Beam, just because I love the framework.
Apache Beam makes life so simple to build both batch and streaming pipelines on top of different runners such as Spark or Flink. Since we’re using the Python version it is challenging to set it up on your cluster unless you use something like a managed service in GCP which runs it on DataFlow. Furthermore, it is (probably) impossible to set it up running on a local Spark cluster running on a Windows machine. I tried, I failed, I cried. I had to boot up my Ubuntu partition. Sigh.
Before I dive into my approach here, why the Python version? I have used the Java version in the past, why would I do that to myself again?
As a hybrid data scientist, data engineer and developer, I am all pro setting up the cleanest code possible. The quote below, that for the life of me I cannot remember who the author was, is something I live by for every line of code I write.
Good code is the modern definition of art
Therefore, having this line imprinted in my mind, i wanted to design a modular pipeline that could have parts added and removed very easily.
The diagram below is what I came up with. It might be beautify, it my be horrible, it might be Van Gogh level, either way, I am very open to opinions, because I might be doing things horribly wrong. So comment away!
In the diagram the FxBeam class is what manages the main the data flow. This function reads the data, calls the resample class process method, extracts the output and writes back to file. In the diagram the classes linked with dotted lines are the classes or functions in use by that segment of the pipeline.
In Apache Beam we make heavy use of ParDo functions . If you haven’t used ParDo before, these are the basis of everything you do in Apache Beam. Essentially you’re creating a class that would be inserted as a segment in the pipeline. Very much like building a flowchart where each DoFn has a particular task.
The Input function
When reading the data we us the
ReadFromText function from apache_beam.io . This function can take file names or file patterns which is convenient, as well as compression types if the file is a compressed gzip for example.
If the pipe symbol is weird, you’re not the only one! When starting using Apache Beam it was a bit of a shock for me too. To save you the search, this is a synonym for
.apply() method and is used to overload python functions. Link for more info here .
In the function the we can either take a JSON input or a CSV input via a parameter for the class. If we’re reading a CSV we make use of a custom parse rows function which takes in the CSV rows and transforms them into a dictionary that can easily flow and be edited by the pipeline.
Finally, before returning the data to be processed, we need to do some preprocessing which includes converting the datetime given in the CSV / JSON to a timestamp, we need the rows as timestamped values and finally convert all the fields to float.
In the snippet above, the
ToTimestamp() takes in
TICK_DATA_TIMESTAMP_FORMAT and parses it. The
AddTimestamp function creates the t timestamped rows. This is given by this DoFn.
If you have a look at the Github code , you’ll notice that I have two resampler classes. The first time I tried this out, I went for the built in window function, that creates fixed windows. However, I couldn’t find a way to make them start at certain times. For example if we’re resampling OHLCV data from 1 minute to 5 minute windows, I’d want the window to start at 12:00 and last to 12:05 not randomly starting and finishing other times such as 12:01 to 12:06. Now one might argue that such shifts technically shouldn’t make a different when building a model based on candle sticks since it is all based on tick data behind the scenes anyway. However, for ease of use I wanted these to start on round figures.
In order to do this, I decided to create time groups myself based on the timestamp and window size, and then use a simple
CombinePerKey function to create the OHLCV values. To the combine per key function we pass in the combiner which does the actual lifting. I'll go into that in the next section. Here is a diagram of the resampler I created:
Ok, let’s start dissecting this down. The
TimestampToTimeGroup is a Pardo function that essentially does this:
The division and floor operation is enough to create the time group. This is because the output value will remain constant while in a certain multiple of the window size.
0 // 3 = 0
1 // 3 = 0
2 // 3 = 0
3 // 3 = 1
The resample function then calls a map elements function, the
CombineByKey function and then the de-map function. The map and de-map functions are needed to restructure the data to be a tuple in the form of a
key:value pair. This function also has another purpose. If the data contains multiple instruments, we can provide an
instrument_column to the resampler to make sure it resamples per instruments. Else it will mash everything together into one big unusable clump of data.
This function is given below:
The de-map function follows the same lines doing the thing in reverse. Won’t really go into that now, but if you’re interested have a look at the code. It’s all neat and tidy I promise!
CombinePerKey function is what does the actual aggregation. This function requires a combiner to handle what happens as rows come in and then how it accumulates combiners running on different nodes. Out of the box combines include sum combiners or moving average combiners. We created one out of the box to do the resampling which is explained next.
The combiner is inheriting from
apache_beam.CombineFn. When using this class we need to make sure some methods are provided as shown below:
In our case the combiner has the following functionality:
When receiving a new input, we check for the open, high, low, close values using the appropriate operation. The volume is a simple increment. In the merge accumulators we do the same checks and operations but across pre-accumulated results. The functions are very simple, but the class does get a bit lengthy due to all the doc strings. Hence, I won’t overload this article with the code but you can easily have a look at it here .
get_value function is a simple function to define what value would be used to calculate the OHLCV aggregation. The current implementation uses only the ask value from the tick data. However, this can easily be manipulated to use the bid or combination of both.
One thing I seriously love about Apache Beam is how it forces you to use DoFn. This makes it easy to:
- Create well documented code
- Stick to the one function on task rule
- Use test driven development
Now in the codebase for the FxBeam project i didn’t use a test driven development approach. However, tests could be easily defined as follows:
Using this structure we can test anything in the pipeline, from specific DoFns, segments or even the whole flow. Therefore, we can pinpoint exactly where an issue lies if something comes up.
Running the Pipeline
The final quick note is related to running the pipeline.
I have created a simple script that accepts arguments and runs the pipeline. This by default runs the pipeline using a direct runner which runs it on the current local machine. This is a less optimized way of running Apache Beam and is only mostly used for testing. To run it on a Spark cluster it is not as simple as providing the endpoint of the master node for Spark.
Since we’re using the Python version of Apache Beam we need the Beam Spark Job Server. This job service will create the spark job to run before submitting it to the cluster. More information on this is given here .
And that should be it! Once completed you should have a fully running pipeline to ingest tick data and it provides OHLCV values. Unfortunately, I haven’t set this up in streaming mode yet. I need to look into that and see what changes are needed. I might go back to using the actual window functions after all! Once that is done I’ll make sure to create another article that highlights the changes.
Originally published at https://markcutajar.com.