Implementing AI customer service based on Langchain

Original link: https://reiner.host/posts/8f0289a1.html

Based on

  • Langchain

  • llama_index >=0.6.5

  • GPT-3.5

  • websockets

  • python >=3.10

dependence

 1
2
3
4
5
6
7
8
9
10
11
 pip install llama-index

pip install openai

pip install langchain

pip install websockets

pip install pandas

pip install llama-hub

what use

Supporting private knowledge base AI question-answering chatbot, capable of both knowledge-based Q&A and casual conversation.

AI Chatbot

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
twenty one
twenty two
twenty three
twenty four
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
 from llama_index import SimpleDirectoryReader, ServiceContext, GPTVectorStoreIndex, PromptHelper, StorageContext, load_index_from_storage
from llama_index.llm_predictor.chatgpt import ChatGPTLLMPredictor
from langchain.chat_models import ChatOpenAI
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
import os
from llama_index.langchain_helpers.agents import IndexToolConfig, LlamaIndexTool, LlamaToolkit

from llama_index.langchain_helpers.agents import create_llama_chat_agent, create_llama_agent
from llama_index.langchain_helpers.memory_wrapper import GPTIndexChatMemory
os.environ[ "OPENAI_API_KEY" ] = 'sk-yourOpenAiKey'
from langchain.memory import ConversationBufferMemory
from llama_index import download_loader
from pathlib import Path


def construct_index ( directory_path ):
# set maximum input size
max_input_size = 4096
# set number of output tokens
num_outputs = 2000
# set maximum chunk overlap
max_chunk_overlap = 20
# set chunk size limit
chunk_size_limit = 2000
#If you don't want a stream output, set streaming=False
llm=ChatOpenAI(temperature= 0.4 , model_name= "gpt-3.5-turbo" , verbose= True , streaming= True , callbacks=[StreamingStdOutCallbackHandler()])
llm_predictor = ChatGPTLLMPredictor(llm=llm)
prompt_helper = PromptHelper(max_input_size, num_outputs, max_chunk_overlap, chunk_size_limit=chunk_size_limit)

files = os.listdir(directory_path)
if len (files) < 1 :
print( 'error!' +directory_path+ 'The file under the folder is empty!' )
return
file_name = files[ 0 ]
print( "load index data :" +directory_path+ '/' +file_name)
# If the data is in Excel format
if file_name.endswith( ".xls" ) or file_name.endswith( ".xlsx" ):
PandasExcelReader = download_loader( "PandasExcelReader" )
loader = PandasExcelReader()
loader._pandas_config={ "header" : 0 }
#There is a problem with the source code, if the default value is not set, an error will be reported
#loader._concat_rows = True
loader._row_joiner = ' '
documents = loader.load_data(file=Path(directory_path+ '/' +file_name),sheet_name= None )
else :
documents = SimpleDirectoryReader(directory_path).load_data()
service_context = ServiceContext.from_defaults(llm_predictor=llm_predictor, prompt_helper=prompt_helper) #llm_predictor=llm_predictor,
index = GPTVectorStoreIndex.from_documents(documents,service_context=service_context)
#save on disk
index.storage_context.persist( "D://storage" )


# rebuild storage context
storage_context = StorageContext.from_defaults(persist_dir= 'D://storage' )
# load index
index = load_index_from_storage(storage_context)
#response_mode="compact",
query_engine = index.as_query_engine(similarity_top_k= 30 ,service_context=service_context) #query_engine = index.as_query_engine(similarity_top_k=1,streaming=True,service_context=service_context)

#If you want to save and use the conversation record in document format, you should use GPTindexChatMemory.
# memory = GPTIndexChatMemory(
# index=chat_history_index,
# memory_key="chat_history",
# query_kwargs={"response_mode": "compact","streaming":True,"service_context":service_context,"similarity_top_k":1}, #,"streaming":True
# # return_source returns source nodes instead of querying index
# return_source=True,
# # return_messages returns context in message format
# return_messages=True,
# )

#use memory save the chat history
memory = ConversationBufferMemory(
memory_key = "chat_history"
)

tool_config = IndexToolConfig(
query_engine=query_engine,
name= f"AI Customer service" ,
description= f"If it is a game-related question, please use tools to obtain information before answering. If this question is about a game and you don't know the answer, jst say 'sorry, i don't know'" ,
tool_kwargs={ "return_direct" : True }
)

tool = LlamaIndexTool. from_tool_config(tool_config)
toolkit = LlamaToolkit(
index_configs=[tool],
)


agent_chain = create_llama_chat_agent(
toolkit,
llm,
memory=memory,
verbose= True ,
agent_kwargs={ "max_iterations" : 3 }
)

while True :
query = input ( "What do you want to ask?" )
print(agent_chain. memory. chat_memory. messages)
response_stream = agent_chain. run(query)
#if use streaming
if hasattr (response_stream, 'response_gen' ):
for text in response_stream.response_gen:
print(text, end= "" )
#todo send to client
else :
print(response_stream)

#This path is the file path of your knowledge base
service_context=construct_index( 'D://data' )





Integrate WebSocket

Receive questions from the client and have AI generate answers.

websocket servet.py

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
twenty one
twenty two
twenty three
twenty four
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
 from llama_index import SimpleDirectoryReader, ServiceContext, PromptHelper, StorageContext, load_index_from_storage, GPTVectorStoreIndex
from langchain.chat_models import ChatOpenAI
from CustomCallbackHandler import CustomAsyncCallBackHandler
from llama_index.llm_predictor.chatgpt import ChatGPTLLMPredictor
from llama_index.langchain_helpers.agents import create_llama_chat_agent
from llama_index.langchain_helpers.memory_wrapper import GPTIndexChatMemory
from llama_index.langchain_helpers.agents import IndexToolConfig, LlamaIndexTool, LlamaToolkit
from langchain.memory import ConversationBufferMemory
from llama_index import download_loader
from pathlib import Path
import os
os.environ[ "OPENAI_API_KEY" ] = 'sk-xxx'
import json

import asyncio
import websockets
import logging
import sys


logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout))

agent_cache={}
connecte_session = {}
session_cache={}
def build_llama_chat_agent ( directory_path, project_id, session_id, prompt_name, advanced_description ):
if session_id not in agent_cache:
storage_path = '/data/index_cache/' +project_id+ '/storage'
# set maximum input size
max_input_size = 4096
# set number of output tokens
num_outputs = 2000
# set maximum chunk overlap
max_chunk_overlap = 20
# set chunk size limit
chunk_size_limit = 2000

llm=ChatOpenAI(temperature= 0.4 , model_name= "gpt-3.5-turbo" , verbose= False , streaming= True )
llm_predictor = ChatGPTLLMPredictor(llm=llm)
prompt_helper = PromptHelper(max_input_size, num_outputs, max_chunk_overlap, chunk_size_limit=chunk_size_limit)


service_context = ServiceContext.from_defaults(llm_predictor=llm_predictor, prompt_helper=prompt_helper) #llm_predictor=llm_predictor,
if os.path.exists(storage_path):

# rebuild storage context
storage_context = StorageContext.from_defaults(persist_dir=storage_path)
# load index
index = load_index_from_storage(storage_context)

agent_cahin = get_chat_agent(index, service_context, llm, prompt_name, advanced_description)
agent_cache[session_id] = agent_cahin
return agent_cahin
else :
files = os.listdir(directory_path)
if len (files) < 1 :
print( 'error!' +directory_path+ 'The file under the folder is empty!' )
return
file_name = files[ 0 ]
logging.info( "load data :" +directory_path+ '/' +file_name)
# if excel data format
if file_name.endswith( ".xls" ) or file_name.endswith( ".xlsx" ):
PandasExcelReader = download_loader( "PandasExcelReader" )
loader = PandasExcelReader()
loader._pandas_config={ "header" : 0 }
#loader._concat_rows = True
loader._row_joiner = ' '
documents = loader.load_data(file=Path(directory_path+ '/' +file_name),sheet_name= None )
else :
documents = SimpleDirectoryReader(directory_path).load_data()
index = GPTVectorStoreIndex.from_documents(documents,service_context=service_context)
#save on disk default ./storage
index.storage_context.persist(storage_path)
agent_cache[session_id] = get_chat_agent(index,service_context,llm,prompt_name,advanced_description)
return agent_cache[session_id]
else :
return agent_cache[session_id]

def get_chat_agent ( index, service_context, llm, prompt_name, advanced_description ):
# memory = GPTIndexChatMemory(
# index=index,
# memory_key="chat_history",
# query_kwargs={"response_mode": "compact","streaming":True,"service_context":service_context,"similarity_top_k":1}, #,"streaming":True
# # return_source returns source nodes instead of querying index
# return_source=True,
# # return_messages returns context in message format
# return_messages=True,
# #chat_memory=ConversationBufferWindowMemory(return_messages=True)
# )
#should be create memories for each user
memory = ConversationBufferMemory(
memory_key = "chat_history"
)

prompt_name=prompt_name if prompt_name is not None else f"AI virtual anchor"
advanced_description = advanced_description if advanced_description is not None else f"You are an AI virtual anchor. If it is a game-related question, please use tools to obtain information before answering. If this question is about a game and you don't know the answer, jst say 'sorry, i don't know'. Remember must respond in Chinese."
print( 'prompt name:' +prompt_name+ " ,description:" +advanced_description)
tool_config = IndexToolConfig(
query_engine = index.as_query_engine(
response_mode= "compact" ,
streaming = True ,
similarity_top_k= 1 , service_context=service_context),
name=prompt_name,
description=advanced_description,
tool_kwargs={ "return_direct" : True , "return_sources" : True },
return_sources= True
)

tool = LlamaIndexTool. from_tool_config(tool_config)
toolkit = LlamaToolkit(
index_configs=[tool],
)

agent_chain = create_llama_chat_agent(
toolkit,
llm,
memory=memory,
verbose= True
)

return agent_chain


async def send_message ( websocket, message_queue ):
while True :

message = await message_queue. get()


await websocket. send(message)
# print('Sent message: %s' % message)


message_queue.task_done()

async def receive_messages ( websocket, message_queue, params_json ):
# data path
direct_path = params_json[ 'input_dir' ]

project_id=params_json[ "project_id" ]
session_id = params_json[ "session_id" ]

connecte_session[ hash (websocket)]=session_id

chat_agent = build_llama_chat_agent(direct_path, project_id, session_id, None , None )
print(chat_agent. memory. chat_memory. messages)
query = params_json[ "data" ][ "question" ]

try :
response_stream = await chat_agent.arun(
query,
callbacks=[CustomAsyncCallBackHandler(message_queue)]
)

logging.info( 'ai response:' +response_stream)
except ValueError as e:
response_stream = str (e)
if not response_stream.startswith( "Could not parse LLM output: `" ):
raise e
response_stream = response_stream. removeprefix( "Could not parse LLM output: `" ). removesuffix( "`" )
logging. error( "error : " +response_stream)



import asyncio
import websockets


# Define the websocket server callback function
async def websocket_server ( websocket, path ):
global count
print( 'Client connected.' )
try :
# Create a message queue
message_queue = asyncio. Queue()
# Start the coroutine for sending and receiving messages
send_task = asyncio.create_task(send_message(websocket, message_queue))
# Receive the message sent by the client
async for message in websocket:
print( 'Received message: %s' % message)
data = json. loads(message)
await receive_messages(websocket, message_queue, data)
except websockets.exceptions.ConnectionClosed:
#Get session_id according to the websocket object
session_id = connect_session[ hash (websocket)]
#Delete the cached proxy object according to session_id
del agent_cache[session_id]
print( 'Client : %s disconnected.' % session_id)
finally :
# Close the websocket connection
await websocket. close()
#connected. remove(websocket)
# Cancel the coroutine for sending and receiving messages
send_task. cancel()

# Wait for all tasks and coroutines to complete
await asyncio. gather(send_task, return_exceptions= True )

if __name__ == "__main__" :
# Start the websocket server
start_server = websockets.serve(websocket_server, '0.0.0.0' , 9961 )
print( 'websocket_server now started listening on port: 9961' )

# run the event loop
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

custom callback handler.py

Return AI’s response to the WebSocket client

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
twenty one
twenty two
twenty three
twenty four
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 from typing import Any, Union
from asyncio.queues import Queue
from langchain.callbacks.base import AsyncCallbackHandler
from typing import Any, Dict, List, Optional
from langchain.schema import LLMResult
import json
from langchain.callbacks.streaming_stdout_final_only import FinalStreamingStdOutCallbackHandler
DEFAULT_ANSWER_PREFIX_TOKENS = [ "\n" , "AI" , ":" ]

class CustomAsyncCallBackHandler ( AsyncCallbackHandler ):
queue: Queue


def __init__ ( self,queue:Queue, answer_prefix_tokens: Optional[List[ str ]] = None ) -> None :
super ().__init__()
if answer_prefix_tokens is None :
answer_prefix_tokens = DEFAULT_ANSWER_PREFIX_TOKENS
self.answer_prefix_tokens = answer_prefix_tokens
self.last_tokens = [ "" ] * len (answer_prefix_tokens)
self. answer_reached = False
self.queue=queue


async def put_message ( self,json_str ):
await self. queue. put(json. dumps(json_str))
await self. queue. join()

async def on_llm_start (
self, serialized: Dict[ str , Any], prompts: List[ str ], **kwargs: Any
) -> None :
"""Run when LLM starts running."""
self. answer_reached = False

async def on_llm_new_token ( self, token: str , **kwargs ) -> None :
# Remember the last n tokens, where n = len(answer_prefix_tokens)
self.last_tokens.append(token)
if len (self. last_tokens) > len (self. answer_prefix_tokens):
self. last_tokens. pop( 0 )

# Check if the last n tokens match the answer_prefix_tokens list ...
if self.last_tokens == self.answer_prefix_tokens:
self. answer_reached = True
# Do not print the last token in answer_prefix_tokens,
# as it's not part of the answer yet
return

# ... if yes, then print tokens from now on
if self. answer_reached:
response = str (token)
await self. put_message(response)



async def on_llm_end ( self, response: LLMResult, **kwargs: Any ) -> None :
"""Run when LLM ends running."""
if self. answer_reached:
response = "[DONE]"
await self. put_message(response)

async def on_llm_error (
self, error: Union[Exception, KeyboardInterrupt], ​​**kwargs: Any
) -> None :
"""Run when LLM errors."""



This article is transferred from: https://reiner.host/posts/8f0289a1.html
This site is only for collection, and the copyright belongs to the original author.