Custom services

Service using Iris KNN

This service creates an API that allows one to predict using the existing Iris KNN model on a fixed dataset, but using user-specified weights and number of neihbors.

# sphinx_gallery_thumbnail_path = '_static/iris.png'
import os
import pandas as pd
from flask import Flask, request, Blueprint, make_response
from apispec_webframeworks.flask import FlaskPlugin
from apispec import APISpec
from flask_swagger_ui import get_swaggerui_blueprint

import aleialib


APP_PREFIX = f"/user-services/invoke/{os.environ.get('USER_SERVICE_ID')}"


def register_swagger(app_):
    spec = APISpec(
        title=os.environ.get("USER_SERVICE_NAME"),
        version="1.0.0",
        openapi_version="3.0.2",
        plugins=[FlaskPlugin()],
    )
    swaggerui_blueprint = get_swaggerui_blueprint(
        APP_PREFIX + "/docs", APP_PREFIX + "/docs/specs.json"
    )

    @swaggerui_blueprint.route("/specs.json", methods=["GET"])
    def get_api_doc():
        return spec.to_dict(), 200

    app_.register_blueprint(swaggerui_blueprint)
    with app_.test_request_context():
        for _, view in app_.view_functions.items():
            spec.path(view=view)


user_services_routes = Blueprint(
    "user_services_routes", __name__, url_prefix=APP_PREFIX
)


@user_services_routes.route("/health", methods=["GET"])
def get_health():
    """Get Health

    ---
    get:
      description: Get Health
      responses:
        200:
          description: Returns Health boolean
          content:
            application/json:
              schema:
                type: object
                properties:
                  health:
                    type: boolean
    """
    return make_response({"health": True})


# routes to be written by users
@user_services_routes.route("/predict", methods=["POST"])
def predict():
    """Predicts using trained Iris KNN.

    One can specify how many neighbors to use with which weights.

    ---
    post:
      tags:
        - Predictor
      description: Predicts using AleiaModel.
      requestBody:
        content:
          application/json:
            schema:
              type: object
              properties:
                weights:
                  type: string
                  required: false
                neighbors:
                  type: integer
                  required: false

      responses:
        200:
          description: Predicted values as a list.
          content:
            application/json:
              schema:
                type: object
                properties:
                  predictions:
                    type: array
                  weights:
                    type: string
                  neighbors:
                    type: integer
        400:
          description: Returns an error message
    """
    request_json = request.get_json()
    fits_summary = pd.DataFrame.from_dict(
        aleialib.user_services.invoke(
            "post",
            "/attribute",
            "52fbf15e-bdfa-477a-9ca6-7f39da042c03",
            {"model_name": "iris_knn", "attribute": "fits_summary"},
        )["fits_summary"]
    )[["revision", "parameters", "results"]].values.tolist()
    params_rev_res = pd.DataFrame(
        [
            (
                params["model_kwargs"]["n_neighbors"],
                params["model_kwargs"]["weights"],
                res[1]["accuracy"],
                revision,
            )
            for revision, params, res in fits_summary
        ],
        columns=["n", "w", "a", "r"],
    )
    params_rev_res = (
        params_rev_res.groupby(["n", "w"])
        .apply(lambda x: x.sort_values("a", ascending=False).iloc[0])
        .drop(columns=["n", "w"])
    )

    weights = request_json.get("weights")
    neighbors = request_json.get("neighbors")

    if (neighbors, weights) not in params_rev_res.index:
        return make_response(
            {
                "error": f"Weights {weights} and neighbors {neighbors} not found in training history"
            },
            400,
        )
    output = aleialib.user_services.invoke(
        "post",
        "/predict",
        "52fbf15e-bdfa-477a-9ca6-7f39da042c03",
        {
            "kwargs": {"feature_engineering_kwargs": {"names": ""}},
            "model_name": "iris_knn",
            "observations_set": {
                "path": "notebooks/dataset/iris_no_target.csv"
            },
            "reload": False,
            "revision": int(params_rev_res.loc[(neighbors, weights), "r"]),
            "save": False,
        },
    )

    # If output is none of those types, make sure it is json-serialisable
    return make_response(
        {"predictions": output, "weights": weights, "neighbors": neighbors}
    )


app = Flask(__name__)
app.register_blueprint(user_services_routes)
register_swagger(app)


if __name__ == "__main__":
    app.run()

Service using AleiaModel

This service creates an API with the 'attribute', 'learn' and 'predict' routes, valid for any existing AleiaModel (it takes the model name as argument).

The first call to one model could take some time, as it will be loaded from S3, but the next calls to the same model will be quicker, as the model is kept in memory.

You need to put multiprocessing_on_dill in your service requirements.

You can test quickly the predict route on Iris classification with:

{
  "kwargs": {
    "feature_engineering_kwargs": {"names": ""}
  },
  "model_name": "iris_knn",
  "observations_set": {
    "path": "notebooks/dataset/iris_no_target.csv"
  },
  "reload": false,
  "revision": -1,
  "save": false
}

You will need to have the ‘iris_no_target.csv’ file, containing the Iris dataset without the ‘target’ column.

import os
import json

import numpy as np
import pandas as pd

from apispec import APISpec
from multiprocessing_on_dill import Lock
from apispec_webframeworks.flask import FlaskPlugin
from flask_swagger_ui import get_swaggerui_blueprint
from flask import Flask, request, Blueprint, make_response
from multiprocessing_on_dill.managers import (
    AcquirerProxy,
    BaseManager,
    DictProxy,
)

from aleiamodel import AleiaModel, URL

aleiamodel_path_schema = {
    "properties": {
        "path": {
            "type": "string",
            "example": "notebooks/dataset/raw.csv",
            "required": "false",
        },
        "paths": {
            "type": "array",
            "items": {"type": "string"},
            "required": "false",
        },
        "url": {
            "type": "object",
            "properties": {
                "url": {"type": "string", "required": "true"},
                "filename": {"type": "string", "required": "false"},
                "s3_folder": {"type": "string", "required": "false"},
            },
            "required": "false",
        },
        "concatenate_kwargs": {"type": "object", "required": "false"},
        "load_kwargs": {"type": "object", "required": "false"},
        "handle_type": {
            "type": "boolean",
            "required": "false",
            "default": "true",
        },
        "revision": {
            "type": "integer",
            "required": "false",
            "default": "-1",
            "example": "-1",
        },
    }
}


APP_PREFIX = f"/user-services/invoke/{os.environ.get('USER_SERVICE_ID')}"
HOST = "127.0.0.1"
PORT = 35791
KEY = b"secret"


def get_shared_state(host, port, key):
    """This function allows to share large object among all the API workers.
    Here it is used to load AleiaModels only once and keep them in memory."""
    shared_dict_ = {}
    shared_lock_ = Lock()
    manager = BaseManager((host, port), key)
    manager.register("get_dict", lambda: shared_dict_, DictProxy)
    manager.register("get_lock", lambda: shared_lock_, AcquirerProxy)
    try:
        manager.get_server()
        manager.start()
    except OSError:  # Address already in use
        manager.connect()
    return manager.get_dict(), manager.get_lock()


shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)


def handle_path(model: AleiaModel, path_name: str, request_json: dict):
    if not (path_argument := request_json.get(path_name)):
        return
    if not isinstance(path_argument, dict):
        return
    private_name = f"_{path_name}"
    if not hasattr(model, private_name):
        return
    path = path_argument.get("path")
    paths = path_argument.get("paths")
    if path:
        setattr(model, path_name, path)
    elif paths:
        setattr(model, path_name, paths)
        if concatenate_kwargs := path_argument.get("concatenate_kwargs"):
            getattr(model, private_name).concatenate_kwargs = concatenate_kwargs
    elif url := path_argument.get("url"):
        setattr(model, path_name, URL(url))
        if force_download := path_argument.get("force_download"):
            getattr(model, private_name).force_download = force_download
    if path or paths:
        if load_kwargs := path_argument.get("load_kwargs"):
            getattr(model, private_name).load_kwargs = load_kwargs
        if handle_type := path_argument.get("handle_type"):
            getattr(model, private_name).handle_type = handle_type
        if revision := path_argument.get("revision"):
            getattr(model, private_name).use_revision(revision)


def get_model():
    request_json = request.get_json()
    save = request_json.get("save", False)
    name = request_json["model_name"]
    reload = request_json.get("reload", False)
    with shared_lock:
        if name not in shared_dict or reload:
            shared_dict[name] = AleiaModel(
                name,
                revision=request_json.get("revision", None),
                must_exist=True,
                ignore_requirements=True,
                read_only=not save,
            )
    return shared_dict[name], request_json, save


def register_swagger(app_):
    spec = APISpec(
        title=os.environ.get("USER_SERVICE_NAME"),
        version="1.0.0",
        openapi_version="3.0.2",
        plugins=[FlaskPlugin()],
    )
    swaggerui_blueprint = get_swaggerui_blueprint(
        APP_PREFIX + "/docs", APP_PREFIX + "/docs/specs.json"
    )
    spec.components.schema("AleiaModelPathDescription", aleiamodel_path_schema)

    @swaggerui_blueprint.route("/specs.json", methods=["GET"])
    def get_api_doc():
        return spec.to_dict(), 200

    app_.register_blueprint(swaggerui_blueprint)
    with app_.test_request_context():
        for _, view in app_.view_functions.items():
            spec.path(view=view)


user_services_routes = Blueprint(
    "user_services_routes", __name__, url_prefix=APP_PREFIX
)


@user_services_routes.route("/health", methods=["GET"])
def get_health():
    """Get Health

    ---
    get:
      description: Get Health
      responses:
        200:
          description: Returns Health boolean
          content:
            application/json:
              schema:
                type: object
                properties:
                  health:
                    type: boolean
    """
    return make_response({"health": True})


# routes to be written by users
@user_services_routes.route("/attribute", methods=["POST"])
def attribute():
    """Get any attribute of a model.

    ---
    post:
      tags:
        - Attribute
      requestBody:
        content:
          application/json:
            schema:
              type: object
              properties:
                model_name:
                  type: string
                  required: true
                attribute:
                  type: string
                  required: true
                reload:
                  description: Reload the model even if this service already
                    loaded it
                  type: boolean
                  default: false
                  required: false

      responses:
        200:
          description: the attribute value

        400:
          description: Returns an error message
    """
    model, request_json, _ = get_model()
    attribute_ = request_json.get("attribute")
    value = getattr(model, attribute_)
    if isinstance(value, np.ndarray):
        value = value.tolist()
    elif isinstance(value, (pd.DataFrame, pd.Series)):
        value = json.loads(value.to_json())

    return make_response({attribute_: value})


# routes to be written by users
@user_services_routes.route("/fit", methods=["POST"])
def learn():
    """Learns using AleiaModel.

    One can specify which model to use, with which version, the data to learn on
    and which kwargs should be given to the prediction pipeline.

    Using this route instead of learning in a notebook is handy for heavy models
    whose learning takes hours. You can then use your notebook for something
    else while other pods are dedicated to your model's learning. Use a
    healt check of your model in the notebook though, to make sure the pipeline
    is correct.

    ---
    post:
      tags:
        - Learner
      description: Learns using AleiaModel.
      requestBody:
        content:
          application/json:
            schema:
              type: object
              properties:
                model_name:
                  type: string
                  required: true
                raw_set:
                  $ref: '#/components/schemas/AleiaModelPathDescription'
                derived_set:
                  $ref: '#/components/schemas/AleiaModelPathDescription'
                train_set:
                  $ref: '#/components/schemas/AleiaModelPathDescription'
                validation_set:
                  $ref: '#/components/schemas/AleiaModelPathDescription'
                test_set:
                  $ref: '#/components/schemas/AleiaModelPathDescription'
                revision:
                  type: integer
                  required: false
                  example: -1
                  default: -1
                  description: 0 is invalid, None or -1 is the latest revision.
                kwargs:
                  type: object
                  required: false
                  properties:
                    reshape_x:
                      type: array
                      required: false
                      items:
                        type: integer
                      example: [-1, 1]
                    reshape_y:
                      type: array
                      required: false
                      items:
                        type: integer
                      example: [-1, 1]
                    model_kwargs:
                      type: object
                      required: false
                    feature_engineering_kwargs:
                      type: object
                      required: false
                    train_val_test_split_kwargs:
                      type: object
                      required: false
                    fit_kwargs:
                      type: object
                      required: false
                    predict_for_metrics_kwargs:
                      type: object
                      required: false
                    metrics_kwargs:
                      type: object
                      required: false
                save:
                  description: Should the model be saved after use ?
                  type: boolean
                  default: true
                  required: false
                reload:
                  description: Reload the model even if this service already
                    loaded it
                  type: boolean
                  default: false
                  required: false

      responses:
        200:
          description: Validation and Test metrics as dicts.
          content:
            application/json:
              schema:
                type: object
                properties:
                  validation:
                    type: object
                  test:
                    type: object

        400:
          description: Returns an error message
    """
    model, request_json, save = get_model()
    for path_name in request_json:
        if not path_name.endswith("_set"):
            continue
        handle_path(model, path_name, request_json)

    metrics = model.fit(**request_json.get("kwargs", {}))

    if save:
        model.save()
    model.close()

    return make_response({"validation": metrics[0], "test": metrics[1]})


# routes to be written by users
@user_services_routes.route("/predict", methods=["POST"])
def predict():
    """Predicts using AleiaModel.

    One can specify which model to use, with which version, the data on which
    to predict, and which kwargs should be given to the prediction pipeline.

    ---
    post:
      tags:
        - Predictor
      description: Predicts using AleiaModel.
      requestBody:
        content:
          application/json:
            schema:
              type: object
              properties:
                model_name:
                  type: string
                  required: true
                observations_set:
                  $ref: '#/components/schemas/AleiaModelPathDescription'
                derived_observations_set:
                  $ref: '#/components/schemas/AleiaModelPathDescription'
                revision:
                  type: integer
                  required: false
                  example: -1
                  default: -1
                  description: 0 is invalid, None or -1 is the latest revision.
                kwargs:
                  type: object
                  required: false
                  properties:
                    reshape_x:
                      type: array
                      required: false
                      items:
                        type: integer
                      example: [-1, 1]
                    feature_engineering_kwargs:
                      type: object
                      required: false
                    predict_kwargs:
                      type: object
                      required: false
                    postprocess_kwargs:
                      type: object
                      required: false
                save:
                  description: Should the model be saved after use ?
                  type: boolean
                  default: true
                  required: false
                reload:
                  description: Reload the model even if this service already
                    loaded it
                  type: boolean
                  default: false
                  required: false

      responses:
        200:
          description: Predicted values as a list.
          content:
            application/json:
              schema:
                type: object
                properties:
                  predictions:
                    type: array
        400:
          description: Returns an error message
    """
    model, request_json, save = get_model()
    for path_name in request_json:
        if not path_name.endswith("_set"):
            continue
        handle_path(model, path_name, request_json)

    output = model.predict(**request_json.get("kwargs", {}))

    if save:
        model.save()
    model.close()

    if isinstance(output, (pd.DataFrame, pd.Series)):
        output = output.values
    if isinstance(output, np.ndarray):
        output = output.tolist()

    # If output is none of those types, make sure it is json-serialisable
    return make_response({"predictions": output})


app = Flask(__name__)
app.register_blueprint(user_services_routes)
register_swagger(app)


if __name__ == "__main__":
    app.run()