Modules

Module using AleiaModel

The pipeline YAML file is:

dataNodes:
  data_node_1:
    source: "notebooks/dataset/iris_no_target.csv"
  data_node_2:
    source: ""

processNodes:
  process_node_1:
    input:
    - data_node_1
    output:
    - data_node_2
    processor: iris_knn_module

# cronjob information that you can change to fit your needs, or delete for
# manual trigger.
schedule:
  schedule_interval: "0 0 * * *"
  start_date: "2022-01-01 00:00"
  end_date: "2022-12-31 00:00"
  timezone: "Europe/Paris"

The related module code is:

# sphinx_gallery_thumbnail_path = '_static/iris.png'
import pandas as pd

from datetime import datetime
from pathlib import Path
from aleiamodel import AleiaModel
from aleialib.s3 import save_file
from aleialib.processor import fetch_datanodes_information

# Both input_path and output_path are relative to S3 root, so they must start
# with 'notebooks/'. Make sure it is the case in your pipeline YAML file.
input_path, output_path = fetch_datanodes_information()
# Those are always lists
input_path = input_path[0]
output_path = output_path[0]
model_name = "iris_knn"
if not output_path:
    output_path = f"notebooks/dataset/{model_name}/predicted.csv"
model = AleiaModel(model_name, must_exist=True, ignore_requirements=True)
print(
    f"Predicting with model {model_name} using revision {model.revision}.",
    flush=True,
)
model.observations_set = input_path

# Fill in the kwargs if needed, or modify your pipeline YAML file to pass
# them as arguments
predicted = model.predict(
    feature_engineering_kwargs={"names": None},
    predict_kwargs={},
    postprocess_kwargs={},
)
# This line might not be useful depending on your particular use-case
predicted = pd.DataFrame(predicted)
print(predicted, flush=True)

# If 'predicted' is not a dataframe, use another extension
extension = ".csv"

# You can comment that out if you do not need the timestamp in your output, and
# if you are sure of your file's extension.
output_path = str(
    Path(f"{Path(output_path).with_suffix('')}_{datetime.now()}").with_suffix(
        extension
    )
)

save_file(
    output_path,
    predicted,
    object_type="dataset",
    handle_type=True,
    from_s3_root=True,
)

model.save()
model.close()