Custom services
Service using Iris KNN
This service creates an API that allows one to predict using the existing Iris KNN model on a fixed dataset, but using user-specified weights and number of neihbors.
# sphinx_gallery_thumbnail_path = '_static/iris.png'
import os
import pandas as pd
from flask import Flask, request, Blueprint, make_response
from apispec_webframeworks.flask import FlaskPlugin
from apispec import APISpec
from flask_swagger_ui import get_swaggerui_blueprint
import aleialib
APP_PREFIX = f"/user-services/invoke/{os.environ.get('USER_SERVICE_ID')}"
def register_swagger(app_):
spec = APISpec(
title=os.environ.get("USER_SERVICE_NAME"),
version="1.0.0",
openapi_version="3.0.2",
plugins=[FlaskPlugin()],
)
swaggerui_blueprint = get_swaggerui_blueprint(
APP_PREFIX + "/docs", APP_PREFIX + "/docs/specs.json"
)
@swaggerui_blueprint.route("/specs.json", methods=["GET"])
def get_api_doc():
return spec.to_dict(), 200
app_.register_blueprint(swaggerui_blueprint)
with app_.test_request_context():
for _, view in app_.view_functions.items():
spec.path(view=view)
user_services_routes = Blueprint(
"user_services_routes", __name__, url_prefix=APP_PREFIX
)
@user_services_routes.route("/health", methods=["GET"])
def get_health():
"""Get Health
---
get:
description: Get Health
responses:
200:
description: Returns Health boolean
content:
application/json:
schema:
type: object
properties:
health:
type: boolean
"""
return make_response({"health": True})
# routes to be written by users
@user_services_routes.route("/predict", methods=["POST"])
def predict():
"""Predicts using trained Iris KNN.
One can specify how many neighbors to use with which weights.
---
post:
tags:
- Predictor
description: Predicts using AleiaModel.
requestBody:
content:
application/json:
schema:
type: object
properties:
weights:
type: string
required: false
neighbors:
type: integer
required: false
responses:
200:
description: Predicted values as a list.
content:
application/json:
schema:
type: object
properties:
predictions:
type: array
weights:
type: string
neighbors:
type: integer
400:
description: Returns an error message
"""
request_json = request.get_json()
fits_summary = pd.DataFrame.from_dict(
aleialib.user_services.invoke(
"post",
"/attribute",
"52fbf15e-bdfa-477a-9ca6-7f39da042c03",
{"model_name": "iris_knn", "attribute": "fits_summary"},
)["fits_summary"]
)[["revision", "parameters", "results"]].values.tolist()
params_rev_res = pd.DataFrame(
[
(
params["model_kwargs"]["n_neighbors"],
params["model_kwargs"]["weights"],
res[1]["accuracy"],
revision,
)
for revision, params, res in fits_summary
],
columns=["n", "w", "a", "r"],
)
params_rev_res = (
params_rev_res.groupby(["n", "w"])
.apply(lambda x: x.sort_values("a", ascending=False).iloc[0])
.drop(columns=["n", "w"])
)
weights = request_json.get("weights")
neighbors = request_json.get("neighbors")
if (neighbors, weights) not in params_rev_res.index:
return make_response(
{
"error": f"Weights {weights} and neighbors {neighbors} not found in training history"
},
400,
)
output = aleialib.user_services.invoke(
"post",
"/predict",
"52fbf15e-bdfa-477a-9ca6-7f39da042c03",
{
"kwargs": {"feature_engineering_kwargs": {"names": ""}},
"model_name": "iris_knn",
"observations_set": {
"path": "notebooks/dataset/iris_no_target.csv"
},
"reload": False,
"revision": int(params_rev_res.loc[(neighbors, weights), "r"]),
"save": False,
},
)
# If output is none of those types, make sure it is json-serialisable
return make_response(
{"predictions": output, "weights": weights, "neighbors": neighbors}
)
app = Flask(__name__)
app.register_blueprint(user_services_routes)
register_swagger(app)
if __name__ == "__main__":
app.run()
Service using AleiaModel
This service creates an API with the 'attribute', 'learn' and 'predict' routes, valid for any existing AleiaModel (it takes the model name as argument).
The first call to one model could take some time, as it will be loaded from S3, but the next calls to the same model will be quicker, as the model is kept in memory.
|
You need to put multiprocessing_on_dill in your service requirements. |
You can test quickly the predict route on Iris classification with:
{
"kwargs": {
"feature_engineering_kwargs": {"names": ""}
},
"model_name": "iris_knn",
"observations_set": {
"path": "notebooks/dataset/iris_no_target.csv"
},
"reload": false,
"revision": -1,
"save": false
}
You will need to have the ‘iris_no_target.csv’ file, containing the Iris dataset without the ‘target’ column.
import os
import json
import numpy as np
import pandas as pd
from apispec import APISpec
from multiprocessing_on_dill import Lock
from apispec_webframeworks.flask import FlaskPlugin
from flask_swagger_ui import get_swaggerui_blueprint
from flask import Flask, request, Blueprint, make_response
from multiprocessing_on_dill.managers import (
AcquirerProxy,
BaseManager,
DictProxy,
)
from aleiamodel import AleiaModel, URL
aleiamodel_path_schema = {
"properties": {
"path": {
"type": "string",
"example": "notebooks/dataset/raw.csv",
"required": "false",
},
"paths": {
"type": "array",
"items": {"type": "string"},
"required": "false",
},
"url": {
"type": "object",
"properties": {
"url": {"type": "string", "required": "true"},
"filename": {"type": "string", "required": "false"},
"s3_folder": {"type": "string", "required": "false"},
},
"required": "false",
},
"concatenate_kwargs": {"type": "object", "required": "false"},
"load_kwargs": {"type": "object", "required": "false"},
"handle_type": {
"type": "boolean",
"required": "false",
"default": "true",
},
"revision": {
"type": "integer",
"required": "false",
"default": "-1",
"example": "-1",
},
}
}
APP_PREFIX = f"/user-services/invoke/{os.environ.get('USER_SERVICE_ID')}"
HOST = "127.0.0.1"
PORT = 35791
KEY = b"secret"
def get_shared_state(host, port, key):
"""This function allows to share large object among all the API workers.
Here it is used to load AleiaModels only once and keep them in memory."""
shared_dict_ = {}
shared_lock_ = Lock()
manager = BaseManager((host, port), key)
manager.register("get_dict", lambda: shared_dict_, DictProxy)
manager.register("get_lock", lambda: shared_lock_, AcquirerProxy)
try:
manager.get_server()
manager.start()
except OSError: # Address already in use
manager.connect()
return manager.get_dict(), manager.get_lock()
shared_dict, shared_lock = get_shared_state(HOST, PORT, KEY)
def handle_path(model: AleiaModel, path_name: str, request_json: dict):
if not (path_argument := request_json.get(path_name)):
return
if not isinstance(path_argument, dict):
return
private_name = f"_{path_name}"
if not hasattr(model, private_name):
return
path = path_argument.get("path")
paths = path_argument.get("paths")
if path:
setattr(model, path_name, path)
elif paths:
setattr(model, path_name, paths)
if concatenate_kwargs := path_argument.get("concatenate_kwargs"):
getattr(model, private_name).concatenate_kwargs = concatenate_kwargs
elif url := path_argument.get("url"):
setattr(model, path_name, URL(url))
if force_download := path_argument.get("force_download"):
getattr(model, private_name).force_download = force_download
if path or paths:
if load_kwargs := path_argument.get("load_kwargs"):
getattr(model, private_name).load_kwargs = load_kwargs
if handle_type := path_argument.get("handle_type"):
getattr(model, private_name).handle_type = handle_type
if revision := path_argument.get("revision"):
getattr(model, private_name).use_revision(revision)
def get_model():
request_json = request.get_json()
save = request_json.get("save", False)
name = request_json["model_name"]
reload = request_json.get("reload", False)
with shared_lock:
if name not in shared_dict or reload:
shared_dict[name] = AleiaModel(
name,
revision=request_json.get("revision", None),
must_exist=True,
ignore_requirements=True,
read_only=not save,
)
return shared_dict[name], request_json, save
def register_swagger(app_):
spec = APISpec(
title=os.environ.get("USER_SERVICE_NAME"),
version="1.0.0",
openapi_version="3.0.2",
plugins=[FlaskPlugin()],
)
swaggerui_blueprint = get_swaggerui_blueprint(
APP_PREFIX + "/docs", APP_PREFIX + "/docs/specs.json"
)
spec.components.schema("AleiaModelPathDescription", aleiamodel_path_schema)
@swaggerui_blueprint.route("/specs.json", methods=["GET"])
def get_api_doc():
return spec.to_dict(), 200
app_.register_blueprint(swaggerui_blueprint)
with app_.test_request_context():
for _, view in app_.view_functions.items():
spec.path(view=view)
user_services_routes = Blueprint(
"user_services_routes", __name__, url_prefix=APP_PREFIX
)
@user_services_routes.route("/health", methods=["GET"])
def get_health():
"""Get Health
---
get:
description: Get Health
responses:
200:
description: Returns Health boolean
content:
application/json:
schema:
type: object
properties:
health:
type: boolean
"""
return make_response({"health": True})
# routes to be written by users
@user_services_routes.route("/attribute", methods=["POST"])
def attribute():
"""Get any attribute of a model.
---
post:
tags:
- Attribute
requestBody:
content:
application/json:
schema:
type: object
properties:
model_name:
type: string
required: true
attribute:
type: string
required: true
reload:
description: Reload the model even if this service already
loaded it
type: boolean
default: false
required: false
responses:
200:
description: the attribute value
400:
description: Returns an error message
"""
model, request_json, _ = get_model()
attribute_ = request_json.get("attribute")
value = getattr(model, attribute_)
if isinstance(value, np.ndarray):
value = value.tolist()
elif isinstance(value, (pd.DataFrame, pd.Series)):
value = json.loads(value.to_json())
return make_response({attribute_: value})
# routes to be written by users
@user_services_routes.route("/fit", methods=["POST"])
def learn():
"""Learns using AleiaModel.
One can specify which model to use, with which version, the data to learn on
and which kwargs should be given to the prediction pipeline.
Using this route instead of learning in a notebook is handy for heavy models
whose learning takes hours. You can then use your notebook for something
else while other pods are dedicated to your model's learning. Use a
healt check of your model in the notebook though, to make sure the pipeline
is correct.
---
post:
tags:
- Learner
description: Learns using AleiaModel.
requestBody:
content:
application/json:
schema:
type: object
properties:
model_name:
type: string
required: true
raw_set:
$ref: '#/components/schemas/AleiaModelPathDescription'
derived_set:
$ref: '#/components/schemas/AleiaModelPathDescription'
train_set:
$ref: '#/components/schemas/AleiaModelPathDescription'
validation_set:
$ref: '#/components/schemas/AleiaModelPathDescription'
test_set:
$ref: '#/components/schemas/AleiaModelPathDescription'
revision:
type: integer
required: false
example: -1
default: -1
description: 0 is invalid, None or -1 is the latest revision.
kwargs:
type: object
required: false
properties:
reshape_x:
type: array
required: false
items:
type: integer
example: [-1, 1]
reshape_y:
type: array
required: false
items:
type: integer
example: [-1, 1]
model_kwargs:
type: object
required: false
feature_engineering_kwargs:
type: object
required: false
train_val_test_split_kwargs:
type: object
required: false
fit_kwargs:
type: object
required: false
predict_for_metrics_kwargs:
type: object
required: false
metrics_kwargs:
type: object
required: false
save:
description: Should the model be saved after use ?
type: boolean
default: true
required: false
reload:
description: Reload the model even if this service already
loaded it
type: boolean
default: false
required: false
responses:
200:
description: Validation and Test metrics as dicts.
content:
application/json:
schema:
type: object
properties:
validation:
type: object
test:
type: object
400:
description: Returns an error message
"""
model, request_json, save = get_model()
for path_name in request_json:
if not path_name.endswith("_set"):
continue
handle_path(model, path_name, request_json)
metrics = model.fit(**request_json.get("kwargs", {}))
if save:
model.save()
model.close()
return make_response({"validation": metrics[0], "test": metrics[1]})
# routes to be written by users
@user_services_routes.route("/predict", methods=["POST"])
def predict():
"""Predicts using AleiaModel.
One can specify which model to use, with which version, the data on which
to predict, and which kwargs should be given to the prediction pipeline.
---
post:
tags:
- Predictor
description: Predicts using AleiaModel.
requestBody:
content:
application/json:
schema:
type: object
properties:
model_name:
type: string
required: true
observations_set:
$ref: '#/components/schemas/AleiaModelPathDescription'
derived_observations_set:
$ref: '#/components/schemas/AleiaModelPathDescription'
revision:
type: integer
required: false
example: -1
default: -1
description: 0 is invalid, None or -1 is the latest revision.
kwargs:
type: object
required: false
properties:
reshape_x:
type: array
required: false
items:
type: integer
example: [-1, 1]
feature_engineering_kwargs:
type: object
required: false
predict_kwargs:
type: object
required: false
postprocess_kwargs:
type: object
required: false
save:
description: Should the model be saved after use ?
type: boolean
default: true
required: false
reload:
description: Reload the model even if this service already
loaded it
type: boolean
default: false
required: false
responses:
200:
description: Predicted values as a list.
content:
application/json:
schema:
type: object
properties:
predictions:
type: array
400:
description: Returns an error message
"""
model, request_json, save = get_model()
for path_name in request_json:
if not path_name.endswith("_set"):
continue
handle_path(model, path_name, request_json)
output = model.predict(**request_json.get("kwargs", {}))
if save:
model.save()
model.close()
if isinstance(output, (pd.DataFrame, pd.Series)):
output = output.values
if isinstance(output, np.ndarray):
output = output.tolist()
# If output is none of those types, make sure it is json-serialisable
return make_response({"predictions": output})
app = Flask(__name__)
app.register_blueprint(user_services_routes)
register_swagger(app)
if __name__ == "__main__":
app.run()