Pipelines & jobs

"Pipeline" is the object in Allonia platform that let you design and manage an advanced pipeline for complex data, task, & model processing.

These Pipelines will be instanced into "Jobs" that will execute them, manually or scheduled, based on the pipeline configuration.

pipeline view

Pipelines

Pipeline creation

You can create a pipeline from the "Pipeline" menu and edit it.

pipeline create

You can also create a complete Pipeline with a single Module directly from a Notebook for which you have further information in related section here.

Pipeline description as Allonia language

Allonia language is a simple data processing Pipeline description format based on YAML syntax. It contains two parts:

  • Data nodes: list of data sets used as part of the pipeline, whether input, intermediate or output.

  • Processing nodes: list of Modules (as processing steps or tasks) as part of the pipeline.

You can check for further information about how to create and manage Modules in related section here.

Processing nodes have input data nodes and output data nodes, thus allowing to declare dependencies. The nodes chained in this way constitute a graph, pipeline’s graph.

Write a Pipeline

Basic pipeline description example:

dataNodes:
  data_node_1: (1)
    source: my_input.csv (3)
  data_node_2: (2)
    source: my_output.csv (3)

processNodes:
  process_node_1: (4)
    input: (5)
    - data_node_1
    output: (6)
    - data_node_2
    processor: my_module (7)

Description:

1 Declaration of the first data node
2 Declaration of the second data node
3 Dame of the data sets
4 Declaration of a single processing node
5 Configuration of input for the processing node, here we reference the first data node
6 Configuration of output for the processing node, here we reference the second data node
7 Name of the Module

So, the effective graph for this simple pipeline is: my_input.csv → my_module → my_output.csv

Example with multitasking:

dataNodes:
  data_node_1:
    source: my_input.csv
  data_node_2:
    source: intermediate.csv
  data_node_3:
    source: my_output.csv
processNodes:
  process_node_1:
    input:
    - data_node_1
    output:
    - data_node_2
    processor: my_first_task
  process_node_2:
    input:
    - data_node_2
    output:
    - data_node_3
    processor: my_second_task

So, the effective graph for this two-tasks pipeline is: my_input.csv → my_first_task → intermediate.csv → my_second_task → my_output.csv

Example with a task having multiple inputs and outputs:

processNodes:
  task1:
    input:
    - data1
    - data2
    - data3
    output:
    - data4
    - data5
    processor: my_module

This task will read 3 input data sets, and write 2 output data sets.

Parse Pipeline informations and use it in Python code

You can parse informations passed through the datanodes & processnodes to use it inside Modules with the dedicated aleialib.processor python librarie. It will let you develop generic modules that will be able to parse data content from Pipelines description.

How to use assets described in Allonia language in python code:

from aleialib import processor
from aleialib import s3

input_data, output_data = processor.fetch_datanodes_information()

# use name of the first input dataNode, read object from Allonia S3
data = s3.load_file(input_data[0])

Schedule a Pipeline

Scheduling a pipeline can be done directly through the configuration file, with the cron schedule expression. Cron expression editor can be found online to make things easier: https://crontab.guru/

Example for a pipeline that will be executed each day at 10:05am on “Europe/Paris” time zone:

dataNodes:
  data_node_1:
    source: my_input.csv
  data_node_2:
    source: my_output.csv

processNodes:
  process_node_1:
    input:
    - data_node_1
    output:
    - data_node_2
    processor: my_module

schedule:
  schedule_interval: "5 10 * * *"
  start_date: "2022-01-01 00:00"
  end_date: "2022-12-31 00:00"
  timezone: "Europe/Paris"

Manage specific requirements

As you can manage librarie setup by default for a whole track (see Python package management related section here), sometimes you will want to manage specific requirements for a Pipeline to initialize it faster, without using all libraries.

Example for a pipeline that will take only pandas 1.5.3 by default, avoiding all other libraries installed with Allonia Package Management:

dataNodes:
  data_node_1:
    source: my_input.csv
  data_node_2:
    source: my_output.csv

processNodes:
  process_node_1:
    input:
    - data_node_1
    output:
    - data_node_2
    processor: my_module

requirement:
    -
        name: "pandas"
        version: 1.5.3

Jobs

Jobs view

Jobs view will display all job executions, running or not. Jobs will have these status depending on their type:

  • Manual (Pipeline with no scheduling parameters set, one time execution)

    • Ready

    • Running

    • Success

    • Failed

  • Scheduled (Pipeline with scheduling parameters set)

    • Pending

    • Running

    • Success

    • Failed

job view

Job creation and run

In order to run a Pipeline into a Job execution: * Ensure pipeline’s input data sets and modules exist * Select a pipeline and use the "Build job" action. After a few seconds, the job is ready to run * If it’s a manual pipeline, you will have to run it. If it’s scheduled, it will wait for next execution iteration

As soon a job is running, you will be able to access to its logs in real-time.

job create

Jobs execution from outside the platform

Jobs can be executed from outside the platform as well, using Allonia’s platforms APIs and a token that can be created and managed through the My Account/Access token space available in your account.

For a given job created on the platform, copying its UUID from the UI will let you execute from code.

Code example to be executed outside Allonia’s platform for an existing job:

from requests_aws4auth import AWS4Auth
#pip3 install requests-aws4auth
import requests

#Environment info
path = "https://api.k.prod.infra.aleia.com"

#token informations
tokenId = "tokenId"
secretKey = "secretKey"

#AWS authentication
service = "s3"
region = ""
awsauth = AWS4Auth(tokenId, secretKey, region, service)

#test parameters
jobId = "jobId"

#Execute the job
pathJobRun = path + "/jobs/"+jobId+"/run"
rJobRun = requests.post(pathJobRun, auth=awsauth)
print(rJobRun.json())