1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 twenty one twenty two twenty three twenty four 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
|
from llama_index import SimpleDirectoryReader, ServiceContext, PromptHelper, StorageContext, load_index_from_storage, GPTVectorStoreIndex from langchain.chat_models import ChatOpenAI from CustomCallbackHandler import CustomAsyncCallBackHandler from llama_index.llm_predictor.chatgpt import ChatGPTLLMPredictor from llama_index.langchain_helpers.agents import create_llama_chat_agent from llama_index.langchain_helpers.memory_wrapper import GPTIndexChatMemory from llama_index.langchain_helpers.agents import IndexToolConfig, LlamaIndexTool, LlamaToolkit from langchain.memory import ConversationBufferMemory from llama_index import download_loader from pathlib import Path import os os.environ[ "OPENAI_API_KEY" ] = 'sk-xxx' import json
import asyncio import websockets import logging import sys
logging.basicConfig(stream=sys.stdout, level=logging.INFO) logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))
agent_cache={} connecte_session = {} session_cache={} def build_llama_chat_agent ( directory_path, project_id, session_id, prompt_name, advanced_description ): if session_id not in agent_cache: storage_path = '/data/index_cache/' +project_id+ '/storage' # set maximum input size max_input_size = 4096 # set number of output tokens num_outputs = 2000 # set maximum chunk overlap max_chunk_overlap = 20 # set chunk size limit chunk_size_limit = 2000
llm=ChatOpenAI(temperature= 0.4 , model_name= "gpt-3.5-turbo" , verbose= False , streaming= True ) llm_predictor = ChatGPTLLMPredictor(llm=llm) prompt_helper = PromptHelper(max_input_size, num_outputs, max_chunk_overlap, chunk_size_limit=chunk_size_limit) service_context = ServiceContext.from_defaults(llm_predictor=llm_predictor, prompt_helper=prompt_helper) #llm_predictor=llm_predictor, if os.path.exists(storage_path): # rebuild storage context storage_context = StorageContext.from_defaults(persist_dir=storage_path) # load index index = load_index_from_storage(storage_context)
agent_cahin = get_chat_agent(index, service_context, llm, prompt_name, advanced_description) agent_cache[session_id] = agent_cahin return agent_cahin else : files = os.listdir(directory_path) if len (files) < 1 : print( 'error!' +directory_path+ 'The file under the folder is empty!' ) return file_name = files[ 0 ] logging.info( "load data :" +directory_path+ '/' +file_name) # if excel data format if file_name.endswith( ".xls" ) or file_name.endswith( ".xlsx" ): PandasExcelReader = download_loader( "PandasExcelReader" ) loader = PandasExcelReader() loader._pandas_config={ "header" : 0 } #loader._concat_rows = True loader._row_joiner = ' ' documents = loader.load_data(file=Path(directory_path+ '/' +file_name),sheet_name= None ) else : documents = SimpleDirectoryReader(directory_path).load_data() index = GPTVectorStoreIndex.from_documents(documents,service_context=service_context) #save on disk default ./storage index.storage_context.persist(storage_path) agent_cache[session_id] = get_chat_agent(index,service_context,llm,prompt_name,advanced_description) return agent_cache[session_id] else : return agent_cache[session_id]
def get_chat_agent ( index, service_context, llm, prompt_name, advanced_description ): # memory = GPTIndexChatMemory( # index=index, # memory_key="chat_history", # query_kwargs={"response_mode": "compact","streaming":True,"service_context":service_context,"similarity_top_k":1}, #,"streaming":True # # return_source returns source nodes instead of querying index # return_source=True, # # return_messages returns context in message format # return_messages=True, # #chat_memory=ConversationBufferWindowMemory(return_messages=True) # ) #should be create memories for each user memory = ConversationBufferMemory( memory_key = "chat_history" )
prompt_name=prompt_name if prompt_name is not None else f"AI virtual anchor" advanced_description = advanced_description if advanced_description is not None else f"You are an AI virtual anchor. If it is a game-related question, please use tools to obtain information before answering. If this question is about a game and you don't know the answer, jst say 'sorry, i don't know'. Remember must respond in Chinese." print( 'prompt name:' +prompt_name+ " ,description:" +advanced_description) tool_config = IndexToolConfig( query_engine = index.as_query_engine( response_mode= "compact" , streaming = True , similarity_top_k= 1 , service_context=service_context), name=prompt_name, description=advanced_description, tool_kwargs={ "return_direct" : True , "return_sources" : True }, return_sources= True )
tool = LlamaIndexTool. from_tool_config(tool_config) toolkit = LlamaToolkit( index_configs=[tool], )
agent_chain = create_llama_chat_agent( toolkit, llm, memory=memory, verbose= True ) return agent_chain
async def send_message ( websocket, message_queue ): while True : message = await message_queue. get()
await websocket. send(message) # print('Sent message: %s' % message)
message_queue.task_done()
async def receive_messages ( websocket, message_queue, params_json ): # data path direct_path = params_json[ 'input_dir' ] project_id=params_json[ "project_id" ] session_id = params_json[ "session_id" ]
connecte_session[ hash (websocket)]=session_id chat_agent = build_llama_chat_agent(direct_path, project_id, session_id, None , None ) print(chat_agent. memory. chat_memory. messages) query = params_json[ "data" ][ "question" ]
try : response_stream = await chat_agent.arun( query, callbacks=[CustomAsyncCallBackHandler(message_queue)] ) logging.info( 'ai response:' +response_stream) except ValueError as e: response_stream = str (e) if not response_stream.startswith( "Could not parse LLM output: `" ): raise e response_stream = response_stream. removeprefix( "Could not parse LLM output: `" ). removesuffix( "`" ) logging. error( "error : " +response_stream)
import asyncio import websockets
# Define the websocket server callback function async def websocket_server ( websocket, path ): global count print( 'Client connected.' ) try : # Create a message queue message_queue = asyncio. Queue() # Start the coroutine for sending and receiving messages send_task = asyncio.create_task(send_message(websocket, message_queue)) # Receive the message sent by the client async for message in websocket: print( 'Received message: %s' % message) data = json. loads(message) await receive_messages(websocket, message_queue, data) except websockets.exceptions.ConnectionClosed: #Get session_id according to the websocket object session_id = connect_session[ hash (websocket)] #Delete the cached proxy object according to session_id del agent_cache[session_id] print( 'Client : %s disconnected.' % session_id) finally : # Close the websocket connection await websocket. close() #connected. remove(websocket) # Cancel the coroutine for sending and receiving messages send_task. cancel()
# Wait for all tasks and coroutines to complete await asyncio. gather(send_task, return_exceptions= True )
if __name__ == "__main__" : # Start the websocket server start_server = websockets.serve(websocket_server, '0.0.0.0' , 9961 ) print( 'websocket_server now started listening on port: 9961' )
# run the event loop asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
|