Re-upload to own selfhosted git

This commit is contained in:
Danilo Cesa 2025-08-09 17:27:25 +08:00
commit 617819eb61
56 changed files with 2168 additions and 0 deletions

20
.gcloudignore Normal file
View File

@ -0,0 +1,20 @@
# This file specifies files that are *not* uploaded to Google Cloud
# using gcloud. It follows the same syntax as .gitignore, with the addition of
# "#!include" directives (which insert the entries of the given .gitignore-style
# file at that point).
#
# For more information, run:
# $ gcloud topic gcloudignore
#
.gcloudignore
# If you would like to upload your .git directory, .gitignore file or files
# from your .gitignore file, remove the corresponding line
# below:
.git
.gitignore
# Python pycache:
__pycache__/
# Ignored by the build system
/setup.cfg
venv/

9
.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
venv
__pycache__/
.env
text/
.terraform.lock.hcl
.terraform/
terraform.tfstate
terraform.tfstate.backup
terraform.tfvars

7
Dockerfile Normal file
View File

@ -0,0 +1,7 @@
FROM python:latest
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./
RUN pip install -r requirements.txt
#CMD [ "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000" ]
CMD exec gunicorn --bind :$PORT --workers 1 --worker-class uvicorn.workers.UvicornWorker --threads 8 main:app

6
app.yaml Normal file
View File

@ -0,0 +1,6 @@
runtime: python311
# env: flex
entrypoint: gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app
instance_class: F4
# network:
# session_affinity: true

22
cloudbuild.yaml Normal file
View File

@ -0,0 +1,22 @@
steps:
# Build the container image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', 'gcr.io/multichannel-chat-widget/backend:$COMMIT_SHA', '.']
# Push the container image to Container Registry
- name: 'gcr.io/cloud-builders/docker'
args: ['push', 'gcr.io/multichannel-chat-widget/backend:$COMMIT_SHA']
# Deploy container image to Cloud Run
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: gcloud
args:
- 'run'
- 'deploy'
- 'backend'
- '--image'
- 'gcr.io/multichannel-chat-widget/backend:$COMMIT_SHA'
- '--region'
- 'asia-east1'
- '--port'
- '8000'
images:
- 'gcr.io/multichannel-chat-widget/backend:$COMMIT_SHA'

67
compute_engine.tf Normal file
View File

@ -0,0 +1,67 @@
terraform {
required_providers {
google = {
source = "hashicorp/google"
version = "4.74.0"
}
}
}
provider "google" {
project = "multichannel-chat-widget"
region = "asia-east1"
}
resource "google_compute_network" "vpc_network" {
name = "multichannel-chat-network"
auto_create_subnetworks = false
mtu = 1460
}
resource "google_compute_subnetwork" "default" {
name = "multichannel-chat-subnet"
ip_cidr_range = "10.0.1.0/24"
region = "asia-east1"
network = google_compute_network.vpc_network.id
}
# Create a single Compute Engine instance
resource "google_compute_instance" "default" {
name = "multichannel-chat-vm"
machine_type = "f1-micro"
zone = "asia-east1-a"
tags = ["ssh"]
boot_disk {
initialize_params {
image = "debian-cloud/debian-11"
}
}
# Install requirements
metadata_startup_script = "sudo apt-get update; sudo apt-get install -yq build-essential python3-pip rsync; pip install -r requirements.txt"
network_interface {
subnetwork = google_compute_subnetwork.default.id
access_config {
# Include this section to give the VM an external IP address
}
}
}
resource "google_compute_firewall" "ssh" {
name = "allow-ssh"
allow {
ports = ["22"]
protocol = "tcp"
}
direction = "INGRESS"
network = google_compute_network.vpc_network.id
priority = 1000
source_ranges = ["0.0.0.0/0"]
target_tags = ["ssh"]
}

118
main.py Normal file
View File

@ -0,0 +1,118 @@
import os
import sys
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
from src.websocket.websocket_manager import ConnectionManager
from src.users.router import router as user_router
from src.files.router import router as file_router
from src.bot.router import router as bot_router
from src.etl.router import router as etl_router
from src.login.router import router as login_router
from src.conversation.router import router as conversation_router
from src.chat.index import chat_manager
from src.chat.callback import StreamingLLMCallbackHandler
from src.chat.schemas import ChatResponse
load_dotenv()
tags_metadata = [
{
"name": "users",
"description": "Operations with users.",
},
{
"name": "files",
"description": "Operations with file handling.",
},
{
"name": "bot",
"description": "Operations with bot.",
},
{
"name": "etl",
"description": "Operations with ETL process.",
},
{
"name": "conversations",
"description": "Operations with conversations process.",
},
]
app = FastAPI(openapi_tags=tags_metadata)
app.mount("/static", StaticFiles(directory="templates"), name="static")
templates = Jinja2Templates(directory="templates")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(user_router)
app.include_router(file_router)
app.include_router(bot_router)
app.include_router(etl_router)
app.include_router(login_router)
app.include_router(conversation_router)
@app.get("/",response_class=HTMLResponse)
async def read_index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/knowledge-base",response_class=HTMLResponse, tags=["ui"])
async def read_index(request: Request):
return templates.TemplateResponse("knowledge-base.html", {"request": request})
@app.get("/websocket-tester",response_class=HTMLResponse, tags=["tester"], summary="UI for testing websocket")
async def read_index(request: Request):
return templates.TemplateResponse("websocket-tester.html", {"request": request})
@app.get("/upload-tester",response_class=HTMLResponse, tags=["tester"], summary="UI for testing file upload")
async def read_index(request: Request):
return templates.TemplateResponse("upload-tester.html", {"request": request})
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
while True:
try:
data = await websocket.receive_json()
q = data["message"]
stream_handler = StreamingLLMCallbackHandler(websocket)
await websocket.send_text(".-botResponseStartStreaming")
response = await chat_manager.chat_response({
"question": q,
"chat_session": data["chat_session"],
"bot_id": data["bot_id"],
"stream_handler": stream_handler,
})
await websocket.send_text(".-botResponseEndStreaming")
except WebSocketDisconnect:
manager.disconnect(websocket)
# await websocket.send_text(f"Connection lost")
if __name__ == "__main__":
uvicorn.run(app, host='0.0.0.0', port=int(os.environ.get('PORT', 8000)), reload=True)

67
requirements.txt Normal file
View File

@ -0,0 +1,67 @@
aiohttp==3.8.4
aiosignal==1.3.1
anyio==3.7.0
async-timeout==4.0.2
attrs==23.1.0
certifi==2023.5.7
charset-normalizer==3.1.0
click==8.1.3
dataclasses-json==0.5.8
dnspython==2.3.0
exceptiongroup==1.1.1
fastapi==0.98.0
frozenlist==1.3.3
greenlet==2.0.2
gunicorn==20.1.0
h11==0.14.0
httptools==0.5.0
idna==3.4
langchain==0.0.306
langchainplus-sdk==0.0.17
loguru==0.7.0
marshmallow==3.19.0
marshmallow-enum==1.5.1
multidict==6.0.4
mypy-extensions==1.0.0
numexpr==2.8.4
numpy==1.25.0
openapi-schema-pydantic==1.2.4
packaging==23.1
pinecone-client==2.2.2
pydantic==1.10.9
python-dateutil==2.8.2
python-dotenv==1.0.0
PyYAML==6.0
regex==2023.6.3
requests==2.31.0
six==1.16.0
sniffio==1.3.0
SQLAlchemy==2.0.16
starlette==0.27.0
tenacity==8.2.2
tiktoken==0.4.0
tqdm==4.65.0
typing-inspect==0.9.0
typing_extensions==4.6.3
urllib3==2.0.3
uvicorn==0.22.0
uvloop==0.17.0
watchfiles==0.19.0
websockets==11.0.3
yarl==1.9.2
google-cloud-bigquery==3.11.3
google-api-python-client==2.93.0
cohere==4.16.0
redis
openai
jinja2
google-cloud-bigquery[pandas]
python-multipart==0.0.6
google-cloud-storage
PyPDF2
fsspec
pandas
gcsfs
pyarrow
pydantic[email]
fastapi-login

22
src/bot/exceptions.py Normal file
View File

@ -0,0 +1,22 @@
from fastapi import HTTPException
from src.bot.repository import repository
def has_bot_parameter(bot_id):
if not bot_id:
print("Exception: Bot id is not provided")
raise HTTPException(status_code=400, detail="Bot id is not provided")
else:
pass
def is_bot_id_valid(bot_id):
bot_settings = repository.get_bot_settings(bot_id)
if len(bot_settings) == 0:
print("Exception: Bot not found")
raise HTTPException(status_code=404, detail="Bot not found")
if not bot_settings:
print("BotService-> get_bot_settings func-> Error occured while fetching bot settings:")
for error in bot_settings:
print(error)
else:
return bot_settings

121
src/bot/repository.py Normal file
View File

@ -0,0 +1,121 @@
from uuid import uuid4
from abc import ABC, abstractmethod
from src.config import TBL_BOT_SET, TBL_USER_BOT, VW_USER_BOT_LATEST
from src.gcp.bigquery import executor, client
from src.bot.schemas import BotCreateModel, BotUpdateModel
# Define the interface (abstract base class) for the Bot_SettingsRepository
class BotsRepositoryInterface(ABC):
@abstractmethod
def get_bot_by_user_id(self, user_id):
pass
@abstractmethod
def save_bot_settings(self, bot_settings = BotCreateModel):
pass
@abstractmethod
def update_bot_settings(self, bot_settings = BotUpdateModel):
pass
# @abstractmethod
# def delete_bot_settings(self, user_id):
# pass
# Concrete implementation of the bot_settingsRepository using BigQuery
class BotRepositoryBigQuery(BotsRepositoryInterface):
def __init__(self, executor):
self._executor = executor
def get_bot_by_user_id(self, user_id):
query = f"SELECT * FROM `{VW_USER_BOT_LATEST}` WHERE user_id = '{user_id}'"
return self._executor.execute_query(query)
def save_user_bot(self,user_id):
id = str(uuid4())
row_to_insert = {
"id": id,
"user_id" : user_id
}
errors = client.insert_rows_json(TBL_USER_BOT, [row_to_insert])
if not errors:
return id
else:
msg = f"Errors occurred while saving bot for user {user_id}: {errors}"
print(msg)
return {"message":msg}
def save_bot_settings(self, bot_settings = BotCreateModel):
bot_id = self.save_user_bot(bot_settings['user_id'])
row_to_insert = {
"id": str(uuid4()),
"bot_id": bot_id,
"title": bot_settings['title'],
"description": bot_settings["description"],
"chat_styles": bot_settings['chat_styles'],
"initial_message": bot_settings['initial_message'],
"message_suggestion": bot_settings['message_suggestion'],
"allow_feedback_response": bot_settings["allow_feedback_response"],
"show_source_in_response": bot_settings["show_source_in_response"],
"advance_settings": bot_settings["advance_settings"]
}
errors = client.insert_rows_json(TBL_BOT_SET, [row_to_insert])
if not errors:
msg = f"Bot settings for bot {bot_id} saved successfully."
print(msg)
return {"message":msg}
else:
msg = f"Errors occurred while saving bot settings for bot_id: {bot_id}: {errors}"
print(msg)
return {"message":msg}
def get_bot_settings(self,bot_id):
query = f"SELECT * FROM `{VW_USER_BOT_LATEST}` WHERE bot_id = '{bot_id}'"
return self._executor.execute_query(query)
def update_bot_settings(self, bot_settings = BotUpdateModel):
row_to_insert = {
"id": str(uuid4()),
"bot_id": bot_settings['bot_id'],
"title": bot_settings['title'],
"description": bot_settings["description"],
"chat_styles": bot_settings['chat_styles'],
"initial_message": bot_settings['initial_message'],
"message_suggestion": bot_settings['message_suggestion'],
"allow_feedback_response": bot_settings["allow_feedback_response"],
"show_source_in_response": bot_settings["show_source_in_response"],
"advance_settings": bot_settings["advance_settings"]
}
errors = client.insert_rows_json(TBL_BOT_SET, [row_to_insert])
if not errors:
msg = f"Bot settings for bot {bot_settings['bot_id']} saved successfully."
print(msg)
return {"message":msg}
else:
msg = f"Errors occurred while saving bot settings for bot_id: {bot_settings['bot_id']}: {errors}"
print(msg)
return {"message":msg}
# def delete_bot_settings(self, user_id):
# # Delete the row from the BigQuery table
# delete_condition = f"user_id = '{user_id}'"
# errors = client.delete_rows(f"{BIGQUERY_SCHEMA}.bot_settings", delete_condition)
# if not errors:
# msg = f"UI settings for user {user_id} deleted successfully."
# print(msg)
# else:
# msg = f"Errors occurred while deleting UI settings for user {user_id}: {errors}"
# print(msg)
# return msg
repository = BotRepositoryBigQuery(executor)

36
src/bot/router.py Normal file
View File

@ -0,0 +1,36 @@
from fastapi import APIRouter, Body, Depends
from src.bot.service import bot_service
from src.bot.schemas import BotCreateModel, BotUpdateModel, BotResponseModel
from src.login.service import manager
router = APIRouter()
@router.get("/bot/{user_id}", tags=["bot"], summary="Get user bot")
async def get_user_bot(user_id: str,user=Depends(manager)):
return await bot_service.get_user_bot(user_id)
@router.post("/bot/", tags=["bot"], summary="Create user bot")
async def post_bot(bot_settings: BotCreateModel = Body(...),user=Depends(manager)):
# print(bot_settings)
return await bot_service.create_bot(bot_settings)
#Bot settings
@router.get("/bot/settings/{bot_id}", tags=["bot"], summary="Get bot settings")
async def get_bot_settings(bot_id: str):
result = await bot_service.get_bot_settings(bot_id)
return result[0]
@router.put("/bot/settings/", tags=["bot"], summary="Update bot settings")
async def put_bot(bot_settings: BotUpdateModel = Body(...),user=Depends(manager)):
return await bot_service.update_bot_settings(bot_settings)
# @router.delete("/ui_settings/{user_id}", tags=["ui-settings"], summary="Update user ui settings")
# async def delete_ui_settings(
# user_id: str,
# ):
# """
# This is my description of the API endpoint
# """
# return await delete_ui_settings(user_id)

46
src/bot/schemas.py Normal file
View File

@ -0,0 +1,46 @@
from pydantic import BaseModel, Json
from typing import Dict, Union
from datetime import datetime
class BotCreateModel(BaseModel):
user_id: str
title: str
description: Union[str, None]
chat_styles: Dict
initial_message: Union[str, None]
message_suggestion: Union[Dict, None]
allow_feedback_response: Union[bool, None]
show_source_in_response: Union[bool, None]
advance_settings: Dict
class BotResponseModel(BaseModel):
bot_id: str
title: str
description: Union[str, None]
chat_styles: Json
initial_message: Union[str, None]
message_suggestion: Json
allow_feedback_response: Union[bool, None]
show_source_in_response: Union[bool, None]
advance_settings: Json
updated_at: datetime
created_at: datetime
class BotListResponseModel(BaseModel):
bot_id: str
title: str
data_source_count: Union[int, None]
updated_at: datetime
created_at: datetime
class BotUpdateModel(BaseModel):
bot_id: str
title: str
description: Union[str, None]
chat_styles: Dict
initial_message: Union[str, None]
message_suggestion: Dict
allow_feedback_response: Union[bool, None]
show_source_in_response: Union[bool, None]
advance_settings: Dict

89
src/bot/service.py Normal file
View File

@ -0,0 +1,89 @@
import json
from src.bot.repository import repository
from src.bot.schemas import BotResponseModel, BotCreateModel, BotUpdateModel,BotListResponseModel
from src.users.exceptions import is_user_valid, has_user_bot
from src.bot.exceptions import has_bot_parameter, is_bot_id_valid
class BotService:
def __init__(self, repository: repository):
self.repository = repository
async def get_user_bot(self, user_id: str) -> list[BotListResponseModel]:
is_user_valid(user_id)
bot_settings = self.repository.get_bot_by_user_id(user_id)
has_user_bot(bot_settings)
if isinstance(bot_settings, dict):
bot_settings = [bot_settings]
return [
BotListResponseModel(**settings)
for settings in bot_settings
]
async def create_bot(self, bot_settings_dto: BotCreateModel) -> dict[str, str]:
is_user_valid(bot_settings_dto.user_id)
self._validate_json_fields(bot_settings_dto)
bot_settings_data = self._convert_json_fields(bot_settings_dto)
bot_settings_data['user_id'] = bot_settings_dto.user_id
response = self.repository.save_bot_settings(bot_settings_data)
return {"message": response}
async def get_bot_settings(self, bot_id: str) -> BotResponseModel:
has_bot_parameter(bot_id)
bot_settings = is_bot_id_valid(bot_id)
return self._create_response_dto_list(bot_settings)
async def update_bot_settings(self, bot_settings_dto: BotUpdateModel) -> dict[str, str]:
is_bot_id_valid(bot_settings_dto.bot_id)
self._validate_json_fields(bot_settings_dto)
bot_settings_data = self._convert_json_fields(bot_settings_dto)
bot_settings_data['bot_id'] = bot_settings_dto.bot_id
response = self.repository.update_bot_settings(bot_settings_data)
return {"message": response}
def _convert_json_fields(self, dto):
return {
"title": dto.title,
"description": dto.description,
"chat_styles": json.dumps(dto.chat_styles),
"message_suggestion": json.dumps(dto.message_suggestion),
"initial_message": dto.initial_message,
"advance_settings": json.dumps(dto.advance_settings),
"allow_feedback_response": dto.allow_feedback_response,
"show_source_in_response": dto.show_source_in_response
}
def _validate_json_fields(self, dto):
# Add any necessary validation for JSON fields here
pass
def _create_response_dto_list(self, settings_list):
if isinstance(settings_list, dict):
settings_list = [settings_list]
return [
BotResponseModel(
bot_id=settings['bot_id'],
title=settings['title'],
description=settings["description"],
chat_styles=settings['chat_styles'],
initial_message=settings['initial_message'],
message_suggestion=settings['message_suggestion'],
advance_settings=settings['advance_settings'],
allow_feedback_response=settings['allow_feedback_response'],
show_source_in_response=settings["show_source_in_response"],
updated_at=settings['updated_at'],
created_at=settings['created_at']
)
for settings in settings_list
]
bot_service = BotService(repository)

56
src/chat/callback.py Normal file
View File

@ -0,0 +1,56 @@
import json
import time
from datetime import datetime
from typing import Any, List, Tuple, Optional, Dict
from langchain.callbacks.base import AsyncCallbackHandler,BaseCallbackHandler
from langchain.callbacks.streaming_stdout_final_only import FinalStreamingStdOutCallbackHandler
from src.chat.schemas import ChatResponse
class StreamingLLMCallbackHandler(FinalStreamingStdOutCallbackHandler):
"""Callback handler for streaming LLM responses."""
def __init__(
self,
websocket,
answer_prefix_tokens: Optional[List[str]] = ['Final', 'Answer', ':']
):
self.websocket = websocket
self.answer_prefix_tokens = [token.strip() for token in answer_prefix_tokens]
self.last_tokens = [""] * len(answer_prefix_tokens)
self.rolling_buffer = []
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Run when LLM starts running."""
self.answer_reached = False
async def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Rewrite on_llm_new_token to send the final answer to the client."""
# Append the current token to the rolling buffer
self.rolling_buffer.append(token.strip())
print(' self.rolling_buffer: ', self.rolling_buffer)
# Keep the buffer size to match the length of answer_prefix_tokens
if len(self.rolling_buffer) > len(self.answer_prefix_tokens):
self.rolling_buffer.pop(0) # Remove the oldest token
# Check if the rolling buffer matches the answer_prefix_tokens
if self.rolling_buffer == self.answer_prefix_tokens:
self.answer_reached = True
print("token:", token)
# If the final answer has been reached, append the tokens to the final answer list
if self.answer_reached:
# resp = ChatResponse(sender="bot", message=token, type="stream")
await self.websocket.send_text(token)
async def on_llm_end(self, token: str, **kwargs: Any) -> None:
"""Run when LLM ends running."""
print("LLM END")
# if self.answer_reached:
# print('end')

View File

@ -0,0 +1,30 @@
from langchain.document_loaders import BigQueryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
#from langchain.document_loaders import DirectoryLoader
from src.config import VW_BOT_CON_LATEST
from src.helpers.token_helper import tiktoken_len
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=100,
chunk_overlap=25,
length_function=tiktoken_len,
separators=["\n\n", "\n", " ", ""]
)
def document_loader(bot_id):
query = f"SELECT data FROM `{VW_BOT_CON_LATEST}` where widget_id = '{bot_id}'"
# loader = DirectoryLoader(f'text/', glob='**/*.txt')
loader = BigQueryLoader(query,page_content_columns=["data"])
documents = loader.load()
# print(f"Documents loaded: {documents}")
# print (f'You have {len(documents)} document(s)')
num_words = sum([len(doc.page_content.split(' ')) for doc in documents])
# print (f'You have roughly {num_words} words in your docs')
# Split your documents into texts
texts = text_splitter.split_documents(documents)
return texts

166
src/chat/index.py Normal file
View File

@ -0,0 +1,166 @@
import pinecone
from langchain.vectorstores import Pinecone as PineconeStore
from langchain.memory import ConversationBufferMemory
from langchain.chains import RetrievalQA
from langchain.memory.chat_message_histories import RedisChatMessageHistory
# from langchain.vectorstores.redis import Redis
from langchain.agents import initialize_agent, Tool
from langchain.agents import AgentType
from langchain.tools.render import render_text_description
from langchain.agents.output_parsers import ReActSingleInputOutputParser
from langchain.agents.format_scratchpad import format_log_to_str
from langchain import hub
from langchain.agents import AgentExecutor
from src.config import REDIS_HOST, PINECONE_API_ENV, PINECONE_API_KEY, PINECONE_INDEX_NAME
from src.chat.document_loader import document_loader
from src.chat.model_manager import ModelManager
from src.bot.service import bot_service
class BaseMessage:
def __init__(self, question, result):
self.question = question
self.result = result
class ChatManager:
def __init__(self):
self.chat_history = None
self.model_manager = ModelManager()
self.llm = None
self.redis_message_history = None
async def initialize(self, data):
try:
return await self.chat_response(data)
except Exception as e:
return f"--------------------There's an error occurred in initializing chat manager: {e}"
async def get_llm(self, data):
try:
bot_details = await bot_service.get_bot_settings(data["bot_id"])
bot_details = bot_details[0].advance_settings
llm_ecosystem = bot_details.llm_model if hasattr(bot_details, 'llm_model') else "openai"
except Exception as e:
return f"--------------------There's an error occurred in get llm method: {e}"
self.llm = self.model_manager.llm_selector(llm_ecosystem,data['stream_handler'])
return llm_ecosystem
def nlp_process(self, data):
embeddings = self.model_manager.embedding_selector(data["embedding"])
try:
self.redis_message_history = RedisChatMessageHistory(url=REDIS_HOST, ttl=600, session_id=data["chat_session"])
print('--------------------redis message_history: ', self.redis_message_history.messages)
except Exception as e:
print('error redischathistory: ', e)
return {"message": f"An error occurred : {e}"}
try:
memory = ConversationBufferMemory(memory_key="memory",chat_memory=self.redis_message_history, return_messages=True)
self.chat_history = memory
print('--------------------memory: ', memory)
except Exception as e:
print('error conversation buffer memory: ', e)
return {"message": f"An error occurred : {e}"}
try:
docs = document_loader(data["bot_id"])
print('--------------------docs: ', docs)
except Exception as e:
print('error document loader: ', e)
return {"message": f"An error occurred : {e}"}
try:
query = data['question']
query_result = embeddings.embed_query(query)
doc_result = embeddings.embed_documents([t.page_content for t in docs])
print('--------------------Store embeddings to vector store: Pinecone')
pinecone.init(
api_key=PINECONE_API_KEY, # find at app.pinecone.io
environment=PINECONE_API_ENV # next to api key in console
)
index = pinecone.Index(PINECONE_INDEX_NAME)
# print(index.describe_index_stats())
# index.delete(deleteAll='true', namespace='')
except Exception as e:
print(f"--------------------Error on Pinecone process: {e}")
docsearch = PineconeStore.from_texts([t.page_content for t in docs], embeddings, index_name=PINECONE_INDEX_NAME)
retriever = self.model_manager.retriever_selector(data["retriever"], docsearch)
qa_tool = RetrievalQA.from_chain_type(
llm=self.llm, retriever=retriever, memory=memory,
verbose=True
)
return qa_tool
def custom_agent(self, tools):
max_iterations = 3
handle_parsing_errors = True
early_stopping_method="generate"
verbose=True
agent = AgentType.REACT_DOCSTORE
try:
return initialize_agent(
tools,
self.llm,
agent,
max_iterations,
handle_parsing_errors,
early_stopping_method,
verbose,
memory=self.chat_history,
)
except Exception as e:
return f"Error occured in custom agent function: {e}"
async def chat_response(self,data):
print(f"--------------------chat history: {self.chat_history}")
llm_ecosystem = await self.get_llm(data)
qa_tool = self.nlp_process({
"question": data["question"],
"chat_history": self.chat_history,
"embedding":llm_ecosystem,
"retriever":llm_ecosystem,
"bot_id":data["bot_id"],
"chat_session": data["chat_session"]
})
tools = [
Tool(
name='Document Store',
func=qa_tool.run,
description="Use it to lookup information from the document store. You must return a final answer, not an action. If you can't return a final answer, return you don't know the answer.",
),
]
prompt = hub.pull("hwchase17/react-chat")
prompt = prompt.partial(
tools=render_text_description(tools),
tool_names=", ".join([t.name for t in tools]),
)
llm_with_stop = self.llm.bind(stop=["\nObservation"])
try:
agent = {
"input": lambda x: x["input"],
"agent_scratchpad": lambda x: format_log_to_str(x['intermediate_steps']),
"chat_history": lambda x: x["memory"]
} | prompt | llm_with_stop | ReActSingleInputOutputParser()
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, memory=self.chat_history, max_iterations=3, early_stopping_method= 'generate')
result = agent_executor.invoke({"input": data["question"]})['output']
self.redis_message_history.add_user_message(data["question"])
except Exception as e:
return f"There's an error occured in chat response function: {e}"
print(f"Result: {result}")
self.redis_message_history.add_ai_message(result)
return result
chat_manager = ChatManager()

30
src/chat/model_manager.py Normal file
View File

@ -0,0 +1,30 @@
from src.chat.services.cohere import CohereService
from src.chat.services.openai import OpenAIService
from src.chat.services.model_service import ModelService
class ModelManager:
SUPPORTED_MODELS = {"cohere": CohereService, "openai": OpenAIService}
def __init__(self) -> None:
pass
def model_checker(self,model):
if model not in self.SUPPORTED_MODELS:
raise ValueError("Selected model not supported")
else:
return
def create_model_service(self, model) -> ModelService:
self.model_checker(model)
return self.SUPPORTED_MODELS[model]()
def retriever_selector(self, model, docsearch) -> str:
model_service = self.create_model_service(model)
return model_service.retriever(docsearch)
def embedding_selector(self, model) -> str:
model_service = self.create_model_service(model)
return model_service.embeddings()
def llm_selector(self, model, stream_handler = None) -> str:
model_service = self.create_model_service(model)
return model_service.llm(stream_handler)

21
src/chat/schemas.py Normal file
View File

@ -0,0 +1,21 @@
from pydantic import BaseModel,validator
class ChatResponse(BaseModel):
"""Chat response schema."""
sender: str
message: str
type: str
@validator("sender")
def sender_must_be_bot_or_you(cls, v):
if v not in ["bot", "human"]:
raise ValueError("sender must be bot or human")
return v
@validator("type")
def validate_message_type(cls, v):
if v not in ["start", "stream", "end", "error", "info"]:
raise ValueError("type must be start, stream or end")
return v

View File

@ -0,0 +1,29 @@
import os
from langchain.llms import Cohere
from langchain.embeddings import CohereEmbeddings
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import CohereRerank
from src.chat.services.model_service import ModelService
from src.config import COHERE_API
os.environ["COHERE_API_KEY"] = COHERE_API
LLM_COHERE = Cohere(cohere_api_key=COHERE_API)
COMPRESSOR_COHERE = CohereRerank()
EMBEDDINGS_COHERE = CohereEmbeddings(cohere_api_key=COHERE_API)
class CohereService(ModelService):
def llm(self) -> str:
return LLM_COHERE
def embeddings(self) ->str:
return EMBEDDINGS_COHERE
def retriever(self,docsearch) -> str:
return ContextualCompressionRetriever(
base_compressor=COMPRESSOR_COHERE, base_retriever=docsearch.as_retriever()
)

View File

@ -0,0 +1,9 @@
class ModelService:
def retriever(self, docsearch) -> str:
pass
def embeddings(self) -> str:
pass
def llm(self, stream_handler = None) -> str:
pass

View File

@ -0,0 +1,17 @@
from langchain.llms import OpenAI
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.callbacks.manager import AsyncCallbackManager
from src.config import OPENAI_API_KEY
from src.chat.services.model_service import ModelService
class OpenAIService(ModelService):
def llm(self,stream_handler) -> str:
stream_manager = AsyncCallbackManager([stream_handler])
return OpenAI(temperature=0, callback_manager=stream_manager, streaming=True, openai_api_key=OPENAI_API_KEY)
def embeddings(self) -> str:
return OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY)
def retriever(self,docsearch) -> str:
return docsearch.as_retriever()

46
src/config.py Normal file
View File

@ -0,0 +1,46 @@
import os
from dotenv import load_dotenv
load_dotenv()
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
OPENAI_MODEL_NAME = os.environ.get("OPENAI_MODEL_NAME")
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
PINECONE_API_ENV = os.environ.get("PINECONE_API_ENV")
PINECONE_INDEX_NAME = GCP_PROJECT_ID
ENCODING_FOR_MODEL = os.environ.get("ENCODING_FOR_MODEL")
ENCODING = os.environ.get("ENCODING")
REDIS_HOST = os.environ.get("REDIS_HOST")
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD")
COHERE_API = os.environ.get("COHERE_API")
SUPPORTED_MODEL = os.environ.get("SUPPORTED_MODEL")
BIGQUERY_SCHEMA = f"{GCP_PROJECT_ID}.data_src"
GCP_UPLOAD_BUCKET = os.environ.get("GCP_UPLOAD_BUCKET")
#Table list
TBL_KWNBASE_CON = f"{BIGQUERY_SCHEMA}.knowledgebase_content"
TBL_KWNBASE_SRC = f"{BIGQUERY_SCHEMA}.knowledgebase_source"
TBL_USER_BOT = f"{BIGQUERY_SCHEMA}.user_bot"
TBL_BOT_SET = f"{BIGQUERY_SCHEMA}.bot_settings"
TBL_USERS = f"{BIGQUERY_SCHEMA}.users"
TBL_CUST_CHAT = f"{BIGQUERY_SCHEMA}.customer_chat"
TBL_CUST_CHAT_LOGS = f"{BIGQUERY_SCHEMA}.customer_chat_logs"
#View list
VW_USER_LIST_LATEST = f"{BIGQUERY_SCHEMA}.vw_user_list_latest"
VW_USER_BOT_LATEST = f"{BIGQUERY_SCHEMA}.vw_user_bot_latest"
VW_USER_FILES_LATEST = f"{BIGQUERY_SCHEMA}.vw_user_files_latest"
VW_BOT_CON_LATEST = f"{BIGQUERY_SCHEMA}.vw_latest_bot_knowledgebase_content"
LOGIN_SECRET_KEY = os.environ.get("LOGIN_SECRET_KEY")

View File

View File

@ -0,0 +1,20 @@
from src.gcp.bigquery import client, executor
from src.config import TBL_CUST_CHAT,TBL_CUST_CHAT_LOGS
class ConversationRepository:
def __init__(self, executor):
self._executor = executor
def save_temp_session(self, user_session, bot_id):
query = f"INSERT INTO`{TBL_CUST_CHAT}` (user_session,bot_id, is_online) VALUES ('{user_session}', '{bot_id}',TRUE)"
return self._executor.execute_query(query)
def get_customer_chat_session(self,customer_session):
query = f"SELECT id FROM `{TBL_CUST_CHAT}` WHERE user_session = '{customer_session}' AND is_online = TRUE ORDER BY updated_at LIMIT 1"
return self._executor.execute_query(query)
def get_chat_logs(self,chat_session):
query = f"SELECT * FROM `{TBL_CUST_CHAT_LOGS}` WHERE customer_chat_id = '{chat_session}'"
return self._executor.execute_query(query)
repository = ConversationRepository(executor)

View File

@ -0,0 +1,16 @@
from fastapi import APIRouter, Body, Depends
# from src.bot.service import bot_service
# from src.bot.schemas import BotCreateModel, BotUpdateModel, BotResponseModel
from src.login.service import manager
from src.conversation.service import conversation_service
router = APIRouter()
@router.get("/conversation/new/{bot_id}", tags=["conversations"], summary="Get user conversation session")
async def get_user_conversation_session(bot_id: str):
return conversation_service.save_customer_chat(bot_id)
@router.get("/conversation/logs/{chat_session}", tags=["conversations"], summary="Get conversation logs")
async def get_conversation_logs(chat_session: str):
return conversation_service.get_chat_logs(chat_session)

View File

View File

@ -0,0 +1,25 @@
from uuid import uuid4
from src.conversation.repository import repository
class ConversationService:
def __init__(self, repository: repository):
self.repository = repository
def save_customer_chat(self,bot_id: str):
user_temp_session = str(uuid4())
try:
repository.save_temp_session(user_temp_session,bot_id)
chat_session = repository.get_customer_chat_session(user_temp_session)
return {"message": "Save customer chat successful", "user_session": user_temp_session, "chat_session":chat_session['id']}
except Exception as e:
return {"message": f"An error occurred while saving customer chat: {e}"}
def get_chat_logs(self,chat_session:str):
try:
chat_logs = repository.get_chat_logs(chat_session)
return {"message": "Success getting chat logs", "chat_logs": chat_logs}
except Exception as e:
return {"message": f"An error occured while getting chat logs: {e}"}
conversation_service = ConversationService(repository)

View File

@ -0,0 +1,84 @@
import pandas as pd
from uuid import uuid4
from src.gcp.storage import StorageClientFactory
from src.etl.repository import repository
# import json
BUCKET_NAME_TMP = 'knowledge-base-tmp'
BUCKET_NAME_SRC = 'knowledge-base-src'
# Factory Method to create pd.read_csv() objects
class CsvReaderFactory:
@staticmethod
def read_csv(uri, **kwargs):
return pd.read_csv(uri, **kwargs)
def get_user_id(filename):
print(f"DataTransformation->get_user_id func->Get user id from filename")
filename_uid = filename.split('.')[1]
uid = f"{filename_uid.split('-')[1]}-{filename_uid.split('-')[2]}-{filename_uid.split('-')[3]}-{filename_uid.split('-')[4]}-{filename_uid.split('-')[5]}"
print(f"DataTransformation->Get user id func->User id: {uid}")
return uid
def get_bot_id(filename):
print(f"DataTransformation->get_bot_id func->get bot id from filename")
filename_uid = filename.split('.')[1]
wid = f"{filename_uid.split('-')[6]}-{filename_uid.split('-')[7]}-{filename_uid.split('-')[8]}-{filename_uid.split('-')[9]}-{filename_uid.split('-')[10]}"
print(f"DataTransformation->get_bot_id func->bot id: {wid}")
return wid
def flatten_data(filename):
print(f"DataTransformation->flatten_data func->Start flatten data")
uri = f"gs://{BUCKET_NAME_TMP}/{filename}"
print(f"DataTransformation->flatten_data func->Read created tmp csv: {uri}")
# Read CSV using pd.read_csv() with the factory method
df = CsvReaderFactory.read_csv(uri, on_bad_lines='skip')
df.astype(str).agg(', '.join, axis=1)
df.dropna(how='all')
records = [
{
'id': str(uuid4()),
'filename': filename,
'data': df.to_string()
}
]
dframe = pd.DataFrame(
records,
columns=[
"id",
"filename",
"data"
]
)
print(f"DataTransformation->flatten_data func->Dataframe from tmp csv: {dframe}")
return dframe
def delete_uploaded_file(filename):
print(f"DataTransformation->delete_uploaded_file func->Delete original file: {filename}")
client = StorageClientFactory.create_client()
bucket = client.get_bucket(BUCKET_NAME_SRC)
blob = bucket.blob(filename)
blob.delete()
return {"message": "Deleted uploaded file"}
# def transform_data(data, new_filename, orig_filename):
# print("DataTransformation->Start transform data")
# client = StorageClientFactory.create_client()
# bucket = client.get_bucket(BUCKET_NAME)
# blob = bucket.blob(new_filename)
# blob.upload_from_string(data)
# print(f"DataTransformation->Created csv from upload file: {new_filename}")
# dframe = flatten_data(new_filename)
# blob.delete()
# delete_uploaded_file(orig_filename)
# repository.load_data(dframe, get_user_id(new_filename), new_filename)
storage_client = StorageClientFactory.create_client()

View File

@ -0,0 +1,23 @@
from src.etl.repository import repository
from src.etl.document_loaders.data_loader_strategy import DataLoaderStrategy
from src.etl.data_transformation_service import storage_client, BUCKET_NAME_TMP, flatten_data, delete_uploaded_file, get_user_id, get_bot_id
class CsvDataLoader(DataLoaderStrategy):
def load_data(data, new_filename, orig_filename, file_type):
print("CSVJsonLoader->load data func->Read file")
bucket = storage_client.get_bucket(BUCKET_NAME_TMP)
blob = bucket.blob(new_filename)
blob.upload_from_string(data)
blob.make_public()
file_url = blob.public_url
print(f"CSVJsonLoader->load data func->Created tmp csv from upload file: {new_filename}")
dframe = flatten_data(new_filename)
# blob.delete()
file_type = 'json' if file_type == 'application/json' else 'csv'
repository.load_data(dframe, get_user_id(new_filename), new_filename, file_type, get_bot_id(new_filename), file_url)
delete_uploaded_file(orig_filename)

View File

@ -0,0 +1,6 @@
from abc import ABC, abstractmethod
class DataLoaderStrategy(ABC):
@abstractmethod
def load_data(self, data, new_filename, orig_filename, type, bot_id):
pass

View File

@ -0,0 +1,50 @@
import pandas as pd
import gcsfs
from uuid import uuid4
from PyPDF2 import PdfReader
from src.config import GCP_PROJECT_ID,GCP_UPLOAD_BUCKET
from src.etl.repository import repository
from src.etl.document_loaders.data_loader_strategy import DataLoaderStrategy
from src.etl.data_transformation_service import delete_uploaded_file, get_user_id, get_bot_id
class PdfDataLoader(DataLoaderStrategy):
def load_data(new_filename, orig_filename):
gcs_file_system = gcsfs.GCSFileSystem(project=GCP_PROJECT_ID)
gcs_pdf_path = f"gs://{GCP_UPLOAD_BUCKET}/{orig_filename}"
print(f"PdfDataLoader->load data func-> pdf path to read: {gcs_pdf_path}")
url = f"https://storage.googleapis.com/{GCP_UPLOAD_BUCKET}/{new_filename}"
f_object = gcs_file_system.open(gcs_pdf_path, "rb")
# creating a pdf reader object
reader = PdfReader(f_object)
page_full_content = []
# getting a specific page from the pdf file
number_of_pages = len(reader.pages)
print(f"PdfDataLoader->load data func-> number of pdf page: {number_of_pages}")
for page_number in range(number_of_pages):
page = reader.pages[page_number]
page_content = page.extract_text()
page_full_content.append(page_content)
records = [
{
'id': str(uuid4()),
'filename': orig_filename,
'data': ''.join(page_full_content)
}
]
f_object.close()
dframe = pd.DataFrame(
records,
columns=[
"id",
"filename",
"data"
]
)
repository.load_data(dframe, get_user_id(new_filename), new_filename, "pdf", get_bot_id(new_filename), url)
# delete_uploaded_file(orig_filename)

8
src/etl/exceptions.py Normal file
View File

@ -0,0 +1,8 @@
from fastapi import HTTPException
def is_cloud_event_data_valid(cloud_event_data):
# if not is_cloud_event_data_valid:
# print("Exception: user not found")
# raise HTTPException(status_code=404, detail="User not found")
# else:
pass

114
src/etl/repository.py Normal file
View File

@ -0,0 +1,114 @@
from uuid import uuid4
from abc import ABC, abstractmethod
from google.cloud import bigquery
from src.config import TBL_KWNBASE_CON, TBL_KWNBASE_SRC
from src.gcp.bigquery import executor, client
# Define the interface (abstract base class) for the UI_SettingsRepository
class ETLRepositoryInterface(ABC):
@abstractmethod
def insert_user_knowledge_base(self,id,user_id, filename, bot_id, file_type):
pass
@abstractmethod
def load_data(self, dframe, user_id,new_filename, file_type, bot_id, url ):
pass
# Strategy interface for data loading
class DataLoaderStrategy:
def load_data(self, dframe, filename, file_type):
raise NotImplementedError
# Concrete strategy for loading data from a DataFrame
class DataFrameLoader(DataLoaderStrategy):
def load_data(self, dframe, filename, file_type):
print(f"ETL Repository->DataFrameLoader dataframe: {dframe}")
table_id = TBL_KWNBASE_CON
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField('id', "STRING"),
bigquery.SchemaField('knowledgebase_source_id', "STRING"),
bigquery.SchemaField('data', "STRING"),
bigquery.SchemaField('filename', "STRING"),
],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
try:
# if file_type == "pdf":
# print(f"ETL Repository->Load pdf data to content table")
load_job = client.load_table_from_dataframe(dframe, table_id, job_config=job_config)
# else:
# print(f"ETL Repository->Load csv or json data to content table")
# uri = f"gs://knowledge-base-tmp/{filename}"
# load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result()
destination_table = client.get_table(table_id)
print("ETL Repository->Loaded {} rows.".format(destination_table.num_rows))
except Exception as e:
print(f"ETL Repository->An error occurred: {e}")
# Context class for DataLoaderStrategy
class DataLoaderContext:
def __init__(self, strategy):
self._strategy = strategy
def set_strategy(self, strategy):
self._strategy = strategy
def load_data(self, dframe, filename, file_type):
self._strategy.load_data(dframe, filename, file_type)
# Concrete implementation of the UI_SettingsRepository using BigQuery
class ETLRepositoryBigQuery(ETLRepositoryInterface):
def __init__(self, executor):
self._executor = executor
# Function to insert data into BigQuery
def insert_user_knowledge_base(self,id,user_id, filename, bot_id, file_type, is_deleted, url):
print("ETL Repository->insert_user_knowledge_base func->Insert knowledgde base source")
try:
# Reference to the dataset and table
table_id = TBL_KWNBASE_SRC
data_to_insert = [{
"id": id,
"user_id": user_id,
"widget_id": bot_id,
"filename": filename,
"type": file_type,
"is_deleted": is_deleted,
"url": url
}]
print(f"ETL Repository->insert_user_knowledge_base func->Data to load: {data_to_insert}")
# Insert the data into the table
errors = client.insert_rows_json(table_id, data_to_insert)
if not errors:
print("ETL Repository->insert_user_knowledge_base func->Data inserted successfully.")
return id
else:
print("ETL Repository->insert_user_knowledge_base func->Errors occurred while inserting data:")
for error in errors:
print(error)
return {"message": f"ETL Repository->insert_user_knowledge_base func->error: {errors}"}
except Exception as e:
print(f"ETL Repository->An error occurred: {e}")
return {"message": f"ETL Repository->insert_user_knowledge_base func->An error occurred: {e}"}
def load_data(self, dframe, user_id,new_filename, file_type, bot_id, url ):
print(f"ETL Repository->BQ class dataframe:{dframe}")
# Use the DataLoaderContext with DataFrameLoader strategy
try:
data_loader = DataLoaderContext(DataFrameLoader())
print("ETL Repository->Load data to knowledgebase content table")
id = str(uuid4())
src_id = self.insert_user_knowledge_base(id, user_id, new_filename, bot_id, file_type, False, url)
dframe["id"] = str(uuid4())
dframe["knowledgebase_source_id"] = src_id
data_loader.load_data(dframe, new_filename, file_type)
print(f"ETL repository-> load data func-> Done loading data")
return {"message": "ETL process done"}
except Exception as e:
print(f"DataToBQ->An error occurred: {e}")
return {"message": "Error on loading data into db:{e}"}
repository = ETLRepositoryBigQuery(executor)

12
src/etl/router.py Normal file
View File

@ -0,0 +1,12 @@
from fastapi import APIRouter, Body, Depends
router = APIRouter()
from src.etl.service import ETLService
from src.login.service import manager
etl_service = ETLService()
@router.post("/etl/data-extraction/", tags=["etl"], summary="Process data extraction")
async def data_processing(cloud_event_data: dict = Body(...)):
return await etl_service.process_data(cloud_event_data)

43
src/etl/service.py Normal file
View File

@ -0,0 +1,43 @@
from google.cloud import storage
from src.gcp.storage import StorageClientFactory
from src.etl.document_loaders.csv_json_loader import CsvDataLoader
from src.etl.document_loaders.pdf_loader import PdfDataLoader
from src.etl.repository import repository
class ETLService:
# Triggered by a change in a storage bucket
async def process_data(self,cloud_event):
print("ETLService-> Start data processing")
data = cloud_event
print(f"ETLService->Cloud Event Data: {data}")
bucket = data["bucket"]
orig_filename = data["name"]
time_created = data["timeCreated"]
new_filename = f"{orig_filename}-{time_created}.csv"
contentType = data['contentType']
if contentType == "application/pdf":
print(f"ETLService->Start pdf loader")
PdfDataLoader.load_data(new_filename, orig_filename)
else:
print(f"ETLService->Start csvjson loader")
try:
client = storage.Client()
bucket = client.get_bucket(bucket)
file_blob = storage.Blob(orig_filename, bucket)
except BaseException as error:
print(f'ETLService->CSVJson loader storage error: {error}')
print(f"ETLService->File blob: {file_blob}")
download_data = file_blob.download_as_string().decode()
print("ETLService->download_data : ", download_data)
try:
CsvDataLoader.load_data(download_data,new_filename, orig_filename, contentType)
except BaseException as error:
print('ETLService->An exception occurred: {}'.format(error))
return {"message":"Done data process"}

9
src/files/exceptions.py Normal file
View File

@ -0,0 +1,9 @@
from fastapi import HTTPException
def is_file_upload_exists(file):
if not file.filename:
print("Exception: No file uploaded")
raise HTTPException(status_code=400, detail="No file uploaded")
else:
pass

22
src/files/repository.py Normal file
View File

@ -0,0 +1,22 @@
from src.config import VW_USER_LIST_LATEST,VW_USER_FILES_LATEST
from src.gcp.bigquery import executor
class FileRepository:
def __init__(self, executor):
self._executor = executor
def get_user_files(self, user_id):
query = f"SELECT * FROM `{VW_USER_FILES_LATEST}` WHERE user_id = '{user_id}'"
return self._executor.execute_query(query)
def get_bot_files(self, bot_id):
query = f"SELECT * FROM `{VW_USER_FILES_LATEST}` WHERE widget_id = '{bot_id}'"
return self._executor.execute_query(query)
def get_specific_bot_file(self, id):
query = f"SELECT * FROM `{VW_USER_FILES_LATEST}` WHERE id = '{id}'"
return self._executor.execute_query(query)
repository = FileRepository(executor)

80
src/files/router.py Normal file
View File

@ -0,0 +1,80 @@
import os
from typing import Annotated
from fastapi import APIRouter,File, UploadFile, HTTPException, Form, Depends,Body
from fastapi.responses import StreamingResponse
from urllib.parse import unquote,urlparse
from src.gcp.storage import StorageClientFactory
from src.files.service import upload_file_to_gcs, delete_bot_file
from src.files.repository import repository
from src.files.exceptions import is_file_upload_exists
from src.bot.exceptions import has_bot_parameter
from src.users.exceptions import is_user_valid
from src.login.service import manager
router = APIRouter()
@router.get("/user-files/{user_id}", tags=["files"], summary="Get all files uploaded by user id")
async def user_files(user_id: str, user=Depends(manager)):
"""
This is my description of the API endpoint
"""
is_user_valid(user_id)
return repository.get_user_files(user_id)
@router.get("/files/bot/{bot_id}", tags=["files"], summary="Get all files uploaded by bot id")
async def bot_files(bot_id: str, user=Depends(manager)):
"""
This is my description of the API endpoint
"""
has_bot_parameter(bot_id)
return repository.get_bot_files(bot_id)
@router.post("/upload-file/", tags=["files"], summary="Upload file")
async def upload_file(user_id: Annotated[str, Form()], bot_id: Annotated[str,Form()],fileList: list[UploadFile] = File(...),user=Depends(manager)):
"""
This is my description of the API endpoint
"""
has_bot_parameter(bot_id)
for file in fileList:
is_file_upload_exists(file)
try:
for file in fileList:
response = await upload_file_to_gcs(user_id,bot_id,file)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
return response
@router.post("/files/bot/download/", tags=["files"], summary="Download bot file")
async def index(url: Annotated[str, Form()], user=Depends(manager)):
"""
This is my description of the API endpoint
"""
file_type = url.split("/")[0]
if file_type == "csv" or file_type== "json":
bucket_src = os.environ.get("GCP_BUCKET_TMP")
else:
bucket_src = os.environ.get("GCP_UPLOAD_BUCKET")
url = url.rsplit('.', 1)[0].rsplit('-', 3)[0]
storage_client = StorageClientFactory.create_client()
bucket = storage_client.bucket(bucket_src)
# Get the blob
blob = bucket.blob(unquote(url))
content_type = "application/octet-stream" # Change to the appropriate content type
# Create a StreamingResponse to send the file
return StreamingResponse(iter([blob.download_as_bytes()]), media_type=content_type)
@router.get("/files/bot/delete/{id}", tags=["files"], summary="Delete bot file")
async def delete_files(id:str,user=Depends(manager)):
try:
response = await delete_bot_file(id)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
return response

42
src/files/service.py Normal file
View File

@ -0,0 +1,42 @@
import os
from fastapi import HTTPException
from src.config import GCP_UPLOAD_BUCKET
from src.gcp.storage import StorageClientFactory
from src.files.repository import repository
from src.etl.repository import repository as ETLRepository
async def upload_file_to_gcs(user_id, bot_id,file):
print("Upload file to GCS")
storage_client = StorageClientFactory.create_client()
try:
split_tup = os.path.splitext(file.filename)
# Generate a unique name for the file in the bucket
blob_name = f"{file.filename}"
# Upload the file to the bucket
bucket = storage_client.bucket(GCP_UPLOAD_BUCKET)
blob = bucket.blob(f"{split_tup[1].replace('.','')}/{blob_name}-{user_id}-{bot_id}")
blob.upload_from_file(file.file, content_type=file.content_type)
# Create a publicly accessible URL for the uploaded file
blob.make_public()
file_url = blob.public_url
return {"message": f"File upload successfully"}
except Exception as e:
print(f"Error uploading file to gcs:{e}")
raise HTTPException(status_code=500, detail=str(e))
async def delete_bot_file(id):
bot_files = repository.get_specific_bot_file(id)
try:
insert_data = ETLRepository.insert_user_knowledge_base(id,bot_files["user_id"], bot_files["filename"],bot_files["widget_id"],bot_files["type"], True, bot_files["url"])
if not insert_data:
print("Delete Bot file -> Data inserted successfully.")
return id
except Exception as e:
print(f"Delete bot file service->An error occurred: {e}")
return {"message": f"Delete bot file service->delete_bot_file func->An error occurred: {e}"}

30
src/gcp/bigquery.py Normal file
View File

@ -0,0 +1,30 @@
from google.cloud import bigquery
class BigQueryClientSingleton:
_instance = None
@staticmethod
def get_instance():
if BigQueryClientSingleton._instance is None:
BigQueryClientSingleton._instance = bigquery.Client()
return BigQueryClientSingleton._instance
class BigQueryQueryExecutor:
def __init__(self, client):
self._client = client
def execute_query(self, query):
result = self._client.query(query)
result_list = [dict(row) for row in result]
# print(result_list)
if len(result_list) == 1:
return result_list[0]
else:
return result_list
client = BigQueryClientSingleton.get_instance()
executor = BigQueryQueryExecutor(client)

6
src/gcp/storage.py Normal file
View File

@ -0,0 +1,6 @@
from google.cloud import storage
# Factory Method to create storage.Client() objects
class StorageClientFactory:
def create_client():
return storage.Client()

View File

@ -0,0 +1,18 @@
import tiktoken
from src.config import ENCODING_FOR_MODEL, ENCODING
tiktoken.encoding_for_model(ENCODING_FOR_MODEL)
tokenizer = tiktoken.get_encoding(ENCODING)
def tiktoken_len(text: str) -> int:
tokens = tokenizer.encode(
text,
disallowed_special=()
)
return len(tokens)
def num_tokens_from_string(string: str, encoding_name: str) -> int:
"""Returns the number of tokens in a text string."""
encoding = tiktoken.get_encoding(encoding_name)
num_tokens = len(encoding.encode(string))
return num_tokens

0
src/login/repository.py Normal file
View File

12
src/login/router.py Normal file
View File

@ -0,0 +1,12 @@
from fastapi import APIRouter, Depends
from fastapi.security import OAuth2PasswordRequestForm
from src.login.service import LoginService
router = APIRouter()
login_service = LoginService()
@router.post("/login", tags=["login"], summary="Logged in user")
async def post_login(data: OAuth2PasswordRequestForm = Depends()):
return await login_service.login(data)

38
src/login/service.py Normal file
View File

@ -0,0 +1,38 @@
import os
from datetime import timedelta
from dotenv import load_dotenv
from fastapi_login import LoginManager
from fastapi import Depends
from fastapi.security import OAuth2PasswordRequestForm
from fastapi_login.exceptions import InvalidCredentialsException
from src.users.service import user_service
load_dotenv()
manager = LoginManager(
os.environ.get("LOGIN_SECRET_KEY"), '/login',
default_expiry=timedelta(hours=1)
)
class LoginService:
async def login(self,data: OAuth2PasswordRequestForm = Depends()):
email = data.username
password = data.password
user = await user_service.get_user_details(email)
if not user:
# you can return any response or error of your choice
raise InvalidCredentialsException
elif password != user['password']:
raise InvalidCredentialsException
access_token = manager.create_access_token(
data={'sub': email, 'user': user['id']}
)
return {'status': 'Success','access_token': access_token}
@manager.user_loader()
async def query_user(email: str):
return await user_service.get_user_details(email)

0
src/users/__init__.py Normal file
View File

51
src/users/exceptions.py Normal file
View File

@ -0,0 +1,51 @@
import re
from uuid import UUID
from fastapi import HTTPException
from src.users.repository import repository
def is_user_valid(user: str):
try:
UUID(user, version=4)
users = repository.get_user_details(user)
except ValueError:
if re.match(r"[^@]+@[^@]+\.[^@]+", user):
users = repository.get_user_details_by_email(user)
else:
users = repository.get_user_details_by_name(user)
if not users:
print("Exception: user not found")
raise HTTPException(status_code=400, detail="User not found")
else:
pass
def is_user_id_valid_uuid(user: str):
try:
UUID(user, version=4)
pass
except ValueError:
raise HTTPException(status_code=400, detail="User not found")
def has_user_bot(bot_settings):
if len(bot_settings) == 0:
raise HTTPException(status_code=400, detail="No bot available")
else:
pass
def is_password_same(user_id:str, password:str):
current_password = repository.get_user_password(user_id)
if password == current_password["password"]:
raise HTTPException(status_code=400, detail="Password should not be the same")
else:
pass
def is_email_exist_to_other_user(user_id:str, email:str):
user_details = repository.get_user_details_by_email(email)
if user_details:
if user_details["id"] != user_id:
raise HTTPException(status_code=400, detail="Email is already exists")
else:
pass
else:
pass

96
src/users/repository.py Normal file
View File

@ -0,0 +1,96 @@
from fastapi import HTTPException
from typing import List
from src.config import VW_USER_LIST_LATEST,TBL_USERS
from src.gcp.bigquery import executor, client
from src.users.schemas import UserResponseModel, UserPasswordResponseModel
class UserRepository:
def __init__(self, executor):
self._executor = executor
def _get_user_details_query(self, column, value):
return f"SELECT * FROM `{VW_USER_LIST_LATEST}` u WHERE {column} = '{value}'"
def get_users(self) -> list[UserResponseModel]:
query = f"SELECT * FROM `{VW_USER_LIST_LATEST}`"
try:
result = self._executor.execute_query(query)
return result
except BaseException as e:
msg = f"UserRepository->get_users func-> Error fetching users:{e}"
print(msg)
raise HTTPException(status_code=400, detail=msg)
def get_user_details(self, user_id) -> UserResponseModel:
query = self._get_user_details_query('id', user_id)
try:
result = self._executor.execute_query(query)
return result
except BaseException as e:
msg = f"UserRepository->get_user_details func-> Error fetching user details:{e}"
print(msg)
raise HTTPException(status_code=400, detail=msg)
def get_user_details_by_name(self, username) -> UserResponseModel:
query = self._get_user_details_query('username', username)
try:
result = self._executor.execute_query(query)
return result
except BaseException as e:
msg = f"UserRepository->get_user_details_by_name func-> Error fetching user details:{e}"
print(msg)
raise HTTPException(status_code=400, detail=msg)
def get_user_details_by_email(self, email) -> UserResponseModel:
query = self._get_user_details_query('email', email)
try:
result = self._executor.execute_query(query)
return result
except BaseException as e:
msg = f"UserRepository->get_user_details_by_email func-> Error fetching user details:{e}"
print(msg)
raise HTTPException(status_code=400, detail=msg)
def get_user_password(self, user_id) -> UserPasswordResponseModel:
query = self._get_user_details_query('id', user_id)
try:
result = self._executor.execute_query(query)
return result
except BaseException as e:
msg = f"UserRepository->get_user_details func-> Error fetching user details:{e}"
print(msg)
raise HTTPException(status_code=400, detail=msg)
def save_user_info(self, user_info):
row_to_insert = {
"id": user_info.id,
# "username": user_info.username,
"firstname": user_info.firstname,
"lastname": user_info.lastname,
"email": user_info.email,
"password": user_info.password
}
errors = client.insert_rows_json(TBL_USERS, [row_to_insert])
if not errors:
msg = f"User info for {user_info.id} saved successfully."
print(msg)
return {"message":msg}
else:
msg = f"Errors occurred while saving user info for user id: {user_info.id}: {errors}"
print(msg)
return {"message":msg}
repository = UserRepository(executor)
def get_users()-> List[UserResponseModel]:
return repository.get_users()
def get_user_details(user_id: str)-> UserResponseModel:
return repository.get_user_details(user_id)
def get_user_details_by_name(username: str)-> UserResponseModel:
return repository.get_user_details_by_name(username)

30
src/users/router.py Normal file
View File

@ -0,0 +1,30 @@
from fastapi import APIRouter, Depends, Body
from src.users.service import user_service
from src.users.schemas import UserResponseModel, UserUpdateModel
from src.login.service import manager
router = APIRouter()
@router.get("/users/", tags=["users"], summary="Get all users")
async def all_users(user=Depends(manager)) -> list[UserResponseModel]:
"""
This is my description of the API endpoint
"""
return await user_service.get_users()
@router.get("/user-details/{user}", tags=["users"],summary="Get user details based on user id or username")
async def user_details(user: str, credentials=Depends(manager)) -> UserResponseModel:
"""
This is my description of the API endpoint
"""
return await user_service.get_user_details(user)
@router.put("/user-details/", tags=["users"],summary="Update user information")
async def user_update(user_info: UserUpdateModel = Body(...),credentials=Depends(manager)):
"""
This is my description of the API endpoint
"""
return await user_service.update_user(user_info)

35
src/users/schemas.py Normal file
View File

@ -0,0 +1,35 @@
from pydantic import BaseModel, Json, EmailStr
from typing import Dict, Union
# from datetime import datetime
# class UserCreateDTO(BaseModel):
# user_id: str
# title: str
# chat_styles: Dict
# chat_properties: Dict
# header_chat_styles: Dict
# auth_form_properties: Dict
# initial_message: Union[str, None]
# message_suggestion: Union[Dict, None]
class UserResponseModel(BaseModel):
id: str
# username: str
firstname: Union[str, None]
lastname: Union[str, None]
email: Union[EmailStr, None]
contact_no: Union[int, None]
# updated_at: datetime
# created_at: datetime
class UserPasswordResponseModel(BaseModel):
id: str
password: str
class UserUpdateModel(BaseModel):
id: str
# username: str
firstname: Union[str, None]
lastname: Union[str, None]
email: Union[EmailStr, None]
password: Union[str, None]

50
src/users/service.py Normal file
View File

@ -0,0 +1,50 @@
import re
from uuid import UUID
from typing import List
from src.users.repository import repository
from src.users.exceptions import is_user_valid, is_password_same, is_email_exist_to_other_user
from src.users.schemas import UserUpdateModel, UserResponseModel
class UserService:
def __init__(self, repository: repository):
self.repository = repository
async def get_users(self) -> List[UserResponseModel]:
return self.repository.get_users()
async def get_user_details(self, user: str) -> UserResponseModel:
is_user_valid(user)
try:
UUID(user, version=4)
user_details = self.repository.get_user_details(user)
except ValueError:
if re.match(r"[^@]+@[^@]+\.[^@]+", user):
user_details = self.repository.get_user_details_by_email(user)
else:
user_details = self.repository.get_user_details_by_name(user)
return user_details
async def update_user(self, user_info: UserUpdateModel) -> dict[str, str]:
is_user_valid(user_info.id)
is_email_exist_to_other_user(user_info.id, user_info.email)
current_user_info = self.repository.get_user_details(user_info.id)
if user_info.firstname is None:
user_info.firstname = current_user_info["firstname"]
if user_info.lastname is None:
user_info.lastname = current_user_info["lastname"]
if user_info.email is None:
user_info.email = current_user_info["email"]
if user_info.password:
is_password_same(user_info.id,user_info.password)
else:
current_password = self.repository.get_user_password(user_info.id)
user_info.password = current_password["password"]
response = self.repository.save_user_info(user_info)
return {"message": response}
user_service = UserService(repository)

View File

@ -0,0 +1,19 @@
from fastapi import WebSocket
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)

40
templates/index.html Normal file
View File

@ -0,0 +1,40 @@
<!doctype html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link href="https://cdnjs.cloudflare.com/ajax/libs/flowbite/1.7.0/flowbite.min.css" rel="stylesheet" />
</head>
<body>
<div class="container mx-auto">
<nav class="bg-white border-gray-200 dark:bg-gray-900">
<div class="max-w-screen-xl flex flex-wrap items-center justify-between mx-auto p-4">
<a href="https://flowbite.com/" class="flex items-center">
<!-- <img src="https://flowbite.com/docs/images/logo.svg" class="h-8 mr-3" alt="Flowbite Logo" />
<span class="self-center text-2xl font-semibold whitespace-nowrap dark:text-white">Flowbite</span> -->
</a>
<button data-collapse-toggle="navbar-default" type="button" class="inline-flex items-center p-2 w-10 h-10 justify-center text-sm text-gray-500 rounded-lg md:hidden hover:bg-gray-100 focus:outline-none focus:ring-2 focus:ring-gray-200 dark:text-gray-400 dark:hover:bg-gray-700 dark:focus:ring-gray-600" aria-controls="navbar-default" aria-expanded="false">
<span class="sr-only">Open main menu</span>
<svg class="w-5 h-5" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 17 14">
<path stroke="currentColor" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M1 1h15M1 7h15M1 13h15"/>
</svg>
</button>
<div class="hidden w-full md:block md:w-auto" id="navbar-default">
<ul class="font-medium flex flex-col p-4 md:p-0 mt-4 border border-gray-100 rounded-lg bg-gray-50 md:flex-row md:space-x-8 md:mt-0 md:border-0 md:bg-white dark:bg-gray-800 md:dark:bg-gray-900 dark:border-gray-700">
<li>
<a href="#" class="block py-2 pl-3 pr-4 text-white bg-blue-700 rounded md:bg-transparent md:text-blue-700 md:p-0 dark:text-white md:dark:text-blue-500" aria-current="page">Home</a>
</li>
<li>
<a href="#" class="block py-2 pl-3 pr-4 text-gray-900 rounded hover:bg-gray-100 md:hover:bg-transparent md:border-0 md:hover:text-blue-700 md:p-0 dark:text-white md:dark:hover:text-blue-500 dark:hover:bg-gray-700 dark:hover:text-white md:dark:hover:bg-transparent">Knowledge Base</a>
</li>
</li>
</ul>
</div>
</div>
</nav>
</div>
</body>
<script src="https://cdn.tailwindcss.com"></script>
</html>

View File

@ -0,0 +1,92 @@
<!doctype html>
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link href="https://cdnjs.cloudflare.com/ajax/libs/flowbite/1.7.0/flowbite.min.css" rel="stylesheet" />
</head>
<body>
<div class="container mx-auto">
<nav class="bg-white border-gray-200 dark:bg-gray-900">
<div class="max-w-screen-xl flex flex-wrap items-center justify-between mx-auto p-4">
<a href="https://flowbite.com/" class="flex items-center">
<!-- <img src="https://flowbite.com/docs/images/logo.svg" class="h-8 mr-3" alt="Flowbite Logo" />
<span class="self-center text-2xl font-semibold whitespace-nowrap dark:text-white">Flowbite</span> -->
</a>
<button data-collapse-toggle="navbar-default" type="button" class="inline-flex items-center p-2 w-10 h-10 justify-center text-sm text-gray-500 rounded-lg md:hidden hover:bg-gray-100 focus:outline-none focus:ring-2 focus:ring-gray-200 dark:text-gray-400 dark:hover:bg-gray-700 dark:focus:ring-gray-600" aria-controls="navbar-default" aria-expanded="false">
<span class="sr-only">Open main menu</span>
<svg class="w-5 h-5" aria-hidden="true" xmlns="http://www.w3.org/2000/svg" fill="none" viewBox="0 0 17 14">
<path stroke="currentColor" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M1 1h15M1 7h15M1 13h15"/>
</svg>
</button>
<div class="hidden w-full md:block md:w-auto" id="navbar-default">
<ul class="font-medium flex flex-col p-4 md:p-0 mt-4 border border-gray-100 rounded-lg bg-gray-50 md:flex-row md:space-x-8 md:mt-0 md:border-0 md:bg-white dark:bg-gray-800 md:dark:bg-gray-900 dark:border-gray-700">
<li>
<a href="#" class="block py-2 pl-3 pr-4 text-gray-900 rounded hover:bg-gray-100 md:hover:bg-transparent md:border-0 md:hover:text-blue-700 md:p-0 dark:text-white md:dark:hover:text-blue-500 dark:hover:bg-gray-700 dark:hover:text-white md:dark:hover:bg-transparent" aria-current="page">Home</a>
</li>
<li>
<a href="#" class="block py-2 pl-3 pr-4 text-white bg-blue-700 rounded md:bg-transparent md:text-blue-700 md:p-0 dark:text-white md:dark:text-blue-500">Knowledge Base</a>
</li>
</li>
</ul>
</div>
</div>
</nav>
<div class="relative overflow-x-auto">
<table class="w-full text-sm text-left text-gray-500 dark:text-gray-400">
<thead class="text-xs text-gray-700 uppercase bg-gray-50 dark:bg-gray-700 dark:text-gray-400">
<tr>
<th scope="col" class="px-6 py-3">
Filename
</th>
<th scope="col" class="px-6 py-3">
Users
</th>
<th scope="col" class="px-6 py-3">
Action
</th>
</tr>
</thead>
<tbody>
<tr class="bg-white border-b dark:bg-gray-800 dark:border-gray-700">
<th scope="row" class="px-6 py-4 font-medium text-gray-900 whitespace-nowrap dark:text-white">
Filename 1
</th>
<td class="px-6 py-4">
User 1
</td>
<td class="px-6 py-4">
View data
</td>
</tr>
<tr class="bg-white border-b dark:bg-gray-800 dark:border-gray-700">
<th scope="row" class="px-6 py-4 font-medium text-gray-900 whitespace-nowrap dark:text-white">
Filename 2
</th>
<td class="px-6 py-4">
User 2
</td>
<td class="px-6 py-4">
View data
</td>
</tr>
<tr class="bg-white dark:bg-gray-800">
<th scope="row" class="px-6 py-4 font-medium text-gray-900 whitespace-nowrap dark:text-white">
Filename 3
</th>
<td class="px-6 py-4">
User 3
</td>
<td class="px-6 py-4">
View data
</td>
</tr>
</tbody>
</table>
</div>
</div>
</body>
<script src="https://cdn.tailwindcss.com"></script>
</html>

View File

@ -0,0 +1,13 @@
<!DOCTYPE html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<form action="/upload-file/" enctype="multipart/form-data" method="post">
<input name="file" type="file" multiple>
<input name="user_id" type="text" value="f3e3194e-9ab7-4c6c-8a9c-167cfe61c8fa" />
<input type="submit">
</form>
</body>
</html>

View File

@ -0,0 +1,50 @@
<!doctype html>
<html>
<head>
<title>Chat</title>
</head>
<body>
<h1>WebSocket Chat</h1>
<form action="" onsubmit="sendMessage(event)">
<label>Message: </label>
<input type="text" id="messageText" autocomplete="off" />
<label>Language Model</label>
<select id="model">
<option value="cohere">Cohere</option>
<option value="openai">OpenAI</option>
</select>
<button>Send</button>
</form>
<ul id="messages"></ul>
<script>
var scheme =
window.location.protocol == "https:" ? "wss://" : "ws://";
var webSocketUri =
scheme +
window.location.hostname +
(location.port ? ":" + location.port : "") +
"/ws";
var websocket = new WebSocket(webSocketUri);
websocket.onmessage = function (event) {
var messages = document.getElementById("messages");
var message = document.createElement("li");
var content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
function sendMessage(event) {
var input = document.getElementById("messageText");
var modelDropDown = document.querySelector("#model");
data = {
// "user_id" : "1",
question: input.value,
model: modelDropDown.value,
};
console.log(data);
websocket.send(JSON.stringify(data));
input.value = "";
event.preventDefault();
}
</script>
</body>
</html>