Peachjar Engineering

Kube-Native Workflows with Argo

December 20, 2018

About a month ago, I was looking for a tool to help automate some of the analytic jobs our data scientist built in Python. My goal was to automate the following process:

  1. Execute Python jobs that analyze the day’s activities, producing a CSV output.
  2. Create a new table in Postgres (that hold the CSV data).
  3. Insert the CSV data into the table.
  4. Recreate a view we use to point the current version of that dataset.

Objective

The job needed to be run daily and it’s output represented the entire corpus of information (there was no appending or updating of indices). Furthermore, the end goal was to have the data visualized in Grafana. Our internal users rely on this dashboard, so it was important for us to minimize downtime. This is why we use a view; it’s indirection between clients like Grafana and the underlying table of data.

My initial inclination was to stitch this process together with various Open Source tools (psql, timescaledb-parallel-copy, and awscli), codified in a Bash script. The actual task looks something like this:

# Generate a unique timestamp for the run
TIMESTAMP=$(date -u +"%Y%m%dT%H%M%S")

CSV_FILE="/tmp/clicks_${TIMESTAMP}.csv"

python run_analytics.py $CSV_FILE

TABLE=clicks_${TIMESTAMP}

PGPASSWORD=$DB_PASSWORD psql -h $DB_HOST -U $DB_USER \
              -d campaign_metrics -p $DB_PORT <<EOSQL
                create table if not exists ${TABLE} (
                  campaign_id uuid,
                  num_clicks integer not null,
                  start_time timestamp with time zone,
                  end_time timestamp with time zone,
                );
EOSQL

# Remove CSV header
tail -n +2 $CSV_FILE > /tmp/csv_no_head.csv

CONNECTION="host=$DB_HOST user=$DB_USER sslmode=require"
CONNECTION="${CONNECTION} password=$DB_PASSWORD port=$DB_PORT"
COLUMNS="campaign_id,num_clicks,start_time,end_time"

timescaledb-parallel-copy --db-name campaign_metrics --table ${TABLE} \
              --connection $CONNECTION --reporting-period 2s --columns $COLUMNS \
              --copy-options "CSV NULL ''" --workers 4 --file /tmp/csv_no_head.csv

PGPASSWORD=$DB_PASSWORD psql -h $DB_HOST -U $DB_USER \
              -d campaign_metrics -p $DB_PORT <<EOSQL
                create index if not exists ${TABLE}_campaign_id
                     on ${TABLE} (campaign_id);
                DROP VIEW IF EXISTS clicks;
                CREATE VIEW clicks AS SELECT * FROM ${TABLE};
EOSQL

I also needed the script to execute within our AWS environment (for security and performance reasons). However, I realized that this was the first effort (of many) where we would be performing simple batch workflows on a schedule. I could containerize all the tools I needed for this one job, but each time we had a new process, we would be faced with adding tools to an already monolithic Docker image or creating a multitude of bespoke images for each process.

What I really wanted was a way to use Docker like a Bash script, tethering multiple containers together into a coherent process — i.e. the output of one container becomes the input of the next.

After about a week of research, including various spikes with tools like Airflow, AWS Glue, etc., I came across Argo - The Workflow Engine for Kubernetes:

Argo

Argo is not only exactly what I was looking for, but better. My original goal was to find a framework that could launch a sequential set of containers in a remote Docker environment. What I found in Argo was a full-featured workflow engine (supporting multi-root DAGs execution) where each task in a workflow is a Kubernetes pod!

Pods as workflow tasks is a game-changer. By reusing Kubernetes objects (specifically the Pod spec), we can essentially use any framework or technology to implement our workflow tasks This is obviously useful if you want to use multiple languages or platforms, but what for the specific task I was trying to automate, it meant that I didn’t have to write, compile, or deploy any code. Additionally, Argo allows developers to reuse other pieces of Kubernetes infrastructure, like Config Maps and Secrets.

Using Argo with our Example

Using our previous example, we might have an Argo workflow that looks like this (sorry, I know this is verbose):

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: click-csv-to-pg-
spec:
  entrypoint: main
  imagePullSecrets:
    - name: quay-pull-secret
  templates:
    - name: main
      steps:
        - - name: generate-tablename
            template: generate-tablename
        - - name: run-analysis
            template: run-analysis
        - - name: create-table
            template: create-table
            arguments:
              parameters:
              - name: clickstable
                value: "{{steps.generate-tablename.outputs.result}}"
        - - name: import-csv
            template: import-csv
            arguments:
              artifacts:
                - name: clicks-csv
                  from: "{{steps.run-analysis.outputs.artifacts.clicks-csv}}"
              parameters:
                - name: clickstable
                  value: "{{steps.generate-tablename.outputs.result}}"
        - - name: index-and-switch-view
            template: index-and-switch-view
            arguments:
              parameters:
                - name: clickstable
                  value: "{{steps.generate-tablename.outputs.result}}"

    - name: generate-tablename
      script:
        image: node:10
        command: [node]
        source: |
          console.log('clicks_' +
              new Date('{{workflow.creationTimestamp}}')
                  .toISOString().replace(/-|T|:|[.]/g, '')
                  .toLowerCase());

    - name: run-analysis
      container:
        image: peachjar/blinded-by-data-science
        args: ['/tmp/clicks.csv']
        envFrom:
          - secretRef:
              name: secrets-for-click-analysis
      outputs:
        artifacts:
          - name: clicks-csv
            path: /tmp/clicks.csv

    - name: create-table
      inputs:
        parameters:
        - name: clickstable
      container:
        image: postgres:10
        command: [sh, -c]
        args:
          - |
            PGPASSWORD=$DB_PASSWORD psql -h $DB_HOST -U $DB_USER \
              -d metrics -p $DB_PORT <<EOSQL
                create table if not exists {{inputs.parameters.clickstable}} (
                  campaign_id uuid,
                  num_clicks integer not null,
                  start_time timestamp with time zone,
                  end_time timestamp with time zone,
                );
            EOSQL
        envFrom:
          - secretRef:
              name: secrets-for-click-analysis

    - name: import-csv
      inputs:
        artifacts:
          - name: clicks-csv
            path: /tmp/clicks.csv
        parameters:
        - name: clickstable
      container:
        image: quay.io/peachjar/timescaledb-parallel-copy:build-29
        command: [sh, -c]
        args:
          - |
            tail -n +2 /tmp/clicks > /tmp/csv_no_head.csv
            CONNECTION="host=$DB_HOST user=$DB_USER sslmode=require"
            CONNECTION="${CONNECTION} password=$DB_PASSWORD port=$DB_PORT"
            COLUMNS="campaign_id,num_clicks,start_time,end_time"
            timescaledb-parallel-copy --db-name metrics \
                --table {{inputs.parameters.clickstable}} \
                --connection $CONNECTION --reporting-period 2s --columns $COLUMNS \
                --copy-options "CSV NULL ''" --workers 4 --file /tmp/csv_no_head.csv
        envFrom:
        - secretRef:
            name: secrets-for-click-analysis

    - name: index-and-switch-view
      inputs:
        parameters:
        - name: clickstable
      container:
        image: postgres:10
        command: [sh, -c]
        args:
          - |
            PGPASSWORD=$DB_PASSWORD psql -h $DB_HOST -U $DB_USER \
              -d metrics -p $DB_PORT <<EOSQL
                create index if not exists ${TABLE}_campaign_id
                     on {{inputs.parameters.clickstable}} (campaign_id);
                DROP VIEW IF EXISTS clicks;
                CREATE VIEW clicks AS
                    SELECT * FROM {{inputs.parameters.clickstable}};
            EOSQL
        envFrom:
          - secretRef:
              name: secrets-for-click-analysis

Running the Workflow

An instance of the workflow can be created using the argo client:

argo submit my-workflow.yml

Although I don’t have any workflow-level input parameters, you can supply those when a job is launched:

argo submit my-workflow.yml -p key=value foo=bar

Dissecting the Workflow Example

This task differs a slightly from the original Bash script, specifically the use of Node.js to generate a table name, but the essence of the workflow is the same.

The workflow consists of a number of templates, which are really just Pod specifications with additional metadata like inputs and outputs (as needed) to describe how the Pod interacts within the general workflow environment.

This workflow has 6 templates:

generate-tablename

We use Node.js to parse the Workflow timestamp, which is part of the metadata available about the workflow, to generate a unique table name for this version of the dataset. This is also a great example of Argo’s feature of “inline scripts” within a workflow. The table name is the output of the script (literally stdout) and can be referenced by other scripts.

run-analysis

Run the analytics using a custom Docker image. One of the great things about Argo is that generates Kubernetes objects, and thus, we get cool stuff like imagePullSecrets allowing us to use containers from our private Quay.io repository:

imagePullSecrets:
  - name: quay-pull-secret

Also, we save the output of the process as an artifact:

outputs:
  artifacts:
    - name: clicks-csv
      path: /tmp/clicks.csv

Where the artifact is saved depends on how you set up Argo. Argo supports a few storage mechanisms; since we are in AWS, we use S3.

create-table

Create the new table using the psql command from the official postgres image. Notice that the task uses an input called clickstable.

inputs:
  parameters:
    - name: clickstable

This is rendered in the task by Argo using Mustache/Handlebars syntax:

create table if not exists {{inputs.parameters.clickstable}} ...

import-csv

We import the CSV into the newly created table using timescaledb-parallel-copy. This is a really fast way to import datasets (we insert 4m records in under 30s). The cool thing to notice about this task is how we use both parameter and artifact inputs:

inputs:
  artifacts:
    - name: clicks-csv
      path: /tmp/clicks.csv
    parameters:
      - name: clickstable

index-and-switch-view

Recreate the view and table indices using psql again.

main

This is probably the most interesting template. main defines the actual sequence of the workflow (which is why I called it main); it is an example of an Argo step template. Argo also includes a dag template which allows for a much more complex workflow including branching and parallel tasks. Argo knows that main is our entry point because we configure it that way:

spec:
  entrypoint: main

However, we also have the option to override the entry point when we submit a job:

argo submit my-workflow-definition.yml --entrypoint some-other-template

Another cool thing about Argo is that a template can be used multiple times. Templates can then be configured by their input to take on different behaviors:

- - name: import-csv
    template: import-csv
    arguments:
    artifacts:
      - name: clicks-csv
        from: "{{steps.run-analysis.outputs.artifacts.clicks-csv}}"
     parameters:
      - name: clickstable
        value: "{{steps.generate-tablename.outputs.result}}"

Hypothetically, I could have created one psql template and injected the SQL logic via parameter; I just preferred defining the SQL with the template.

Demo

This is an example of running the actual workflow we use (not the one above). It’s very similar — the real difference is that I’m starting with an existing CSV (the analysis task takes hours) and the example doesn’t show real configuration from our Kubernetes environment — otherwise, the flow is identical.

argo submit workflow.yml

This is what the console looks like when you watch the task progress:

argo watch <task-id>

Argo CLI in Progress

You can also follow the workflow’s progress in the Argo UI.

Start the Kubernetes proxy:

kubectl proxy

And visiting the UI through the proxy: http://127.0.0.1:8001/api/v1/namespaces/argo/services/argo-ui/proxy/

Here the workflow is still in progress:

Argo UI in progress

You can even drill in and get details about the task:

Argo task details

This also includes information about the artifacts generated during the process:

Artifacts

When the workflow finishes, you will see the following output in the console:

CLI Complete

The UI will also show the workflow is complete:

UI Complete

Conclusion

Argo is a fantastic framework. Our team’s initial experiences with Argo convinced us to convert more of our DevOps tasks to the framework. We now run database migrations as workflows and are looking to leverage Argo CD to provision test environments. If that goes well, I imagine Argo will be used to perform all of our deployment and analytic tasks. Needless to say, I’m really excited about this project and look forward to all the new features planned in the future.


Richard Clayton

Author

Richard Clayton

Senior Back-end Engineering

Peachjar Engineering

We're Hiring!
Check out our careers page for more info!