Modules
Module using AleiaModel
The pipeline YAML file is:
dataNodes:
data_node_1:
source: "notebooks/dataset/iris_no_target.csv"
data_node_2:
source: ""
processNodes:
process_node_1:
input:
- data_node_1
output:
- data_node_2
processor: iris_knn_module
# cronjob information that you can change to fit your needs, or delete for
# manual trigger.
schedule:
schedule_interval: "0 0 * * *"
start_date: "2022-01-01 00:00"
end_date: "2022-12-31 00:00"
timezone: "Europe/Paris"
The related module code is:
# sphinx_gallery_thumbnail_path = '_static/iris.png'
import pandas as pd
from datetime import datetime
from pathlib import Path
from aleiamodel import AleiaModel
from aleialib.s3 import save_file
from aleialib.processor import fetch_datanodes_information
# Both input_path and output_path are relative to S3 root, so they must start
# with 'notebooks/'. Make sure it is the case in your pipeline YAML file.
input_path, output_path = fetch_datanodes_information()
# Those are always lists
input_path = input_path[0]
output_path = output_path[0]
model_name = "iris_knn"
if not output_path:
output_path = f"notebooks/dataset/{model_name}/predicted.csv"
model = AleiaModel(model_name, must_exist=True, ignore_requirements=True)
print(
f"Predicting with model {model_name} using revision {model.revision}.",
flush=True,
)
model.observations_set = input_path
# Fill in the kwargs if needed, or modify your pipeline YAML file to pass
# them as arguments
predicted = model.predict(
feature_engineering_kwargs={"names": None},
predict_kwargs={},
postprocess_kwargs={},
)
# This line might not be useful depending on your particular use-case
predicted = pd.DataFrame(predicted)
print(predicted, flush=True)
# If 'predicted' is not a dataframe, use another extension
extension = ".csv"
# You can comment that out if you do not need the timestamp in your output, and
# if you are sure of your file's extension.
output_path = str(
Path(f"{Path(output_path).with_suffix('')}_{datetime.now()}").with_suffix(
extension
)
)
save_file(
output_path,
predicted,
object_type="dataset",
handle_type=True,
from_s3_root=True,
)
model.save()
model.close()