December 20, 2018
About a month ago, I was looking for a tool to help automate some of the analytic jobs our data scientist built in Python. My goal was to automate the following process:
The job needed to be run daily and it’s output represented the entire corpus of information (there was no appending or updating of indices). Furthermore, the end goal was to have the data visualized in Grafana. Our internal users rely on this dashboard, so it was important for us to minimize downtime. This is why we use a view; it’s indirection between clients like Grafana and the underlying table of data.
My initial inclination was to stitch this process together with various Open Source tools (psql, timescaledb-parallel-copy, and awscli), codified in a Bash script. The actual task looks something like this:
# Generate a unique timestamp for the run
TIMESTAMP=$(date -u +"%Y%m%dT%H%M%S")
CSV_FILE="/tmp/clicks_${TIMESTAMP}.csv"
python run_analytics.py $CSV_FILE
TABLE=clicks_${TIMESTAMP}
PGPASSWORD=$DB_PASSWORD psql -h $DB_HOST -U $DB_USER \
-d campaign_metrics -p $DB_PORT <<EOSQL
create table if not exists ${TABLE} (
campaign_id uuid,
num_clicks integer not null,
start_time timestamp with time zone,
end_time timestamp with time zone,
);
EOSQL
# Remove CSV header
tail -n +2 $CSV_FILE > /tmp/csv_no_head.csv
CONNECTION="host=$DB_HOST user=$DB_USER sslmode=require"
CONNECTION="${CONNECTION} password=$DB_PASSWORD port=$DB_PORT"
COLUMNS="campaign_id,num_clicks,start_time,end_time"
timescaledb-parallel-copy --db-name campaign_metrics --table ${TABLE} \
--connection $CONNECTION --reporting-period 2s --columns $COLUMNS \
--copy-options "CSV NULL ''" --workers 4 --file /tmp/csv_no_head.csv
PGPASSWORD=$DB_PASSWORD psql -h $DB_HOST -U $DB_USER \
-d campaign_metrics -p $DB_PORT <<EOSQL
create index if not exists ${TABLE}_campaign_id
on ${TABLE} (campaign_id);
DROP VIEW IF EXISTS clicks;
CREATE VIEW clicks AS SELECT * FROM ${TABLE};
EOSQL
I also needed the script to execute within our AWS environment (for security and performance reasons). However, I realized that this was the first effort (of many) where we would be performing simple batch workflows on a schedule. I could containerize all the tools I needed for this one job, but each time we had a new process, we would be faced with adding tools to an already monolithic Docker image or creating a multitude of bespoke images for each process.
What I really wanted was a way to use Docker like a Bash script, tethering multiple containers together into a coherent process — i.e. the output of one container becomes the input of the next.
After about a week of research, including various spikes with tools like Airflow, AWS Glue, etc., I came across Argo - The Workflow Engine for Kubernetes:
Argo is not only exactly what I was looking for, but better. My original goal was to find a framework that could launch a sequential set of containers in a remote Docker environment. What I found in Argo was a full-featured workflow engine (supporting multi-root DAGs execution) where each task in a workflow is a Kubernetes pod!
Pods as workflow tasks is a game-changer. By reusing Kubernetes objects (specifically the Pod spec), we can essentially use any framework or technology to implement our workflow tasks This is obviously useful if you want to use multiple languages or platforms, but what for the specific task I was trying to automate, it meant that I didn’t have to write, compile, or deploy any code. Additionally, Argo allows developers to reuse other pieces of Kubernetes infrastructure, like Config Maps and Secrets.
Using our previous example, we might have an Argo workflow that looks like this (sorry, I know this is verbose):
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: click-csv-to-pg-
spec:
entrypoint: main
imagePullSecrets:
- name: quay-pull-secret
templates:
- name: main
steps:
- - name: generate-tablename
template: generate-tablename
- - name: run-analysis
template: run-analysis
- - name: create-table
template: create-table
arguments:
parameters:
- name: clickstable
value: "{{steps.generate-tablename.outputs.result}}"
- - name: import-csv
template: import-csv
arguments:
artifacts:
- name: clicks-csv
from: "{{steps.run-analysis.outputs.artifacts.clicks-csv}}"
parameters:
- name: clickstable
value: "{{steps.generate-tablename.outputs.result}}"
- - name: index-and-switch-view
template: index-and-switch-view
arguments:
parameters:
- name: clickstable
value: "{{steps.generate-tablename.outputs.result}}"
- name: generate-tablename
script:
image: node:10
command: [node]
source: |
console.log('clicks_' +
new Date('{{workflow.creationTimestamp}}')
.toISOString().replace(/-|T|:|[.]/g, '')
.toLowerCase());
- name: run-analysis
container:
image: peachjar/blinded-by-data-science
args: ['/tmp/clicks.csv']
envFrom:
- secretRef:
name: secrets-for-click-analysis
outputs:
artifacts:
- name: clicks-csv
path: /tmp/clicks.csv
- name: create-table
inputs:
parameters:
- name: clickstable
container:
image: postgres:10
command: [sh, -c]
args:
- |
PGPASSWORD=$DB_PASSWORD psql -h $DB_HOST -U $DB_USER \
-d metrics -p $DB_PORT <<EOSQL
create table if not exists {{inputs.parameters.clickstable}} (
campaign_id uuid,
num_clicks integer not null,
start_time timestamp with time zone,
end_time timestamp with time zone,
);
EOSQL
envFrom:
- secretRef:
name: secrets-for-click-analysis
- name: import-csv
inputs:
artifacts:
- name: clicks-csv
path: /tmp/clicks.csv
parameters:
- name: clickstable
container:
image: quay.io/peachjar/timescaledb-parallel-copy:build-29
command: [sh, -c]
args:
- |
tail -n +2 /tmp/clicks > /tmp/csv_no_head.csv
CONNECTION="host=$DB_HOST user=$DB_USER sslmode=require"
CONNECTION="${CONNECTION} password=$DB_PASSWORD port=$DB_PORT"
COLUMNS="campaign_id,num_clicks,start_time,end_time"
timescaledb-parallel-copy --db-name metrics \
--table {{inputs.parameters.clickstable}} \
--connection $CONNECTION --reporting-period 2s --columns $COLUMNS \
--copy-options "CSV NULL ''" --workers 4 --file /tmp/csv_no_head.csv
envFrom:
- secretRef:
name: secrets-for-click-analysis
- name: index-and-switch-view
inputs:
parameters:
- name: clickstable
container:
image: postgres:10
command: [sh, -c]
args:
- |
PGPASSWORD=$DB_PASSWORD psql -h $DB_HOST -U $DB_USER \
-d metrics -p $DB_PORT <<EOSQL
create index if not exists ${TABLE}_campaign_id
on {{inputs.parameters.clickstable}} (campaign_id);
DROP VIEW IF EXISTS clicks;
CREATE VIEW clicks AS
SELECT * FROM {{inputs.parameters.clickstable}};
EOSQL
envFrom:
- secretRef:
name: secrets-for-click-analysis
An instance of the workflow can be created using the argo
client:
argo submit my-workflow.yml
Although I don’t have any workflow-level input parameters, you can supply those when a job is launched:
argo submit my-workflow.yml -p key=value foo=bar
This task differs a slightly from the original Bash script, specifically the use of Node.js to generate a table name, but the essence of the workflow is the same.
The workflow consists of a number of templates
, which are really just Pod specifications with additional metadata like inputs
and outputs
(as needed) to describe how the Pod interacts within the general workflow environment.
This workflow has 6 templates:
We use Node.js to parse the Workflow timestamp, which is part of the metadata available about the workflow, to generate a unique table name for this version of the dataset. This is also a great example of Argo’s feature of “inline scripts” within a workflow. The table name is the output of the script (literally stdout
) and can be referenced by other scripts.
Run the analytics using a custom Docker image. One of the great things about Argo is that generates Kubernetes objects, and thus, we get cool stuff like imagePullSecrets
allowing us to use containers from our private Quay.io repository:
imagePullSecrets:
- name: quay-pull-secret
Also, we save the output of the process as an artifact:
outputs:
artifacts:
- name: clicks-csv
path: /tmp/clicks.csv
Where the artifact is saved depends on how you set up Argo. Argo supports a few storage mechanisms; since we are in AWS, we use S3.
Create the new table using the psql
command from the official postgres
image. Notice that the task uses an input called clickstable
.
inputs:
parameters:
- name: clickstable
This is rendered in the task by Argo using Mustache/Handlebars syntax:
create table if not exists {{inputs.parameters.clickstable}} ...
We import the CSV into the newly created table using timescaledb-parallel-copy
. This is a really fast way to import datasets (we insert 4m records in under 30s). The cool thing to notice about this task is how we use both parameter and artifact inputs:
inputs:
artifacts:
- name: clicks-csv
path: /tmp/clicks.csv
parameters:
- name: clickstable
Recreate the view and table indices using psql
again.
This is probably the most interesting template. main
defines the actual sequence of the workflow (which is why I called it main); it is an example of an Argo step
template. Argo also includes a dag
template which allows for a much more complex workflow including branching and parallel tasks. Argo knows that main is our entry point because we configure it that way:
spec:
entrypoint: main
However, we also have the option to override the entry point when we submit a job:
argo submit my-workflow-definition.yml --entrypoint some-other-template
Another cool thing about Argo is that a template can be used multiple times. Templates can then be configured by their input
to take on different behaviors:
- - name: import-csv
template: import-csv
arguments:
artifacts:
- name: clicks-csv
from: "{{steps.run-analysis.outputs.artifacts.clicks-csv}}"
parameters:
- name: clickstable
value: "{{steps.generate-tablename.outputs.result}}"
Hypothetically, I could have created one psql
template and injected the SQL logic via parameter; I just preferred defining the SQL with the template.
This is an example of running the actual workflow we use (not the one above). It’s very similar — the real difference is that I’m starting with an existing CSV (the analysis task takes hours) and the example doesn’t show real configuration from our Kubernetes environment — otherwise, the flow is identical.
argo submit workflow.yml
This is what the console looks like when you watch the task progress:
argo watch <task-id>
You can also follow the workflow’s progress in the Argo UI.
Start the Kubernetes proxy:
kubectl proxy
And visiting the UI through the proxy: http://127.0.0.1:8001/api/v1/namespaces/argo/services/argo-ui/proxy/
Here the workflow is still in progress:
You can even drill in and get details about the task:
This also includes information about the artifacts generated during the process:
When the workflow finishes, you will see the following output in the console:
The UI will also show the workflow is complete:
Argo is a fantastic framework. Our team’s initial experiences with Argo convinced us to convert more of our DevOps tasks to the framework. We now run database migrations as workflows and are looking to leverage Argo CD to provision test environments. If that goes well, I imagine Argo will be used to perform all of our deployment and analytic tasks. Needless to say, I’m really excited about this project and look forward to all the new features planned in the future.
We're Hiring!
Check out our careers page for more info!