Networks/Project

SK networks AI Camp - mini Project4(Chatbot)

코딩하는 Español되기 2024. 11. 5. 22:00

github 링크

4번째 미니 프로젝트 주제는 "LLM을 연동한 내외부 문서 기반 질의응답 시스템"였습니다.
대학원에 가야겠다고 결심하고 난 뒤로 제대로 듣지 않았던 LLM 부분에 대해서 하려니 힘들더라고요...

진짜 들으라고 하지 않았으면 하나도 모르고 eng? 아 몰랑~ 해죠! 하고 못했을 것 같습니다.

[사실 gpt한테 해줘를 하루정도 시현했습니다]

 

저희 조는 "운전면허 필기시험 준비생들을 위한 LAG 기술을 활용한 챗봇"을 진행하고자 했고

처음에 생각한 주요 기능은 아래와 같습니다.

    ● LAG 연동: 운전면허 필기 교재 및 이론 문서 분석
    ● 실시간 답변: 사용자 질문에 정확하고 신속한 응답 제공 
    ● 모의시험 기능: 실제 시험과 유사한 문제 제공


저는 아래를 해보았습니다. gpt에 도움을 많이 받은 관계로 하나하나 코드를 뜯어보며 구현을 이해하고

이후 더 고도화를 하고자 합니다

1. 운전면허필기 기출문제(1, 2종). PDF를 전처리하여 CSV로 저장

2. 유사도를 측정하여 임베딩을 진행

3. Langchain과 Langraph를 활용해 보기

4. streamlit에서 구현해 보기

5. 구현 영상


1. 운전면허필기 기출문제(1, 2종). PDF를 전처리하여 CSV로 저장

○ pdf 자료

운전면허필기 기출문제(1,2종).pdf
7.55MB

 

○ 진행 : colab에서 pymupdf를 사용하여 진행

    ● pymupdf를 install 및 기타 import 모듈

!pip install pymupdf
from google.colab import drive
import fitz  # PyMuPDF
import pandas as pd
import re
import os
import hashlib

 

    ● 드라이브 연결 및 pdf 파일 열기

drive.mount('/content/data')
# PDF 파일 경로
DATA_PATH = "PDF 파일 저장 경로"

# 저장할 이미지 디렉토리
IMAGE_DIR = "IMAGE를 저장할 경로"
os.makedirs(IMAGE_DIR, exist_ok=True)
# 문제를 저장할 리스트 초기화
data = []

# PDF 파일 열기"
pdf_document = fitz.open(DATA_PATH)

 

한 번에 통합해서 전처리 진행(실패)

        - 총 1000개의 문제지만 1136개의 문제를 발견

    □ 문제 이유:

        1) 문제 형식("숫자.")이지만 해설에도 ("숫자.")으로 된 형태의 글이 존재

        2) 문제 형식이 ("숫자.")이 아닌 "숫자(띄어쓰기)." , "숫자 ,(콤마)", "숫자 번호 순서 꼬임" 불순한 데이터가 있었음

# 모든 페이지의 텍스트를 한 번에 통합
full_text = ""
for page_num in range(pdf_document.page_count):
    page = pdf_document[page_num]
    full_text += page.get_text() + "\n"

# 문제 패턴을 정의한 정규 표현식
question_pattern = r"(?m)^(?<!\S)(\d{1,3})\.\s"

question_matches = list(re.finditer(question_pattern, full_text))
print(f"총 {len(question_matches)}개의 문제 번호를 발견했습니다.")

# 누락된 문제 번호와 비정상적인 문제 번호를 저장할 리스트
missing_questions = []
abnormal_questions = []
previous_question_number = 0
data = []

# 총 1136개의 문제 번호를 발견했습니다.

     ● 코드

       1. 반복문 실행 및 문제 번호 추출

           - enumerate를 사용해 question_matches 리스트의 각 항목을 반복

           - idx = 현재 인덱스, match는 현재 일치 항목
           - question_number 변수에 현재 문제 번호를 추출하고 정수로 변환

 

       2. 비정상적인 문제 번호 확인

           - IF 현재 문제 번호 < 이전 문제 번호 then abnormal_questions 리스트에 추가(비정상 목록)
           - full_text에서 현재 문제부터 다음 문제 사이의 텍스트를 추출해 해설에 추가

 

       3. 누락된 문제 번호 확인

           - 현재 문제 번호 - 이전 문제 번호 ≥ 1 then 차이에 해당하는 문제 번호들을 missing_questions 리스트에 추가

 

       4. 문제/정답/해설 추출

           - question_text 변수: start_idx와 end_idx를 통해 현재 문제의 텍스트 범위 저장
           - 문제 추출:  ~ ■ 정답 앞까지 추출
           - 정답 추출: ■ 정답 ~  ■ 해설 앞까지 추출
           - 해설 추출: ■ 해설 ~ 다음 문제 번호 | 텍스트 끝까지 추출해 해설로 저장

 

       5. 사진 유무 확인 및 데이터 저장

          - 일단 모두 'yes'로 사진이 있다고 생각
          - 추출된 문제, 정답, 해설, 사진 유무 정보를 딕셔너리로 만들어 data 리스트에 추가

        6. 결과

          - 979개로 누락된 데이터 존재

(11, 19, 97, 124, 148, 218, 376, 416, 518, 547, 689, 721, 722, 723, 724, 725, 726, 745, 766, 836, 956)
 

[전체 코드]

더보기

-  한 번에 통합해서 돌린 코드(실패 코드)

for idx, match in enumerate(question_matches):
    question_number = int(match.group(1))

    if question_number < previous_question_number:
        abnormal_questions.append(question_number)
        start_idx = match.end()
        end_idx = question_matches[idx + 1].start() if idx + 1 < len(question_matches) else len(full_text)
        explanation_text = full_text[start_idx:end_idx].strip()
        data[-1]["해설"] += f"\n\n[비정상적인 문제 번호 {question_number}에 대한 설명 추가]\n{explanation_text}"
        continue

    if question_number > previous_question_number + 1:
        for missing_num in range(previous_question_number + 1, question_number):
            missing_questions.append(missing_num)

    previous_question_number = question_number
    start_idx = match.end()
    end_idx = question_matches[idx + 1].start() if idx + 1 < len(question_matches) else len(full_text)
    question_text = full_text[start_idx:end_idx].strip()

    # 문제 내용 추출
    question_match = re.search(r"(.*?)(?=■\s*정답)", question_text, re.DOTALL)
    question = question_match.group(1).strip() if question_match else ""

    # 정답 추출
    answer_match = re.search(r"■\s*정답\s*:\s*(.*?)(?=■\s*해설)", question_text, re.DOTALL)
    answer = answer_match.group(1).strip() if answer_match else ""

    # 해설 추출
    next_question_pattern = rf"(?m)^(?<!\S){question_number + 1}\.\s"
    explanation_match = re.search(r"■\s*해설\s*:\s*(.*?)(?=" + next_question_pattern + r"|$)", question_text, re.DOTALL)
    explanation = explanation_match.group(1).strip() if explanation_match else ""

    # 사진 유무 확인 (이미지 저장은 나중에 수행)
    photo = "yes" if question_number in range(1, 1001) else "no"

    data.append({
        "문제": f"{question_number}. {question}",
        "정답": answer,
        "해설": explanation,
        "사진": photo
    })
if missing_questions or abnormal_questions:
    print(f"누락된 문제 번호: {missing_questions}")
    print(f"비정상적인 문제 번호: {abnormal_questions}")
else:
    print("모든 문제 번호가 정상적으로 추출되었습니다.")

df = pd.DataFrame(data, columns=["문제", "정답", "해설", "사진"])

# 누락된 문제 번호: [11, 19, 97, 124, 148, 218, 376, 416, 518, 547, 689, 721, 722, 723, 724, 725, 726, 745, 766, 836, 956]
# 비정상적인 문제 번호: [1, 2, 3, 4, 1, 1, 2, 3, 4, 2, 1, 2, 3, 1, 1, 2, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 6, 7, 8, 9, 10, 6, 7, 8, 9, 10, 11, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 1, 2, 3, 5, 4, 1, 2, 3, 1, 2, 3, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 1, 2, 1, 2, 1, 1, 2, 3, 4, 5, 6, 7, 1, 1, 2, 3, 1, 3, 1, 1, 1, 2, 3, 1, 2, 3, 1, 1, 2, 3, 3, 4, 1, 2, 3, 3, 1, 2, 1, 2, 1, 2, 3, 722, 723, 724, 725, 726, 1, 2, 3, 1, 2, 1, 2]
num_problems = len(df["문제"])
num_answers = len(df["정답"])
num_explanations = len(df["해설"])

print(f"문제 수: {num_problems}, 정답 수: {num_answers}, 해설 수: {num_explanations}")

if num_problems != 1000 or num_answers != 1000 or num_explanations != 1000:
    print("경고: CSV 저장을 중단")
else:
    csv_path = "저장할 csv 경로/csv이름.csv"
    df.to_csv(csv_path, index=False, encoding="utf-8-sig")
    print(f"CSV 파일 {csv_path}에 저장되었습니다.")
    
# 문제 수: 979, 정답 수: 979, 해설 수: 979
# 경고: CSV 저장을 중단

○ 1개씩 누락되면 다시 찾기를 추가한 코드(4개 정도는 수작업으로 고침)

    ● 데이터 저장용 변수 초기화

        - data: 추출한 문제 데이터를 저장
        - abnormal_questions: 비정상적인 문제 번호를 저장

    ● 문제 번호 패턴 및 수정 사전 정의
        - correction_dict: 특정 문제 번호(721번)를 727번으로 수정(721번의 경우 727로 되어있음)
        - previous_question_number: 이전 문제 번호를 저장

    ● 데이터 추가 함수 정의
       - add_problem_data: 문제가 data에 존재하지 않을 경우, 문제, 정답, 해설을 추출해 데이터를 추가하는 함수
       - find_and_add_problem: 비정상적인 문제 번호에 대한 문제, 정답, 해설을 추출하여 data에 추가하는 함수

    ● 페이지별 텍스트에서 문제 추출
       - PDF의 각 페이지를 순회하며 페이지 텍스트(page_text)에서 문제 번호 찾기
       - question_matches: 해당 페이지의 모든 문제 번호와 위치 찾기
    ● 페이지 내 문제 처리
       - 특정 문제 번호 수정: correction_dict를 사용 → 특정 문제 번호 수정(727 → 721)
       - 비정상 문제 번호 처리: 위의 코드와 동일
       - 누락된 문제 번호 처리: 위의 코드와 동일

    ● 문제, 정답, 해설 추출: 위 코드와 동일
    ● DataFrame 변환
       - data → DataFrame으로 변환 → 681 ~ 965번 문제는 사진 컬럼 "yes"(이미지 있는 문제) 

※ 이후 문제가 있던 721 ~ 726번까지는 수작업으로 진행하였습니다

※ 이미지는 문제 번호와 매핑하려고 했으나 중간중간 이상한 사진을 매핑하여 차후 업데이트 예정입니다.

문제 정리.csv
0.74MB

 

[전체 코드]

더보기
# 데이터 저장용 리스트
data = []
abnormal_questions = []

# 문제 번호 형식을 정의
question_pattern = r"(?m)^\s*(\d{1,3})(?:[.,\s])\s*"
correction_dict = {721: 727}  # 특정 문제 번호 수정
previous_question_number = 0
# 문제 번호에 대한 데이터 추가 함수
def add_problem_data(problem_num, page_text):
    if any(entry["문제"].startswith(f"{problem_num}.") for entry in data):
        return  # 이미 데이터가 있으면 추가하지 않음

    # 문제 내용 추출
    question_pattern = rf"{problem_num}[.,\s]\s*(.*?)(?=\s*정답)"
    question_match = re.search(question_pattern, page_text, re.DOTALL)
    question = question_match.group(1).replace('■', '').strip() if question_match else f"{problem_num}번 문제를 찾을 수 없음"

    # 정답 추출
    answer_pattern = r"정답\s*[::]?\s*(.*?)(?=\s*해설|$)"
    answer_match = re.search(answer_pattern, page_text, re.DOTALL)
    answer = answer_match.group(1).replace('■', '').replace('\n', ' ').strip() if answer_match else "정답을 찾을 수 없음"

    # 해설 추출 - 다음 문제 번호가 나오기 전까지의 모든 텍스트를 포함
    next_problem_num = problem_num + 1
    explanation_pattern = rf"해설\s*[::]?\s*(.*?)(?=(?m)^{next_problem_num}[.,\s]|$)"
    explanation_match = re.search(explanation_pattern, page_text, re.DOTALL | re.MULTILINE)
    explanation = explanation_match.group(1).replace('■', '').strip() if explanation_match else "해설을 찾을 수 없음"

    # 데이터 추가
    data.append({
        "문제": f"{problem_num}. {question}",
        "정답": answer,
        "해설": explanation,
        "사진": "yes"
    })
def find_and_add_problem(problem_num, page_text):
    # 문제 내용 추출
    question_pattern = rf"{problem_num}[.,\s]\s*(.*?)(?=정답)"
    question_match = re.search(question_pattern, page_text, re.DOTALL)
    question = question_match.group(1).replace('■', '').strip() if question_match else f"{problem_num}번 문제를 찾을 수 없음"

    # 정답 내용 추출
    answer_pattern = r"정답\s*[::]?\s*(.*?)(?=해설|$)"
    answer_match = re.search(answer_pattern, page_text, re.DOTALL)
    answer = answer_match.group(1).replace('■', '').replace('\n', ' ').strip() if answer_match else "정답을 찾을 수 없음"

    # 해설 내용 추출
    next_problem_num = problem_num + 1
    explanation_pattern = rf"해설\s*[::]?\s*(.*?)(?=(?m)^{next_problem_num}[.,\s]|$)"
    explanation_match = re.search(explanation_pattern, page_text, re.DOTALL | re.MULTILINE)
    explanation = explanation_match.group(1).replace('■', '').strip() if explanation_match else "해설을 찾을 수 없음"

    # 데이터에 추가
    data.append({
        "문제": f"{problem_num}. {question}",
        "정답": answer,
        "해설": explanation,
        "사진": "yes"
    })
# 문제 번호별 텍스트를 페이지 단위로 추출
for page_num in range(pdf_document.page_count):
    page = pdf_document[page_num]
    page_text = page.get_text()

    question_matches = list(re.finditer(question_pattern, page_text))
    for idx, match in enumerate(question_matches):
        question_number = int(match.group(1))
        question_number = correction_dict.get(question_number, question_number)

        # 문제 번호가 723 ~ 726인 경우 별도 처리
        if 721 <= question_number <= 726:
            find_and_add_problem(question_number, page_text)
            previous_question_number = question_number
            continue

        # 문제 번호가 이전 번호보다 작으면 해설 안에 있는 부분
        if question_number < previous_question_number:
            abnormal_questions.append(question_number)
            start_idx = match.end()
            end_idx = question_matches[idx + 1].start() if idx + 1 < len(question_matches) else len(page_text)
            explanation_text = page_text[start_idx:end_idx].strip()
            data[-1]["해설"] += f"\n\n{question_number}. {explanation_text}"
            continue

        # 누락된 문제 번호 확인
        if question_number > previous_question_number + 1:
            for missing_num in range(previous_question_number + 1, question_number):
                print(f"누락된 문제 번호 {missing_num} 발견")
                add_problem_data(missing_num, page_text)

        # 문제 번호 업데이트
        previous_question_number = question_number

        # 문제 텍스트 시작과 끝 인덱스
        start_idx = match.end()
        end_idx = question_matches[idx + 1].start() if idx + 1 < len(question_matches) else len(page_text)
        question_text = page_text[start_idx:end_idx].strip()

        # 문제 내용 추출
        question_match = re.search(r"(.*?)(?=\s*정답|$)", question_text, re.DOTALL)
        question = question_match.group(1).replace('■', '').strip() if question_match else ""

        # 정답 추출
        answer_match = re.search(r"정답\s*[::]?\s*(.*?)(?=\s*해설|$)", question_text, re.DOTALL)
        answer = answer_match.group(1).replace('■', '').replace('\n', ' ').strip() if answer_match else ""

        # 해설 추출 - 다음 문제 번호가 나타나기 전까지 모든 텍스트 포함
        next_problem_num = question_number + 1
        explanation_pattern = rf"해설\s*[::]?\s*(.*?)(?=\n{next_problem_num}[.,\s]|$)"
        explanation_match = re.search(explanation_pattern, question_text, re.DOTALL)
        explanation = explanation_match.group(1).replace('■', '').strip() if explanation_match else ""

        # 사진 유무 확인
        photo = "yes" if question_number in range(1, 1001) else "no"

        # 데이터 리스트에 추가
        data.append({
            "문제": f"{question_number}. {question}",
            "정답": answer,
            "해설": explanation,
            "사진": photo
        })
# DataFrame으로 변환
df = pd.DataFrame(data, columns=["문제", "정답", "해설", "사진"])
df['사진'] = df['문제'].apply(lambda x: 'yes' if 681 <= int(x.split('.')[0]) <= 965 else 'no')
csv_path = "저장경로/파일명.csv"
df.to_csv(csv_path, index=False, encoding="utf-8-sig")

2. 유사도를 측정하여 임베딩을 진행

○ 저장한 CSV 파일에서 질문 텍스트를 임베딩(벡터화) → CSV파일로 저장

    ● Hugging Face의 AutoTokenizer와 AutoModel을 사용

        - intfloat/multilingual-e5-large 모델과 토크나이저를 로드
    ※ 다국어 텍스트 임베딩에 적합한 사전 훈련 모델이기에 사용

    ● embed_text 함수

        - 텍스트를 입력으로 받아 모델을 통해 임베딩 벡터 생성
        - 입력 텍스트가 문자열이 아닌 경우, 문자열로 변환
        - 토크나이저를 통해 텍스트 → 텐서로 변환

        - padding과 truncation을 적용해 길이 조정
        - 모델의 출력을 last_hidden_state에서 평균을 내어 임베딩 벡터로 만듦
        - 임베딩을 numpy 배열로 반환

    ● embed_dataframe 함수

        - 특정 열에 포함된 모든 텍스트에 대해 임베딩 생성

        - tqdm: 진행률을 표시(처리 속도를 시각적으로 확인)
        - 각 임베딩 벡터 → 리스트 형태로 변환 → embeddings 리스트에 추가 → 최종적으로 이를 반환

임베딩된_문제.csv
19.93MB

 

[전체 코드]

더보기

- 모듈 임포트 및 설치

!pip install tqdm transformers torch
import pandas as pd
import os
from google.colab import drive
from transformers import AutoTokenizer, AutoModel
import torch
from tqdm import tqdm
import numpy as np

 

- 드라이브 마운트 및 csv 파일 로드

drive.mount('/content/data')
df = pd.read_csv("저장경로/읽어올 파일명.csv")

 

- huggingface 모델 로드 및 토크나이저 로드

model_name = "intfloat/multilingual-e5-large"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

 

- 남은 시간 계산 함수

def embed_dataframe(df, column_name):
    embeddings = []
    for text in tqdm(df[column_name], desc="임베딩 진행 중", unit="문제"):
        embedding = embed_text(text)
        embeddings.append(embedding.tolist())  # 각 임베딩 벡터를 리스트로 변환하여 저장
    return embeddings

 

- 텍스트 임베딩 함수

def embed_text(text):
    if not isinstance(text, str):
        text = str(text)

    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
    return embeddings.squeeze().numpy()

 

- 임베딩 및 저장

df['임베딩'] = embed_dataframe(df, '문제')

# 임베딩 벡터를 문자열로 변환해 저장
df['임베딩'] = df['임베딩'].apply(lambda x: ','.join(map(str, x)))
# CSV 파일로 저장
df.to_csv("저장경로/파일명.csv", index=False)

3. Langchain과 Langraph를 활용(In Colab)

모델 설정 및 임베딩 함수 정의

    ● intfloat/multilingual-e5-large:텍스트를 벡터 형태의 임베딩으로 변환하는 데 사용

    ● embed_text 함수: 텍스트를 벡터로 변환

            - 사용자의 질문과 데이터베이스에 있는 질문을 비교하기 위함

            - 텍스트를 토큰화 → 모델을 통해 임베딩 벡터를 생성 → 평균을 내어 문장 전체의 벡터 표현 생성

임베딩 벡터 변환

    ● 각 임베딩 벡터는 쉼표로 구분된 문자열로 저장 So, 리스트 형태의 숫자 배열로 변환해 df['임베딩']에 저장

    ● 각 질문에 대해 벡터 정보를 사용 가능

RetrievalQAChain 클래스:

    ● 입력된 질문과 유사한 질문을 검색 → 가장 유사한 질문을 반환

○ DrivingTestChatbot 클래스

    ● 추천 문제를 제공 & 답변을 확인하는 기능

    ● 사용자가 틀린 답변을 입력했을 때는 해설을 제공(prompt를 통해 LLM에게 물어본 자세한 내용을 알려줌)

○ find_similar_question 함수

    ● 사용자의 입력을 임베딩 → 데이터프레임의 질문들과 코사인 유사도를 계산 → 가장 유사한 질문을 반환

실행 예시

 

랭그래프 구현(랭체인 코드 내용은 거의 동일하며 체인 구성에 관해서만 설명; 코드는 아래에 있음)

    ● LangChain의 상태 그래프

        - 각 단계를 그래프 형태로 정의 → 챗봇의 각 동작을 체계적으로 연결

        - 각 상태가 가지는 입력, 출력, 종료 여부를 정의

        - 각 상태의 결과를 다음 상태에 전달 가능

    ● embed_text_state

        - 텍스트를 임베딩하는 노드, 입력을 받아 벡터 생성 & 다음 노드로 전달

    ● find_similar_question_state

       - 임베딩 벡터를 입력받아 유사 질문을 찾고, 이 결과를 다음 노드로 전달

    ● recommend_question_state

       - 추천된 질문을 출력

    ● check_answer_state

       - 사용자가 입력한 답변을 검증 IF 정답 then 종료, ELSE 해설 생성을 위한 다음 노드로 연결

    ● generate_explanation_state(답변이 틀린 경우)

       - 상세 해설을 생성해 사용자에게 제공

    ● check_answer

       - 사용자의 답이 맞으면 종료, 틀리면 generate_explanation으로 연결

    ● 최종 실행 및 그래프 시각화

[전체 코드]

더보기

- 모듈 임포트 및 모델 설정

!pip install openai==0.27.0
!pip install langchain-community langchain-core langgraph
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.chains import RetrievalQA
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
from langchain.llms import OpenAI
import openai
import os
import pandas as pd
import numpy as np
from transformers import AutoTokenizer, AutoModel
import torch
from google.colab import drive
from typing_extensions import TypedDict, Optional
# 랭그래프
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display
from sklearn.metrics.pairwise import cosine_similarity

drive.mount('/content/drive')
model_name = "intfloat/multilingual-e5-large"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)

 

- 데이터 로드 및 임베딩 벡터 변환

file_path = "경로/파일명.csv"
df = pd.read_csv(file_path)
df['임베딩'] = df['임베딩'].apply(lambda x: np.fromstring(x.strip('[]'), sep=','))

 

- 임베딩 함수

def embed_text(text):
    if not isinstance(text, str):
        text = str(text)
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
    return embeddings.squeeze().numpy()

 

- OpenAI 및 LangChain 환경 변수 설정

os.environ["OPENAI_API_KEY"] = ""
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_PROJECT"] = "GRAPH TUTORIAL"
os.environ["LANGCHAIN_API_KEY"] = ""

 

- 유사 질문 검색 및 챗봇 클래스

class RetrievalQAChain:
    def __init__(self, find_similar_fn):
        self.find_similar_fn = find_similar_fn

    def __call__(self, input_question):
        similar_question = self.find_similar_fn(input_question).iloc[0]
        return similar_question

 

- 추천 문제 제공, 답변 확인 기능

class DrivingTestChatbot:
    def __init__(self, qa_chain):
        self.qa_chain = qa_chain
        self.current_question = None
        self.current_answer = None
        self.current_explanation = None

    def recommend_question(self, input_text):
        if not isinstance(input_text, str):
            input_text = str(input_text)

        # 사용자의 질문 → 문제 검색해 반환
        response = self.qa_chain(input_text)

        # 유사한 질문, 정답, 해설 저장
        self.current_question = response['문제']
        self.current_answer = response['정답']
        self.current_explanation = response['해설']

        return f"추천 문제: {self.current_question}"

    def check_answer(self, user_answer):
        if user_answer.strip() == self.current_answer.strip():
            return "정답입니다!"
        else:
            # 오답일 경우 LLM에게 해설 요청
            prompt = f"사용자가 입력한 답이 틀렸습니다. 틀린 답: {user_answer}\n" \
                     f"문제: {self.current_question}\n" \
                     f"정답: {self.current_answer}\n" \
                     f"해설: {self.current_explanation}\n\n" \
                     f"사용자의 틀린 답을 바탕으로 해설을 자세하게 설명해주세요."

            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": "You are a helpful assistant specialized in driving test questions."},
                    {"role": "user", "content": prompt}
                ]
            )
            return response['choices'][0]['message']['content']

 

- 사용자 입력과 질문의 코사인 유사도를 검사해 가장 유사한 질문 반환 

def find_similar_question(input_question, top_n=1):
    input_embedding = embed_text(input_question)
    similarities = cosine_similarity([input_embedding], np.stack(df['임베딩'].values))[0]
    df['유사도'] = similarities
    similar_questions = df.sort_values(by='유사도', ascending=False).head(top_n)
    return similar_questions[['문제', '정답', '해설', '유사도']]

 

- 실행

qa_chain = RetrievalQAChain(find_similar_fn=find_similar_question)
chatbot = DrivingTestChatbot(qa_chain=qa_chain)

while True:
    input_text = input("운전면허와 관련된 질문을 입력하세요 (종료하려면 '종료' 입력): ")
    if input_text.lower() == "종료":
        print("챗봇을 종료합니다.")
        break

    print(chatbot.recommend_question(input_text))
    user_answer = input("답을 입력하세요: ")
    print(chatbot.check_answer(user_answer))

 

======================= 랭체인, 랭그래프 =================================

 

- 임베딩 함수

def embed_text(text):
    if not isinstance(text, str):
        text = str(text)
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
    return embeddings.squeeze().numpy()

 - 유사도 찾기 체인

def find_similar_question(input_question, top_n=1):
    input_embedding = embed_text(input_question)
    similarities = cosine_similarity([input_embedding], np.stack(df['임베딩'].values))[0]
    df['유사도'] = similarities
    similar_questions = df.sort_values(by='유사도', ascending=False).head(top_n)
    return similar_questions[['문제', '정답', '해설', '유사도']].iloc[0]

 

- llm 모델 로드

llm = OpenAI(model_name="gpt-3.5-turbo")

 

- 질문 추천 체인

class RecommendQuestionChain:
    def __init__(self, find_similar_fn):
        self.find_similar_fn = find_similar_fn
        self.current_question = None
        self.current_answer = None
        self.current_explanation = None

    def __call__(self, input_question):
        response = self.find_similar_fn(input_question)
        self.current_question = response['문제']
        self.current_answer = response['정답']
        self.current_explanation = response['해설']
        return f"추천 문제: {self.current_question}"

 

- 답변 검증 체인

class AnswerCheckChain:
    def __init__(self, llm, recommend_chain):
        self.llm = llm
        self.recommend_chain = recommend_chain

    def __call__(self, user_answer):
        if user_answer.strip() == self.recommend_chain.current_answer.strip():
            return "정답입니다!"
        else:
            prompt = (
                f"사용자가 입력한 답이 틀렸습니다. 틀린 답: {user_answer}\n"
                f"문제: {self.recommend_chain.current_question}\n"
                f"정답: {self.recommend_chain.current_answer}\n"
                f"해설: {self.recommend_chain.current_explanation}\n\n"
                f"사용자의 틀린 답을 바탕으로 해설을 자세하게 설명해주세요."
            )

            response = openai.ChatCompletion.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": "You are a helpful assistant specialized in driving test questions."},
                    {"role": "user", "content": prompt}
                ]
            )
            return response['choices'][0]['message']['content']

 

- 챗봇 실행 클래스

class DrivingTestChatbot:
    def __init__(self, recommend_chain, answer_check_chain):
        self.recommend_chain = recommend_chain
        self.answer_check_chain = answer_check_chain

    def run(self):
        while True:
            input_text = input("운전면허 필기와 관련된 질문을 입력하세요 (종료하려면 '종료' 입력): ")
            if input_text.lower() == "종료":
                print("종료")
                break

            # 질문 추천 체인 실행
            print(self.recommend_chain(input_text))
            user_answer = input("답을 입력하세요: ")

            # 답변 검증 및 해설 생성 체인 실행
            print(self.answer_check_chain(user_answer))

 

- 챗봇 체인 구성 및 실행

recommend_chain = RecommendQuestionChain(find_similar_fn=find_similar_question)
answer_check_chain = AnswerCheckChain(llm=OpenAI(model="gpt-3.5-turbo"), recommend_chain=recommend_chain)

chatbot = DrivingTestChatbot(recommend_chain=recommend_chain, answer_check_chain=answer_check_chain)
chatbot.run()

 

- 랭체인 상태 그래프 정의

class State(TypedDict):
    input: Optional[str]
    node_output: Optional[str]
    is_stop: Optional[bool]

 

- 노드 함수 정의

    - 텍스트 임베딩 함수

def embed_text_state(state: State) -> State:
    input_text = state["input"]
    embedding = embed_text(input_text)
    return {"node_output": embedding, "is_stop": False}

    - 유사 질문 찾기 함수

def find_similar_question_state(state: State) -> State:
    input_embedding = state["node_output"]
    similar_question = find_similar_question(input_embedding)
    return {"node_output": similar_question['문제'], "is_stop": False}

     - 추천 질문 출력 함수

def recommend_question_state(state: State) -> State:
    question_text = state["node_output"]
    response = recommend_chain.__call__(question_text)
    return {**state, "node_output": response}

    - 답변 검증 함수

def check_answer_state(state: State) -> State:
    user_answer = state["node_output"]
    is_correct = answer_check_chain.__call__(user_answer) == "정답!"
    return {**state, "is_stop": is_correct, "node_output": "정답!" if is_correct else "오답!"}

    - 상세 해설 설명 함수

def generate_explanation_state(state: State) -> State:
    explanation = answer_check_chain.llm(state["node_output"])
    return {**state, "node_output": explanation}

 

- 노드 추가 

simple_graph.add_node("embed_text", embed_text_state)
simple_graph.add_node("find_similar_question", find_similar_question_state)
simple_graph.add_node("recommend_question", recommend_question_state)
simple_graph.add_node("check_answer", check_answer_state)
simple_graph.add_node("generate_explanation", generate_explanation_state)

 

- 그래프 간선 추가 및 조건 설정(엣지)

     - 각 노드를 연결해 사용자 입력 처리 순서 정의

simple_graph.add_edge(START, "embed_text")
simple_graph.add_edge("embed_text", "find_similar_question")
simple_graph.add_edge("find_similar_question", "recommend_question")
simple_graph.add_edge("recommend_question", "check_answer")
simple_graph.add_conditional_edges(
    "check_answer",
    is_stop_fnc,
    {
        "go_stop": END,
        "go_explanation": "generate_explanation"
    }
)
simple_graph.add_edge("generate_explanation", END)

 

- 컴파일 및 시각화

compiled_graph = simple_graph.compile()

try:
    display(
        Image(
            compiled_graph.get_graph().draw_mermaid_png()
        )
    )
except Exception as e:
    print("그래프 시각화 중 오류 발생:", e)

 

4. streamlit에서 구현해 보기

○ 위에서 정의했던 내용들을 Streamlit으로 웹에서 구현하고자 했습니다.

    ● 기존 함수들은 거의 동일 But 몇몇 변경 사항이 존재

        - find_similar_question의 ton_n을 5로 변경하여 유사도가 높은 5개 중 랜덤으로 한 문제 출력하도록 변경

202411.05 기준으로 project Tree

    ● ctrl + shift + p  → project tree 실행 → README.md에 저장

○ app.py(초기 시작 파일)

    ● utils: 사용자 지정 함수 모듈을 모아 놓은 폴더
         - find_similar_question: 사용자 질문과 유사한 문제를 찾는 함수
         - typing_effect: 메시지 출력에 타이핑 효과를 주는 함수
         - reset_state: 상태 초기화 함수(대화 상태 초기화)
         - constant, prompt: 챗봇 역할, 메시지 생성 관련 상수와 함수를 정의한 모듈

1) 체인 초기화

    ● RecommendQuestionChain과 AnswerCheckChain을 각각 초기화(추천 문제 생성과 답변 검토 기능 설정)
    ● RecommendQuestionChain:  유사한 질문을 찾고 추천 문제 생성(find_similar_question 함수)
    ● AnswerCheckChain: OpenAI의 ChatCompletion 모델을 사용 → 답변 평가 및 해설 제공   

    

2) 대화 내용 초기화(reset_state 함수)

3) 대화 상태 저장

    ● 대화 기록 관리(st.session_state.messages):를 통해 대화 메시지들 저장(초기화 시 비어 있는 리스트로 설정)
    ● 답변 대기 상태(st.session_state.awaiting_answer): 챗봇이 사용자의 답변 입력을 기다리는지 여부(초기값: False)

 

4) 이전 대화 기록 출력

    ● 대화 기록 출력: st.session_state.messages에 저장된 모든 메시지를 반복하며 화면에 출력
    ● 각 메시지는 CHATBOT_MESSAGE.role.name에 따라 사용자 또는 챗봇의 역할을 구분해 출력

        ※ utils/constant.py에 구현

 

5) 사용자 입력창

    ● st.chat_input: 사용자 입력을 받을 수 있는 단일 입력 창 생성

 

6) 사용자 입력처리

 ① 질문 입력 처리(awaiting_answer가 False일 때 수행)

    ● create_message 함수: 사용자 입력을 메시지 객체로 만들어 st.session_state.messages에 저장
    ● 화면에 출력: 사용자 질문을 화면에 표시(st.write) 

 ② 추천 문제 생성

    ● 추천 문제 생성(recommend_chain): 유사한 질문을 찾고 추천 문제 생성
    ● 문자열 처리: 추천 문제의 줄 바꿈을 두 줄로 변경
    ● 추천 문제 상태 저장

      : current_question, current_answer, current_explanation로 추천된 질문, 정답, 해설을 각각 st.session_state에 저장

※ 상태를 저장하지 않아 자꾸 None 값이 넘어가서 이상한 답변을 많이 받음 → Print 하여 상태 확인한 결과 고칠 수 있었음

  ③ 추천 메시지 생성 및 출력

    ● 메시지 생성 및 추가(assistant_message_content): 추천 문제 메시지를 messages 리스트에 추가
    ● 화면에 출력: 추천 문제를 챗봇 메시지 형태로 화면에 출력

  ④ 답변 대기 상태를 True로 변환

 

7) 답변 입력처리(awaiting_answer가 True일 때 수행)

    ● 사용자가 답변을 입력 → 메시지로 생성 → messages 리스트에 저장
    ● 화면에 출력: 사용자가 입력한 답변을 화면에 표시

 ① 답변 검토 및 결과 생성

    ● 답변 검토(answer_check_chain): 답변이 정답인지 확인 → 결과(정답 또는 오답 해설)를 반환

    ● 결과 포맷팅: 줄 바꿈을 추가
    ● 결과 메시지 생성: formatted_answer_result로 메시지를 생성 + messages 리스트에 저장

 

 ② 결과 메시지 출력 및 상태 초기화

    ● 결과 출력(display_typing_effect 함수): 타이핑 효과와 함께 결과 메시지를 화면에 표시
    ● 상태 초기화: awaiting_answer를 False로 설정 → 다음 질문을 받을 준비

  

[app.py, utils/constant.py, utils/prompt.py]

더보기

○ utils/prompt.py

from .constant import CHATBOT_MESSAGE, CHATBOT_ROLE

def create_message(role:CHATBOT_ROLE, prompt:str):

    return {
        CHATBOT_MESSAGE.role.name: role.name,
        CHATBOT_MESSAGE.content.name: prompt
    }

○ utils/constant.py

import enum 

class CHATBOT_ROLE(enum.Enum):
    user = (enum.auto, "사용자")
    assistant = (enum.auto, "LLM 모델")

# message
class CHATBOT_MESSAGE(enum.Enum):
    role = (enum.auto, "작성자")
    content = (enum.auto, "메세지")

 

○ app.py

import openai
import streamlit as st
from chains.recommend_chain import RecommendQuestionChain
from chains.answer_check_chain import AnswerCheckChain
from utils.find_similar_question import find_similar_question
from utils.typing_effect import display_typing_effect
from utils.reset_state import reset_state
from utils.constant import CHATBOT_ROLE, CHATBOT_MESSAGE
from utils.prompt import create_message
import re

# 체인 초기화   
recommend_chain = RecommendQuestionChain(find_similar_fn=find_similar_question)
answer_check_chain = AnswerCheckChain(llm=openai.ChatCompletion, recommend_chain=recommend_chain)

# UI 설정
st.set_page_config(page_title="운전면허 필기 시험 챗봇", layout="centered")
st.title("🚗 운전면허 필기 시험 챗봇")

# 상태 초기화
reset_state()

# 대화 상태 저장
if "messages" not in st.session_state:
    st.session_state.messages = []
if "awaiting_answer" not in st.session_state:
    st.session_state.awaiting_answer = False

# 이전 대화 기록 출력
for message in st.session_state.messages:
    with st.chat_message(message[CHATBOT_MESSAGE.role.name]):
        st.markdown(message[CHATBOT_MESSAGE.content.name])

# 입력창 (단일 입력창)
user_input = st.chat_input("질문을 입력하거나 답을 입력하세요")

# 사용자 입력 처리
if user_input:
    if not st.session_state.awaiting_answer:
        # 질문 입력 처리
        user_message = create_message(role=CHATBOT_ROLE.user, prompt=user_input)
        st.session_state.messages.append(user_message)

        # 화면에 사용자 질문 출력
        with st.chat_message(CHATBOT_ROLE.user.name):
            st.write(user_input)

        # 추천 문제 생성
        st.session_state["recommended_question"] = recommend_chain(user_input)
        assistant_message_content = st.session_state["recommended_question"]
        
        # 줄바꿈 처리된 추천 문제 생성
        assistant_message_content = re.sub(r'\n', '\n\n', st.session_state["recommended_question"])
        
        # 추천된 문제 정보를 상태에 저장
        st.session_state["current_question"] = recommend_chain.current_question
        st.session_state["current_answer"] = recommend_chain.current_answer
        st.session_state["current_explanation"] = recommend_chain.current_explanation

        # 추천 문제 메시지 추가 및 출력
        assistant_message = create_message(role=CHATBOT_ROLE.assistant, prompt=assistant_message_content)
        st.session_state.messages.append(assistant_message)

        with st.chat_message(CHATBOT_ROLE.assistant.name):
            st.markdown(assistant_message_content)

        # 답변 대기 상태로 전환
        st.session_state.awaiting_answer = True

    else:
        # 답변 입력 처리
        user_answer_message = create_message(role=CHATBOT_ROLE.user, prompt=user_input)
        st.session_state.messages.append(user_answer_message)

        # 화면에 사용자 답변 출력
        with st.chat_message(CHATBOT_ROLE.user.name):
            st.write(user_input)

        # 답변 검토 및 결과 생성
        answer_result = answer_check_chain(user_input)
        
        # 줄바꿈 처리된 결과 메시지 생성
        formatted_answer_result = "\n".join([sentence.strip() for sentence in answer_result.split(". ") if sentence])

        assistant_result_message = create_message(role=CHATBOT_ROLE.assistant, prompt=formatted_answer_result)
        st.session_state.messages.append(assistant_result_message)

        # 결과 메시지 화면 출력
        with st.chat_message(CHATBOT_ROLE.assistant.name):
            display_typing_effect(formatted_answer_result)

        # 상태 초기화하여 새로운 질문을 받을 수 있도록 함
        st.session_state.awaiting_answer = False

○ config.py | .env 파일(보안 파일 형식만)

# config.py

import openai
import os

# 환경 변수 설정
openai.api_key = "OPEN_API_KEY"

# 모델 경로
MODEL_NAME = "intfloat/multilingual-e5-large"
DATA_PATH = "data/임베딩된_문제.csv"
# .env

# openapi
OPENAI_API_KEY = 'OPEN_API_KEY'
TEST_ENV = "나의 이름은 홍길동입니다."


# Langsmith KEY
LANGCHAIN_TRACING_V2 = "true"
LANGCHAIN_ENDPOINT = "https://api.smith.langchain.com/"
LANGCHAIN_PROJECT = "LANGCHAIN_PROJECT_NAME"
LANGCHAIN_API_KEY = "LANGCHAIN_API_KEY"

 

[utils 폴더 안 파일]

※ constant.py와 prompt.py는 app.py 설명과 함께 적혀있음

○ find_similar_question.py

    ● 질문과 유사한 문제를 찾는 기능

    ● 데이터 로드 및 임베딩

        - pd.read_csv(DATA_PATH): 질문 데이터가 포함된 CSV 파일 로드(data/임베딩된_문제.csv)
        - 임베딩 컬럼: 벡터 형태의 임베딩 데이터가 저장

                               np.fromstring을 사용해 실제 벡터로 변환(문자열 형식이기 때문)
    ● find_similar_question 함수

        - 입력된 질문(input_question)과 유사한 질문을 찾는 역할

        - embed_text(input_question): 입력된 질문을 임베딩 벡터로 변환
        - cosine_similarity를 사용해 입력 벡터와 데이터프레임에 저장된 모든 임베딩 벡터 간 유사도를 계산

          ※ 결과 =  similarities 배열에 저장

    ● 유사 질문 선택

       - similarities를 유사도 열로 추가 → 데이터프레임에 각 질문의 유사도를 할당
       - nlargest(top_n, '유사도'):가장 유사한 top_n개의 질문 5개를 찾고, 그중에서 하나를 무작위로 선택(sample(n=1))
       - 선택된 질문은 문제, 정답, 해설, 유사도 컬럼을 포함

[find_similar_question.py, embedding.py 코드]

더보기

○ find_similar_question.py

import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
from embedding import embed_text
from config import DATA_PATH

# 데이터 로드 및 유사도 기반 문제 추천 함수
df = pd.read_csv(DATA_PATH)
df['임베딩'] = df['임베딩'].apply(lambda x: np.fromstring(x.strip('[]'), sep=','))

def find_similar_question(input_question, top_n=5):
    input_embedding = embed_text(input_question)
    similarities = cosine_similarity([input_embedding], np.stack(df['임베딩'].values))[0]
    
    similar_questions = df.assign(유사도=similarities).nlargest(top_n, '유사도')
    selected_question = similar_questions.sample(n=1).iloc[0]
    return selected_question

 

embedding.py 

from transformers import AutoTokenizer, AutoModel
import torch
import numpy as np
from config import MODEL_NAME

# 텍스트 임베딩

# 모델 및 토크나이저 로드
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME)

def embed_text(text):
    if not isinstance(text, str):
        text = str(text)
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    with torch.no_grad():
        embeddings = model(**inputs).last_hidden_state.mean(dim=1)
    return embeddings.squeeze().numpy()

○ reset_state.py

   ● 세션 상태 초기화(기본값 설정)

   ● 상태 초기화 함수(reset_state 함수)

       - 대화 상태 초기화(각 키의 기본값 설정)

         → 사용자가 처음 대화를 시작할 때 이전 값들이 초기화된 상태로 유지되도록 함
       - question, recommended_question, answer_result: 빈 문자열로 초기화

       - show_answer: False로 초기화(초기값: 답변이 표시되지 않은 상태)

import streamlit as st

def reset_state():
    for key in ["question", "recommended_question", "answer_result", "show_answer"]:
        if key not in st.session_state:
            st.session_state[key] = "" if key != "show_answer" else False

 

○ typing_effect.py

    ● 텍스트를 한 글자씩 출력해 타이핑 효과를 주는 함수

    ● 타이핑 효과 함수(display_typing_effect 함수)

        - 문자열(text)을 한 글자씩 화면에 출력하여 타이핑 효과를 구현
        - output = st.empty(): Streamlit에서 출력할 영역을 빈 상태로 생성

        - typed_text 변수: 문자를 하나씩 추가하여 문자열을 완성해 나가기 위함
        - output.markdown(f"{typed_text}"): 현재까지 생성된 텍스트를 화면에 표시
        - time.sleep(delay): 각 문자 출력 간 딜레이 적용(타이핑 효과)

import streamlit as st
import time

def display_typing_effect(text, delay=0.03):
    output = st.empty()
    typed_text = ""
    for char in text:
        typed_text += char
        output.markdown(f"{typed_text}")
        time.sleep(delay)

 

[chains 폴더 안에 파일]

 

○ answer_check_chain.py

    ● 사용자의 답변이 맞는지 확인하고, 틀렸을 경우 LLM(gpt)을 통해 해설을 생성하는 역할

    ● Streamlit 라이브러리: Streamlit의 session_state를 사용하여 상태 관리

    ※ 세션 상태는 대화형 애플리케이션에서 사용자의 현재 질문, 답변, 해설 등을 지속적으로 저장하고 불러오는 데 유용

    ● AnswerCheckChain 클래스
        - llm: gpt-3.5-turbo를 사용해 답변 검토 및 해설 생성을 위해 필요한 파라미터
        - recommend_chain: RecommendQuestionChain 인스턴스, 추천된 질문 & 답변 정보를 저장하고 불러오는 데 사용

           → 이를 통해 AnswerCheckChain이 추천된 문제와 답변을 가져올 수 있음

    ● 상태에서 추천된 값 가져오기
        - st.session_state.get: current_question, current_answer, current_explanation(질문/정답/해설) 불러오기

        - RecommendQuestionChain에서 생성된 추천 문제와 답변 정보가 사용자의 답변과 비교하기 위해 유지

    ● 정답 확인
        - current_answer가 존재 & 사용자가 입력한 답변(user_answer) = current_answer →  "정답입니다!"라는 메시지 반환
        - 사용자가 입력한 답변에서 공백을 제거(strip())하여 오타나 불필요한 공백으로 인한 오류 줄이기

    ● 정답이 없을 경우 기본 메시지 반환: current_answer가 None인 경우 → 추천된 정답이 없다는 메시지 반환

    ● 해설 요청을 위한 프롬프트 생성(사용자의 답변이 틀린 경우)

      → 상세한 해설 생성을 요청하는 prompt(틀린 답, 문제, 정답, 해설 등의 정보를 포함해 사용자 맞춤형 해설을 제공)

    ● LLM 호출(GPT-3.5 모델) prompt를 전달하여 해설을 요청
        - system 메시지를 통해 LLM에게 "운전면허 문제 해설 전문가" 역할을 지정

더보기
import streamlit as st

class AnswerCheckChain:
    def __init__(self, llm, recommend_chain):
        self.llm = llm
        self.recommend_chain = recommend_chain

    def __call__(self, user_answer):
        # st.session_state에서 이전 추천된 값 불러오기
        current_question = st.session_state.get("current_question")
        current_answer = st.session_state.get("current_answer")
        current_explanation = st.session_state.get("current_explanation")

        # 값 확인
        # print("DEBUG: current_question =", current_question)
        # print("DEBUG: current_answer =", current_answer)
        # print("DEBUG: current_explanation =", current_explanation)
        # print("DEBUG: user_answer =", user_answer)

        # 정답 비교
        if current_answer and user_answer.strip() == current_answer.strip():
            return "정답입니다!"
        else:
            # current_answer가 None인 경우 기본 메시지 출력
            if not current_answer:
                return "추천된 정답이 없습니다. 다시 시도해 주세요."

            # prompt 내용 생성 및 출력
            prompt = (
                f"사용자가 입력한 답이 틀렸습니다. 틀린 답: {user_answer}\n"
                f"문제: {current_question}\n"
                f"정답: {current_answer}\n"
                f"해설: {current_explanation}\n\n"
                f"사용자의 틀린 답을 바탕으로 해설을 자세하게 설명해주세요."
            )

            # LLM API 호출
            response = self.llm.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": "You are a helpful assistant specialized in driving test questions."},
                    {"role": "user", "content": prompt}
                ]
            )
            return response['choices'][0]['message']['content']

○ recommend_chain.py

    ● 사용자가 입력한 질문과 유사한 질문을 추천하는 역할

    ● 유사한 질문 찾기 함수 임포트
        - find_similar_question: 사용자 입력 질문과 데이터베이스의 질문들 간 유사도 계산 → 가장 유사한 질문 찾는 함수

    ● RecommendQuestionChain 클래스
        - find_similar_fn: 유사한 질문을 찾는 함수(find_similar_question 사용)
        - current_question, current_answer, current_explanation: 추천된 질문, 정답, 해설을 저장할 변수

    ● 입력된 질문에 대한 추천 생성
        - __call__ 메서드:  input_question을 받기 →  유사 질문 찾기

                                → 추천된 질문, 정답, 해설을 각각 current_question, current_answer, current_explanation에 저장
        - find_similar_fn 호출: input_question과 가장 유사한 질문을 찾아서 반환

    ● 추천 문제 반환
       - IF current_question이 존재 then 해당 질문을 반환 → 사용자에게 추천 문제로 표시
       - Elif current_question이 존재 X then "추천된 문제가 없습니다."라는 메시지 반환

더보기
from utils.find_similar_question import find_similar_question

class RecommendQuestionChain:
    def __init__(self, find_similar_fn):
        self.find_similar_fn = find_similar_fn
        self.current_question = None
        self.current_answer = None
        self.current_explanation = None

    def __call__(self, input_question):
        # find_similar_question 함수의 반환 값 확인
        response = self.find_similar_fn(input_question)
        self.current_question = response['문제']
        self.current_answer = response['정답']
        self.current_explanation = response['해설']
        # 추천 문제 반환
        return f"{self.current_question}" if self.current_question else "추천된 문제가 없습니다."

 

구현 영상

 

[깨알 자동차 이동 구현]