2. RAG 프로젝트 구성
RAG 시스템을 구축하기 위해 프로젝트를 다음과 같이 구성한다. 시스템 구축을 위해 기본이 되는 항목들을 나타낸 것으로, 이후 구현의 정도에 따라
다른 파일 또는 모듈이 추가될 수 있다.
RAG 파이프라인 구축
RAG 시스템을 구현하기 전에 우선 전체 파이프라인(pipeline) 코드를 간단하게 작성하고, 실제 작동 여부를 파악하여 파이프라인 전반에 대해 이해하는
것이 바람직하다. 따라서 기본적인 파이프라인을 구성하고 Streamlit을 통해 실제 챗봇이 작동하는지를 확인한 이후에 세부 내용을 구현할 것이다.
- retriever: 사용자의 질의를 입력받아 관련되거나 유사한 문서를 출력하는 모듈이다.
- generator: 유사 문서와 사용자 질의를 결합하여, 사용자 질의에 적절한 답변을 생성하는 모듈이다.
- utils: 시스템 구현에 필요한 유틸리티를 포함하는 모듈이다.
Module
- app.py: RAG를 위한 API를 구현하는 스크립트로, FastAPI로 구현될 예정이다.
- chat.py: Streamlit을 기반으로 사용자와 직접 대화하는 인터페이스를 구현할 예정이다.
- docker-entrypoint.sh: 도커 실행(run) 시 대상이 되는 스크립트를 표시한다.
- Dockerfile: 실행 환경 구축을 위한 도커 빌드 파일이다.
Script
3. [Pipeline] API 구축
RAG 시스템은 일반적으로 ‘INDEXING’, ‘RETRIEVE’, ‘CHAT’ 세 가지의 기능적인 API를 가진다. 각 API의 목표와 역할은 다음과 같다.
*pipeline-mockup 브런치를 참고.
1) Request/ Response 형식 정의: FastAPI를 기반으로 API를 구현하기에 앞서 API별 입출력을 정의한다.
INDEXING
RETRIEVE
CHAT
사용자 질의를 답변하기 위한 근거가 되는 문서를 벡터로 변환하고, 저장하는 역할을 하는 API.
사용자 질의와 사전에 인덱싱(Indexing)한 문서 간의 유사도를 기반으로 질의와 관련된 문서를 반환하는 API.
RETRIEVE API를 통해 받은 관련 문서를 질의와 결합하여 그에 맞는 적절한 답변을 반환하는 API.
- 위 API 정의서에 따라 utils/api_io.py에 입출력을 정의. Pydantic의 BaseModel로 구현되었으며 기본적인 기능만 정의하였으므로, 이후에 사용자 기호에 따라 변경/확장 가능.
API INDEXING RETRIEVE CHAT
항목 변수 이름 변수 유형 설명 변수 이름 변수 유형 설명 변수 이름 변수 유형 설명
입력
id String API 입력 ID. id String API 입력 ID. id String API 입력 ID.
name String 사용자 이름. name String 사용자 이름. name String 사용자 이름.
group_name String 인덱스 그룹 이름. group_name String 인덱스 그룹 이름. group_name String 인덱스 그룹 이름.
documents Sequence[Dictionary] 인덱싱 문서 리스트. query String 검색(Retreival) 질의. messages Sequence[Dictionary] 사용자 메시지 리스트.
max_chunk_size Integer 분할 문서 최대 길이. max_query_size Integer 최대 질의 길이. max_query_size Integer 최대 질의 길이.
max_chunk_overlap Integer 분할 문서 중복 토큰 수. max_chunk_overlap Integer 분할 문서 중복 토큰 수. max_response_size Integer 최대 응답 길이.
- - - top_k Integer 반환 문서의 수 top_k Integer (검색 시) 반환 문서의 수
- - - - - - stream Boolean 스트림 응답 생성 여부.
출력
id String API 입력 ID. id String API 입력 ID. - String | Stream 응답.
name String 사용자 이름. name String 사용자 이름. - - -
group_name String 인덱스 그룹 이름. group_name String 인덱스 그룹 이름. - - -
is_success Boolean 인덱싱 성공 여부 related_documents Sequence[Dictionary] 관련 문서 리스트. - - -
4. 2) API 구현: 1에서 정의한 API 명세서에 따라 FastAPI를 기반으로 API 코드를 작성한다(자세한 코드는 app.py 를 참고).
@app.post("/indexing")
async def indexing(item: IndexingItem) -> IndexingOutput:
# TODO: 색인 작업을 수행하는 코드를 작성합니다.
# 현재는 성공으로 가정합니다.
return IndexingOutput(
id=item.id,
name=item.name,
group_id=item.group_id,
is_success=True
)
@app.post("/retrieve")
async def retrieval(item: RetrievalItem) -> RetrievalOutput:
# TODO: 검색 작업을 수행하는 코드를 작성합니다.
# 현재는 고정된 결과로 가정합니다.
related_documents = [
{
"id": "doc1",
"text": "This is the first document.",
"metadata": {},
"score": 0.9
},
{
"id": "doc2",
"text": "This is the second document.",
"metadata": {},
"score": 0.8
}
]
return RetrievalOutput(
id=item.id,
name=item.name,
group_id=item.group_id,
related_documents=related_documents
)
@app.post("/chat")
async def chat(item: ChatItem):
# 검색 작업을 수행합니다.
query = item.messages[-1].content
related_documents = await retrieval(
RetrievalItem(
id=item.id,
name=item.name,
group_id=item.group_id,
query=query,
max_query_size=item.max_query_size,
top_k=item.top_k
)
)
# 검색 결과와 대화 메시지를 결합합니다.
if related_documents.related_documents:
retrieval_str = "nn".join(doc.text for doc in related_documents.related_documents)
query = "{context}nn{query}".format(context=retrieval_str, query=query)
messages = item.messages[:-1] + [{"role": "user", "content": query}]
# 대화 작업을 수행합니다.
# TODO: 대화 작업을 수행하는 코드를 작성합니다. 현재는 고정된 리스트 값으로 가정합니다.
contents = "This is the first response." * 10
contents = re.split("( )", contents)
# item.stream 이 False 일 경우, 한 번에 반환.
if not item.stream:
return "".join(contents)
# item.stream 이 True 일 경우, StreamingResponse로 반환.
# 현재 코드에서는 genertor를 사용하여 bytes 형태로 반환(이후 생성 모델의 스트림 출력으로 대체).
async def generate_response() -> Generator:
for content in contents:
yield content
await asyncio.sleep(0.02)
return StreamingResponse(
generate_response(),
model_type="Others",
db_manager=None,
metadata=None
)
5. 3) API 실행 및 테스트: 작성된 파이프라인을 실행 후, 테스트 코드를 작성하여 평가한다.
4) Streamlit 기반 챗봇 구축
- API 실행: API를 실행하기 위해 다음과 같이 명령어를 입력한다. (app.py)
- Streamlit 공식 사이트에서는 Streamlit 기반으로 챗봇을 구현할 수 있도록 스크립트를 구현한다. 이를 실행중인 API와 연결하여 테스트한다(chat.py).
* Streamlit Chat Script - https://docs.streamlit.io/develop/tutorials/llms/build-conversational-apps
$python3 app.py --sever_address=0.0.0.0 --server_port=8000
- API 테스트: 작성한 각 API에 대한 간단한 테스트를 진행한다. (test/pipeline.py)
(마지막 Stream 테스트에 대하여 명령어로 실행할 경우, 한번에 출력될 수가 있음. IDE로 실행하거나 curl로 실행할 경우 확인 가능.)
$python3 test/pipline.py
def request_api(messages: List[Dict[str, Any]], url: str):
data = {
"id": "test",
"name": "test",
"group_id": "test",
"messages": messages,
"max_query_size": 1024,
"max_response_size": 4096,
"top_k": 3,
"stream": True
}
response = requests.post(url, json=data, stream=True)
return response
# Get chatbot response
response = request_api(st.session_state.messages, args.chat_api)
def _genertor():
for chunk in response:
yield chunk.decode("utf-8")
with st.chat_message("assistant"):
response = st.write_stream(_genertor())
# Add chatbot response to chat history
st.session_state.messages.append({"role": "assistant", "content": response})
...(중략)...
...(중략)...
6. 5) Streamlit 기반 챗봇 테스트
- 작성한 chat.py를 실행한다. (물론 3번에서 실행한 app.py가 작동 중이어야 한다.)
- ‘http://localhost:8501’로 접속하여 채팅 테스트를 한다. (테스트 용으로 구축한 것이라, 설정한 반복 메시지가 출력된다.)
이로써, 기본적인 챗봇 파이프라인이 완성되었다. 이후, Retriever, Generator 모델을 개발하여 RAG 챗봇을 구현할 것이다.
$streamlit run chat.py --server.address=localhost --server.port=8501 -- --chat_api=http://localhost:8000/chat
(좌측의 설정 탭은 추후 업데이트할 예정)