End to end LLM
The content will provide a full example on how to build and deploy a LLM inside the platform. For this use-case, the LLM will be used to detect sentiment from tweet texts.
The example will follow these steps:
-
Build the LLM with a notebook
-
Create some random dataset to apply the LLM on
-
Gather an open source LLM (Mistral-7B-Instruct-v0.1) and quantize it on 4bits (letting it run on 16GB GPU RAM)
-
Save the quantized LLM on platform s3 storage
-
Run prediction of the LLM on the dataset and store it in JSON
-
Transform the JSON prediction to CSV data
-
-
Deploy the LLM inside a pipeline/job
-
Convert notebook to module
-
Use the module inside pipeline
-
Execute the pipeline through a job
-
-
Deploy the LLM inside a custom-service
-
Create a custom service and configure it
-
Adapt notebook code to custom service
-
Call the custom service from code
-
Internal call
-
External call
-
-
Build the LLM with Notebook
It’s required to run a notebook using a "GPU" ressource.
Requirements
!pip install faker
!pip install torch==2.2.0
!pip install transformers==4.37.2
!pip install bitsandbytes==0.42.0
!pip install accelerate==0.26.1
Data generation (random tweets) : Require a folder "landing/" created in dataset space.
import csv
from faker import Faker
from datetime import datetime
from io import StringIO
from aleialib import s3
fake = Faker()
# Number of rows to generate
num_rows = 1000
# Create a StringIO object
csv_buffer = StringIO()
# Create CSV writer
writer = csv.writer(csv_buffer, delimiter=',')
writer.writerow(['id', 'message', 'timestamp'])
# Generate and write rows
for i in range(num_rows):
tweet_id = i + 1
tweet_message = fake.text(140)
tweet_timestamp = fake.date_time_this_decade()
writer.writerow([tweet_id, tweet_message, tweet_timestamp])
# Get the generated CSV data as a string
csv_data = csv_buffer.getvalue()
s3.save_file("landing/tweets_1k.csv",csv_data, object_type="dataset")
Download and quantize the LLM on 4bits
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from datetime import datetime
import sys
if not sys.warnoptions:
import warnings
warnings.simplefilter("ignore")
from tqdm import tqdm
from functools import partialmethod
tqdm.__init__ = partialmethod(tqdm.__init__, disable=True)
from huggingface_hub import logging
logging.set_verbosity_error()
# function to get duration
def get_duration_ms(time1, time2):
diff = time2 - time1
return diff.total_seconds() * 1000 # returns duration in milliseconds
model_name = 'mistralai/Mistral-7B-Instruct-v0.1'
access_token = "hf_yourhftoken"
print(model_name)
def load_quantized_model(model_name: str):
"""
:param model_name: Name or path of the model to be loaded.
:return: Loaded quantized model.
"""
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16
)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
quantization_config=bnb_config,
token=access_token
)
return model
def initialize_tokenizer(model_name: str):
"""
Initialize the tokenizer with the specified model_name.
:param model_name: Name or path of the model for tokenizer initialization.
:return: Initialized tokenizer.
"""
tokenizer = AutoTokenizer.from_pretrained(model_name, token=access_token)
tokenizer.bos_token_id = 1 # Set beginning of sentence token id
return tokenizer
print('Model quantization starting...')
start_time = datetime.now()
model = load_quantized_model(model_name)
end_time = datetime.now()
duration = get_duration_ms(start_time, end_time)
print('Model quantized and loaded, elapsed time : '+str(duration)+' ms')
print('Tokenizer initialization starting...')
start_time = datetime.now()
tokenizer = initialize_tokenizer(model_name)
end_time = datetime.now()
duration = get_duration_ms(start_time, end_time)
print('Tokenizer initialized, elapsed time : '+str(duration)+' ms')
Save the quantized LLM on S3 : Require a folder "models/mistralai/Mistral-7B-Instruct-v0.1/" on dataset space
from aleialib import s3
s3.save_file('models/mistralai/Mistral-7B-Instruct-v0.1/model_quantized_4bits.content', model, object_type='dataset')
s3.save_file('models/mistralai/Mistral-7B-Instruct-v0.1/tokenizer.content', tokenizer, object_type='dataset')
Load from S3 previously saved quantized LLM
from aleialib import s3
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
#loading model
model = s3.load_file('models/mistralai/Mistral-7B-Instruct-v0.1/model_quantized_4bits.content', object_type='dataset')
tokenizer = s3.load_file('models/mistralai/Mistral-7B-Instruct-v0.1/tokenizer.content', object_type='dataset')
model_name = 'mistralai/Mistral-7B-Instruct-v0.1'
Apply LLM on dataset and store results in JSON : require a folder "predictions_annotation/ created on dataset space
from aleialib import s3
import re
import json
from datetime import datetime
# function to get duration
def get_duration_ms(time1, time2):
diff = time2 - time1
return diff.total_seconds() * 1000 # returns duration in milliseconds
# function to get LLM response
def llm_apply(tokenizer, model, ask_ai, message, text):
# Define stop token ids
stop_token_ids = [0]
text = "[INST]"+ask_ai+ message+"[\INST]"
# prediction
encoded = tokenizer(text, return_tensors="pt", add_special_tokens=False)
model_input = encoded
generated_ids = model.generate(**model_input, max_new_tokens=1000, do_sample=True, pad_token_id=tokenizer.eos_token_id)
decoded = tokenizer.batch_decode(generated_ids)
#print(decoded[0])
result = re.search(r'\[\\INST\](.+)</s>', decoded[0])
if result:
result_group = result.group(1)
else:
result_group = "None"
return result_group
# loading data
content = s3.load_file("landing/tweets_1k.csv", object_type="dataset", handle_type=True)
# prompt
ask_ai = "Tell in 1 word AND in lower case AND without period the emotion from the text, by using only 1 of the following words : happiness, sadness, fear, disgust, anger, surprise : "
# list of models
list_model = [model_name+" version 1", model_name+" version 2", model_name+" version 3"]
# init variables
predictions = []
for message in content['message'][:10]:
json_obj = {}
# define stop token ids
stop_token_ids = [0]
# full prompt with content
text = "[INST]"+ask_ai+ message+"[\INST]"
# JSON generation
## 1st layer
json_obj['prompt'] = ask_ai
json_obj['content'] = message
## 2nd layer with items
items = []
for model_id in list_model:
# tracking start date
start_time = datetime.now()
# prediction
result = llm_apply(tokenizer, model, ask_ai, message, text)
# tracking end date
end_time = datetime.now()
# calculate duration
duration = get_duration_ms(start_time, end_time)
## append JSON value to annotation format
item = {}
item['id'] = model_id
item['title'] = model_name + " 4 bits quantization"
item['body'] = result
item['datetime'] = end_time.strftime("%Y-%m-%d %H:%M:%S")
item['duration'] = duration
items.append(item)
# summary
print("Message : "+message+" | Answer : "+result)
print("["+end_time.strftime("%Y-%m-%d %H:%M:%S")+"] New prediction - Model: "+model_id+" - Result: "+result+" - Duration: "+str(duration)+" ms")
# Adding "items" key and items list as its value to the JSON object
json_obj['items'] = items
predictions.append(json_obj)
json_final = json.dumps(predictions, indent=4, default=str)
print(json_final)
file_time = datetime.now()
s3.save_file("predictions_annotation/llm_predictions_"+file_time.strftime("%Y-%m-%d %H:%M:%S")+".json", json_final, object_type="dataset")
Load the JSON results and transform it to csv data : require a folder "predictions/" created on dataset space
import json
import csv
from io import StringIO
from aleialib import s3
list_files = s3.list_files("dataset/predictions_annotation/", return_result=True)
print(list_files)
for file in list_files:
raw_data = s3.load_file(file, from_s3_root=True)
# Load the json file
data = json.loads(raw_data)
# Create a StringIO object
csv_buffer = StringIO()
# Create CSV writer
writer = csv.writer(csv_buffer, delimiter=',')
writer.writerow(['id','prompt', 'content', 'model_id', 'title', 'body', 'datetime', 'duration'])
# Create a csv file and add headers
# Loop through each element in the json file
i = 0
for element in data:
# Get the prompt and content
prompt = element['prompt']
content = element['content']
# Loop through the items list
for item in element['items']:
i=i+1
id = i
# Get the remaining data
model_id = item['id']
title = item['title']
body = str(item['body'])
time = str(item['datetime'])
duration = str(item['duration'])
# Write the row to the csv file
writer.writerow([id,prompt, content, model_id, title, body, time, duration])
# Get the generated CSV data as a string
csv_data = csv_buffer.getvalue()
print(csv_data)
s3.save_file("predictions/llm_predictions_"+file_time.strftime("%Y-%m-%d %H:%M:%S")+".csv", csv_data, object_type="dataset")
Deploy the LLM inside a pipeline/job
You will be able to use the previously created LLM inside a pipeline that will let you execute a job directly or scheduled, as a batch execution type.
The pipeline structure will following this format:
-
Job (execution, scheduled or not)
-
Pipeline (execution order and description)
-
Module 1 (inference task)
-
Module 2 (data parsing task)
-
-
Modules
From the previous notebook, you will be able to create related modules directly through the notebook action Build module. It will let you select the wanted cells to create a module from.
You will need to create 2 modules corresponding to:
-
The LLM loading and inference run : task-inference
-
The data transformation from JSON prediction to CSV data : task-data-parser
After creating the module, you will need to check and update their ressource configuration. It should be set on "GPU".
Module task-inference
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from aleialib import s3
import re
import json
import time
from datetime import datetime
import sys
if not sys.warnoptions:
import warnings
warnings.simplefilter("ignore")
from tqdm import tqdm
from functools import partialmethod
tqdm.__init__ = partialmethod(tqdm.__init__, disable=True)
from huggingface_hub import logging
logging.set_verbosity_error()
model_name = 'mistralai/Mistral-7B-Instruct-v0.1'
# function to get duration
def get_duration_ms(time1, time2):
diff = time2 - time1
return diff.total_seconds() * 1000 # returns duration in milliseconds
# function to get LLM response
def llm_apply(tokenizer, model, ask_ai, message, text):
# Define stop token ids
stop_token_ids = [0]
text = "[INST]"+ask_ai+ message+"[\INST]"
# prediction
encoded = tokenizer(text, return_tensors="pt", add_special_tokens=False)
model_input = encoded
generated_ids = model.generate(**model_input, max_new_tokens=1000, do_sample=True, pad_token_id=tokenizer.eos_token_id)
decoded = tokenizer.batch_decode(generated_ids)
#print(decoded[0])
result = re.search(r'\[\\INST\](.+)</s>', decoded[0])
if result:
result_group = result.group(1)
else:
result_group = "None"
return result_group
# loading data
content = s3.load_file("landing/tweets_1k.csv", object_type="dataset", handle_type=True)
# loading model
model = s3.load_file('models/mistralai/Mistral-7B-Instruct-v0.1/model_quantized_4bits.content', object_type='dataset')
tokenizer = s3.load_file('models/mistralai/Mistral-7B-Instruct-v0.1/tokenizer.content', object_type='dataset')
# prompt
ask_ai = "Tell in 1 word AND in lower case AND without period the emotion from the text, by using only 1 of the following words : happiness, sadness, fear, disgust, anger, surprise"
# list of models
list_model = [model_name+" version 1", model_name+" version 2", model_name+" version 3"]
# init variables
predictions = []
for message in content['message']:
json_obj = {}
# define stop token ids
stop_token_ids = [0]
# full prompt with content
text = "[INST]"+ask_ai+ message+"[\INST]"
# JSON generation
## 1st layer
json_obj['prompt'] = ask_ai
json_obj['content'] = message
## 2nd layer with items
items = []
for model_id in list_model:
# tracking start date
start_time = datetime.now()
# prediction
result = llm_apply(tokenizer, model, ask_ai, message, text)
# tracking end date
end_time = datetime.now()
# calculate duration
duration = get_duration_ms(start_time, end_time)
## append JSON value to annotation format
item = {}
item['id'] = model_id
item['title'] = model_name + " 4 bits quantization"
item['body'] = result
item['datetime'] = end_time.strftime("%Y-%m-%d %H:%M:%S")
item['duration'] = duration
items.append(item)
# summary
print("["+end_time.strftime("%Y-%m-%d %H:%M:%S")+"] New prediction - Model: "+model_id+" - Result: "+result+" - Duration: "+str(duration)+" ms")
# Adding "items" key and items list as its value to the JSON object
json_obj['items'] = items
predictions.append(json_obj)
json_final = json.dumps(predictions, indent=4, default=str)
#print(json_final)
file_time = datetime.now()
s3.save_file("predictions_annotation/llm_predictions_"+file_time.strftime("%Y-%m-%d %H:%M:%S")+".json", json_final, object_type="dataset")
Module task-parser
import json
import csv
from io import StringIO
from aleialib import s3
from datetime import datetime
list_files = s3.list_files("dataset/predictions_annotation/", return_result=True)
print(list_files)
for file in list_files:
raw_data = s3.load_file(file, from_s3_root=True)
# Load the json file
data = json.loads(raw_data)
# Create a StringIO object
csv_buffer = StringIO()
# Create CSV writer
writer = csv.writer(csv_buffer, delimiter=',')
writer.writerow(['id','prompt', 'content', 'model_id', 'title', 'body', 'datetime', 'duration'])
# Create a csv file and add headers
# Loop through each element in the json file
i = 0
for element in data:
# Get the prompt and content
prompt = element['prompt']
content = element['content']
# Loop through the items list
for item in element['items']:
i=i+1
id = i
# Get the remaining data
model_id = item['id']
title = item['title']
body = str(item['body'])
time = str(item['datetime'])
duration = str(item['duration'])
# Write the row to the csv file
writer.writerow([id,prompt, content, model_id, title, body, time, duration])
# Get the generated CSV data as a string
csv_data = csv_buffer.getvalue()
file_time = datetime.now()
s3.save_file("predictions/llm_predictions_"+file_time.strftime("%Y-%m-%d %H:%M:%S")+".csv", csv_data, object_type="dataset")
Pipeline/Job
You will need to create the related pipeline : pipeline-annotation-data
Pipeline content
dataNodes:
data_node_1:
source: none.value
data_node_2:
source: none.value
data_node_3:
source: none.value
processNodes:
process_node_1:
input:
- data_node_1
output:
- data_node_2
processor: task-inference
process_node_2:
input:
- data_node_2
output:
- data_node_3
processor: task-data-parser
#schedule:
# schedule_interval: "0 0 * * *"
# start_date: "2022-01-01 00:00"
# end_date: "2022-12-31 00:00"
# timezone: "Europe/Paris"
requirement:
-
name: "torch"
version: 2.2.0
-
name: "transformers"
version: 4.37.2
-
name: "bitsandbytes"
version: 0.42.0
-
name: "accelerate"
version: 0.26.1
From the pipeline created, you will be able to build the related job. If it’s scheduled it will wait for next execution date, otherwise it will run instantly.
Deploy the LLM inside a custom-service
Custom service creation
You are able to use the LLM inside a custom service to be able to do real-time serving.
Create a custom-service, and then configure it to cover these requirements:
-
Environment type : GPU
-
Pythons libraries
-
transformers==4.37.2
-
torch==2.2.0
-
jinja2==3.0.3
-
bitsandbytes==0.42.0
-
accelerate==0.26.1
-
Code
"""Initial version of user service template.
Usage:
* @user_services_routes.route("/my-route", methods=["GET"])
def my_route(): ...
* accepted_methods: ["GET", "POST", "PUT", "DELETE", "PATCH"]
* accepted_body: application/json
"""
import os
import numpy as np
import pandas as pd
from flask import Flask, request, Blueprint, make_response
from apispec_webframeworks.flask import FlaskPlugin
from apispec import APISpec
from flask_swagger_ui import get_swaggerui_blueprint
from aleiamodel import AleiaModel, URL
APP_PREFIX = f"/user-services/invoke/{os.environ.get('USER_SERVICE_ID')}"
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
model_name = 'mistralai/Mistral-7B-Instruct-v0.1'
access_token = "hf_yourhftoken"
print(model_name)
def load_quantized_model(model_name: str):
"""
:param model_name: Name or path of the model to be loaded.
:return: Loaded quantized model.
"""
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16
)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.bfloat16,
quantization_config=bnb_config,
token=access_token
)
return model
def initialize_tokenizer(model_name: str):
"""
Initialize the tokenizer with the specified model_name.
:param model_name: Name or path of the model for tokenizer initialization.
:return: Initialized tokenizer.
"""
tokenizer = AutoTokenizer.from_pretrained(model_name, token=access_token)
tokenizer.bos_token_id = 1 # Set beginning of sentence token id
return tokenizer
model = load_quantized_model(model_name)
tokenizer = initialize_tokenizer(model_name)
def register_swagger(app_):
spec = APISpec(
title=os.environ.get("USER_SERVICE_NAME"),
version="1.0.0",
openapi_version="3.0.2",
plugins=[FlaskPlugin()],
)
swaggerui_blueprint = get_swaggerui_blueprint(
APP_PREFIX + "/docs", APP_PREFIX + "/docs/specs.json"
)
@swaggerui_blueprint.route("/specs.json", methods=["GET"])
def get_api_doc():
return spec.to_dict(), 200
app_.register_blueprint(swaggerui_blueprint)
with app_.test_request_context():
for _, view in app_.view_functions.items():
spec.path(view=view)
user_services_routes = Blueprint(
"user_services_routes", __name__, url_prefix=APP_PREFIX
)
@user_services_routes.route("/health", methods=["GET"])
def get_health():
"""Get Health
---
get:
description: Get Health
responses:
200:
description: Returns Health boolean
content:
application/json:
schema:
type: object
properties:
health:
type: boolean
"""
return make_response({"health": True})
# routes to be written by users
@user_services_routes.route("/chat", methods=["POST"])
def chat():
"""Chat using mistralai/Mistral-7B-Instruct-v0.1.
---
post:
tags:
- Chat
description: Chat using mistralai/Mistral-7B-Instruct-v0.1.
requestBody:
content:
application/json:
schema:
type: object
properties:
content:
type: string
required: true
example: "Do you have any recommandation to start working in datascience?"
responses:
200:
description: chat response from model.
content:
application/json:
schema:
type: object
properties:
predictions:
type: array
400:
description: Returns an error message
"""
messages = request.get_json("content",0)
content = messages['content']
print(content)
stop_token_ids = [0]
text = "[INST]"+content+"[\INST]"
encoded = tokenizer(text, return_tensors="pt", add_special_tokens=False)
model_input = encoded
generated_ids = model.generate(**model_input, max_new_tokens=1000, do_sample=True)
decoded = tokenizer.batch_decode(generated_ids)
print(decoded[0])
# If output is none of those types, make sure it is json-serialisable
return make_response({"response": decoded[0]})
app = Flask(__name__)
app.register_blueprint(user_services_routes)
register_swagger(app)
if __name__ == "__main__":
app.run()
Custom service call
After the custom service successfully deployed, you are able to use it by calling it from internal or external code.
Internal code (platform notebook for example)
Requirements
!pip install nltk
import nltk
nltk.download('punkt')
Call
import aleialib
from datetime import datetime
from nltk.tokenize import word_tokenize
# function to get duration
def get_duration_ms(time1, time2):
diff = time2 - time1
return diff.total_seconds() * 1000 # returns duration in milliseconds
content_api = {"content": "Write me a bio about Madonna in 100 words, giving me details about the web sources used with URL"}
# tracking start date
start_time = datetime.now()
response_llm = aleialib.user_services.invoke("post", "/chat","94a8bc6d-d307-43c9-9804-f1fe06b7c961",content_api)
# tracking end date
end_time = datetime.now()
# calculate duration
duration = get_duration_ms(start_time, end_time)
number_tokens = len(word_tokenize(response_llm['response']))
number_characters = len(response_llm['response'])
print("Call start : "+start_time.strftime("%Y-%m-%d %H:%M:%S"))
print("Call end : "+end_time.strftime("%Y-%m-%d %H:%M:%S"))
print("Time taken : "+str(duration)+" ms")
print("Number of tokens : "+str(number_tokens))
print("Speed : "+ str(number_tokens/duration*1000)+" tokens/second")
print("Number of characters : "+str(number_characters))
print("Speed : "+ str(number_characters/duration*1000)+" characters/second")
print("Response : ")
print(response_llm['response'])
External call (from api or script)
Requirements
pip install requests
pip install requests_aws4auth
pip install nltk
Call
import requests
from requests_aws4auth import AWS4Auth
import nltk
nltk.download('punkt')
from datetime import datetime
from nltk.tokenize import word_tokenize
# function to get duration
def get_duration_ms(time1, time2):
diff = time2 - time1
return diff.total_seconds() * 1000 # returns duration in milliseconds
env = 'https://api.k.prod.infra.aleia.com'
payload = {
"content": "Do you have any recommandation to start working in datascience?"
}
path = '/user-services/invoke/94a8bc6d-d307-43c9-9804-f1fe06b7c961/chat'
url = env + path
service = 's3'
region = ''
awsauth = AWS4Auth('replace with your token id', 'replace with your secret key', region, service)
print('Payload:')
print(payload)
# tracking start date
start_time = datetime.now()
r = requests.post(url, auth=awsauth, json=payload)
# tracking end date
end_time = datetime.now()
# calculate duration
duration = get_duration_ms(start_time, end_time)
number_tokens = len(word_tokenize(r.json()['response']))
number_characters = len(r.json()['response'])
print("Call start : "+start_time.strftime("%Y-%m-%d %H:%M:%S"))
print("Call end : "+end_time.strftime("%Y-%m-%d %H:%M:%S"))
print("Time taken : "+str(duration)+" ms")
print("Number of tokens : "+str(number_tokens))
print("Speed : "+ str(number_tokens/duration*1000)+" tokens/second")
print("Number of characters : "+str(number_characters))
print("Speed : "+ str(number_characters/duration*1000)+" characters/second")
print('Response:')
print(r.json()['response'])