SK networks AI Camp에서 파이널 프로젝트를 2024.11.07 ~ 2025.01.02까지 진행합니다.
1차 에자일 발표를 마친 후 지금까지 진행했던 내용을 정리해보고자 합니다.
저희 팀에서 만들고자 하는 프로젝트는 "LLM을 활용한 대화형 문화 콘텐츠 추천 서비스"입니다.
주제는 전시회, 뮤지컬 투 트랙으로 진행했습니다.
1) OCR, LLM, RAG를 활용한 대화형 문화 콘텐츠(전시회) 추천
2) LLM, DeepFM 모델을 활용한 좋아하는 배우 기반 원하는 장르 뮤지컬 추천
저는 2번 주제인 뮤지컬 추천 팀에서 진행하게 되었습니다.
이전에 블로그 포스팅 했던 공부 내용을 기반으로 Modeling, 데이터 전처리, streamlit 구현, Model 평가 및 개선 역할을 담당했습니다.
팀원들이 정리해 준 PPT를 기반으로 프로젝트에 관한 설명을 한 후 코드 리뷰를 적도록 하겠습니다.
프로젝트 소개
○ 프로젝트 소개
● 배경
- 국내 여가 생활 만족도 증가(그 中 2030 세대의 큰 만족도)
- 전시회 및 뮤지컬 추천 서비스 부족
● 목적
- 문화 콘텐츠(뮤지컬, 전시회)에 대해 사용자의 관심사, 선호도를 반영한 맞춤형 추천 시스템
● 기존 서비스(왼)와 차별점(오)
- 정보의 단순성(단순 순위) VS AI 기반 추천 시스템
- 개인화된 추천 부족 VS 개인화된 추천 서비스
- 전시회 이미지 정보 시각적 제한 VS 전시회 정보의 다각적 전달
● ERD 설계 및 DB 선정
- NoSQL vs RDBMS
: 유연성 및 확장성을 고려
- MongoDB vs Other NoSQL DB
: 프로젝트 적합성(Vector Search 가능성)
- MongoDB Atlas vs MongoDB on AWS
: 비용 효율성(완전 관리형 서비스) & 멀티 클라우드 인프라 구축 용이(빠른 구축)
○ 타깃 그룹: 뮤지컬에 입문하는 2030 세대
● 한국엔터테이먼트산업협회 연구 결과에 따르면 연기자가 0.106(표준화 계수)만큼 관람 만족에 영향을 줌
● 예술성/대중성 선호도에 따른 회귀 분석 결과
- 연기자가 대중성 선호그룹에는 영향을 줌
- 연기자가 예술성 선호 그룹은 영향을 주지 않음
● 타깃을 고려하였을 때 대중성 선호그룹에 속할 것으로 판단
● So, 사용자가 좋아하는 배우와 유사한 배우가 출연한 뮤지컬을 추천
○ 수집한 데이터: 약 4,000명의 배우에 대한 정보가 포함된 뮤지컬 데이터
● 수집처: 공연예술통합전산망(KOPIS) API
● 데이터 특징: 배우의 특징과 관련한 줄글 데이터 X, 뮤지컬 id, 예매율 같은 구조화된 데이터
● So, LLM 사용보다는 많은 차원이 고려된 벡터를 만드는 신경망 모델 사용이 적합하다 판단
● + 추천 모델 → DeepFM 사용을 결정
○ 기획(전체 프로세스)
● 데이터 수집 및 가공: API
- 장르 구조화를 위해 OpenAi API 활용
● 사용자 인터페이스 설계
- 사용자 편의성을 고려한 서비스 구성 e.g. 배우 이름 자동 완성, 배우 이름 초성 검색, 장르 카테고리화
● 추천 시스템
- DeepFM 모델 + Filtering
● 평가 및 피드백 시스템
- 추천 모델 평가 지표: Precision@k, Recall@k
- reranker: 사용자 피드백을 반영
○ 기획(모델)
● Wide Component(FM)
- 뮤지컬, 장르, 배우 데이터
- 선형적인 관계, 2차 상호작용과 같은 단순한 관계 학습
● Deep Component
- 기존 피쳐 + 예매율 → 모델이 배우가 출연한 뮤지컬의 예매율에 관한 중요도를 반영하도록 함
- 고차원 벡터들의 비선형 관계를 학습하여 더 복잡한 패턴 모델링
● Prediction Rayer
- 두 Component 결과를 합쳐 각 배우에 대한 뮤지컬 추천 확률 계산
- 모델을 통해 학습된 배우 임베딩 벡터를 통해 유사한 배우를 탐색 → 뮤지컬 추천
○ 기능
● 유사 배우 기반 추천
- 선호하는 배우가 현재 출연하는 뮤지컬 X → 유사한 배우가 출연한 뮤지컬 추천
● 장르 필터링과 새로운 작품 추천
- e.g. 선호하는 배우가 주로 출연하는 장르가 '역사'
But '판타지'와 같은 다른 장르에서 그 배우가 출연할 가능성이 있는 작품을 예측 & 추천
○ Flow Chart
● DeepFM 모델: 구축한 모델을 통해 예측 수행
● Filtering: 장르와 배우 필터링을 거쳐 뮤지컬 추천
● If 사용자가 해당 추천 만족 X then Reranker를 통해 이를 반영 → 추천 개선
○ 향후 목표
● 서비스 확장: 다른 문화 공간 추천 에이전트 구현
● 멀티턴 챗봇 기능 추가
● 모니터링 Slack 알람(비용, 오류, 성능) 기능 추가
● 모델 개선 및 데이터 전처리 개선 모델 평가지표 성능 향상
● 사용자 피드백 및 순위 데이터 추가를 통해 NDCG@k 평가지표도 추가 활용
○ precision@k
- 상위 k개의 추천 항목 중 사용자가 실제로 선호한 항목의 비율
- 추천한 k개 아이템 중 얼마나 정확하게 관련 있는 아이템을 추천했는지 측정
○ Recall@k
- 실제로 선호한 항목 중 상위 k개의 추천 항목에 포함된 비율
- 사용자가 선호하는 모든 아이템 중에서 얼마나 많이 추천되었는지 측정
○ NDCG@k
- 상위 k개의 추천 결과에서 순위에 따라 중요도가 달라지는 정확도를 평가
- 관련성이 높은 항목이 상위에 올수록 더 높은 점수를 부여하여 순위의 중요성도 반영
코드 리뷰
○ 전체 프로세스 코드
● ./utils/All_Musical_Process.py
1) Musical_Process 클래스
[기능] 지정된 스크립트를 현재 환경의 Python 인터프리터로 실행
- subprocess.run() 메서드 사용 → 외부 Python 스크립트 실행 및 예외 처리
[구현 방식]
- script_path: 실행 스크립트 절대 경로 계산
- venv_python: 현재 Python 인터프리터 경로 불러오기(sys.executable)
[사용 이유]
- 재사용성: 여러 스크립트를 실행할 수 있도록 설계
- 환경 일관성: 가상환경의 Python 인터프리터 사용으로 의존성 충돌 방지
- 오류 관리: 실행 실패 시 로그 출력
[주요 처리]
- first_preprocessing.py 실행: 원시 데이터 첫 전처리
- prompt.py 실행: 장르 데이터를 Prompt를 통해 구조화
- preprocessing.py 실행: 추가 전처리
- DeepFM.py 실행: 추천 모델 생성 및 저장
[개선 가능성]
- 동적 워크플로우 지원: 사용자 입력 | 설정값에 따라 특정 단계만 실행하도록 개선
import subprocess
import sys
import os
# 현재 디렉토리 경로
current_dir = os.path.dirname(os.path.abspath(__file__))
if current_dir not in sys.path:
sys.path.append(current_dir)
# main.py 경로
main_dir = os.path.abspath(os.path.join(current_dir, ".."))
if main_dir not in sys.path:
sys.path.append(main_dir)
import config
class Musical_Process:
def execute_script(self, script_name):
# 현재 파일 기준으로 스크립트 경로 설정
script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), script_name)
# 가상환경의 Python 실행 경로 가져오기
venv_python = sys.executable
try:
subprocess.run([venv_python, script_path], check=True)
except subprocess.CalledProcessError as e:
print(f"Error occurred while executing {script_name}: {e}")
raise
if __name__ == "__main__":
processed_data_file_path = f'{config.file_path}/{config.processed_data}'
add_genre_file_path = f'{config.file_path}/{config.add_genre_file_name}'
process = Musical_Process()
"""처음 전처리 코드 per+raw.json -> processed_data.json"""
if not os.path.exists(processed_data_file_path):
process.execute_script("first_preprocessing.py")
else:
print('pass first_preprocessing')
"""장르 추가 실행 조건 processed_data.json -> add_genre_story.json"""
if not os.path.exists(add_genre_file_path):
process.execute_script("prompt.py")
else:
print('pass prompt')
pass
"""전처리 코드 실행 조건 add_genre_story.json -> df_with_negatives.json"""
if not os.path.exists(config.df_with_negatives_path):
process.execute_script("preprocessing.py")
else:
print('pass Preprocessing')
"""모델 생성 실행 조건"""
if not os.path.exists(config.save_model_path):
# print("DeepFM 실행")
process.execute_script("DeepFM.py")
else:
print('pass model')
pass
● ./config.py
- 파일경로 및 추가 처리 자료들을 한 파일에서 관리하여 코드 간결성 확보 및 가독성 증가
"""파일 경로"""
file_path = "C:/SKN_3_MyProject/SKN_03_FINAL/Data/Final"
per_raw = "per+raw.json"
processed_data = "processed_data.json"
add_genre_file_name = "add_genre_story.json"
df_with_negatives_path = 'C:/SKN_3_MyProject/SKN_03_FINAL/Data/Final/df_with_negatives.json'
picture_file_path = 'C:/SKN_3_MyProject/SKN_03_FINAL/READMEImages/Performance.jpg'
Score_Distribution_path = 'C:/SKN_3_MyProject/SKN_03_FINAL/READMEImages/Score_Distribution.jpg'
save_model_path = "C:/SKN_3_MyProject/SKN_03_FINAL/Data/Model/Recommend.h5"
# genre
unique_genres = [
"대학로", "가족", "신화", "역사", "지역|창작"
]
# 삭제할 컬럼 목록
columns_to_drop = [
"performance_id", "facility_id", "producer", "planner",
"host", "sponsor", "synopsis", "genre", "open_run",
"visit", "daehakro", "festival", "musical_create"
]
● ./.env
- OpenAI API 키 관리 파일
OPENAI_API_KEY = 'YOUR_API_KEY'
○ 데이터 전처리
● ./utils/first_preprocessing.py
1) F_Preprocessing 클래스
[기능]
- load_data(self): JSON 파일 로드 → DataFrame
- preprocessing_data(self): 데이터 전처리 수행 및 처리된 데이터 → JSON 파일로 저장
- run(self): 데이터 로드 및 전처리 과정 순차적 실행
[구현 방식]
- 컬럼 삭제: config.columns_to_drop에 정의된 컬럼 제거(df.drop 메소드 사용)
- 조건 필터링: child 값이 'Y'가 아닌 데이터만 남김
- 결측값 제거: df.dropna()로 결측값 제거
- 타입 변환: percentage 컬럼 → 숫자로 변환 If 변환 실패 then NaN처리
[이유]
- 데이터 무결성 보장
- 유지보수 용이성: 주요 전처리 단계를 명확히 구분 → 수정 및 디버깅 고려
- run 함수 구현: 다른 코드에서 활용 가능, 실행 과정을 간단하게 호출할 수 있도록 설계
[장점]
- 구조화된 설계: 클래스와 메서드로 작업 분리
- 모듈화 된 전처리: 데이터 로드, 전처리, 실행 순서 명확한 구분
- 유연한 설정: config 파일로 설정을 관리해 변경 유연성 확보
[개선 가능성]
- 에러 처리 강화
import json
import pandas as pd
import numpy as np
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import config
class F_Preprocessing:
def __init__(self):
self.data_list = []
self.df = None
self.processed_data = f'{config.file_path}/{config.processed_data}'
self.columns_to_drop = config.columns_to_drop
def load_data(self):
data_list = []
# JSON 파일을 로드
with open(f'{config.file_path}/{config.per_raw}', 'r', encoding='utf-8-sig') as file:
data_list = json.load(file)
# 리스트를 DataFrame으로 변환
self.df = pd.DataFrame(data_list)
def preprocessing_data(self):
# 컬럼 삭제
self.df = self.df.drop(columns=self.columns_to_drop)
# child 값이 'Y'가 아닌 데이터만 필터링
self.df = self.df[self.df['child'] != 'Y']
self.df = self.df.dropna()
self.df['percentage'] = pd.to_numeric(self.df['percentage'], errors='coerce') # 값이 문자열일 경우 처리
self.df = self.df[self.df['percentage'] <= 100]
# 전처리된 데이터를 새로운 JSON 파일로 저장
self.df.to_json(self.processed_data, orient='records', force_ascii=False, lines=True)
print(f"전처리된 데이터가 {self.processed_data}에 저장되었습니다.")
def run(self):
self.load_data()
self.preprocessing_data()
if __name__ == "__main__":
preprocessing_instance = F_Preprocessing()
preprocessing_instance.run()
● ./utils/preprocessing.py
1) Preprocessing 클래스
[기능]
- extract_ticket_price(self, ticket_price): ticket_price 값을 다양한 패턴으로 처리 & 단인 정수 값으로 변환
- normalize_column(self, column, min_value=None, max_value=None)
: 주어진 컬럼 값을 0.001 ~ 1로 정규화 및 로그 변환
- preprocessing_data(self): 데이터 전처리 및 부정 샘플 생성
[구현 방식]
- ticket_price: "전석 무료" | 단일 가격 패턴 처리, 여러 좌석이 있을 경우 평균 계산, 처리되지 않으면 None 반환
- column: 최소/최댓값 계산, 값이 동일한 경우 처리, 정규화 후 소수점 4자리로 반올림
- 장르 처리, 배우 정보 확장(,로 분리해 행 단위 확장)
- 필요 컬럼 선택 및 필터링: 조건 = target이 1, percentage >= 20
- 부정 샘플 생성: 1대4 비율로 부정 샘플 비율 조정
[사용 이유]
- ticket_price: 데이터 패턴 고려 및 정규화
- column: 데이터 정규화를 통해 모델 학습에 적합하게 변환, 로그변환 및 0.001 상수 추가해 안정성 확보
- 부정샘플링: 균형 잡힌 학습 데이터 생성
[장점]
- ticket_price: 문자열 처리와 정규표현식을 활용해 유연하게 구현
- 모듈화: 데이터 로드, 전처리, 샘플 생성 로직 명확한 구분
- 정규화 및 패턴 처리와 유연성
[개선 가능성]
- 처리되지 않은 값 로깅 추가
- 데이터 추가 시 패턴, 변경 경우 대비: config에 관리 추가
- 정규화 방식과 최소/최댓값 설정을 유연하게 처리하도록 개선
- 샘플링 효율성 개선: 효율적 조건 평가 및 샘플링 로직 필요(로직이 길고 복잡) → pandas 고급 연산 활용
import json
import pandas as pd
import numpy as np
import re
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import config
class Preprocessing:
def __init__(self):
self.data_list = []
self.df = None
def load_data(self):
data_list = []
# JSON 파일 한 줄씩 읽어서 처리
with open(f'{config.file_path}/{config.add_genre_file_name}', 'r', encoding='utf-8-sig') as file:
for line in file:
# 한 줄의 JSON 문자열을 딕셔너리로 변환
data = json.loads(line.strip())
data_list.append(data) # 리스트에 추가
# 리스트를 DataFrame으로 변환
self.df = pd.DataFrame(data_list)
def extract_ticket_price(self, ticket_price):
"""ticket_price 컬럼 처리 로직"""
# 1. 전석 무료 처리
if "전석 무료" in ticket_price:
return 0
# 2. 전석 + 단일 가격 처리
elif "전석" in ticket_price:
match = re.search(r'(\d+),?\d*원', ticket_price)
if match:
return int(match.group(1).replace(",", ""))
# 3. 여러 좌석 가격 처리
else:
prices = re.findall(r'석\s*(\d+),?\d*원', ticket_price)
if prices:
prices = [int(price.replace(",", "")) for price in prices]
return sum(prices) / len(prices) # 평균 계산
return None # 기본값 (가격이 없는 경우)
def normalize_column(self, column, min_value=None, max_value=None):
"""로그 변환 및 0~1 정규화"""
# 최소/최대값 지정
if min_value is None:
min_value = column.min()
if max_value is None:
max_value = column.max()
# 값이 모두 동일한 경우 처리 (분모가 0이 되는 경우 방지)
if min_value == max_value:
return pd.Series(0.001, index=column.index) # 동일값이면 최소값 반환
# 정규화
normalized = 0.001 + (column - min_value) / (max_value - min_value) * (1 - 0.001)
# 소수점 4자리로 반올림
return normalized.round(4)
def preprocessing_data(self):
# 장르 처리: '중' 키워드 포함 시 마지막 장르 선택
def extract_final_genre(genre):
if genre == '연애' or genre == '미스터리' or genre == '가요뮤지컬' or genre == '창작':
return "대학로"
if genre == '부산북구':
return "지역|창작"
return genre
self.df['genre'] = self.df['genre'].apply(extract_final_genre)
# cast 열을 쉼표로 나누고 행으로 확장
self.df["cast_split"] = self.df["cast"].str.split(", ") # 쉼표로 분리
df_expanded = self.df.explode("cast_split").reset_index(drop=True) # 행으로 확장
# cast_split 열 이름을 cast로 덮어쓰기
df_expanded["cast"] = df_expanded["cast_split"]
df_expanded = df_expanded.drop(columns=["cast_split"])
# 필요한 열만 선택
df_selected = df_expanded[["cast",
"title",
"genre",
"percentage",
"ticket_price"
]]
# '등'을 제거하고 공백을 제거
df_selected['cast'] = df_selected['cast'].str.replace('등', '', regex=False).str.strip()
# 1. target=1인 데이터만 필터링
df_selected['target'] = 1
positive_df = df_selected[df_selected['target'] == 1]
# cast별 target=1 데이터 개수 계산
cast_counts = positive_df.groupby('cast')['target'].count()
# target=1 데이터가 5개 이상인 배우만 필터링
valid_casts = cast_counts[cast_counts >= 5].index
positive_df = positive_df[positive_df['cast'].isin(valid_casts)]
# 2. 전체 영화 목록 추출 (중복 제거)
all_movies = df_selected.groupby(['title', 'genre'], as_index=False).first()
# 티켓 price
# ticket_price 처리 및 컬럼 추가
df_selected['processed_ticket_price'] = df_selected['ticket_price'].apply(self.extract_ticket_price)
# ticket_price 정규화
df_selected['normalized_ticket_price'] = self.normalize_column(
df_selected['processed_ticket_price'].fillna(0), # 결측값 처리
min_value=df_selected['processed_ticket_price'].min(),
max_value=df_selected['processed_ticket_price'].max()
)
# 'ticket_price' 컬럼에 normalized 값을 덮어쓰기
positive_df['ticket_price'] = df_selected.loc[
df_selected['target'] == 1, 'normalized_ticket_price'
].round(4)
# 중간 컬럼 제거
df_selected = df_selected.drop(columns=['normalized_ticket_price', 'processed_ticket_price'])
# percentage 값이 20 미만인 데이터 제거
df_selected = df_selected[df_selected['percentage'] >= 20]
# percentage 정규화
df_selected['percentage'] = self.normalize_column(
df_selected['percentage'].fillna(0), # 결측값 처리
min_value=df_selected['percentage'].min(),
max_value=df_selected['percentage'].max()
)
positive_df['percentage'] = df_selected.loc[
df_selected['target'] == 1, 'percentage'
]
# 전체 영화 목록 추출 (중복 제거)
all_movies = df_selected.groupby(['title', 'genre'], as_index=False).first()
# 3. 부정 샘플을 위한 빈 리스트 생성
negative_samples = []
# 3. 부정 샘플을 위한 빈 리스트 생성
# 부정 샘플 생성
for _, row in positive_df.iterrows():
cast = row['cast']
movies_played_by_cast = positive_df[positive_df['cast'] == cast]['title'].unique()
genres_played_by_cast = positive_df[positive_df['cast'] == cast]['genre'].unique()
# 배우가 출연하지 않은 영화들 중 조건에 맞는 영화 선택
non_cast_movies = all_movies[
~all_movies['title'].isin(movies_played_by_cast) &
~all_movies['genre'].isin(genres_played_by_cast)
]
# 긍정 샘플 수에 따라 부정 샘플 수 결정
positive_count_for_cast = len(movies_played_by_cast)
max_negative_samples_for_cast = positive_count_for_cast * 4
# 부정 샘플링 (필요한 만큼만)
non_cast_movies_sampled = non_cast_movies.sample(
n=min(len(non_cast_movies), max_negative_samples_for_cast),
random_state=42,
replace=False
)
# 샘플링된 영화 데이터를 negative_samples에 추가
for _, movie_row in non_cast_movies_sampled.iterrows():
matching_movie = all_movies[
(all_movies['title'] == movie_row['title']) &
(all_movies['genre'] == movie_row['genre']) &
(all_movies['percentage'] == movie_row['percentage'])
]
normalized_ticket_price = matching_movie['ticket_price'].iloc[0] if not matching_movie.empty else 0.01
negative_samples.append({
'cast': cast,
'title': movie_row['title'],
'genre': movie_row['genre'],
'percentage': movie_row['percentage'] if not pd.isna(movie_row['percentage']) else 0,
'ticket_price': normalized_ticket_price,
'target': 0
})
# Negative samples DataFrame 생성
negative_df = pd.DataFrame(negative_samples)
# Positive와 Negative 데이터 결합
df_with_negatives = pd.concat([positive_df, negative_df], ignore_index=True)
# 최종 비율 확인
positive_count = len(df_with_negatives[df_with_negatives['target'] == 1])
negative_count = len(df_with_negatives[df_with_negatives['target'] == 0])
print(f"Positive count: {positive_count}, Negative count: {negative_count}")
if negative_count > positive_count * 4:
print("Too many negative samples. Adjusting to 1:4 ratio.")
df_with_negatives = pd.concat([
positive_df,
negative_df.sample(
n=positive_count * 4,
random_state=42
)
], ignore_index=True)
# 결과 저장
df_with_negatives.to_json(config.df_with_negatives_path, orient='records', lines=True, force_ascii=False)
def run(self):
self.load_data()
self.preprocessing_data()
if __name__ == "__main__":
preprocessing_instance = Preprocessing()
preprocessing_instance.run()
● ./utils/prompt.py
1) GenereStoryUpdater 클래스
[기능]
- update_genre_and_story(self, df): 데이터 프레임의 각 행에 대해 genre 값 업데이트
- get_genre_and_story(self, row): OpenAI 모델을 사용해 genre 생성
[구현 방식]
- self.total_charge, self_cnt: 호출 비용 및 호출 횟수 추적을 위한 변수
- ChatPromptTemplate.from_template: GPT와 대화를 위한 템플릿
- 반복문으로 데이터 프레임 각 행 순회
- genre 값이 존재하는 경우 데이터 유지
- ChatPromptTemplate.format_messages()로 GPT 프롬프트 생성
[사용 이유]
- 무결성 유지 및 비용 감소: 필요한 경우에만 호출
[장점]
- try-except 블록을 통해 오류 발생 시 안전 실행 보장
- ChatPromptTemplate과 LangChain 콜백을 사용해 GPT 호출을 간결하게 관리
- 모듈화 및 API 사용 효율성
[개선 가능성]
- 백터화 기법 적용 고려: 큰 데이터 프레임인 경우 효율성을 위한 고려
- API 호출 재시도 로직 추가
- 응답 형식이 예상과 다를 경우 검증 로직 추가
- 확장성: 추가 장르 분류 기준도 고려한 코드 변경(장르 분류 기준, 처리 규칙을 config로 외부화)
import pandas as pd
import os
import openai
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.callbacks.manager import get_openai_callback
from langchain_openai import OpenAI
from langchain.chat_models import ChatOpenAI
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import config
from dotenv import load_dotenv
class GenreStoryUpdater:
def __init__(self):
self.total_charge = 0
self.cnt = 0
self.prompt_template = ChatPromptTemplate.from_template("""
다음은 뮤지컬의 정보입니다:
제목: {title}
상영 위치: {place}
출연진: {cast}
포스터 URL: {poster}
1. 이 뮤지컬의 장르를 '역사', '가족', '신화', '지역|창작', '대학로' 이 5가지 카테고리 중에서만 적절한 것을 한 개만 골라서 적어주세요.
아래와 같은 형식으로 답변해주세요:
장르: <장르>
""")
self.chat_model = ChatOpenAI(
model="gpt-3.5-turbo",
temperature=0.7,
max_tokens=300,
openai_api_key=os.getenv('OPENAI_API_KEY')
)
def update_genre_and_story(self, df):
for idx, row in df.iterrows():
if pd.notnull(row['genre']):
genre = row['genre']
else:
genre = self.get_genre_and_story(row)
df.at[idx, "genre"] = genre
def get_genre_and_story(self, row):
if row['genre']:
return row['genre']
else:
try:
prompt = self.prompt_template.format_messages(
title=row['title'],
place=row['place'],
cast=row['cast'],
poster=row['poster']
)
with get_openai_callback() as cb:
response = self.chat_model(prompt)
content = response.content
self.total_charge += cb.total_cost
self.cnt += 1
print(f"{self.cnt}: 호출에 청구된 총 금액(USD): \t${self.total_charge}")
genre = content.split("장르: ")[1].split("\n")[0].strip()
return genre
except Exception as e:
print(f"오류 발생: {e}")
return ""
def main():
# 파일 경로 설정
add_genre_story_path = f'{config.file_path}/{config.add_genre_file_name}'
processed_performance_details_path = f'{config.file_path}/{config.processed_data}'
print(processed_performance_details_path)
# 파일 존재 여부 확인
if os.path.exists(add_genre_story_path):
df = pd.read_json(add_genre_story_path)
else:
# processed_perfomance_details.json -> 데이터프레임 생성
df = pd.read_json(processed_performance_details_path, lines=True)
df['genre'] = None
load_dotenv()
updater = GenreStoryUpdater()
updater.update_genre_and_story(df)
# 파일 저장
df.to_json(add_genre_story_path, orient='records', lines=True, force_ascii=False)
if __name__ == "__main__":
main()
○ 모델링
● ./utils/DeepFM.py
1) MusicalRecommender 클래스
[기능]
- load_and_preprocess_data(self): 데이터 로드 및 범주형 데이터 레이블 인코딩으로 변환
- prepare_training_data(self): 학습 및 테스트 세트로 데이터 분리
- create_deepfm_model(self): 모델 아키텍처 정의
- train_model(self): 모델 학습과 검증
- retrain(self): 전체 데이터를 사용해 모델 재학습
- save_model(self, path): 학습된 모델 저장
- FMInteraction: 특성 간 쌍 상호작용 계산
- weighted_loss: 긍정 샘플에 더 높은 가중치를 부여한 이진 교차 엔트로피 손실
[구현 방식]
- load_and_preprocess_data(self): title, cast, genre 레이블 인코딩 & 각 범주형 변수 고유값 저장(vocab_sizes)
- tran_test_split으로 데이터 분리
- 입력층(input): 범주형 변수별 입력층 정의
- 임베딩층: 각 범주형 변수에 대해 Embedding Layer 정의 + L2 정규화
- FMInteraction Layer: 특성 간 상호작용 계산(두 특성 벡터 간 내적 계산 후 결과 합산)
- Dense Layer: fully-connected-layer에 Dropout + L2 정규화
- EarlyStopping 콜백 설정
- TensorFlow의 save 메소드 사용
- weighted_loss: 긍정 샘플에 가중치를 곱한 후 평균
[사용 이유]
- 모델 입력 데이터 = 숫자형 So, 범주형 데이터 인코딩
- 어휘 크기를 계산해 Embedding Layer 크기 동적 설정
- Dropout과 L2 정규화로 과적합 방지
- EarlyStopping으로 불필요 학습 방지
- retrain: 최종 학습 데이터를 사용해 모델 성능 극대화
- FMInteraciton: FM의 핵심로직으로 특성 간 상호작용 모델링
- weighted_loss: 긍정/부정 샘플 불균형 문제 해결
[장점]
- 구조적 설계: 전처리, 모델 정의, 학습이 독립적 구성
- 범주형 특성 간 상호작용 모델링하는 효과적 설계
- 가중치 손실 함수 사용: 클래스 불균형 문제 해결
[개선 가능성]
- 과적합 방지 미흡: 학습 초기 가중치 초기화 전략 부족
# 필요한 라이브러리 임포트
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, Dense, Concatenate, Flatten, Add, Lambda, Dropout
import tensorflow as tf
from tensorflow.keras import backend as K
from keras.layers import Layer
import matplotlib.pyplot as plt
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.saving import register_keras_serializable
from tensorflow.keras.regularizers import l2
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import config
# MusicalRecommender 클래스 정의
class MusicalRecommender:
def __init__(self):
self.data = None
self.original_data = None
self.model = None
self.label_encoders = {}
self.vocab_sizes = {}
def load_and_preprocess_data(self):
# 데이터 로드 및 전처리
# Load data (Ensure the file is in the same directory or provide correct relative path)
self.data = pd.read_json(config.df_with_negatives_path, lines=True) # Update the path to a relative one if necessary
self.original_data = self.data.copy()
categorical_features = ['title',
'cast',
'genre'
]
# 범주형 변수 레이블 인코딩
for feature in categorical_features:
self.label_encoders[feature] = LabelEncoder()
self.data[feature] = self.label_encoders[feature].fit_transform(self.data[feature].astype(str))
self.vocab_sizes[feature] = len(self.label_encoders[feature].classes_)
def prepare_training_data(self):
# 범주형 데이터와 수치형 데이터를 처리
categorical_features = ['title',
'cast',
'genre'
]
categorical_data = {}
for feature in categorical_features:
categorical_data[feature] = self.data[feature].values # 각 범주형 데이터를 딕셔너리에 저장
# X 구성: 카테고리형 데이터와 수치형 데이터를 모두 합친 DataFrame 생성
X = pd.DataFrame({
'title': categorical_data['title'],
'cast': categorical_data['cast'],
'genre': categorical_data['genre'],
# 'percentage': numerical_data[:, 0],
# 'ticket_price': numerical_data[:, 1]
})
# 타겟 데이터
y = self.data['target']
# X와 y의 길이가 일치하는지 확인
print(f"Length of X: {len(X)}")
print(f"Length of y: {len(y)}")
# 길이가 일치하면 훈련/테스트 데이터로 분리
if len(X) == len(y):
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
else:
print("Error: Lengths of X and y do not match!")
return X_train, X_test, y_train, y_test
def create_deepfm_model(self):
# 모델 구조 정의
inputs = {
'title': Input(shape=(1,), dtype=tf.int32, name='title'),
'cast': Input(shape=(1,), dtype=tf.int32, name='cast'),
'genre': Input(shape=(1,), dtype=tf.int32, name='genre'),
# 'percentage': Input(shape=(1,), dtype=tf.float32, name='percentage'),
# 'ticket_price': Input(shape=(1,), dtype=tf.float32, name='ticket_price')
}
# 임베딩 정의 (L2 정규화 추가)
embeddings = {
'title': Embedding(self.vocab_sizes['title'], 16, embeddings_regularizer=l2(1e-4), name='embedding_title')(inputs['title']),
'cast': Embedding(self.vocab_sizes['cast'], 16, embeddings_regularizer=l2(1e-4), name='embedding_cast')(inputs['cast']),
'genre': Embedding(self.vocab_sizes['genre'], 16, embeddings_regularizer=l2(1e-4), name='embedding_genre')(inputs['genre']),
}
fm_output = FMInteraction()([embeddings['title'],
embeddings['cast'],
embeddings['genre']
])
# Flatten embeddings and concatenate with numerical data
concatenated = Concatenate()(
[Flatten()(embeddings['title']),
Flatten()(embeddings['cast']),
Flatten()(embeddings['genre']),
# inputs['percentage'],
# inputs['ticket_price'],
Flatten()(fm_output)
])
# 완전 연결 계층 (Dense 레이어 L2 정규화 추가)
x = Dense(128, activation='relu', kernel_regularizer=l2(1e-4))(concatenated)
x = Dropout(0.3)(x)
x = Dense(64, activation='relu', kernel_regularizer=l2(1e-4))(x)
x = Dropout(0.3)(x)
x = Dense(32, activation='relu', kernel_regularizer=l2(1e-4))(x)
output = Dense(1, activation='sigmoid', kernel_regularizer=l2(1e-4))(x)
self.model = Model(inputs=[inputs['title'],
inputs['cast'],
inputs['genre'],
# inputs['percentage'],
# inputs['ticket_price']
],
outputs=output)
self.model.compile(optimizer='adam', loss=weighted_loss, metrics=['accuracy', 'Precision', 'Recall'])
self.model.summary()
def train_model(self):
X_train, X_test, y_train, y_test = self.prepare_training_data()
self.create_deepfm_model()
# EarlyStopping 콜백 정의
early_stopping = EarlyStopping(
monitor='val_loss', # 검증 손실을 모니터링
patience=3, # 손실이 개선되지 않으면 3 에포크 후 중지
restore_best_weights=True # 가장 좋은 모델 가중치를 복원
)
# Train the model and display progress
history = self.model.fit(
[X_train['title'],
X_train['cast'],
X_train['genre'],
# X_train['percentage'],
# X_train['ticket_price']
],
y_train,
batch_size=64,
epochs=20,
verbose=1,
validation_data=([X_test['title'],
X_test['cast'],
X_test['genre'],
# X_test['percentage'],
# X_test['ticket_price']
],
y_test),
callbacks=[early_stopping]
)
# Plot training history
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0, 1])
plt.legend(loc='lower right')
plt.savefig(config.picture_file_path)
self.retrain()
def retrain(self):
X_full = pd.DataFrame({
'title': self.data['title'],
'cast': self.data['cast'],
'genre': self.data['genre'],
# 'percentage': self.data['percentage'],
# 'ticket_price': self.data['ticket_price'],
})
y_full = self.data['target']
self.model.fit(
[X_full['title'],
X_full['cast'],
X_full['genre'],
# X_full['percentage'],
# X_full['ticket_price'],
],
y_full,
batch_size=64,
epochs=5, # 전체 데이터로 재학습할 에포크 수
verbose=1
)
print("Retraining completed.")
evaluation_results = self.model.evaluate(
[X_full['title'],
X_full['cast'],
X_full['genre'],
# X_full['percentage'],
# X_full['ticket_price'],
],
y_full, verbose=2
)
# 레이블 인코더 저장
self.save_model(config.save_model_path)
test_loss = evaluation_results[0] # 손실 (loss)
test_accuracy = evaluation_results[1] # 정확도 (accuracy)
test_precision = evaluation_results[2] # 정밀도 (Precision)
test_recall = evaluation_results[3] # 재현율 (Recall)
print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")
print(f"Test Precision: {test_precision}")
print(f"Test Recall: {test_recall}")
def save_model(self, path):
self.model.save(path)
def run(self):
self.load_and_preprocess_data()
self.train_model()
# Keras 직렬화 시스템에 FMInteraction 클래스를 등록
# package 파라미터는 사용자 지정 이름(Custom)을 설정하여 직렬화 중 충돌을 방지
# 저장된 모델을 로드할 때 FMInteraction 클래스를 올바르게 인식
@register_keras_serializable(package="Custom")
class FMInteraction(Layer):
def __init__(self, **kwargs):
super(FMInteraction, self).__init__(**kwargs)
def call(self, inputs):
pairwise_interactions = []
for i in range(len(inputs)):
for j in range(i + 1, len(inputs)):
interaction = K.sum(inputs[i] * inputs[j], axis=-1, keepdims=True)
pairwise_interactions.append(interaction)
# 긍정 샘플 가중치 추가
return K.sum(pairwise_interactions, axis=0) * 1.2
@register_keras_serializable(package="Custom")
def weighted_loss(y_true, y_pred):
weight = K.cast(y_true == 1, 'float32') * 1.5 + 0.5 # 긍정 샘플에 더 높은 가중치
return K.mean(weight * K.binary_crossentropy(y_true, y_pred))
if __name__ == "__main__":
recommender = MusicalRecommender()
recommender.run()
○ 추천
● ./utils/recommend.py
1) Recommender 클래스
[기능]
- load_model(self): 저장된 모델 파일, 커스텀 손실함수, 커스텀 레이어 로드
- load_data(self): 예측에 사용할 데이터와 범주형 데이터를 위한 레이블 인코더 로드
- load_reference_data(self): 추천 결과에 들어갈 추가 정보가 들어있는 기존 데이터 로드
- recommend(self, cast, genre): 배우, 장르 기반 추천 결과 생성
[구현 방식]
- 모델 로드: load_model 메서드 사용
- 예외 처리: 모델 파일 없을 경우 FileNotFountError
- 추천 결과를 기존 데이터와 매칭하여 최종 결과 생성
- 입력 처리: 배우, 장르를 레이블 인코딩 → 숫자로 변환
- 모델 예측: 데이터를 입력해 각 데이터 포인트의 예측 점수 계산
- 필터링: 예측 점수 기준으로 장르 및 배우 기반 상위 결과 필터링
- 중복 제거 및 변환
[사용 이유]
- 배우, 장르 기준으로 데이터 필터링 → 사용자가 원하는 결과 제공
- 모델 예측 점수를 활용해 추천 신뢰성 높
[장점]
- 모듈화
- 범주형 데이터 처리: 레이블 인코더 사용으로 범주형 데이터 효과적 처리
- 추천 로직: 필터링 및 정렬로 사용자 맞춤형 결과 제공
[개선 가능성]
- 효율성: 데이터 병합 및 중복 제거 과정에서 코드 단순화 가능 → Pandas 병합 및 필터링 과정 최적화
- 에러 처리 부족
- 추천 다양성 제한: 예측 점수 기준으로만 정렬됨 → 협업 필터링, 콘텐츠 기반 추천 알고리즘 추가
import pandas as pd
import numpy as np
import pickle
from DeepFM import weighted_loss, FMInteraction
from tensorflow.keras.models import load_model
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import config
class Recommender:
def __init__(self):
self.model = None
self.data = None
self.reference_data = None
self.label_encoders = {}
def load_model(self):
"""모델 로드"""
try:
self.model = load_model(config.save_model_path, custom_objects={
"weighted_loss": weighted_loss,
"FMInteraction": FMInteraction
})
except FileNotFoundError:
raise FileNotFoundError("저장된 모델을 찾을 수 없음")
def load_data(self):
"""데이터 로드"""
try:
self.data = pd.read_json(config.df_with_negatives_path, lines=True)
# 레이블 인코더 로드
for column in ['title', 'cast', 'genre']:
self.label_encoders[column] = {val: idx for idx, val in enumerate(self.data[column].unique())}
except FileNotFoundError:
raise FileNotFoundError("데이터 파일을 찾을 수 없음")
def load_reference_data(self):
"""기준 데이터 로드"""
try:
self.reference_data = pd.read_json(f"{config.file_path}/{config.add_genre_file_name}", lines=True)
except FileNotFoundError:
raise FileNotFoundError("기준 파일을 찾을 수 없습니다.")
def recommend(self, cast, genre):
print(f"Debug: 입력된 cast - {cast}")
print(f"Debug: 입력된 genre - {genre}")
cast_encoded = self.label_encoders['cast'][cast]
genre_encoded = self.label_encoders['genre'][genre]
# 데이터셋 전체를 사용하여 예측
X = self.data[['title',
'cast',
'genre',
# 'percentage',
# 'ticket_price'
]].copy()
title_encoder = self.label_encoders['title']
cast_encoder = {v: k for k, v in self.label_encoders['cast'].items()}
genre_encoder = {v: k for k, v in self.label_encoders['genre'].items()}
# 데이터 인코딩
X['title'] = X['title'].map(title_encoder)
X['cast'] = X['cast'].map(self.label_encoders['cast'])
X['genre'] = X['genre'].map(self.label_encoders['genre'])
# 중복 제거
X = X.drop_duplicates(subset=['title', 'cast', 'genre'])
predictions = self.model.predict([X['title'].values,
X['cast'].values,
X['genre'].values,
# X['percentage'].values,
# X['ticket_price'].values
])
X['predicted_score'] = predictions
# 입력된 장르로 필터링
genre_filtered_data = X[X['genre'] == genre_encoded]
cast_filtered_data = X[X['cast'] == cast_encoded]
# 1. 장르 기반 추천 10개
genre_top_titles = genre_filtered_data.sort_values(by='predicted_score', ascending=False).head(15)
# 2. 배우 기반 추천 10개
cast_top_titles = cast_filtered_data.sort_values(by='predicted_score', ascending=False).head(15)
# 3. 병합 후 중복 제거
combined_titles = pd.concat([genre_top_titles, cast_top_titles])
# 4. 예측 점수 기준으로 정렬
top_titles = combined_titles.sort_values(by='predicted_score', ascending=False)
# 디코딩
top_titles['decoded_title'] = top_titles['title'].map({v: k for k, v in title_encoder.items()})
top_titles['decoded_genre'] = top_titles['genre'].map({v: k for k, v in genre_encoder.items()})
top_titles['decoded_cast'] = top_titles['cast'].map(cast_encoder)
# [ ] 제거 및 중복 처리
top_titles['clean_title'] = top_titles['decoded_title'].str.replace(r'\[.*?\]', '', regex=True).str.strip()
top_titles = top_titles.drop_duplicates(subset=['clean_title'])
matched_recommendations = self.reference_data[self.reference_data['title'].isin(top_titles['clean_title'])]
matched_recommendations['clean_title'] = matched_recommendations['title'].str.replace(r'\[.*?\]', '', regex=True).str.strip()
matched_recommendations = matched_recommendations.drop_duplicates(subset=['clean_title'])
# 최종 데이터 결합
final_recommendations = matched_recommendations.merge(
top_titles[['clean_title', 'predicted_score']],
on='clean_title'
).sort_values(by='predicted_score', ascending=False)
# 10개 미만이면 필터링되지 않은 데이터 중 추가
if len(final_recommendations) < 10:
missing_count = 10 - len(final_recommendations)
# 이미 포함된 데이터 제외
excluded_titles = final_recommendations['title'].tolist() + genre_top_titles['title'].tolist() + cast_top_titles['title'].tolist()
unfiltered_data = self.data[~self.data['title'].isin(excluded_titles)]
# top_titles와 unfiltered_data 매칭
matched_data = unfiltered_data.merge(
top_titles[['decoded_title', 'predicted_score']],
left_on='title',
right_on='decoded_title'
).drop_duplicates(subset=['title'])
# 추가할 데이터
additional_recommendations = matched_data.sort_values(by='predicted_score', ascending=False).head(missing_count)
matched_reference_data = self.reference_data[
self.reference_data['title'].str.contains('|'.join(additional_recommendations['title']), na=False)
]
# 중복 제거를 위한 임시 컬럼
matched_reference_data['clean_title'] = matched_reference_data['title'].str.replace(r'\[.*?\]', '', regex=True).str.strip()
# 'clean_title' 기준으로 중복 제거 (첫 번째로 나온 데이터만 유지)
matched_reference_data = matched_reference_data.drop_duplicates(subset=['clean_title'])
score_mapping = dict(zip(additional_recommendations['title'], additional_recommendations['predicted_score']))
matched_reference_data['predicted_score'] = matched_reference_data['clean_title'].map(score_mapping)
# 'clean_title' 컬럼 삭제
matched_reference_data = matched_reference_data.drop(columns=['clean_title'])
# 최종 결합
final_recommendations = pd.concat([final_recommendations, matched_reference_data]).sort_values(by='predicted_score', ascending=False).head(10)
# 10개만 반환하도록 처리
final_recommendations = final_recommendations.head(10)
# 결과 출력
return final_recommendations[['poster', 'title', 'place', 'cast', 'genre', 'ticket_price']]
# return final_recommendations[['poster', 'title', 'place', 'start_date', 'end_date', 'cast', 'genre', 'ticket_price']]
"""콘솔 테스트용 출력 코드"""
# return final_recommendations[['title', 'genre', 'cast', 'predicted_score']]
"""테스트 입력"""
if __name__ == "__main__":
recommender = Recommender()
recommender.load_model()
recommender.load_data()
recommender.load_reference_data()
# cast = "옥주현"
# genre = "역사"
# recommendations = recommender.recommend(cast, genre)
# print(recommendations)
○ Streamlit 화면
● ./pages/musical.py
import streamlit as st
from components.sidebar import add_custom_sidebar, button_style, render_button
import pandas as pd
import itertools
import importlib.util
import time
from hgtk.text import decompose, compose
import sys
import os
# main.py 경로 추가
current_dir = os.path.dirname(os.path.abspath(__file__))
main_dir = os.path.abspath(os.path.join(current_dir, ".."))
sys.path.append(main_dir)
# utils 디렉토리 경로 추가
utils_dir = os.path.abspath(os.path.join(current_dir, "../utils"))
sys.path.append(utils_dir)
from utils.All_Musical_Process import Musical_Process
from utils.recommend import Recommender
import config
"""기본 틀"""
# 사이드바 추가
add_custom_sidebar()
button_style()
# CSS 스타일
st.markdown("""
<style>
.sidebar {
background-color: #f0f0f0;
padding: 20px;
}
.search-container {
background-color: #f5f5f5;
padding: 20px;
border-radius: 10px;
margin: 20px 0;
}
.actor-list-item {
cursor: pointer;
margin-bottom: 5px;
padding: 5px;
border: 1px solid #ddd;
border-radius: 4px;
background-color: #f9f9f9;
}
.actor-list-item:hover {
background-color: #e6e6e6;
}
</style>
""", unsafe_allow_html=True)
# Musical_Process 실행 함수
def run_musical_process():
try:
# All_Musical_Process.py 경로 설정
script_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../utils/All_Musical_Process.py"))
# 모듈 로드 및 실행
spec = importlib.util.spec_from_file_location("All_Musical_Process", script_path)
all_musical_module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(all_musical_module)
except Exception as e:
st.error(f"실행 중 오류 발생: {e}")
# 로딩 화면 표시 함수
def show_loading_screen():
# 빈 컨테이너 생성
placeholder = st.empty()
# 스피너 표시
with placeholder.container():
with st.spinner("로딩 중... 잠시만 기다려주세요(처음에는 로딩이 길어질 수 있습니다.)"):
run_musical_process()
placeholder.empty()
show_loading_screen()
# 메인 페이지 제목
st.markdown("# 뮤지컬 chat")
# 상단 설명 텍스트
st.markdown("""
<div class="header-text">
좋아하시는 배우와 장르를 선택하시면<br>
맞춤형 뮤지컬을 추천해드립니다
</div>
""", unsafe_allow_html=True)
# 배우 데이터 로드
@st.cache_data
def load_actor_list():
file = config.df_with_negatives_path
if not os.path.exists(file):
raise FileNotFoundError(f"파일을 찾을 수 없습니다: {file}")
add_genre_file = pd.read_json(file, lines=True)
actor_list = add_genre_file["cast"].tolist()
return sorted(actor_list)
actor_list = load_actor_list()
st.markdown("## 배우와 장르 선택")
def get_chosung(text):
"""한글 문자열에서 초성만 추출"""
result = ""
for char in text:
if '가' <= char <= '힣': # 한글 범위 내에서만 분리
decomp = decompose(char)
if decomp[0] != '': # 초성 존재
result += decomp[0]
else:
result += char # 한글 외에는 그대로 추가
return result
# 세션 상태 초기화
if "selected_actor" not in st.session_state:
st.session_state["selected_actor"] = None
if "favorite_actor" not in st.session_state:
st.session_state["favorite_actor"] = ""
if "filtered_actors" not in st.session_state:
st.session_state["filtered_actors"] = []
# 배우 입력창
favorite_actor = st.text_input(
"좋아하는 배우를 입력하세요",
placeholder="배우 이름 또는 초성을 입력하세요",
value=st.session_state.get("favorite_actor", ""),
key="favorite_actor_input"
)
# 검색 처리
if favorite_actor != st.session_state.get("favorite_actor", ""):
st.session_state["favorite_actor"] = favorite_actor
search_query = st.session_state["favorite_actor"]
if search_query:
# 단어와 초성 구분
if all('가' <= char <= '힣' for char in search_query): # 완성된 단어 입력 시
st.session_state["filtered_actors"] = sorted({
actor for actor in actor_list if actor[:len(search_query)] == search_query
})
else: # 초성 입력 시
user_chosung = get_chosung(search_query)
st.session_state["filtered_actors"] = sorted({
actor for actor in actor_list
if user_chosung == get_chosung(actor[:len(search_query)]) # 초성 매칭
})
# 배우 목록 드롭다운으로 표시
if st.session_state.get("filtered_actors", []):
unique_filtered_actors = sorted(set(st.session_state["filtered_actors"]))
selected_actor = st.selectbox(
"검색된 배우 목록:",
options=unique_filtered_actors,
key="actor_dropdown"
)
st.session_state["selected_actor"] = selected_actor
# 선택된 배우 출력
if st.session_state["selected_actor"]:
st.markdown(f"**선택된 배우:** {st.session_state['selected_actor']}")
# 초기화 버튼
if st.button("초기화", key="reset_button"):
st.session_state["selected_actor"] = None
st.session_state["favorite_actor"] = ""
st.session_state["filtered_actors"] = []
# 장르처리
# 유니크한 장르 값 정의
unique_genres = config.unique_genres
# 장르 선택
genre_choice = st.selectbox(
"좋아하는 장르를 선택하세요",
options=unique_genres,
format_func=lambda x: x
)
# 선택된 장르 출력
st.markdown(f"**선택된 장르:** {genre_choice}")
# 추천 버튼
if st.button("추천받기", key="run_button"):
if not st.session_state["selected_actor"]:
st.error("배우를 선택해주세요.")
else:
with st.spinner("추천 결과를 생성하는 중입니다... 잠시만 기다려주세요."):
try:
recommender = Recommender()
recommender.load_data()
recommender.load_model()
recommender.load_reference_data()
genre_id = genre_choice
recommendations = recommender.recommend(st.session_state["selected_actor"], genre_id)
if not recommendations.empty:
st.markdown("### 추천된 뮤지컬 목록")
for _, row in recommendations.iterrows():
# 컬럼 레이아웃 생성
col1, col2 = st.columns([1, 3])
# 왼쪽에 포스터 이미지 출력
with col1:
st.image(row['poster'], width=150)
# 오른쪽에 뮤지컬 정보 출력
with col2:
st.markdown(f"""
- **제목**: {row['title']}
- **장소**: {row['place']}
- **출연진**: {row['cast']}
- **장르**: {row['genre']}
- **티켓 가격**: {row['ticket_price']}
""")
else:
st.warning("추천 결과가 없습니다.")
except Exception as e:
st.error(f"추천 과정에서 오류가 발생했습니다: {str(e)}")
이렇게 코드 구현을 완료하였고 시연 및 발표까지 완료했습니다.
남은 한 달 동안은 2차 목표로 삼은 기능 추가 및 개선을 할 예정입니다.
아래는 간단한 시연 영상입니다.
'Networks > Project' 카테고리의 다른 글
SK networks AI Camp - Slack ChatBot (1) | 2024.12.10 |
---|---|
SK networks AI Camp - mini Project4(Chatbot) (6) | 2024.11.05 |
SK networks AI Camp - ToyProject(AWS에 이미지 업로드 및 받아오기) (1) | 2024.10.17 |
SK networks AI Camp - toyproject AWS 및 Github 트러블 이슈 (5) | 2024.10.17 |
SK networks AI Camp - mini Project2(Active Senior) (0) | 2024.08.19 |