This is a collection of Python jobs that are supposed to transform data. These jobs are using PySpark to process larger volumes of data and are supposed to run on a Spark cluster (via spark-submit).
We use batect to dockerise the tasks in this exercise. batect is a lightweight wrapper around Docker that helps to ensure tasks run consistently (across linux, mac windows). With batect, the only dependencies that need to be installed are Docker and Java >=8. Every other dependency is managed inside Docker containers. Please make sure you have the following installed and can run them
- Docker
- Java (1.8)
You could use following instructions as guidelines to install Docker and Java.
# Install pre-requisites needed by batect # For mac users: scripts/install.sh # For windows/linux users:# Please ensure Docker and java >=8 is installed scripts\install_choco.ps1 scripts\install.bat./batect unit-test./batect integration-test./batect style-checksThis is running the linter and a type checker.
There are two applications in this repo: Word Count, and Citibike.
Currently, these exist as skeletons, and have some initial test cases which are defined but ignored. For each application, please un-ignore the tests and implement the missing logic.
A NLP model is dependent on a specific input file. This job is supposed to preprocess a given text file to produce this input file for the NLP model (feature engineering). This job will count the occurrences of a word within the given text file (corpus).
There is a dump of the datalake for this under resources/word_count/words.txt with a text file.
Simple *.txt file containing text.
A single *.csv file containing data similar to:
"word","count""a","3""an","5" ...JOB=jobs/word_count.py ./batect run-job For analytics purposes the BI department of a bike share company would like to present dashboards, displaying the distance each bike was driven. There is a *.csv file that contains historical data of previous bike rides. This input file needs to be processed in multiple steps. There is a pipeline running these jobs.
There is a dump of the datalake for this under resources/citibike/citibike.csv with historical data.
Reads a *.csv file and transforms it to parquet format. The column names will be sanitized (whitespaces replaced).
Historical bike ride *.csv file:
"tripduration","starttime","stoptime","start station id","start station name","start station latitude",...364,"2017-07-01 00:00:00","2017-07-01 00:06:05",539,"Metropolitan Ave & Bedford Ave",40.71534825,... ...*.parquet files containing the same content
"tripduration","starttime","stoptime","start_station_id","start_station_name","start_station_latitude",...364,"2017-07-01 00:00:00","2017-07-01 00:06:05",539,"Metropolitan Ave & Bedford Ave",40.71534825,... ...JOB=jobs/citibike_ingest.py ./batect run-jobThis job takes bike trip information and calculates the "as the crow flies" distance traveled for each trip. It reads the previously ingested data parquet files.
Hint:
- For distance calculation, consider using Harvesine formula as an option.
Historical bike ride *.parquet files
"tripduration",...364,... ...*.parquet files containing historical data with distance column containing the calculated distance.
"tripduration",...,"distance"364,...,1.34 ...JOB=jobs/citibike_distance_calculation.py ./batect run-jobIf you would like to run the code in your laptop locally without containers then please follow instructions here.
