End to end LLM

The content will provide a full example on how to build and deploy a LLM inside the platform. For this use-case, the LLM will be used to detect sentiment from tweet texts.

The example will follow these steps:

  1. Build the LLM with a notebook

    1. Create some random dataset to apply the LLM on

    2. Gather an open source LLM (Mistral-7B-Instruct-v0.1) and quantize it on 4bits (letting it run on 16GB GPU RAM)

    3. Save the quantized LLM on platform s3 storage

    4. Run prediction of the LLM on the dataset and store it in JSON

    5. Transform the JSON prediction to CSV data

  2. Deploy the LLM inside a pipeline/job

    1. Convert notebook to module

    2. Use the module inside pipeline

    3. Execute the pipeline through a job

  3. Deploy the LLM inside a custom-service

    1. Create a custom service and configure it

    2. Adapt notebook code to custom service

    3. Call the custom service from code

      1. Internal call

      2. External call

Build the LLM with Notebook

It’s required to run a notebook using a "GPU" ressource.

Requirements

!pip install faker
!pip install torch==2.2.0
!pip install transformers==4.37.2
!pip install bitsandbytes==0.42.0
!pip install accelerate==0.26.1

Data generation (random tweets) : Require a folder "landing/" created in dataset space.

import csv
from faker import Faker
from datetime import datetime
from io import StringIO

from aleialib import s3

fake = Faker()

# Number of rows to generate
num_rows = 1000

# Create a StringIO object
csv_buffer = StringIO()

# Create CSV writer
writer = csv.writer(csv_buffer, delimiter=',')
writer.writerow(['id', 'message', 'timestamp'])

# Generate and write rows
for i in range(num_rows):
    tweet_id = i + 1
    tweet_message = fake.text(140)
    tweet_timestamp = fake.date_time_this_decade()

    writer.writerow([tweet_id, tweet_message, tweet_timestamp])

# Get the generated CSV data as a string
csv_data = csv_buffer.getvalue()

s3.save_file("landing/tweets_1k.csv",csv_data, object_type="dataset")

Download and quantize the LLM on 4bits

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from datetime import datetime

import sys

if not sys.warnoptions:
    import warnings
    warnings.simplefilter("ignore")

from tqdm import tqdm
from functools import partialmethod

tqdm.__init__ = partialmethod(tqdm.__init__, disable=True)

from huggingface_hub import logging

logging.set_verbosity_error()

# function to get duration
def get_duration_ms(time1, time2):
    diff = time2 - time1
    return diff.total_seconds() * 1000  # returns duration in milliseconds


model_name = 'mistralai/Mistral-7B-Instruct-v0.1'
access_token = "hf_yourhftoken"

print(model_name)

def load_quantized_model(model_name: str):
    """
    :param model_name: Name or path of the model to be loaded.
    :return: Loaded quantized model.
    """
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16
    )

    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.bfloat16,
        quantization_config=bnb_config,
        token=access_token
    )

    return model

def initialize_tokenizer(model_name: str):
    """
    Initialize the tokenizer with the specified model_name.

    :param model_name: Name or path of the model for tokenizer initialization.
    :return: Initialized tokenizer.
    """
    tokenizer = AutoTokenizer.from_pretrained(model_name, token=access_token)
    tokenizer.bos_token_id = 1  # Set beginning of sentence token id
    return tokenizer

print('Model quantization starting...')
start_time = datetime.now()
model = load_quantized_model(model_name)
end_time = datetime.now()
duration = get_duration_ms(start_time, end_time)
print('Model quantized and loaded, elapsed time : '+str(duration)+' ms')

print('Tokenizer initialization starting...')
start_time = datetime.now()
tokenizer = initialize_tokenizer(model_name)
end_time = datetime.now()
duration = get_duration_ms(start_time, end_time)
print('Tokenizer initialized, elapsed time : '+str(duration)+' ms')

Save the quantized LLM on S3 : Require a folder "models/mistralai/Mistral-7B-Instruct-v0.1/" on dataset space

from aleialib import s3

s3.save_file('models/mistralai/Mistral-7B-Instruct-v0.1/model_quantized_4bits.content', model, object_type='dataset')
s3.save_file('models/mistralai/Mistral-7B-Instruct-v0.1/tokenizer.content', tokenizer, object_type='dataset')

Load from S3 previously saved quantized LLM

from aleialib import s3
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

#loading model
model = s3.load_file('models/mistralai/Mistral-7B-Instruct-v0.1/model_quantized_4bits.content', object_type='dataset')
tokenizer = s3.load_file('models/mistralai/Mistral-7B-Instruct-v0.1/tokenizer.content', object_type='dataset')
model_name = 'mistralai/Mistral-7B-Instruct-v0.1'

Apply LLM on dataset and store results in JSON : require a folder "predictions_annotation/ created on dataset space

from aleialib import s3
import re
import json
from datetime import datetime

# function to get duration
def get_duration_ms(time1, time2):
    diff = time2 - time1
    return diff.total_seconds() * 1000  # returns duration in milliseconds

# function to get LLM response
def llm_apply(tokenizer, model, ask_ai, message, text):
    # Define stop token ids
    stop_token_ids = [0]

    text = "[INST]"+ask_ai+ message+"[\INST]"

    # prediction
    encoded = tokenizer(text, return_tensors="pt", add_special_tokens=False)
    model_input = encoded
    generated_ids = model.generate(**model_input, max_new_tokens=1000, do_sample=True, pad_token_id=tokenizer.eos_token_id)
    decoded = tokenizer.batch_decode(generated_ids)
    #print(decoded[0])
    result = re.search(r'\[\\INST\](.+)</s>', decoded[0])
    if result:
        result_group = result.group(1)
    else:
        result_group = "None"

    return result_group


# loading data
content = s3.load_file("landing/tweets_1k.csv", object_type="dataset", handle_type=True)

# prompt
ask_ai = "Tell in 1 word AND in lower case AND without period the emotion from the text, by using only 1 of the following words : happiness, sadness, fear, disgust, anger, surprise : "

# list of models
list_model = [model_name+" version 1", model_name+" version 2", model_name+" version 3"]

# init variables

predictions = []

for message in content['message'][:10]:

    json_obj = {}

    # define stop token ids
    stop_token_ids = [0]

    # full prompt with content
    text = "[INST]"+ask_ai+ message+"[\INST]"

    # JSON generation

    ## 1st layer
    json_obj['prompt'] = ask_ai
    json_obj['content'] = message

    ## 2nd layer with items
    items = []

    for model_id in list_model:
         # tracking start date
        start_time = datetime.now()

        # prediction
        result = llm_apply(tokenizer, model, ask_ai, message, text)

        # tracking end date
        end_time = datetime.now()

        # calculate duration
        duration = get_duration_ms(start_time, end_time)

        ## append JSON value to annotation format
        item = {}
        item['id'] = model_id
        item['title'] = model_name + " 4 bits quantization"
        item['body'] = result
        item['datetime'] = end_time.strftime("%Y-%m-%d %H:%M:%S")
        item['duration'] = duration
        items.append(item)

        # summary
        print("Message : "+message+" | Answer : "+result)
        print("["+end_time.strftime("%Y-%m-%d %H:%M:%S")+"] New prediction - Model: "+model_id+" - Result: "+result+" - Duration: "+str(duration)+" ms")

    # Adding "items" key and items list as its value to the JSON object
    json_obj['items'] = items
    predictions.append(json_obj)

json_final = json.dumps(predictions, indent=4, default=str)
print(json_final)

file_time = datetime.now()
s3.save_file("predictions_annotation/llm_predictions_"+file_time.strftime("%Y-%m-%d %H:%M:%S")+".json", json_final, object_type="dataset")

Load the JSON results and transform it to csv data : require a folder "predictions/" created on dataset space

import json
import csv
from io import StringIO
from aleialib import s3

list_files = s3.list_files("dataset/predictions_annotation/", return_result=True)
print(list_files)

for file in list_files:
    raw_data = s3.load_file(file, from_s3_root=True)

    # Load the json file
    data =  json.loads(raw_data)

    # Create a StringIO object
    csv_buffer = StringIO()

    # Create CSV writer
    writer = csv.writer(csv_buffer, delimiter=',')
    writer.writerow(['id','prompt', 'content', 'model_id', 'title', 'body', 'datetime', 'duration'])

    # Create a csv file and add headers
    # Loop through each element in the json file
    i = 0
    for element in data:

        # Get the prompt and content
        prompt = element['prompt']
        content = element['content']

        # Loop through the items list
        for item in element['items']:
            i=i+1
            id = i
            # Get the remaining data
            model_id = item['id']
            title = item['title']
            body = str(item['body'])
            time = str(item['datetime'])
            duration = str(item['duration'])

            # Write the row to the csv file
            writer.writerow([id,prompt, content, model_id, title, body, time, duration])

    # Get the generated CSV data as a string
    csv_data = csv_buffer.getvalue()
    print(csv_data)

    s3.save_file("predictions/llm_predictions_"+file_time.strftime("%Y-%m-%d %H:%M:%S")+".csv", csv_data, object_type="dataset")

Deploy the LLM inside a pipeline/job

You will be able to use the previously created LLM inside a pipeline that will let you execute a job directly or scheduled, as a batch execution type.

The pipeline structure will following this format:

  1. Job (execution, scheduled or not)

    1. Pipeline (execution order and description)

      1. Module 1 (inference task)

      2. Module 2 (data parsing task)

Modules

From the previous notebook, you will be able to create related modules directly through the notebook action Build module. It will let you select the wanted cells to create a module from.

You will need to create 2 modules corresponding to:

  1. The LLM loading and inference run : task-inference

  2. The data transformation from JSON prediction to CSV data : task-data-parser

After creating the module, you will need to check and update their ressource configuration. It should be set on "GPU".

Module task-inference

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from aleialib import s3
import re
import json
import time
from datetime import datetime

import sys

if not sys.warnoptions:
    import warnings
    warnings.simplefilter("ignore")

from tqdm import tqdm
from functools import partialmethod

tqdm.__init__ = partialmethod(tqdm.__init__, disable=True)

from huggingface_hub import logging

logging.set_verbosity_error()

model_name = 'mistralai/Mistral-7B-Instruct-v0.1'

# function to get duration
def get_duration_ms(time1, time2):
    diff = time2 - time1
    return diff.total_seconds() * 1000  # returns duration in milliseconds

# function to get LLM response
def llm_apply(tokenizer, model, ask_ai, message, text):
    # Define stop token ids
    stop_token_ids = [0]

    text = "[INST]"+ask_ai+ message+"[\INST]"

    # prediction
    encoded = tokenizer(text, return_tensors="pt", add_special_tokens=False)
    model_input = encoded
    generated_ids = model.generate(**model_input, max_new_tokens=1000, do_sample=True, pad_token_id=tokenizer.eos_token_id)
    decoded = tokenizer.batch_decode(generated_ids)
    #print(decoded[0])
    result = re.search(r'\[\\INST\](.+)</s>', decoded[0])
    if result:
        result_group = result.group(1)
    else:
        result_group = "None"

    return result_group


# loading data
content = s3.load_file("landing/tweets_1k.csv", object_type="dataset", handle_type=True)

# loading model
model = s3.load_file('models/mistralai/Mistral-7B-Instruct-v0.1/model_quantized_4bits.content', object_type='dataset')
tokenizer = s3.load_file('models/mistralai/Mistral-7B-Instruct-v0.1/tokenizer.content', object_type='dataset')

# prompt
ask_ai = "Tell in 1 word AND in lower case AND without period the emotion from the text, by using only 1 of the following words : happiness, sadness, fear, disgust, anger, surprise"

# list of models
list_model = [model_name+" version 1", model_name+" version 2", model_name+" version 3"]

# init variables
predictions = []

for message in content['message']:

    json_obj = {}

    # define stop token ids
    stop_token_ids = [0]

    # full prompt with content
    text = "[INST]"+ask_ai+ message+"[\INST]"

    # JSON generation

    ## 1st layer
    json_obj['prompt'] = ask_ai
    json_obj['content'] = message

    ## 2nd layer with items
    items = []

    for model_id in list_model:
         # tracking start date
        start_time = datetime.now()

        # prediction
        result = llm_apply(tokenizer, model, ask_ai, message, text)

        # tracking end date
        end_time = datetime.now()

        # calculate duration
        duration = get_duration_ms(start_time, end_time)

        ## append JSON value to annotation format
        item = {}
        item['id'] = model_id
        item['title'] = model_name + " 4 bits quantization"
        item['body'] = result
        item['datetime'] = end_time.strftime("%Y-%m-%d %H:%M:%S")
        item['duration'] = duration
        items.append(item)

        # summary
        print("["+end_time.strftime("%Y-%m-%d %H:%M:%S")+"] New prediction - Model: "+model_id+" - Result: "+result+" - Duration: "+str(duration)+" ms")

    # Adding "items" key and items list as its value to the JSON object
    json_obj['items'] = items
    predictions.append(json_obj)

json_final = json.dumps(predictions, indent=4, default=str)
#print(json_final)

file_time = datetime.now()
s3.save_file("predictions_annotation/llm_predictions_"+file_time.strftime("%Y-%m-%d %H:%M:%S")+".json", json_final, object_type="dataset")

Module task-parser

import json
import csv
from io import StringIO
from aleialib import s3
from datetime import datetime

list_files = s3.list_files("dataset/predictions_annotation/", return_result=True)
print(list_files)

for file in list_files:
    raw_data = s3.load_file(file, from_s3_root=True)

    # Load the json file
    data =  json.loads(raw_data)

    # Create a StringIO object
    csv_buffer = StringIO()

    # Create CSV writer
    writer = csv.writer(csv_buffer, delimiter=',')
    writer.writerow(['id','prompt', 'content', 'model_id', 'title', 'body', 'datetime', 'duration'])

    # Create a csv file and add headers
    # Loop through each element in the json file
    i = 0
    for element in data:

        # Get the prompt and content
        prompt = element['prompt']
        content = element['content']

        # Loop through the items list
        for item in element['items']:
            i=i+1
            id = i
            # Get the remaining data
            model_id = item['id']
            title = item['title']
            body = str(item['body'])
            time = str(item['datetime'])
            duration = str(item['duration'])

            # Write the row to the csv file
            writer.writerow([id,prompt, content, model_id, title, body, time, duration])

    # Get the generated CSV data as a string
    csv_data = csv_buffer.getvalue()

    file_time = datetime.now()
    s3.save_file("predictions/llm_predictions_"+file_time.strftime("%Y-%m-%d %H:%M:%S")+".csv", csv_data, object_type="dataset")

Pipeline/Job

You will need to create the related pipeline : pipeline-annotation-data

Pipeline content

dataNodes:
  data_node_1:
    source: none.value
  data_node_2:
    source: none.value
  data_node_3:
    source: none.value
processNodes:
  process_node_1:
    input:
    - data_node_1
    output:
    - data_node_2
    processor: task-inference
  process_node_2:
    input:
    - data_node_2
    output:
    - data_node_3
    processor: task-data-parser

#schedule:
#  schedule_interval: "0 0 * * *"
#  start_date: "2022-01-01 00:00"
#  end_date: "2022-12-31 00:00"
#  timezone: "Europe/Paris"

requirement:
    -
        name: "torch"
        version: 2.2.0
    -
        name: "transformers"
        version: 4.37.2
    -
        name: "bitsandbytes"
        version: 0.42.0
    -
        name: "accelerate"
        version: 0.26.1

From the pipeline created, you will be able to build the related job. If it’s scheduled it will wait for next execution date, otherwise it will run instantly.

Deploy the LLM inside a custom-service

Custom service creation

You are able to use the LLM inside a custom service to be able to do real-time serving.

Create a custom-service, and then configure it to cover these requirements:

  1. Environment type : GPU

  2. Pythons libraries

    1. transformers==4.37.2

    2. torch==2.2.0

    3. jinja2==3.0.3

    4. bitsandbytes==0.42.0

    5. accelerate==0.26.1

Code

"""Initial version of user service template.

Usage:
    * @user_services_routes.route("/my-route", methods=["GET"])
      def my_route(): ...
    * accepted_methods: ["GET", "POST", "PUT", "DELETE", "PATCH"]
    * accepted_body: application/json
"""
import os

import numpy as np
import pandas as pd
from flask import Flask, request, Blueprint, make_response
from apispec_webframeworks.flask import FlaskPlugin
from apispec import APISpec
from flask_swagger_ui import get_swaggerui_blueprint

from aleiamodel import AleiaModel, URL

APP_PREFIX = f"/user-services/invoke/{os.environ.get('USER_SERVICE_ID')}"

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig

model_name = 'mistralai/Mistral-7B-Instruct-v0.1'
access_token = "hf_yourhftoken"

print(model_name)

def load_quantized_model(model_name: str):
    """
    :param model_name: Name or path of the model to be loaded.
    :return: Loaded quantized model.
    """
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_use_double_quant=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16
    )

    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.bfloat16,
        quantization_config=bnb_config,
        token=access_token
    )

    return model

def initialize_tokenizer(model_name: str):
    """
    Initialize the tokenizer with the specified model_name.

    :param model_name: Name or path of the model for tokenizer initialization.
    :return: Initialized tokenizer.
    """
    tokenizer = AutoTokenizer.from_pretrained(model_name, token=access_token)
    tokenizer.bos_token_id = 1  # Set beginning of sentence token id
    return tokenizer

model = load_quantized_model(model_name)

tokenizer = initialize_tokenizer(model_name)

def register_swagger(app_):
    spec = APISpec(
        title=os.environ.get("USER_SERVICE_NAME"),
        version="1.0.0",
        openapi_version="3.0.2",
        plugins=[FlaskPlugin()],
    )
    swaggerui_blueprint = get_swaggerui_blueprint(
        APP_PREFIX + "/docs", APP_PREFIX + "/docs/specs.json"
    )

    @swaggerui_blueprint.route("/specs.json", methods=["GET"])
    def get_api_doc():
        return spec.to_dict(), 200

    app_.register_blueprint(swaggerui_blueprint)
    with app_.test_request_context():
        for _, view in app_.view_functions.items():
            spec.path(view=view)


user_services_routes = Blueprint(
    "user_services_routes", __name__, url_prefix=APP_PREFIX
)


@user_services_routes.route("/health", methods=["GET"])
def get_health():
    """Get Health

    ---
    get:
      description: Get Health
      responses:
        200:
          description: Returns Health boolean
          content:
            application/json:
              schema:
                type: object
                properties:
                  health:
                    type: boolean
    """
    return make_response({"health": True})


# routes to be written by users
@user_services_routes.route("/chat", methods=["POST"])
def chat():
    """Chat using mistralai/Mistral-7B-Instruct-v0.1.

    ---
    post:
      tags:
        - Chat
      description: Chat using mistralai/Mistral-7B-Instruct-v0.1.
      requestBody:
        content:
          application/json:
            schema:
              type: object
              properties:
                content:
                  type: string
                  required: true
                  example: "Do you have any recommandation to start working in datascience?"

      responses:
        200:
          description: chat response from model.
          content:
            application/json:
              schema:
                type: object
                properties:
                  predictions:
                    type: array
        400:
          description: Returns an error message
    """
    messages = request.get_json("content",0)
    content = messages['content']

    print(content)

    stop_token_ids = [0]

    text = "[INST]"+content+"[\INST]"

    encoded = tokenizer(text, return_tensors="pt", add_special_tokens=False)
    model_input = encoded
    generated_ids = model.generate(**model_input, max_new_tokens=1000, do_sample=True)
    decoded = tokenizer.batch_decode(generated_ids)
    print(decoded[0])

    # If output is none of those types, make sure it is json-serialisable
    return make_response({"response": decoded[0]})


app = Flask(__name__)
app.register_blueprint(user_services_routes)
register_swagger(app)


if __name__ == "__main__":
    app.run()

Custom service call

After the custom service successfully deployed, you are able to use it by calling it from internal or external code.

Internal code (platform notebook for example)

Requirements

!pip install nltk
import nltk
nltk.download('punkt')

Call

import aleialib
from datetime import datetime
from nltk.tokenize import word_tokenize

# function to get duration
def get_duration_ms(time1, time2):
    diff = time2 - time1
    return diff.total_seconds() * 1000  # returns duration in milliseconds

content_api = {"content": "Write me a bio about Madonna in 100 words, giving me details about the web sources used with URL"}

# tracking start date
start_time = datetime.now()

response_llm = aleialib.user_services.invoke("post", "/chat","94a8bc6d-d307-43c9-9804-f1fe06b7c961",content_api)

# tracking end date
end_time = datetime.now()

# calculate duration
duration = get_duration_ms(start_time, end_time)

number_tokens = len(word_tokenize(response_llm['response']))
number_characters = len(response_llm['response'])

print("Call start : "+start_time.strftime("%Y-%m-%d %H:%M:%S"))
print("Call end : "+end_time.strftime("%Y-%m-%d %H:%M:%S"))
print("Time taken : "+str(duration)+" ms")
print("Number of tokens : "+str(number_tokens))
print("Speed : "+ str(number_tokens/duration*1000)+" tokens/second")
print("Number of characters : "+str(number_characters))
print("Speed : "+ str(number_characters/duration*1000)+" characters/second")
print("Response : ")
print(response_llm['response'])

External call (from api or script)

Requirements

pip install requests
pip install requests_aws4auth
pip install nltk

Call

import requests
from requests_aws4auth import AWS4Auth

import nltk
nltk.download('punkt')

from datetime import datetime
from nltk.tokenize import word_tokenize

# function to get duration
def get_duration_ms(time1, time2):
    diff = time2 - time1
    return diff.total_seconds() * 1000  # returns duration in milliseconds


env = 'https://api.k.prod.infra.aleia.com'

payload = {
  "content": "Do you have any recommandation to start working in datascience?"
}

path = '/user-services/invoke/94a8bc6d-d307-43c9-9804-f1fe06b7c961/chat'
url = env + path
service = 's3'
region = ''

awsauth = AWS4Auth('replace with your token id', 'replace with your secret key', region, service)
print('Payload:')
print(payload)

# tracking start date
start_time = datetime.now()

r = requests.post(url, auth=awsauth, json=payload)

# tracking end date
end_time = datetime.now()

# calculate duration
duration = get_duration_ms(start_time, end_time)

number_tokens = len(word_tokenize(r.json()['response']))
number_characters = len(r.json()['response'])

print("Call start : "+start_time.strftime("%Y-%m-%d %H:%M:%S"))
print("Call end : "+end_time.strftime("%Y-%m-%d %H:%M:%S"))
print("Time taken : "+str(duration)+" ms")
print("Number of tokens : "+str(number_tokens))
print("Speed : "+ str(number_tokens/duration*1000)+" tokens/second")
print("Number of characters : "+str(number_characters))
print("Speed : "+ str(number_characters/duration*1000)+" characters/second")

print('Response:')
print(r.json()['response'])