From 617819eb61c7337a0b14b6c7c782b21b9eaf21af Mon Sep 17 00:00:00 2001 From: Danilo Cesa Date: Sat, 9 Aug 2025 17:27:25 +0800 Subject: [PATCH] Re-upload to own selfhosted git --- .gcloudignore | 20 +++ .gitignore | 9 + Dockerfile | 7 + app.yaml | 6 + cloudbuild.yaml | 22 +++ compute_engine.tf | 67 +++++++ main.py | 118 +++++++++++++ requirements.txt | 67 +++++++ src/bot/exceptions.py | 22 +++ src/bot/repository.py | 121 +++++++++++++ src/bot/router.py | 36 ++++ src/bot/schemas.py | 46 +++++ src/bot/service.py | 89 ++++++++++ src/chat/callback.py | 56 ++++++ src/chat/document_loader.py | 30 ++++ src/chat/index.py | 166 ++++++++++++++++++ src/chat/model_manager.py | 30 ++++ src/chat/schemas.py | 21 +++ src/chat/services/cohere.py | 29 +++ src/chat/services/model_service.py | 9 + src/chat/services/openai.py | 17 ++ src/config.py | 46 +++++ src/conversation/exceptions.py | 0 src/conversation/repository.py | 20 +++ src/conversation/router.py | 16 ++ src/conversation/schema.py | 0 src/conversation/service.py | 25 +++ src/etl/data_transformation_service.py | 84 +++++++++ src/etl/document_loaders/csv_json_loader.py | 23 +++ .../document_loaders/data_loader_strategy.py | 6 + src/etl/document_loaders/pdf_loader.py | 50 ++++++ src/etl/exceptions.py | 8 + src/etl/repository.py | 114 ++++++++++++ src/etl/router.py | 12 ++ src/etl/service.py | 43 +++++ src/files/exceptions.py | 9 + src/files/repository.py | 22 +++ src/files/router.py | 80 +++++++++ src/files/service.py | 42 +++++ src/gcp/bigquery.py | 30 ++++ src/gcp/storage.py | 6 + src/helpers/token_helper.py | 18 ++ src/login/repository.py | 0 src/login/router.py | 12 ++ src/login/service.py | 38 ++++ src/users/__init__.py | 0 src/users/exceptions.py | 51 ++++++ src/users/repository.py | 96 ++++++++++ src/users/router.py | 30 ++++ src/users/schemas.py | 35 ++++ src/users/service.py | 50 ++++++ src/websocket/websocket_manager.py | 19 ++ templates/index.html | 40 +++++ templates/knowledge-base.html | 92 ++++++++++ templates/upload-tester.html | 13 ++ templates/websocket-tester.html | 50 ++++++ 56 files changed, 2168 insertions(+) create mode 100644 .gcloudignore create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 app.yaml create mode 100644 cloudbuild.yaml create mode 100644 compute_engine.tf create mode 100644 main.py create mode 100644 requirements.txt create mode 100644 src/bot/exceptions.py create mode 100644 src/bot/repository.py create mode 100644 src/bot/router.py create mode 100644 src/bot/schemas.py create mode 100644 src/bot/service.py create mode 100644 src/chat/callback.py create mode 100644 src/chat/document_loader.py create mode 100644 src/chat/index.py create mode 100644 src/chat/model_manager.py create mode 100644 src/chat/schemas.py create mode 100644 src/chat/services/cohere.py create mode 100644 src/chat/services/model_service.py create mode 100644 src/chat/services/openai.py create mode 100644 src/config.py create mode 100644 src/conversation/exceptions.py create mode 100644 src/conversation/repository.py create mode 100644 src/conversation/router.py create mode 100644 src/conversation/schema.py create mode 100644 src/conversation/service.py create mode 100644 src/etl/data_transformation_service.py create mode 100644 src/etl/document_loaders/csv_json_loader.py create mode 100644 src/etl/document_loaders/data_loader_strategy.py create mode 100644 src/etl/document_loaders/pdf_loader.py create mode 100644 src/etl/exceptions.py create mode 100644 src/etl/repository.py create mode 100644 src/etl/router.py create mode 100644 src/etl/service.py create mode 100644 src/files/exceptions.py create mode 100644 src/files/repository.py create mode 100644 src/files/router.py create mode 100644 src/files/service.py create mode 100644 src/gcp/bigquery.py create mode 100644 src/gcp/storage.py create mode 100644 src/helpers/token_helper.py create mode 100644 src/login/repository.py create mode 100644 src/login/router.py create mode 100644 src/login/service.py create mode 100644 src/users/__init__.py create mode 100644 src/users/exceptions.py create mode 100644 src/users/repository.py create mode 100644 src/users/router.py create mode 100644 src/users/schemas.py create mode 100644 src/users/service.py create mode 100644 src/websocket/websocket_manager.py create mode 100644 templates/index.html create mode 100644 templates/knowledge-base.html create mode 100644 templates/upload-tester.html create mode 100644 templates/websocket-tester.html diff --git a/.gcloudignore b/.gcloudignore new file mode 100644 index 0000000..8bc1111 --- /dev/null +++ b/.gcloudignore @@ -0,0 +1,20 @@ +# This file specifies files that are *not* uploaded to Google Cloud +# using gcloud. It follows the same syntax as .gitignore, with the addition of +# "#!include" directives (which insert the entries of the given .gitignore-style +# file at that point). +# +# For more information, run: +# $ gcloud topic gcloudignore +# +.gcloudignore +# If you would like to upload your .git directory, .gitignore file or files +# from your .gitignore file, remove the corresponding line +# below: +.git +.gitignore + +# Python pycache: +__pycache__/ +# Ignored by the build system +/setup.cfg +venv/ \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ba6237f --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +venv +__pycache__/ +.env +text/ +.terraform.lock.hcl +.terraform/ +terraform.tfstate +terraform.tfstate.backup +terraform.tfvars \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..b8826e5 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,7 @@ +FROM python:latest +ENV APP_HOME /app +WORKDIR $APP_HOME +COPY . ./ +RUN pip install -r requirements.txt +#CMD [ "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000" ] +CMD exec gunicorn --bind :$PORT --workers 1 --worker-class uvicorn.workers.UvicornWorker --threads 8 main:app \ No newline at end of file diff --git a/app.yaml b/app.yaml new file mode 100644 index 0000000..005e235 --- /dev/null +++ b/app.yaml @@ -0,0 +1,6 @@ +runtime: python311 +# env: flex +entrypoint: gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app +instance_class: F4 +# network: +# session_affinity: true \ No newline at end of file diff --git a/cloudbuild.yaml b/cloudbuild.yaml new file mode 100644 index 0000000..165896e --- /dev/null +++ b/cloudbuild.yaml @@ -0,0 +1,22 @@ +steps: + # Build the container image + - name: 'gcr.io/cloud-builders/docker' + args: ['build', '-t', 'gcr.io/multichannel-chat-widget/backend:$COMMIT_SHA', '.'] + # Push the container image to Container Registry + - name: 'gcr.io/cloud-builders/docker' + args: ['push', 'gcr.io/multichannel-chat-widget/backend:$COMMIT_SHA'] + # Deploy container image to Cloud Run + - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' + entrypoint: gcloud + args: + - 'run' + - 'deploy' + - 'backend' + - '--image' + - 'gcr.io/multichannel-chat-widget/backend:$COMMIT_SHA' + - '--region' + - 'asia-east1' + - '--port' + - '8000' +images: + - 'gcr.io/multichannel-chat-widget/backend:$COMMIT_SHA' \ No newline at end of file diff --git a/compute_engine.tf b/compute_engine.tf new file mode 100644 index 0000000..467937f --- /dev/null +++ b/compute_engine.tf @@ -0,0 +1,67 @@ +terraform { + required_providers { + google = { + source = "hashicorp/google" + version = "4.74.0" + } + } +} + + +provider "google" { + project = "multichannel-chat-widget" + region = "asia-east1" +} + +resource "google_compute_network" "vpc_network" { + name = "multichannel-chat-network" + auto_create_subnetworks = false + mtu = 1460 +} + +resource "google_compute_subnetwork" "default" { + name = "multichannel-chat-subnet" + ip_cidr_range = "10.0.1.0/24" + region = "asia-east1" + network = google_compute_network.vpc_network.id + +} + + +# Create a single Compute Engine instance +resource "google_compute_instance" "default" { + name = "multichannel-chat-vm" + machine_type = "f1-micro" + zone = "asia-east1-a" + tags = ["ssh"] + + boot_disk { + initialize_params { + image = "debian-cloud/debian-11" + } + } + + # Install requirements + metadata_startup_script = "sudo apt-get update; sudo apt-get install -yq build-essential python3-pip rsync; pip install -r requirements.txt" + + network_interface { + subnetwork = google_compute_subnetwork.default.id + + access_config { + # Include this section to give the VM an external IP address + } + } +} + +resource "google_compute_firewall" "ssh" { + name = "allow-ssh" + allow { + ports = ["22"] + protocol = "tcp" + } + direction = "INGRESS" + network = google_compute_network.vpc_network.id + priority = 1000 + source_ranges = ["0.0.0.0/0"] + target_tags = ["ssh"] +} \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..f602b7e --- /dev/null +++ b/main.py @@ -0,0 +1,118 @@ +import os +import sys +import uvicorn +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request +from fastapi.responses import HTMLResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates +from fastapi.middleware.cors import CORSMiddleware +from dotenv import load_dotenv + +from src.websocket.websocket_manager import ConnectionManager +from src.users.router import router as user_router +from src.files.router import router as file_router +from src.bot.router import router as bot_router +from src.etl.router import router as etl_router +from src.login.router import router as login_router +from src.conversation.router import router as conversation_router + +from src.chat.index import chat_manager +from src.chat.callback import StreamingLLMCallbackHandler +from src.chat.schemas import ChatResponse + +load_dotenv() + +tags_metadata = [ + { + "name": "users", + "description": "Operations with users.", + }, + { + "name": "files", + "description": "Operations with file handling.", + }, + { + "name": "bot", + "description": "Operations with bot.", + }, + { + "name": "etl", + "description": "Operations with ETL process.", + }, + { + "name": "conversations", + "description": "Operations with conversations process.", + }, + +] + + +app = FastAPI(openapi_tags=tags_metadata) + + +app.mount("/static", StaticFiles(directory="templates"), name="static") +templates = Jinja2Templates(directory="templates") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=False, + allow_methods=["*"], + allow_headers=["*"], +) + + +app.include_router(user_router) +app.include_router(file_router) +app.include_router(bot_router) +app.include_router(etl_router) +app.include_router(login_router) +app.include_router(conversation_router) + + +@app.get("/",response_class=HTMLResponse) +async def read_index(request: Request): + return templates.TemplateResponse("index.html", {"request": request}) + + +@app.get("/knowledge-base",response_class=HTMLResponse, tags=["ui"]) +async def read_index(request: Request): + return templates.TemplateResponse("knowledge-base.html", {"request": request}) + + +@app.get("/websocket-tester",response_class=HTMLResponse, tags=["tester"], summary="UI for testing websocket") +async def read_index(request: Request): + return templates.TemplateResponse("websocket-tester.html", {"request": request}) + +@app.get("/upload-tester",response_class=HTMLResponse, tags=["tester"], summary="UI for testing file upload") +async def read_index(request: Request): + return templates.TemplateResponse("upload-tester.html", {"request": request}) + + +manager = ConnectionManager() + +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + await manager.connect(websocket) + + while True: + try: + data = await websocket.receive_json() + q = data["message"] + + stream_handler = StreamingLLMCallbackHandler(websocket) + await websocket.send_text(".-botResponseStartStreaming") + response = await chat_manager.chat_response({ + "question": q, + "chat_session": data["chat_session"], + "bot_id": data["bot_id"], + "stream_handler": stream_handler, + }) + + await websocket.send_text(".-botResponseEndStreaming") + except WebSocketDisconnect: + manager.disconnect(websocket) + # await websocket.send_text(f"Connection lost") + +if __name__ == "__main__": + uvicorn.run(app, host='0.0.0.0', port=int(os.environ.get('PORT', 8000)), reload=True) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..56c65ee --- /dev/null +++ b/requirements.txt @@ -0,0 +1,67 @@ +aiohttp==3.8.4 +aiosignal==1.3.1 +anyio==3.7.0 +async-timeout==4.0.2 +attrs==23.1.0 +certifi==2023.5.7 +charset-normalizer==3.1.0 +click==8.1.3 +dataclasses-json==0.5.8 +dnspython==2.3.0 +exceptiongroup==1.1.1 +fastapi==0.98.0 +frozenlist==1.3.3 +greenlet==2.0.2 +gunicorn==20.1.0 +h11==0.14.0 +httptools==0.5.0 +idna==3.4 +langchain==0.0.306 +langchainplus-sdk==0.0.17 +loguru==0.7.0 +marshmallow==3.19.0 +marshmallow-enum==1.5.1 +multidict==6.0.4 +mypy-extensions==1.0.0 +numexpr==2.8.4 +numpy==1.25.0 +openapi-schema-pydantic==1.2.4 +packaging==23.1 +pinecone-client==2.2.2 +pydantic==1.10.9 +python-dateutil==2.8.2 +python-dotenv==1.0.0 +PyYAML==6.0 +regex==2023.6.3 +requests==2.31.0 +six==1.16.0 +sniffio==1.3.0 +SQLAlchemy==2.0.16 +starlette==0.27.0 +tenacity==8.2.2 +tiktoken==0.4.0 +tqdm==4.65.0 +typing-inspect==0.9.0 +typing_extensions==4.6.3 +urllib3==2.0.3 +uvicorn==0.22.0 +uvloop==0.17.0 +watchfiles==0.19.0 +websockets==11.0.3 +yarl==1.9.2 +google-cloud-bigquery==3.11.3 +google-api-python-client==2.93.0 +cohere==4.16.0 +redis +openai +jinja2 +google-cloud-bigquery[pandas] +python-multipart==0.0.6 +google-cloud-storage +PyPDF2 +fsspec +pandas +gcsfs +pyarrow +pydantic[email] +fastapi-login \ No newline at end of file diff --git a/src/bot/exceptions.py b/src/bot/exceptions.py new file mode 100644 index 0000000..1d3a719 --- /dev/null +++ b/src/bot/exceptions.py @@ -0,0 +1,22 @@ +from fastapi import HTTPException + +from src.bot.repository import repository + +def has_bot_parameter(bot_id): + if not bot_id: + print("Exception: Bot id is not provided") + raise HTTPException(status_code=400, detail="Bot id is not provided") + else: + pass + +def is_bot_id_valid(bot_id): + bot_settings = repository.get_bot_settings(bot_id) + if len(bot_settings) == 0: + print("Exception: Bot not found") + raise HTTPException(status_code=404, detail="Bot not found") + if not bot_settings: + print("BotService-> get_bot_settings func-> Error occured while fetching bot settings:") + for error in bot_settings: + print(error) + else: + return bot_settings \ No newline at end of file diff --git a/src/bot/repository.py b/src/bot/repository.py new file mode 100644 index 0000000..84ef968 --- /dev/null +++ b/src/bot/repository.py @@ -0,0 +1,121 @@ +from uuid import uuid4 +from abc import ABC, abstractmethod +from src.config import TBL_BOT_SET, TBL_USER_BOT, VW_USER_BOT_LATEST +from src.gcp.bigquery import executor, client +from src.bot.schemas import BotCreateModel, BotUpdateModel + +# Define the interface (abstract base class) for the Bot_SettingsRepository +class BotsRepositoryInterface(ABC): + @abstractmethod + def get_bot_by_user_id(self, user_id): + pass + + @abstractmethod + def save_bot_settings(self, bot_settings = BotCreateModel): + pass + + @abstractmethod + def update_bot_settings(self, bot_settings = BotUpdateModel): + pass + + # @abstractmethod + # def delete_bot_settings(self, user_id): + # pass + + +# Concrete implementation of the bot_settingsRepository using BigQuery +class BotRepositoryBigQuery(BotsRepositoryInterface): + def __init__(self, executor): + self._executor = executor + + def get_bot_by_user_id(self, user_id): + query = f"SELECT * FROM `{VW_USER_BOT_LATEST}` WHERE user_id = '{user_id}'" + return self._executor.execute_query(query) + + + def save_user_bot(self,user_id): + id = str(uuid4()) + row_to_insert = { + "id": id, + "user_id" : user_id + } + + errors = client.insert_rows_json(TBL_USER_BOT, [row_to_insert]) + + if not errors: + return id + else: + msg = f"Errors occurred while saving bot for user {user_id}: {errors}" + print(msg) + return {"message":msg} + + def save_bot_settings(self, bot_settings = BotCreateModel): + bot_id = self.save_user_bot(bot_settings['user_id']) + row_to_insert = { + "id": str(uuid4()), + "bot_id": bot_id, + "title": bot_settings['title'], + "description": bot_settings["description"], + "chat_styles": bot_settings['chat_styles'], + "initial_message": bot_settings['initial_message'], + "message_suggestion": bot_settings['message_suggestion'], + "allow_feedback_response": bot_settings["allow_feedback_response"], + "show_source_in_response": bot_settings["show_source_in_response"], + "advance_settings": bot_settings["advance_settings"] + } + + errors = client.insert_rows_json(TBL_BOT_SET, [row_to_insert]) + + if not errors: + msg = f"Bot settings for bot {bot_id} saved successfully." + print(msg) + return {"message":msg} + else: + msg = f"Errors occurred while saving bot settings for bot_id: {bot_id}: {errors}" + print(msg) + return {"message":msg} + + def get_bot_settings(self,bot_id): + query = f"SELECT * FROM `{VW_USER_BOT_LATEST}` WHERE bot_id = '{bot_id}'" + return self._executor.execute_query(query) + + def update_bot_settings(self, bot_settings = BotUpdateModel): + row_to_insert = { + "id": str(uuid4()), + "bot_id": bot_settings['bot_id'], + "title": bot_settings['title'], + "description": bot_settings["description"], + "chat_styles": bot_settings['chat_styles'], + "initial_message": bot_settings['initial_message'], + "message_suggestion": bot_settings['message_suggestion'], + "allow_feedback_response": bot_settings["allow_feedback_response"], + "show_source_in_response": bot_settings["show_source_in_response"], + "advance_settings": bot_settings["advance_settings"] + } + + errors = client.insert_rows_json(TBL_BOT_SET, [row_to_insert]) + + if not errors: + msg = f"Bot settings for bot {bot_settings['bot_id']} saved successfully." + print(msg) + return {"message":msg} + else: + msg = f"Errors occurred while saving bot settings for bot_id: {bot_settings['bot_id']}: {errors}" + print(msg) + return {"message":msg} + + # def delete_bot_settings(self, user_id): + # # Delete the row from the BigQuery table + # delete_condition = f"user_id = '{user_id}'" + # errors = client.delete_rows(f"{BIGQUERY_SCHEMA}.bot_settings", delete_condition) + + # if not errors: + # msg = f"UI settings for user {user_id} deleted successfully." + # print(msg) + # else: + # msg = f"Errors occurred while deleting UI settings for user {user_id}: {errors}" + # print(msg) + + # return msg + +repository = BotRepositoryBigQuery(executor) diff --git a/src/bot/router.py b/src/bot/router.py new file mode 100644 index 0000000..f45ffdf --- /dev/null +++ b/src/bot/router.py @@ -0,0 +1,36 @@ +from fastapi import APIRouter, Body, Depends + +from src.bot.service import bot_service +from src.bot.schemas import BotCreateModel, BotUpdateModel, BotResponseModel +from src.login.service import manager + +router = APIRouter() + +@router.get("/bot/{user_id}", tags=["bot"], summary="Get user bot") +async def get_user_bot(user_id: str,user=Depends(manager)): + return await bot_service.get_user_bot(user_id) + +@router.post("/bot/", tags=["bot"], summary="Create user bot") +async def post_bot(bot_settings: BotCreateModel = Body(...),user=Depends(manager)): + # print(bot_settings) + return await bot_service.create_bot(bot_settings) + +#Bot settings +@router.get("/bot/settings/{bot_id}", tags=["bot"], summary="Get bot settings") +async def get_bot_settings(bot_id: str): + result = await bot_service.get_bot_settings(bot_id) + return result[0] + +@router.put("/bot/settings/", tags=["bot"], summary="Update bot settings") +async def put_bot(bot_settings: BotUpdateModel = Body(...),user=Depends(manager)): + return await bot_service.update_bot_settings(bot_settings) + + +# @router.delete("/ui_settings/{user_id}", tags=["ui-settings"], summary="Update user ui settings") +# async def delete_ui_settings( +# user_id: str, +# ): +# """ +# This is my description of the API endpoint +# """ +# return await delete_ui_settings(user_id) \ No newline at end of file diff --git a/src/bot/schemas.py b/src/bot/schemas.py new file mode 100644 index 0000000..a50651b --- /dev/null +++ b/src/bot/schemas.py @@ -0,0 +1,46 @@ +from pydantic import BaseModel, Json +from typing import Dict, Union +from datetime import datetime + +class BotCreateModel(BaseModel): + user_id: str + title: str + description: Union[str, None] + chat_styles: Dict + initial_message: Union[str, None] + message_suggestion: Union[Dict, None] + allow_feedback_response: Union[bool, None] + show_source_in_response: Union[bool, None] + advance_settings: Dict + + +class BotResponseModel(BaseModel): + bot_id: str + title: str + description: Union[str, None] + chat_styles: Json + initial_message: Union[str, None] + message_suggestion: Json + allow_feedback_response: Union[bool, None] + show_source_in_response: Union[bool, None] + advance_settings: Json + updated_at: datetime + created_at: datetime + +class BotListResponseModel(BaseModel): + bot_id: str + title: str + data_source_count: Union[int, None] + updated_at: datetime + created_at: datetime + +class BotUpdateModel(BaseModel): + bot_id: str + title: str + description: Union[str, None] + chat_styles: Dict + initial_message: Union[str, None] + message_suggestion: Dict + allow_feedback_response: Union[bool, None] + show_source_in_response: Union[bool, None] + advance_settings: Dict diff --git a/src/bot/service.py b/src/bot/service.py new file mode 100644 index 0000000..fe488d1 --- /dev/null +++ b/src/bot/service.py @@ -0,0 +1,89 @@ +import json + +from src.bot.repository import repository +from src.bot.schemas import BotResponseModel, BotCreateModel, BotUpdateModel,BotListResponseModel +from src.users.exceptions import is_user_valid, has_user_bot +from src.bot.exceptions import has_bot_parameter, is_bot_id_valid + +class BotService: + def __init__(self, repository: repository): + self.repository = repository + + async def get_user_bot(self, user_id: str) -> list[BotListResponseModel]: + is_user_valid(user_id) + bot_settings = self.repository.get_bot_by_user_id(user_id) + has_user_bot(bot_settings) + if isinstance(bot_settings, dict): + bot_settings = [bot_settings] + + return [ + BotListResponseModel(**settings) + for settings in bot_settings + ] + + async def create_bot(self, bot_settings_dto: BotCreateModel) -> dict[str, str]: + is_user_valid(bot_settings_dto.user_id) + + self._validate_json_fields(bot_settings_dto) + bot_settings_data = self._convert_json_fields(bot_settings_dto) + bot_settings_data['user_id'] = bot_settings_dto.user_id + response = self.repository.save_bot_settings(bot_settings_data) + + return {"message": response} + + async def get_bot_settings(self, bot_id: str) -> BotResponseModel: + has_bot_parameter(bot_id) + bot_settings = is_bot_id_valid(bot_id) + return self._create_response_dto_list(bot_settings) + + async def update_bot_settings(self, bot_settings_dto: BotUpdateModel) -> dict[str, str]: + is_bot_id_valid(bot_settings_dto.bot_id) + + self._validate_json_fields(bot_settings_dto) + bot_settings_data = self._convert_json_fields(bot_settings_dto) + bot_settings_data['bot_id'] = bot_settings_dto.bot_id + + response = self.repository.update_bot_settings(bot_settings_data) + + return {"message": response} + + def _convert_json_fields(self, dto): + return { + "title": dto.title, + "description": dto.description, + "chat_styles": json.dumps(dto.chat_styles), + "message_suggestion": json.dumps(dto.message_suggestion), + "initial_message": dto.initial_message, + "advance_settings": json.dumps(dto.advance_settings), + "allow_feedback_response": dto.allow_feedback_response, + "show_source_in_response": dto.show_source_in_response + } + + def _validate_json_fields(self, dto): + # Add any necessary validation for JSON fields here + pass + + def _create_response_dto_list(self, settings_list): + + if isinstance(settings_list, dict): + settings_list = [settings_list] + + return [ + BotResponseModel( + bot_id=settings['bot_id'], + title=settings['title'], + description=settings["description"], + chat_styles=settings['chat_styles'], + initial_message=settings['initial_message'], + message_suggestion=settings['message_suggestion'], + advance_settings=settings['advance_settings'], + allow_feedback_response=settings['allow_feedback_response'], + show_source_in_response=settings["show_source_in_response"], + updated_at=settings['updated_at'], + created_at=settings['created_at'] + ) + for settings in settings_list + ] + + +bot_service = BotService(repository) \ No newline at end of file diff --git a/src/chat/callback.py b/src/chat/callback.py new file mode 100644 index 0000000..0a79dd3 --- /dev/null +++ b/src/chat/callback.py @@ -0,0 +1,56 @@ +import json +import time +from datetime import datetime +from typing import Any, List, Tuple, Optional, Dict + +from langchain.callbacks.base import AsyncCallbackHandler,BaseCallbackHandler +from langchain.callbacks.streaming_stdout_final_only import FinalStreamingStdOutCallbackHandler +from src.chat.schemas import ChatResponse + + +class StreamingLLMCallbackHandler(FinalStreamingStdOutCallbackHandler): + """Callback handler for streaming LLM responses.""" + + def __init__( + self, + websocket, + answer_prefix_tokens: Optional[List[str]] = ['Final', 'Answer', ':'] + ): + self.websocket = websocket + self.answer_prefix_tokens = [token.strip() for token in answer_prefix_tokens] + self.last_tokens = [""] * len(answer_prefix_tokens) + self.rolling_buffer = [] + + def on_llm_start( + self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any + ) -> None: + """Run when LLM starts running.""" + self.answer_reached = False + + + async def on_llm_new_token(self, token: str, **kwargs: Any) -> None: + + """Rewrite on_llm_new_token to send the final answer to the client.""" + # Append the current token to the rolling buffer + self.rolling_buffer.append(token.strip()) + print(' self.rolling_buffer: ', self.rolling_buffer) + + # Keep the buffer size to match the length of answer_prefix_tokens + if len(self.rolling_buffer) > len(self.answer_prefix_tokens): + self.rolling_buffer.pop(0) # Remove the oldest token + + # Check if the rolling buffer matches the answer_prefix_tokens + if self.rolling_buffer == self.answer_prefix_tokens: + self.answer_reached = True + print("token:", token) + + # If the final answer has been reached, append the tokens to the final answer list + if self.answer_reached: + # resp = ChatResponse(sender="bot", message=token, type="stream") + await self.websocket.send_text(token) + + async def on_llm_end(self, token: str, **kwargs: Any) -> None: + """Run when LLM ends running.""" + print("LLM END") + # if self.answer_reached: + # print('end') \ No newline at end of file diff --git a/src/chat/document_loader.py b/src/chat/document_loader.py new file mode 100644 index 0000000..3de407d --- /dev/null +++ b/src/chat/document_loader.py @@ -0,0 +1,30 @@ +from langchain.document_loaders import BigQueryLoader +from langchain.text_splitter import RecursiveCharacterTextSplitter +#from langchain.document_loaders import DirectoryLoader + +from src.config import VW_BOT_CON_LATEST +from src.helpers.token_helper import tiktoken_len + + +text_splitter = RecursiveCharacterTextSplitter( + chunk_size=100, + chunk_overlap=25, + length_function=tiktoken_len, + separators=["\n\n", "\n", " ", ""] +) + +def document_loader(bot_id): + query = f"SELECT data FROM `{VW_BOT_CON_LATEST}` where widget_id = '{bot_id}'" + # loader = DirectoryLoader(f'text/', glob='**/*.txt') + loader = BigQueryLoader(query,page_content_columns=["data"]) + documents = loader.load() + + # print(f"Documents loaded: {documents}") + # print (f'You have {len(documents)} document(s)') + num_words = sum([len(doc.page_content.split(' ')) for doc in documents]) + # print (f'You have roughly {num_words} words in your docs') + + # Split your documents into texts + texts = text_splitter.split_documents(documents) + + return texts diff --git a/src/chat/index.py b/src/chat/index.py new file mode 100644 index 0000000..2b21abd --- /dev/null +++ b/src/chat/index.py @@ -0,0 +1,166 @@ +import pinecone + +from langchain.vectorstores import Pinecone as PineconeStore +from langchain.memory import ConversationBufferMemory +from langchain.chains import RetrievalQA +from langchain.memory.chat_message_histories import RedisChatMessageHistory +# from langchain.vectorstores.redis import Redis +from langchain.agents import initialize_agent, Tool +from langchain.agents import AgentType + +from langchain.tools.render import render_text_description +from langchain.agents.output_parsers import ReActSingleInputOutputParser +from langchain.agents.format_scratchpad import format_log_to_str +from langchain import hub +from langchain.agents import AgentExecutor + +from src.config import REDIS_HOST, PINECONE_API_ENV, PINECONE_API_KEY, PINECONE_INDEX_NAME +from src.chat.document_loader import document_loader +from src.chat.model_manager import ModelManager +from src.bot.service import bot_service + +class BaseMessage: + def __init__(self, question, result): + self.question = question + self.result = result + +class ChatManager: + def __init__(self): + self.chat_history = None + self.model_manager = ModelManager() + self.llm = None + self.redis_message_history = None + + async def initialize(self, data): + try: + return await self.chat_response(data) + except Exception as e: + return f"--------------------There's an error occurred in initializing chat manager: {e}" + + async def get_llm(self, data): + try: + bot_details = await bot_service.get_bot_settings(data["bot_id"]) + bot_details = bot_details[0].advance_settings + llm_ecosystem = bot_details.llm_model if hasattr(bot_details, 'llm_model') else "openai" + except Exception as e: + return f"--------------------There's an error occurred in get llm method: {e}" + + self.llm = self.model_manager.llm_selector(llm_ecosystem,data['stream_handler']) + return llm_ecosystem + + def nlp_process(self, data): + embeddings = self.model_manager.embedding_selector(data["embedding"]) + try: + self.redis_message_history = RedisChatMessageHistory(url=REDIS_HOST, ttl=600, session_id=data["chat_session"]) + print('--------------------redis message_history: ', self.redis_message_history.messages) + except Exception as e: + print('error redischathistory: ', e) + return {"message": f"An error occurred : {e}"} + try: + memory = ConversationBufferMemory(memory_key="memory",chat_memory=self.redis_message_history, return_messages=True) + self.chat_history = memory + print('--------------------memory: ', memory) + except Exception as e: + print('error conversation buffer memory: ', e) + return {"message": f"An error occurred : {e}"} + try: + docs = document_loader(data["bot_id"]) + print('--------------------docs: ', docs) + except Exception as e: + print('error document loader: ', e) + return {"message": f"An error occurred : {e}"} + + + + try: + query = data['question'] + query_result = embeddings.embed_query(query) + doc_result = embeddings.embed_documents([t.page_content for t in docs]) + print('--------------------Store embeddings to vector store: Pinecone') + pinecone.init( + api_key=PINECONE_API_KEY, # find at app.pinecone.io + environment=PINECONE_API_ENV # next to api key in console + ) + index = pinecone.Index(PINECONE_INDEX_NAME) + # print(index.describe_index_stats()) + # index.delete(deleteAll='true', namespace='') + except Exception as e: + print(f"--------------------Error on Pinecone process: {e}") + + docsearch = PineconeStore.from_texts([t.page_content for t in docs], embeddings, index_name=PINECONE_INDEX_NAME) + retriever = self.model_manager.retriever_selector(data["retriever"], docsearch) + qa_tool = RetrievalQA.from_chain_type( + llm=self.llm, retriever=retriever, memory=memory, + verbose=True + ) + return qa_tool + + def custom_agent(self, tools): + max_iterations = 3 + handle_parsing_errors = True + early_stopping_method="generate" + verbose=True + agent = AgentType.REACT_DOCSTORE + try: + return initialize_agent( + tools, + self.llm, + agent, + max_iterations, + handle_parsing_errors, + early_stopping_method, + verbose, + memory=self.chat_history, + ) + except Exception as e: + return f"Error occured in custom agent function: {e}" + + async def chat_response(self,data): + print(f"--------------------chat history: {self.chat_history}") + llm_ecosystem = await self.get_llm(data) + qa_tool = self.nlp_process({ + "question": data["question"], + "chat_history": self.chat_history, + "embedding":llm_ecosystem, + "retriever":llm_ecosystem, + "bot_id":data["bot_id"], + "chat_session": data["chat_session"] + }) + + tools = [ + Tool( + name='Document Store', + func=qa_tool.run, + description="Use it to lookup information from the document store. You must return a final answer, not an action. If you can't return a final answer, return you don't know the answer.", + ), + ] + + prompt = hub.pull("hwchase17/react-chat") + prompt = prompt.partial( + tools=render_text_description(tools), + tool_names=", ".join([t.name for t in tools]), + ) + + llm_with_stop = self.llm.bind(stop=["\nObservation"]) + try: + agent = { + "input": lambda x: x["input"], + "agent_scratchpad": lambda x: format_log_to_str(x['intermediate_steps']), + "chat_history": lambda x: x["memory"] + } | prompt | llm_with_stop | ReActSingleInputOutputParser() + + agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, memory=self.chat_history, max_iterations=3, early_stopping_method= 'generate') + result = agent_executor.invoke({"input": data["question"]})['output'] + + self.redis_message_history.add_user_message(data["question"]) + + except Exception as e: + return f"There's an error occured in chat response function: {e}" + + + print(f"Result: {result}") + self.redis_message_history.add_ai_message(result) + + return result + +chat_manager = ChatManager() \ No newline at end of file diff --git a/src/chat/model_manager.py b/src/chat/model_manager.py new file mode 100644 index 0000000..918a62c --- /dev/null +++ b/src/chat/model_manager.py @@ -0,0 +1,30 @@ +from src.chat.services.cohere import CohereService +from src.chat.services.openai import OpenAIService +from src.chat.services.model_service import ModelService + +class ModelManager: + SUPPORTED_MODELS = {"cohere": CohereService, "openai": OpenAIService} + def __init__(self) -> None: + pass + + def model_checker(self,model): + if model not in self.SUPPORTED_MODELS: + raise ValueError("Selected model not supported") + else: + return + + def create_model_service(self, model) -> ModelService: + self.model_checker(model) + return self.SUPPORTED_MODELS[model]() + + def retriever_selector(self, model, docsearch) -> str: + model_service = self.create_model_service(model) + return model_service.retriever(docsearch) + + def embedding_selector(self, model) -> str: + model_service = self.create_model_service(model) + return model_service.embeddings() + + def llm_selector(self, model, stream_handler = None) -> str: + model_service = self.create_model_service(model) + return model_service.llm(stream_handler) \ No newline at end of file diff --git a/src/chat/schemas.py b/src/chat/schemas.py new file mode 100644 index 0000000..7b21d94 --- /dev/null +++ b/src/chat/schemas.py @@ -0,0 +1,21 @@ +from pydantic import BaseModel,validator + + +class ChatResponse(BaseModel): + """Chat response schema.""" + + sender: str + message: str + type: str + + @validator("sender") + def sender_must_be_bot_or_you(cls, v): + if v not in ["bot", "human"]: + raise ValueError("sender must be bot or human") + return v + + @validator("type") + def validate_message_type(cls, v): + if v not in ["start", "stream", "end", "error", "info"]: + raise ValueError("type must be start, stream or end") + return v diff --git a/src/chat/services/cohere.py b/src/chat/services/cohere.py new file mode 100644 index 0000000..a4cc3cc --- /dev/null +++ b/src/chat/services/cohere.py @@ -0,0 +1,29 @@ +import os + +from langchain.llms import Cohere +from langchain.embeddings import CohereEmbeddings +from langchain.retrievers import ContextualCompressionRetriever +from langchain.retrievers.document_compressors import CohereRerank + +from src.chat.services.model_service import ModelService +from src.config import COHERE_API + +os.environ["COHERE_API_KEY"] = COHERE_API + +LLM_COHERE = Cohere(cohere_api_key=COHERE_API) + +COMPRESSOR_COHERE = CohereRerank() + +EMBEDDINGS_COHERE = CohereEmbeddings(cohere_api_key=COHERE_API) + +class CohereService(ModelService): + def llm(self) -> str: + return LLM_COHERE + + def embeddings(self) ->str: + return EMBEDDINGS_COHERE + + def retriever(self,docsearch) -> str: + return ContextualCompressionRetriever( + base_compressor=COMPRESSOR_COHERE, base_retriever=docsearch.as_retriever() + ) \ No newline at end of file diff --git a/src/chat/services/model_service.py b/src/chat/services/model_service.py new file mode 100644 index 0000000..e8a0d81 --- /dev/null +++ b/src/chat/services/model_service.py @@ -0,0 +1,9 @@ +class ModelService: + def retriever(self, docsearch) -> str: + pass + + def embeddings(self) -> str: + pass + + def llm(self, stream_handler = None) -> str: + pass \ No newline at end of file diff --git a/src/chat/services/openai.py b/src/chat/services/openai.py new file mode 100644 index 0000000..aa45d75 --- /dev/null +++ b/src/chat/services/openai.py @@ -0,0 +1,17 @@ +from langchain.llms import OpenAI +from langchain.embeddings.openai import OpenAIEmbeddings +from langchain.callbacks.manager import AsyncCallbackManager + +from src.config import OPENAI_API_KEY +from src.chat.services.model_service import ModelService + +class OpenAIService(ModelService): + def llm(self,stream_handler) -> str: + stream_manager = AsyncCallbackManager([stream_handler]) + return OpenAI(temperature=0, callback_manager=stream_manager, streaming=True, openai_api_key=OPENAI_API_KEY) + + def embeddings(self) -> str: + return OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY) + + def retriever(self,docsearch) -> str: + return docsearch.as_retriever() diff --git a/src/config.py b/src/config.py new file mode 100644 index 0000000..68569ac --- /dev/null +++ b/src/config.py @@ -0,0 +1,46 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID") + +OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") +OPENAI_MODEL_NAME = os.environ.get("OPENAI_MODEL_NAME") + +PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY") +PINECONE_API_ENV = os.environ.get("PINECONE_API_ENV") +PINECONE_INDEX_NAME = GCP_PROJECT_ID + +ENCODING_FOR_MODEL = os.environ.get("ENCODING_FOR_MODEL") +ENCODING = os.environ.get("ENCODING") + +REDIS_HOST = os.environ.get("REDIS_HOST") +REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD") + +COHERE_API = os.environ.get("COHERE_API") + +SUPPORTED_MODEL = os.environ.get("SUPPORTED_MODEL") + + +BIGQUERY_SCHEMA = f"{GCP_PROJECT_ID}.data_src" +GCP_UPLOAD_BUCKET = os.environ.get("GCP_UPLOAD_BUCKET") + + +#Table list +TBL_KWNBASE_CON = f"{BIGQUERY_SCHEMA}.knowledgebase_content" +TBL_KWNBASE_SRC = f"{BIGQUERY_SCHEMA}.knowledgebase_source" +TBL_USER_BOT = f"{BIGQUERY_SCHEMA}.user_bot" +TBL_BOT_SET = f"{BIGQUERY_SCHEMA}.bot_settings" +TBL_USERS = f"{BIGQUERY_SCHEMA}.users" +TBL_CUST_CHAT = f"{BIGQUERY_SCHEMA}.customer_chat" +TBL_CUST_CHAT_LOGS = f"{BIGQUERY_SCHEMA}.customer_chat_logs" + + +#View list +VW_USER_LIST_LATEST = f"{BIGQUERY_SCHEMA}.vw_user_list_latest" +VW_USER_BOT_LATEST = f"{BIGQUERY_SCHEMA}.vw_user_bot_latest" +VW_USER_FILES_LATEST = f"{BIGQUERY_SCHEMA}.vw_user_files_latest" +VW_BOT_CON_LATEST = f"{BIGQUERY_SCHEMA}.vw_latest_bot_knowledgebase_content" + +LOGIN_SECRET_KEY = os.environ.get("LOGIN_SECRET_KEY") \ No newline at end of file diff --git a/src/conversation/exceptions.py b/src/conversation/exceptions.py new file mode 100644 index 0000000..e69de29 diff --git a/src/conversation/repository.py b/src/conversation/repository.py new file mode 100644 index 0000000..a23a9ad --- /dev/null +++ b/src/conversation/repository.py @@ -0,0 +1,20 @@ +from src.gcp.bigquery import client, executor +from src.config import TBL_CUST_CHAT,TBL_CUST_CHAT_LOGS + +class ConversationRepository: + def __init__(self, executor): + self._executor = executor + + def save_temp_session(self, user_session, bot_id): + query = f"INSERT INTO`{TBL_CUST_CHAT}` (user_session,bot_id, is_online) VALUES ('{user_session}', '{bot_id}',TRUE)" + return self._executor.execute_query(query) + + def get_customer_chat_session(self,customer_session): + query = f"SELECT id FROM `{TBL_CUST_CHAT}` WHERE user_session = '{customer_session}' AND is_online = TRUE ORDER BY updated_at LIMIT 1" + return self._executor.execute_query(query) + + def get_chat_logs(self,chat_session): + query = f"SELECT * FROM `{TBL_CUST_CHAT_LOGS}` WHERE customer_chat_id = '{chat_session}'" + return self._executor.execute_query(query) + +repository = ConversationRepository(executor) \ No newline at end of file diff --git a/src/conversation/router.py b/src/conversation/router.py new file mode 100644 index 0000000..a201635 --- /dev/null +++ b/src/conversation/router.py @@ -0,0 +1,16 @@ +from fastapi import APIRouter, Body, Depends + +# from src.bot.service import bot_service +# from src.bot.schemas import BotCreateModel, BotUpdateModel, BotResponseModel +from src.login.service import manager +from src.conversation.service import conversation_service + +router = APIRouter() + +@router.get("/conversation/new/{bot_id}", tags=["conversations"], summary="Get user conversation session") +async def get_user_conversation_session(bot_id: str): + return conversation_service.save_customer_chat(bot_id) + +@router.get("/conversation/logs/{chat_session}", tags=["conversations"], summary="Get conversation logs") +async def get_conversation_logs(chat_session: str): + return conversation_service.get_chat_logs(chat_session) \ No newline at end of file diff --git a/src/conversation/schema.py b/src/conversation/schema.py new file mode 100644 index 0000000..e69de29 diff --git a/src/conversation/service.py b/src/conversation/service.py new file mode 100644 index 0000000..b5d69c0 --- /dev/null +++ b/src/conversation/service.py @@ -0,0 +1,25 @@ +from uuid import uuid4 +from src.conversation.repository import repository + +class ConversationService: + def __init__(self, repository: repository): + self.repository = repository + + def save_customer_chat(self,bot_id: str): + user_temp_session = str(uuid4()) + try: + repository.save_temp_session(user_temp_session,bot_id) + chat_session = repository.get_customer_chat_session(user_temp_session) + return {"message": "Save customer chat successful", "user_session": user_temp_session, "chat_session":chat_session['id']} + + except Exception as e: + return {"message": f"An error occurred while saving customer chat: {e}"} + + def get_chat_logs(self,chat_session:str): + try: + chat_logs = repository.get_chat_logs(chat_session) + return {"message": "Success getting chat logs", "chat_logs": chat_logs} + except Exception as e: + return {"message": f"An error occured while getting chat logs: {e}"} + +conversation_service = ConversationService(repository) \ No newline at end of file diff --git a/src/etl/data_transformation_service.py b/src/etl/data_transformation_service.py new file mode 100644 index 0000000..d2c011d --- /dev/null +++ b/src/etl/data_transformation_service.py @@ -0,0 +1,84 @@ +import pandas as pd +from uuid import uuid4 + +from src.gcp.storage import StorageClientFactory +from src.etl.repository import repository + +# import json + +BUCKET_NAME_TMP = 'knowledge-base-tmp' +BUCKET_NAME_SRC = 'knowledge-base-src' + + +# Factory Method to create pd.read_csv() objects +class CsvReaderFactory: + @staticmethod + def read_csv(uri, **kwargs): + return pd.read_csv(uri, **kwargs) + +def get_user_id(filename): + print(f"DataTransformation->get_user_id func->Get user id from filename") + filename_uid = filename.split('.')[1] + uid = f"{filename_uid.split('-')[1]}-{filename_uid.split('-')[2]}-{filename_uid.split('-')[3]}-{filename_uid.split('-')[4]}-{filename_uid.split('-')[5]}" + print(f"DataTransformation->Get user id func->User id: {uid}") + return uid + +def get_bot_id(filename): + print(f"DataTransformation->get_bot_id func->get bot id from filename") + filename_uid = filename.split('.')[1] + wid = f"{filename_uid.split('-')[6]}-{filename_uid.split('-')[7]}-{filename_uid.split('-')[8]}-{filename_uid.split('-')[9]}-{filename_uid.split('-')[10]}" + print(f"DataTransformation->get_bot_id func->bot id: {wid}") + return wid + +def flatten_data(filename): + print(f"DataTransformation->flatten_data func->Start flatten data") + uri = f"gs://{BUCKET_NAME_TMP}/{filename}" + print(f"DataTransformation->flatten_data func->Read created tmp csv: {uri}") + + # Read CSV using pd.read_csv() with the factory method + df = CsvReaderFactory.read_csv(uri, on_bad_lines='skip') + + df.astype(str).agg(', '.join, axis=1) + df.dropna(how='all') + records = [ + { + 'id': str(uuid4()), + 'filename': filename, + 'data': df.to_string() + } + ] + dframe = pd.DataFrame( + records, + columns=[ + "id", + "filename", + "data" + ] + ) + print(f"DataTransformation->flatten_data func->Dataframe from tmp csv: {dframe}") + return dframe + +def delete_uploaded_file(filename): + print(f"DataTransformation->delete_uploaded_file func->Delete original file: {filename}") + client = StorageClientFactory.create_client() + bucket = client.get_bucket(BUCKET_NAME_SRC) + blob = bucket.blob(filename) + blob.delete() + return {"message": "Deleted uploaded file"} + +# def transform_data(data, new_filename, orig_filename): +# print("DataTransformation->Start transform data") + +# client = StorageClientFactory.create_client() +# bucket = client.get_bucket(BUCKET_NAME) +# blob = bucket.blob(new_filename) +# blob.upload_from_string(data) +# print(f"DataTransformation->Created csv from upload file: {new_filename}") + +# dframe = flatten_data(new_filename) + +# blob.delete() +# delete_uploaded_file(orig_filename) +# repository.load_data(dframe, get_user_id(new_filename), new_filename) + +storage_client = StorageClientFactory.create_client() \ No newline at end of file diff --git a/src/etl/document_loaders/csv_json_loader.py b/src/etl/document_loaders/csv_json_loader.py new file mode 100644 index 0000000..5152f69 --- /dev/null +++ b/src/etl/document_loaders/csv_json_loader.py @@ -0,0 +1,23 @@ + +from src.etl.repository import repository +from src.etl.document_loaders.data_loader_strategy import DataLoaderStrategy +from src.etl.data_transformation_service import storage_client, BUCKET_NAME_TMP, flatten_data, delete_uploaded_file, get_user_id, get_bot_id + + +class CsvDataLoader(DataLoaderStrategy): + def load_data(data, new_filename, orig_filename, file_type): + print("CSVJsonLoader->load data func->Read file") + + bucket = storage_client.get_bucket(BUCKET_NAME_TMP) + blob = bucket.blob(new_filename) + blob.upload_from_string(data) + blob.make_public() + file_url = blob.public_url + print(f"CSVJsonLoader->load data func->Created tmp csv from upload file: {new_filename}") + + dframe = flatten_data(new_filename) + + # blob.delete() + file_type = 'json' if file_type == 'application/json' else 'csv' + repository.load_data(dframe, get_user_id(new_filename), new_filename, file_type, get_bot_id(new_filename), file_url) + delete_uploaded_file(orig_filename) diff --git a/src/etl/document_loaders/data_loader_strategy.py b/src/etl/document_loaders/data_loader_strategy.py new file mode 100644 index 0000000..0d959c2 --- /dev/null +++ b/src/etl/document_loaders/data_loader_strategy.py @@ -0,0 +1,6 @@ +from abc import ABC, abstractmethod + +class DataLoaderStrategy(ABC): + @abstractmethod + def load_data(self, data, new_filename, orig_filename, type, bot_id): + pass diff --git a/src/etl/document_loaders/pdf_loader.py b/src/etl/document_loaders/pdf_loader.py new file mode 100644 index 0000000..5c95bc9 --- /dev/null +++ b/src/etl/document_loaders/pdf_loader.py @@ -0,0 +1,50 @@ +import pandas as pd +import gcsfs +from uuid import uuid4 +from PyPDF2 import PdfReader + +from src.config import GCP_PROJECT_ID,GCP_UPLOAD_BUCKET +from src.etl.repository import repository +from src.etl.document_loaders.data_loader_strategy import DataLoaderStrategy +from src.etl.data_transformation_service import delete_uploaded_file, get_user_id, get_bot_id + +class PdfDataLoader(DataLoaderStrategy): + def load_data(new_filename, orig_filename): + gcs_file_system = gcsfs.GCSFileSystem(project=GCP_PROJECT_ID) + gcs_pdf_path = f"gs://{GCP_UPLOAD_BUCKET}/{orig_filename}" + print(f"PdfDataLoader->load data func-> pdf path to read: {gcs_pdf_path}") + url = f"https://storage.googleapis.com/{GCP_UPLOAD_BUCKET}/{new_filename}" + f_object = gcs_file_system.open(gcs_pdf_path, "rb") + + # creating a pdf reader object + reader = PdfReader(f_object) + page_full_content = [] + # getting a specific page from the pdf file + number_of_pages = len(reader.pages) + print(f"PdfDataLoader->load data func-> number of pdf page: {number_of_pages}") + + for page_number in range(number_of_pages): + page = reader.pages[page_number] + page_content = page.extract_text() + page_full_content.append(page_content) + + records = [ + { + 'id': str(uuid4()), + 'filename': orig_filename, + 'data': ''.join(page_full_content) + } + ] + f_object.close() + + dframe = pd.DataFrame( + records, + columns=[ + "id", + "filename", + "data" + ] + ) + + repository.load_data(dframe, get_user_id(new_filename), new_filename, "pdf", get_bot_id(new_filename), url) + # delete_uploaded_file(orig_filename) diff --git a/src/etl/exceptions.py b/src/etl/exceptions.py new file mode 100644 index 0000000..1fac44d --- /dev/null +++ b/src/etl/exceptions.py @@ -0,0 +1,8 @@ +from fastapi import HTTPException + +def is_cloud_event_data_valid(cloud_event_data): + # if not is_cloud_event_data_valid: + # print("Exception: user not found") + # raise HTTPException(status_code=404, detail="User not found") + # else: + pass \ No newline at end of file diff --git a/src/etl/repository.py b/src/etl/repository.py new file mode 100644 index 0000000..ae9c87f --- /dev/null +++ b/src/etl/repository.py @@ -0,0 +1,114 @@ +from uuid import uuid4 +from abc import ABC, abstractmethod +from google.cloud import bigquery + +from src.config import TBL_KWNBASE_CON, TBL_KWNBASE_SRC +from src.gcp.bigquery import executor, client + +# Define the interface (abstract base class) for the UI_SettingsRepository +class ETLRepositoryInterface(ABC): + @abstractmethod + def insert_user_knowledge_base(self,id,user_id, filename, bot_id, file_type): + pass + + @abstractmethod + def load_data(self, dframe, user_id,new_filename, file_type, bot_id, url ): + pass + +# Strategy interface for data loading +class DataLoaderStrategy: + def load_data(self, dframe, filename, file_type): + raise NotImplementedError + +# Concrete strategy for loading data from a DataFrame +class DataFrameLoader(DataLoaderStrategy): + def load_data(self, dframe, filename, file_type): + print(f"ETL Repository->DataFrameLoader dataframe: {dframe}") + table_id = TBL_KWNBASE_CON + job_config = bigquery.LoadJobConfig( + schema=[ + bigquery.SchemaField('id', "STRING"), + bigquery.SchemaField('knowledgebase_source_id', "STRING"), + bigquery.SchemaField('data', "STRING"), + bigquery.SchemaField('filename', "STRING"), + ], + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + ) + try: + # if file_type == "pdf": + # print(f"ETL Repository->Load pdf data to content table") + load_job = client.load_table_from_dataframe(dframe, table_id, job_config=job_config) + # else: + # print(f"ETL Repository->Load csv or json data to content table") + # uri = f"gs://knowledge-base-tmp/{filename}" + # load_job = client.load_table_from_uri(uri, table_id, job_config=job_config) + load_job.result() + destination_table = client.get_table(table_id) + print("ETL Repository->Loaded {} rows.".format(destination_table.num_rows)) + except Exception as e: + print(f"ETL Repository->An error occurred: {e}") + +# Context class for DataLoaderStrategy +class DataLoaderContext: + def __init__(self, strategy): + self._strategy = strategy + def set_strategy(self, strategy): + self._strategy = strategy + def load_data(self, dframe, filename, file_type): + self._strategy.load_data(dframe, filename, file_type) + + +# Concrete implementation of the UI_SettingsRepository using BigQuery +class ETLRepositoryBigQuery(ETLRepositoryInterface): + def __init__(self, executor): + self._executor = executor + + # Function to insert data into BigQuery + def insert_user_knowledge_base(self,id,user_id, filename, bot_id, file_type, is_deleted, url): + print("ETL Repository->insert_user_knowledge_base func->Insert knowledgde base source") + try: + # Reference to the dataset and table + table_id = TBL_KWNBASE_SRC + data_to_insert = [{ + "id": id, + "user_id": user_id, + "widget_id": bot_id, + "filename": filename, + "type": file_type, + "is_deleted": is_deleted, + "url": url + }] + print(f"ETL Repository->insert_user_knowledge_base func->Data to load: {data_to_insert}") + # Insert the data into the table + errors = client.insert_rows_json(table_id, data_to_insert) + + if not errors: + print("ETL Repository->insert_user_knowledge_base func->Data inserted successfully.") + return id + else: + print("ETL Repository->insert_user_knowledge_base func->Errors occurred while inserting data:") + for error in errors: + print(error) + return {"message": f"ETL Repository->insert_user_knowledge_base func->error: {errors}"} + except Exception as e: + print(f"ETL Repository->An error occurred: {e}") + return {"message": f"ETL Repository->insert_user_knowledge_base func->An error occurred: {e}"} + + def load_data(self, dframe, user_id,new_filename, file_type, bot_id, url ): + print(f"ETL Repository->BQ class dataframe:{dframe}") + # Use the DataLoaderContext with DataFrameLoader strategy + try: + data_loader = DataLoaderContext(DataFrameLoader()) + print("ETL Repository->Load data to knowledgebase content table") + id = str(uuid4()) + src_id = self.insert_user_knowledge_base(id, user_id, new_filename, bot_id, file_type, False, url) + dframe["id"] = str(uuid4()) + dframe["knowledgebase_source_id"] = src_id + data_loader.load_data(dframe, new_filename, file_type) + print(f"ETL repository-> load data func-> Done loading data") + return {"message": "ETL process done"} + except Exception as e: + print(f"DataToBQ->An error occurred: {e}") + return {"message": "Error on loading data into db:{e}"} + +repository = ETLRepositoryBigQuery(executor) \ No newline at end of file diff --git a/src/etl/router.py b/src/etl/router.py new file mode 100644 index 0000000..d5ccf2c --- /dev/null +++ b/src/etl/router.py @@ -0,0 +1,12 @@ +from fastapi import APIRouter, Body, Depends + +router = APIRouter() + +from src.etl.service import ETLService +from src.login.service import manager + +etl_service = ETLService() + +@router.post("/etl/data-extraction/", tags=["etl"], summary="Process data extraction") +async def data_processing(cloud_event_data: dict = Body(...)): + return await etl_service.process_data(cloud_event_data) diff --git a/src/etl/service.py b/src/etl/service.py new file mode 100644 index 0000000..6a676a7 --- /dev/null +++ b/src/etl/service.py @@ -0,0 +1,43 @@ + +from google.cloud import storage +from src.gcp.storage import StorageClientFactory + +from src.etl.document_loaders.csv_json_loader import CsvDataLoader +from src.etl.document_loaders.pdf_loader import PdfDataLoader +from src.etl.repository import repository + +class ETLService: + # Triggered by a change in a storage bucket + async def process_data(self,cloud_event): + print("ETLService-> Start data processing") + data = cloud_event + print(f"ETLService->Cloud Event Data: {data}") + + bucket = data["bucket"] + orig_filename = data["name"] + time_created = data["timeCreated"] + new_filename = f"{orig_filename}-{time_created}.csv" + contentType = data['contentType'] + + if contentType == "application/pdf": + print(f"ETLService->Start pdf loader") + PdfDataLoader.load_data(new_filename, orig_filename) + else: + print(f"ETLService->Start csvjson loader") + try: + client = storage.Client() + bucket = client.get_bucket(bucket) + file_blob = storage.Blob(orig_filename, bucket) + except BaseException as error: + print(f'ETLService->CSVJson loader storage error: {error}') + + print(f"ETLService->File blob: {file_blob}") + download_data = file_blob.download_as_string().decode() + print("ETLService->download_data : ", download_data) + + try: + CsvDataLoader.load_data(download_data,new_filename, orig_filename, contentType) + except BaseException as error: + print('ETLService->An exception occurred: {}'.format(error)) + + return {"message":"Done data process"} \ No newline at end of file diff --git a/src/files/exceptions.py b/src/files/exceptions.py new file mode 100644 index 0000000..888de40 --- /dev/null +++ b/src/files/exceptions.py @@ -0,0 +1,9 @@ +from fastapi import HTTPException + +def is_file_upload_exists(file): + if not file.filename: + print("Exception: No file uploaded") + raise HTTPException(status_code=400, detail="No file uploaded") + else: + pass + diff --git a/src/files/repository.py b/src/files/repository.py new file mode 100644 index 0000000..5239774 --- /dev/null +++ b/src/files/repository.py @@ -0,0 +1,22 @@ +from src.config import VW_USER_LIST_LATEST,VW_USER_FILES_LATEST +from src.gcp.bigquery import executor + +class FileRepository: + def __init__(self, executor): + self._executor = executor + + def get_user_files(self, user_id): + query = f"SELECT * FROM `{VW_USER_FILES_LATEST}` WHERE user_id = '{user_id}'" + return self._executor.execute_query(query) + + def get_bot_files(self, bot_id): + query = f"SELECT * FROM `{VW_USER_FILES_LATEST}` WHERE widget_id = '{bot_id}'" + return self._executor.execute_query(query) + + def get_specific_bot_file(self, id): + query = f"SELECT * FROM `{VW_USER_FILES_LATEST}` WHERE id = '{id}'" + return self._executor.execute_query(query) + + + +repository = FileRepository(executor) \ No newline at end of file diff --git a/src/files/router.py b/src/files/router.py new file mode 100644 index 0000000..b083e7d --- /dev/null +++ b/src/files/router.py @@ -0,0 +1,80 @@ +import os +from typing import Annotated +from fastapi import APIRouter,File, UploadFile, HTTPException, Form, Depends,Body +from fastapi.responses import StreamingResponse +from urllib.parse import unquote,urlparse + +from src.gcp.storage import StorageClientFactory + +from src.files.service import upload_file_to_gcs, delete_bot_file +from src.files.repository import repository +from src.files.exceptions import is_file_upload_exists +from src.bot.exceptions import has_bot_parameter +from src.users.exceptions import is_user_valid +from src.login.service import manager + +router = APIRouter() + + +@router.get("/user-files/{user_id}", tags=["files"], summary="Get all files uploaded by user id") +async def user_files(user_id: str, user=Depends(manager)): + """ + This is my description of the API endpoint + """ + is_user_valid(user_id) + return repository.get_user_files(user_id) + + +@router.get("/files/bot/{bot_id}", tags=["files"], summary="Get all files uploaded by bot id") +async def bot_files(bot_id: str, user=Depends(manager)): + """ + This is my description of the API endpoint + """ + has_bot_parameter(bot_id) + return repository.get_bot_files(bot_id) + +@router.post("/upload-file/", tags=["files"], summary="Upload file") +async def upload_file(user_id: Annotated[str, Form()], bot_id: Annotated[str,Form()],fileList: list[UploadFile] = File(...),user=Depends(manager)): + """ + This is my description of the API endpoint + """ + has_bot_parameter(bot_id) + for file in fileList: + is_file_upload_exists(file) + + try: + for file in fileList: + response = await upload_file_to_gcs(user_id,bot_id,file) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + return response + +@router.post("/files/bot/download/", tags=["files"], summary="Download bot file") +async def index(url: Annotated[str, Form()], user=Depends(manager)): + """ + This is my description of the API endpoint + """ + file_type = url.split("/")[0] + if file_type == "csv" or file_type== "json": + bucket_src = os.environ.get("GCP_BUCKET_TMP") + else: + bucket_src = os.environ.get("GCP_UPLOAD_BUCKET") + url = url.rsplit('.', 1)[0].rsplit('-', 3)[0] + + storage_client = StorageClientFactory.create_client() + bucket = storage_client.bucket(bucket_src) + # Get the blob + blob = bucket.blob(unquote(url)) + content_type = "application/octet-stream" # Change to the appropriate content type + + # Create a StreamingResponse to send the file + return StreamingResponse(iter([blob.download_as_bytes()]), media_type=content_type) + +@router.get("/files/bot/delete/{id}", tags=["files"], summary="Delete bot file") +async def delete_files(id:str,user=Depends(manager)): + try: + response = await delete_bot_file(id) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + return response + diff --git a/src/files/service.py b/src/files/service.py new file mode 100644 index 0000000..0be0fb3 --- /dev/null +++ b/src/files/service.py @@ -0,0 +1,42 @@ +import os + +from fastapi import HTTPException + +from src.config import GCP_UPLOAD_BUCKET +from src.gcp.storage import StorageClientFactory +from src.files.repository import repository +from src.etl.repository import repository as ETLRepository + +async def upload_file_to_gcs(user_id, bot_id,file): + print("Upload file to GCS") + storage_client = StorageClientFactory.create_client() + try: + split_tup = os.path.splitext(file.filename) + + # Generate a unique name for the file in the bucket + blob_name = f"{file.filename}" + + # Upload the file to the bucket + bucket = storage_client.bucket(GCP_UPLOAD_BUCKET) + blob = bucket.blob(f"{split_tup[1].replace('.','')}/{blob_name}-{user_id}-{bot_id}") + blob.upload_from_file(file.file, content_type=file.content_type) + + # Create a publicly accessible URL for the uploaded file + blob.make_public() + file_url = blob.public_url + return {"message": f"File upload successfully"} + except Exception as e: + print(f"Error uploading file to gcs:{e}") + raise HTTPException(status_code=500, detail=str(e)) + + +async def delete_bot_file(id): + bot_files = repository.get_specific_bot_file(id) + try: + insert_data = ETLRepository.insert_user_knowledge_base(id,bot_files["user_id"], bot_files["filename"],bot_files["widget_id"],bot_files["type"], True, bot_files["url"]) + if not insert_data: + print("Delete Bot file -> Data inserted successfully.") + return id + except Exception as e: + print(f"Delete bot file service->An error occurred: {e}") + return {"message": f"Delete bot file service->delete_bot_file func->An error occurred: {e}"} \ No newline at end of file diff --git a/src/gcp/bigquery.py b/src/gcp/bigquery.py new file mode 100644 index 0000000..0aaf94d --- /dev/null +++ b/src/gcp/bigquery.py @@ -0,0 +1,30 @@ +from google.cloud import bigquery + +class BigQueryClientSingleton: + _instance = None + + @staticmethod + def get_instance(): + if BigQueryClientSingleton._instance is None: + BigQueryClientSingleton._instance = bigquery.Client() + return BigQueryClientSingleton._instance + + +class BigQueryQueryExecutor: + def __init__(self, client): + self._client = client + + def execute_query(self, query): + result = self._client.query(query) + + result_list = [dict(row) for row in result] + # print(result_list) + if len(result_list) == 1: + return result_list[0] + else: + return result_list + + + +client = BigQueryClientSingleton.get_instance() +executor = BigQueryQueryExecutor(client) diff --git a/src/gcp/storage.py b/src/gcp/storage.py new file mode 100644 index 0000000..82d894a --- /dev/null +++ b/src/gcp/storage.py @@ -0,0 +1,6 @@ +from google.cloud import storage + +# Factory Method to create storage.Client() objects +class StorageClientFactory: + def create_client(): + return storage.Client() \ No newline at end of file diff --git a/src/helpers/token_helper.py b/src/helpers/token_helper.py new file mode 100644 index 0000000..294a47f --- /dev/null +++ b/src/helpers/token_helper.py @@ -0,0 +1,18 @@ +import tiktoken +from src.config import ENCODING_FOR_MODEL, ENCODING + +tiktoken.encoding_for_model(ENCODING_FOR_MODEL) +tokenizer = tiktoken.get_encoding(ENCODING) + +def tiktoken_len(text: str) -> int: + tokens = tokenizer.encode( + text, + disallowed_special=() + ) + return len(tokens) + +def num_tokens_from_string(string: str, encoding_name: str) -> int: + """Returns the number of tokens in a text string.""" + encoding = tiktoken.get_encoding(encoding_name) + num_tokens = len(encoding.encode(string)) + return num_tokens \ No newline at end of file diff --git a/src/login/repository.py b/src/login/repository.py new file mode 100644 index 0000000..e69de29 diff --git a/src/login/router.py b/src/login/router.py new file mode 100644 index 0000000..3ea5923 --- /dev/null +++ b/src/login/router.py @@ -0,0 +1,12 @@ +from fastapi import APIRouter, Depends +from fastapi.security import OAuth2PasswordRequestForm + +from src.login.service import LoginService + +router = APIRouter() + +login_service = LoginService() + +@router.post("/login", tags=["login"], summary="Logged in user") +async def post_login(data: OAuth2PasswordRequestForm = Depends()): + return await login_service.login(data) diff --git a/src/login/service.py b/src/login/service.py new file mode 100644 index 0000000..43a6f2a --- /dev/null +++ b/src/login/service.py @@ -0,0 +1,38 @@ +import os +from datetime import timedelta +from dotenv import load_dotenv +from fastapi_login import LoginManager +from fastapi import Depends +from fastapi.security import OAuth2PasswordRequestForm +from fastapi_login.exceptions import InvalidCredentialsException + +from src.users.service import user_service + +load_dotenv() + +manager = LoginManager( + os.environ.get("LOGIN_SECRET_KEY"), '/login', + default_expiry=timedelta(hours=1) +) + +class LoginService: + async def login(self,data: OAuth2PasswordRequestForm = Depends()): + + email = data.username + password = data.password + + user = await user_service.get_user_details(email) + if not user: + # you can return any response or error of your choice + raise InvalidCredentialsException + elif password != user['password']: + raise InvalidCredentialsException + access_token = manager.create_access_token( + data={'sub': email, 'user': user['id']} + ) + + return {'status': 'Success','access_token': access_token} + + @manager.user_loader() + async def query_user(email: str): + return await user_service.get_user_details(email) diff --git a/src/users/__init__.py b/src/users/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/users/exceptions.py b/src/users/exceptions.py new file mode 100644 index 0000000..165820c --- /dev/null +++ b/src/users/exceptions.py @@ -0,0 +1,51 @@ +import re +from uuid import UUID +from fastapi import HTTPException + +from src.users.repository import repository + +def is_user_valid(user: str): + try: + UUID(user, version=4) + users = repository.get_user_details(user) + except ValueError: + if re.match(r"[^@]+@[^@]+\.[^@]+", user): + users = repository.get_user_details_by_email(user) + else: + users = repository.get_user_details_by_name(user) + + if not users: + print("Exception: user not found") + raise HTTPException(status_code=400, detail="User not found") + else: + pass + +def is_user_id_valid_uuid(user: str): + try: + UUID(user, version=4) + pass + except ValueError: + raise HTTPException(status_code=400, detail="User not found") + +def has_user_bot(bot_settings): + if len(bot_settings) == 0: + raise HTTPException(status_code=400, detail="No bot available") + else: + pass + +def is_password_same(user_id:str, password:str): + current_password = repository.get_user_password(user_id) + if password == current_password["password"]: + raise HTTPException(status_code=400, detail="Password should not be the same") + else: + pass + +def is_email_exist_to_other_user(user_id:str, email:str): + user_details = repository.get_user_details_by_email(email) + if user_details: + if user_details["id"] != user_id: + raise HTTPException(status_code=400, detail="Email is already exists") + else: + pass + else: + pass \ No newline at end of file diff --git a/src/users/repository.py b/src/users/repository.py new file mode 100644 index 0000000..4989f4d --- /dev/null +++ b/src/users/repository.py @@ -0,0 +1,96 @@ +from fastapi import HTTPException +from typing import List + +from src.config import VW_USER_LIST_LATEST,TBL_USERS +from src.gcp.bigquery import executor, client +from src.users.schemas import UserResponseModel, UserPasswordResponseModel + +class UserRepository: + def __init__(self, executor): + self._executor = executor + + def _get_user_details_query(self, column, value): + return f"SELECT * FROM `{VW_USER_LIST_LATEST}` u WHERE {column} = '{value}'" + + def get_users(self) -> list[UserResponseModel]: + query = f"SELECT * FROM `{VW_USER_LIST_LATEST}`" + try: + result = self._executor.execute_query(query) + return result + except BaseException as e: + msg = f"UserRepository->get_users func-> Error fetching users:{e}" + print(msg) + raise HTTPException(status_code=400, detail=msg) + + def get_user_details(self, user_id) -> UserResponseModel: + query = self._get_user_details_query('id', user_id) + try: + result = self._executor.execute_query(query) + return result + except BaseException as e: + msg = f"UserRepository->get_user_details func-> Error fetching user details:{e}" + print(msg) + raise HTTPException(status_code=400, detail=msg) + + def get_user_details_by_name(self, username) -> UserResponseModel: + query = self._get_user_details_query('username', username) + try: + result = self._executor.execute_query(query) + return result + except BaseException as e: + msg = f"UserRepository->get_user_details_by_name func-> Error fetching user details:{e}" + print(msg) + raise HTTPException(status_code=400, detail=msg) + + def get_user_details_by_email(self, email) -> UserResponseModel: + query = self._get_user_details_query('email', email) + try: + result = self._executor.execute_query(query) + return result + except BaseException as e: + msg = f"UserRepository->get_user_details_by_email func-> Error fetching user details:{e}" + print(msg) + raise HTTPException(status_code=400, detail=msg) + + def get_user_password(self, user_id) -> UserPasswordResponseModel: + query = self._get_user_details_query('id', user_id) + try: + result = self._executor.execute_query(query) + return result + except BaseException as e: + msg = f"UserRepository->get_user_details func-> Error fetching user details:{e}" + print(msg) + raise HTTPException(status_code=400, detail=msg) + + def save_user_info(self, user_info): + row_to_insert = { + "id": user_info.id, + # "username": user_info.username, + "firstname": user_info.firstname, + "lastname": user_info.lastname, + "email": user_info.email, + "password": user_info.password + } + + errors = client.insert_rows_json(TBL_USERS, [row_to_insert]) + + if not errors: + msg = f"User info for {user_info.id} saved successfully." + print(msg) + return {"message":msg} + else: + msg = f"Errors occurred while saving user info for user id: {user_info.id}: {errors}" + print(msg) + return {"message":msg} + +repository = UserRepository(executor) + + +def get_users()-> List[UserResponseModel]: + return repository.get_users() + +def get_user_details(user_id: str)-> UserResponseModel: + return repository.get_user_details(user_id) + +def get_user_details_by_name(username: str)-> UserResponseModel: + return repository.get_user_details_by_name(username) \ No newline at end of file diff --git a/src/users/router.py b/src/users/router.py new file mode 100644 index 0000000..43d42ab --- /dev/null +++ b/src/users/router.py @@ -0,0 +1,30 @@ +from fastapi import APIRouter, Depends, Body + +from src.users.service import user_service +from src.users.schemas import UserResponseModel, UserUpdateModel +from src.login.service import manager + +router = APIRouter() + +@router.get("/users/", tags=["users"], summary="Get all users") +async def all_users(user=Depends(manager)) -> list[UserResponseModel]: + """ + This is my description of the API endpoint + """ + return await user_service.get_users() + +@router.get("/user-details/{user}", tags=["users"],summary="Get user details based on user id or username") +async def user_details(user: str, credentials=Depends(manager)) -> UserResponseModel: + """ + This is my description of the API endpoint + """ + + return await user_service.get_user_details(user) + +@router.put("/user-details/", tags=["users"],summary="Update user information") +async def user_update(user_info: UserUpdateModel = Body(...),credentials=Depends(manager)): + """ + This is my description of the API endpoint + """ + + return await user_service.update_user(user_info) \ No newline at end of file diff --git a/src/users/schemas.py b/src/users/schemas.py new file mode 100644 index 0000000..ccc15c6 --- /dev/null +++ b/src/users/schemas.py @@ -0,0 +1,35 @@ +from pydantic import BaseModel, Json, EmailStr +from typing import Dict, Union +# from datetime import datetime + +# class UserCreateDTO(BaseModel): +# user_id: str +# title: str +# chat_styles: Dict +# chat_properties: Dict +# header_chat_styles: Dict +# auth_form_properties: Dict +# initial_message: Union[str, None] +# message_suggestion: Union[Dict, None] + +class UserResponseModel(BaseModel): + id: str + # username: str + firstname: Union[str, None] + lastname: Union[str, None] + email: Union[EmailStr, None] + contact_no: Union[int, None] + # updated_at: datetime + # created_at: datetime + +class UserPasswordResponseModel(BaseModel): + id: str + password: str + +class UserUpdateModel(BaseModel): + id: str + # username: str + firstname: Union[str, None] + lastname: Union[str, None] + email: Union[EmailStr, None] + password: Union[str, None] diff --git a/src/users/service.py b/src/users/service.py new file mode 100644 index 0000000..d33cb6c --- /dev/null +++ b/src/users/service.py @@ -0,0 +1,50 @@ +import re +from uuid import UUID +from typing import List + +from src.users.repository import repository +from src.users.exceptions import is_user_valid, is_password_same, is_email_exist_to_other_user +from src.users.schemas import UserUpdateModel, UserResponseModel + + +class UserService: + def __init__(self, repository: repository): + self.repository = repository + + async def get_users(self) -> List[UserResponseModel]: + return self.repository.get_users() + + async def get_user_details(self, user: str) -> UserResponseModel: + is_user_valid(user) + try: + UUID(user, version=4) + user_details = self.repository.get_user_details(user) + except ValueError: + if re.match(r"[^@]+@[^@]+\.[^@]+", user): + user_details = self.repository.get_user_details_by_email(user) + else: + user_details = self.repository.get_user_details_by_name(user) + return user_details + + async def update_user(self, user_info: UserUpdateModel) -> dict[str, str]: + is_user_valid(user_info.id) + is_email_exist_to_other_user(user_info.id, user_info.email) + current_user_info = self.repository.get_user_details(user_info.id) + if user_info.firstname is None: + user_info.firstname = current_user_info["firstname"] + if user_info.lastname is None: + user_info.lastname = current_user_info["lastname"] + if user_info.email is None: + user_info.email = current_user_info["email"] + + if user_info.password: + is_password_same(user_info.id,user_info.password) + else: + current_password = self.repository.get_user_password(user_info.id) + user_info.password = current_password["password"] + + response = self.repository.save_user_info(user_info) + + return {"message": response} + +user_service = UserService(repository) \ No newline at end of file diff --git a/src/websocket/websocket_manager.py b/src/websocket/websocket_manager.py new file mode 100644 index 0000000..b8d12e7 --- /dev/null +++ b/src/websocket/websocket_manager.py @@ -0,0 +1,19 @@ +from fastapi import WebSocket + +class ConnectionManager: + def __init__(self): + self.active_connections: list[WebSocket] = [] + + async def connect(self, websocket: WebSocket): + await websocket.accept() + self.active_connections.append(websocket) + + def disconnect(self, websocket: WebSocket): + self.active_connections.remove(websocket) + + async def send_personal_message(self, message: str, websocket: WebSocket): + await websocket.send_text(message) + + async def broadcast(self, message: str): + for connection in self.active_connections: + await connection.send_text(message) diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..1075f72 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,40 @@ + + + + + + + + +
+ + + +
+ + + \ No newline at end of file diff --git a/templates/knowledge-base.html b/templates/knowledge-base.html new file mode 100644 index 0000000..edc859a --- /dev/null +++ b/templates/knowledge-base.html @@ -0,0 +1,92 @@ + + + + + + + + +
+ + +
+ + + + + + + + + + + + + + + + + + + + + + + + + +
+ Filename + + Users + + Action +
+ Filename 1 + + User 1 + + View data +
+ Filename 2 + + User 2 + + View data +
+ Filename 3 + + User 3 + + View data +
+
+ +
+ + + \ No newline at end of file diff --git a/templates/upload-tester.html b/templates/upload-tester.html new file mode 100644 index 0000000..e631119 --- /dev/null +++ b/templates/upload-tester.html @@ -0,0 +1,13 @@ + + + + Chat + + +
+ + + +
+ + \ No newline at end of file diff --git a/templates/websocket-tester.html b/templates/websocket-tester.html new file mode 100644 index 0000000..c149705 --- /dev/null +++ b/templates/websocket-tester.html @@ -0,0 +1,50 @@ + + + + Chat + + +

WebSocket Chat

+
+ + + + + +
+ + + +