gpt4all-ui/endpoints/chat_bar.py
2024-08-15 17:28:33 +02:00

159 lines
7.0 KiB
Python

"""
project: lollms_webui
file: chat_bar.py
author: ParisNeo
description:
This module contains a set of FastAPI routes that provide information about the Lord of Large Language and Multimodal Systems (LoLLMs) Web UI
application. These routes are linked to lollms_webui chatbox
"""
from fastapi import APIRouter, Request
from fastapi import HTTPException
from pydantic import BaseModel, Field
from lollms_webui import LOLLMSWebUI
from pydantic import BaseModel
from starlette.responses import StreamingResponse
from lollms.types import MSG_OPERATION_TYPE
from lollms.main_config import BaseConfig
from lollms.utilities import detect_antiprompt, remove_text_from_string, trace_exception, find_first_available_file_index
from ascii_colors import ASCIIColors
from lollms.databases.discussions_database import DiscussionsDB
from lollms.types import SENDER_TYPES
from typing import List
from pathlib import Path
import tqdm
from fastapi import FastAPI, UploadFile, File
import shutil
import os
import platform
from urllib.parse import urlparse
from functools import partial
from datetime import datetime
from utilities.execution_engines.python_execution_engine import execute_python
from utilities.execution_engines.latex_execution_engine import execute_latex
from utilities.execution_engines.shell_execution_engine import execute_bash
from lollms.security import sanitize_path, forbid_remote_access
from lollms.internet import scrape_and_save
from urllib.parse import urlparse
import threading
# ----------------------- Defining router and main class ------------------------------
router = APIRouter()
lollmsElfServer:LOLLMSWebUI = LOLLMSWebUI.get_instance()
class AddWebPageRequest(BaseModel):
client_id: str = Field(...)
url: str = Field(..., description="Url to be used")
class CmdExecutionRequest(BaseModel):
client_id: str = Field(...)
command: str = Field(..., description="Url to be used")
parameters: List[str] = Field(..., description="Command parameters")
"""
@router.post("/execute_personality_command")
async def execute_personality_command(request: CmdExecutionRequest):
client_id = request.client_id
client = lollmsElfServer.session.get_client(client_id)
lollmsElfServer.cancel_gen = False
client.generated_text=""
client.cancel_generation=False
client.continuing=False
client.first_chunk=True
if not lollmsElfServer.model:
ASCIIColors.error("Model not selected. Please select a model")
lollmsElfServer.error("Model not selected. Please select a model", client_id=client_id)
return {'status':False,"error":"Model not selected. Please select a model"}
if not lollmsElfServer.busy:
if lollmsElfServer.session.get_client(client_id).discussion is None:
if lollmsElfServer.db.does_last_discussion_have_messages():
lollmsElfServer.session.get_client(client_id).discussion = lollmsElfServer.db.create_discussion()
else:
lollmsElfServer.session.get_client(client_id).discussion = lollmsElfServer.db.load_last_discussion()
ump = lollmsElfServer.config.discussion_prompt_separator +lollmsElfServer.config.user_name.strip() if lollmsElfServer.config.use_user_name_in_discussions else lollmsElfServer.personality.user_message_prefix
message = lollmsElfServer.session.get_client(client_id).discussion.add_message(
message_type = MSG_OPERATION_TYPE.MSG_OPERATION_TYPE_SET_CONTENT.value,
sender_type = SENDER_TYPES.SENDER_TYPES_USER.value,
sender = ump.replace(lollmsElfServer.config.discussion_prompt_separator,"").replace(":",""),
content="",
metadata=None,
parent_message_id=lollmsElfServer.message_id
)
lollmsElfServer.busy=True
command = request.command
parameters = request.parameters
lollmsElfServer.prepare_reception(client_id)
if lollmsElfServer.personality.processor is not None:
lollmsElfServer.start_time = datetime.now()
lollmsElfServer.personality.processor.callback = partial(lollmsElfServer.process_data, client_id=client_id)
lollmsElfServer.personality.processor.execute_command(command, parameters)
else:
lollmsElfServer.warning("Non scripted personalities do not support commands",client_id=client_id)
lollmsElfServer.close_message(client_id)
lollmsElfServer.busy=False
#tpe = threading.Thread(target=lollmsElfServer.start_message_generation, args=(message, message_id, client_id))
#tpe.start()
else:
lollmsElfServer.error("I am busy. Come back later.", client_id=client_id)
return {'status':False,"error":"I am busy. Come back later."}
lollmsElfServer.busy=False
return {'status':True,}
"""
MAX_PAGE_SIZE = 10000000
@router.post("/add_webpage")
async def add_webpage(request: AddWebPageRequest):
forbid_remote_access(lollmsElfServer)
client = lollmsElfServer.session.get_client(request.client_id)
if client is None:
raise HTTPException(status_code=400, detail="Unknown client. This service only accepts lollms webui requests")
def do_scraping():
lollmsElfServer.ShowBlockingMessage("Scraping web page\nPlease wait...")
ASCIIColors.yellow("Scaping web page")
client = lollmsElfServer.session.get_client(request.client_id)
url = request.url
index = find_first_available_file_index(client.discussion.discussion_folder/"text_data","web_",".txt")
try:
file_path=sanitize_path(str(client.discussion.discussion_folder/"text_data"/f"web_{index}.txt"),True)
except Exception as ex:
lollmsElfServer.HideBlockingMessage()
raise ex
try:
result = urlparse(url)
if all([result.scheme, result.netloc]): # valid URL
if not scrape_and_save(url=url, file_path=file_path,max_size=MAX_PAGE_SIZE):
lollmsElfServer.HideBlockingMessage()
raise HTTPException(status_code=400, detail="Web page too large")
else:
lollmsElfServer.HideBlockingMessage()
raise HTTPException(status_code=400, detail="Invalid URL")
except Exception as e:
trace_exception(e)
lollmsElfServer.HideBlockingMessage()
raise HTTPException(status_code=400, detail=f"Exception : {e}")
try:
client.discussion.add_file(file_path, client, partial(lollmsElfServer.process_data, client_id = request.client_id))
# File saved successfully
lollmsElfServer.HideBlockingMessage()
lollmsElfServer.refresh_files()
except Exception as e:
# Error occurred while saving the file
lollmsElfServer.HideBlockingMessage()
lollmsElfServer.refresh_files()
return {'status':False,"error":str(e)}
client.generation_thread = threading.Thread(target=do_scraping)
client.generation_thread.start()
return {'status':True}