ResumeIQ / main_backup.py
Pragatik771's picture
Update main_backup.py
d8ffbea verified
raw
history blame
29 kB
# main.py
import streamlit as st
import pandas as pd
import PyPDF2
import json
import os
import re
import datetime
from dotenv import load_dotenv
from models.llm_client import LLMClient
from agents.resume_extractor import ResumeExtractor
from agents.jd_summarizer import JobDescriptionSummarizer
from agents.matcher import ResumeJDMatcher
from agents.shortlister import Shortlister
from agents.interview_scheduler import InterviewScheduler
from db.database import ResumeMatchDB
from utils.email_sender import EmailSender
from utils.file_uploader import BulkFileUploader
# Load environment variables
load_dotenv()
# === Streamlit UI ===
st.set_page_config(page_title="AI Resume Analyzer", page_icon="πŸ“‹", layout="wide")
st.title("πŸ“‹ AI Resume Analyzer")
# Initialize session state
if 'results' not in st.session_state:
st.session_state.results = []
if 'scheduled_interviews' not in st.session_state:
st.session_state.scheduled_interviews = []
if 'interview_data' not in st.session_state:
st.session_state.interview_data = {}
if 'search_query' not in st.session_state:
st.session_state.search_query = ""
if 'bulk_uploaded_files' not in st.session_state:
st.session_state.bulk_uploaded_files = []
if 'bulk_file_data' not in st.session_state:
st.session_state.bulk_file_data = []
if 'selected_folder_path' not in st.session_state:
st.session_state.selected_folder_path = None
# === LLM Configuration (DeepSeek) ===
DEEPSEEK_API_KEY = os.getenv("DEEPSEEK_API_KEY")
DEEPSEEK_MODEL = os.getenv("DEEPSEEK_MODEL", "deepseek-chat")
DEEPSEEK_BASE_URL = os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com")
if not DEEPSEEK_API_KEY:
st.error("❌ DEEPSEEK_API_KEY not found. Please set it in your .env file.")
st.stop()
# === Upload Resume ===
st.subheader("πŸ“‚ Upload Resumes")
# Upload method selection
upload_method = st.radio("Select upload method:", ["Upload from Device", "Bulk Upload from Folder", "Google Drive Folder Link"], horizontal=True)
uploaded_files = []
if upload_method == "Upload from Device":
# Clear bulk uploaded files when switching to device upload
if st.session_state.bulk_file_data:
st.session_state.bulk_file_data = []
uploaded_files = st.file_uploader("Upload multiple resumes (PDF, DOCX, DOC)", type=["pdf", "docx", "doc"], accept_multiple_files=True)
elif upload_method == "Bulk Upload from Folder":
col1, col2 = st.columns([3, 1])
with col1:
folder_path = st.text_input("Folder path containing resumes (PDF, DOCX, DOC):", key="folder_path_input")
with col2:
st.write("") # Spacing
st.write("") # Spacing
if st.button("πŸ“ Browse", key="browse_folder_btn"):
from utils.folder_browser import FolderBrowser
selected_path = FolderBrowser.browse_folder()
if selected_path:
st.session_state.selected_folder_path = selected_path
st.rerun()
# Use selected path from browser if available
if 'selected_folder_path' in st.session_state and st.session_state.selected_folder_path:
folder_path = st.session_state.selected_folder_path
st.info(f"πŸ“‚ Selected: {folder_path}")
if folder_path and st.button("Load Files from Folder", key="load_folder_btn"):
with st.spinner("Loading files from folder..."):
file_data_list = BulkFileUploader.load_from_folder(folder_path)
if file_data_list:
# Store raw file data in session state
st.session_state.bulk_file_data = file_data_list
st.success(f"βœ… Loaded {len(file_data_list)} files from folder")
# Clear the selected path after loading
if 'selected_folder_path' in st.session_state:
del st.session_state.selected_folder_path
else:
st.error("❌ No supported document files found in the specified folder")
# Recreate file objects from stored data on each run
if st.session_state.bulk_file_data:
uploaded_files = [BulkFileUploader.create_file_object(fd) for fd in st.session_state.bulk_file_data]
st.info(f"πŸ“ {len(uploaded_files)} file(s) loaded and ready for analysis")
elif upload_method == "Google Drive Folder Link":
drive_link = st.text_input("Enter Google Drive folder link:")
st.info("πŸ“Œ Note: You need to provide GOOGLE_DRIVE_API_KEY in .env file.")
if drive_link and st.button("Load Files from Google Drive"):
with st.spinner("Loading files from Google Drive..."):
file_data_list = BulkFileUploader.load_from_google_drive(drive_link)
if file_data_list:
# Store raw file data in session state
st.session_state.bulk_file_data = file_data_list
st.success(f"βœ… Loaded {len(file_data_list)} files from Google Drive")
else:
st.error("❌ Could not load files from Google Drive. Check the link and API key.")
# Recreate file objects from stored data on each run
if st.session_state.bulk_file_data:
uploaded_files = [BulkFileUploader.create_file_object(fd) for fd in st.session_state.bulk_file_data]
st.info(f"πŸ“ {len(uploaded_files)} file(s) loaded and ready for analysis")
# === Job Description Input ===
st.subheader("πŸ“ Job Description")
jd_input_type = st.radio("Select input method:", ["Text Input", "Upload File"], horizontal=True)
job_descriptions = []
if jd_input_type == "Text Input":
job_description = st.text_area("Enter job description:", height=200, key="jd_text_input")
if job_description and job_description.strip():
job_descriptions.append({"title": "Job Description", "content": job_description.strip()})
st.success(f"βœ… Job description added ({len(job_description.strip())} characters)")
elif jd_input_type == "Upload File":
jd_file = st.file_uploader("Upload job description file (PDF, DOCX, DOC, TXT, or CSV)", type=["pdf", "docx", "doc", "txt", "csv"])
if jd_file:
if jd_file.type == "application/pdf":
reader = PyPDF2.PdfReader(jd_file)
content = " ".join(page.extract_text() for page in reader.pages)
job_descriptions.append({"title": jd_file.name, "content": content})
elif jd_file.type in ["application/vnd.openxmlformats-officedocument.wordprocessingml.document", "application/msword"]:
from utils.document_processor import DocumentProcessor
content = DocumentProcessor.extract_text(jd_file)
job_descriptions.append({"title": jd_file.name, "content": content})
elif jd_file.type == "text/plain":
content = jd_file.read().decode("utf-8")
job_descriptions.append({"title": jd_file.name, "content": content})
elif jd_file.type == "text/csv":
try:
encodings = ['utf-8', 'cp1252', 'latin1', 'iso-8859-1']
df = None
for encoding in encodings:
try:
jd_file.seek(0)
df = pd.read_csv(jd_file, encoding=encoding)
break
except UnicodeDecodeError:
continue
if df is None:
st.error("❌ Could not read CSV file with any supported encoding")
elif 'Job Title' in df.columns and 'Job Description' in df.columns:
job_descriptions = [{"title": row['Job Title'], "content": row['Job Description']}
for _, row in df.iterrows()]
else:
st.error("❌ CSV must contain 'Job Title' and 'Job Description' columns")
except Exception as e:
st.error(f"❌ Error reading CSV file: {str(e)}")
# Initialize database connection
try:
db = ResumeMatchDB()
except Exception as e:
st.error("❌ Database Connection Error")
def format_date(date_str):
"""Format date string for better readability"""
try:
date = datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
return date.strftime("%B %d, %Y at %I:%M %p")
except:
return date_str
def display_statistics():
"""Display basic statistics"""
if st.session_state.results:
total_candidates = len(st.session_state.results)
shortlisted = sum(1 for r in st.session_state.results if r['best_match']['is_shortlisted'])
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Candidates", total_candidates)
with col2:
st.metric("Shortlisted", shortlisted)
with col3:
st.metric("Not Shortlisted", total_candidates - shortlisted)
def extract_candidate_info(resume_text):
"""Extract candidate name and email from resume text"""
email_pattern = r'[\w\.-]+@[\w\.-]+\.\w+'
email = re.search(email_pattern, resume_text)
email = email.group(0) if email else "Not found"
lines = resume_text.split('\n')
name = "Not found"
name_patterns = [
r'^[A-Z][a-z]+\s+[A-Z][a-z]+$',
r'^[A-Z][a-z]+\s+[A-Z]\.\s+[A-Z][a-z]+$',
r'^[A-Z][a-z]+\s+[A-Z][a-z]+\s+[A-Z][a-z]+$'
]
for line in lines:
line = line.strip()
if line and '@' not in line:
for pattern in name_patterns:
if re.match(pattern, line):
name = line
break
if name != "Not found":
break
return name, email
def analyze_resumes():
"""Analyze resumes and store results in session state"""
progress_bar = st.progress(0)
status_text = st.empty()
with st.spinner("Analyzing resumes..."):
results = []
total_steps = len(uploaded_files) * len(job_descriptions)
current_step = 0
db = ResumeMatchDB()
for uploaded_file in uploaded_files:
status_text.text(f"πŸ“„ Processing {uploaded_file.name}...")
extractor = ResumeExtractor(uploaded_file)
resume_text = extractor.get_resume_text()
# Validate resume text extraction
if not resume_text or len(resume_text.strip()) < 10:
st.error(f"❌ Could not extract text from {uploaded_file.name}. File may be corrupted or empty.")
continue
candidate_name, candidate_email = extract_candidate_info(resume_text)
candidate_id = db.insert_candidate(
name=candidate_name,
email=candidate_email,
resume_path=uploaded_file.name
)
resume_results = []
for jd in job_descriptions:
current_step += 1
progress = current_step / total_steps
progress_bar.progress(progress)
status_text.text(f"πŸ” Matching with {jd['title']}...")
jd_agent = JobDescriptionSummarizer(jd['content'])
jd_summary = jd_agent.get_summary()
# Validate job description summary
if not jd_summary or len(jd_summary.strip()) < 10:
st.error(f"❌ Could not process job description: {jd['title']}")
continue
llm = LLMClient(api_key=DEEPSEEK_API_KEY, model_name=DEEPSEEK_MODEL, base_url=DEEPSEEK_BASE_URL)
matcher = ResumeJDMatcher(llm)
shortlister = Shortlister(threshold=70.0)
match_result = matcher.match_resume_to_job(resume_text, jd_summary)
match_percent = shortlister.compute_final_score(match_result)
is_shortlisted = shortlister.is_shortlisted(match_percent)
job_id = db.insert_job_description(
title=jd['title'],
description=jd['content']
)
match_data = {
'match_score': match_percent,
'skills_match': match_result['skills_match'],
'experience_match': match_result['experience_match'],
'education_match': match_result['education_match'],
'certifications_match': match_result['certifications_match'],
'summary': match_result['summary'],
'is_shortlisted': is_shortlisted
}
db.insert_match_result(candidate_id, job_id, match_data)
resume_results.append({
"job_title": jd['title'],
"match_score": match_percent,
"is_shortlisted": is_shortlisted,
"details": match_result,
"job_id": job_id
})
best_match = max(resume_results, key=lambda x: x['match_score'])
results.append({
"candidate_name": candidate_name,
"candidate_email": candidate_email,
"resume_name": uploaded_file.name,
"best_match": best_match,
"candidate_id": candidate_id
})
progress_bar.empty()
status_text.empty()
st.session_state.results = results
def display_results():
"""Display analysis results and handle interview scheduling"""
if not st.session_state.results:
return
# Display statistics
display_statistics()
st.subheader("🎯 Analysis Results")
# Add search functionality at the top
search_query = st.text_input("πŸ” Search by name or role", key="search_input")
# Filter results based on search
filtered_results = st.session_state.results
if search_query:
search_query_lower = search_query.lower()
filtered_results = [
r for r in st.session_state.results
if search_query_lower in r['candidate_name'].lower() or
search_query_lower in r['best_match']['job_title'].lower()
]
# Email shortlisted candidates section
shortlisted_candidates = [r for r in filtered_results if r['best_match']['is_shortlisted']]
if shortlisted_candidates:
show_email_composer(shortlisted_candidates)
st.divider()
# Create a list to store all candidates for download
all_candidates = []
for result in filtered_results:
with st.expander(f"πŸ“„ {result['candidate_name']} ({result['candidate_email']})"):
st.write(f"**Resume:** {result['resume_name']}")
match = result['best_match']
st.subheader(f"Best Match: {match['job_title']}")
st.metric("Match Score", f"{match['match_score']:.1f}%")
all_candidates.append({
"Name": result['candidate_name'],
"Email": result['candidate_email'],
"Resume": result['resume_name'],
"Best Match Role": match['job_title'],
"Match Score": f"{match['match_score']:.1f}%",
"Status": "Shortlisted" if match['is_shortlisted'] else "Not Shortlisted"
})
if match['is_shortlisted']:
st.success("βœ… Shortlisted")
handle_interview_scheduling(result, match)
else:
st.warning("⚠️ Not Shortlisted")
display_match_details(match['details'])
# Add download button for candidate list
if all_candidates:
df_candidates = pd.DataFrame(all_candidates)
csv = df_candidates.to_csv(index=False).encode('utf-8')
st.download_button(
label="πŸ“₯ Download Candidate List",
data=csv,
file_name="candidate_list.csv",
mime="text/csv"
)
def handle_interview_scheduling(result, match):
"""Handle interview scheduling for a candidate"""
st.subheader("πŸ“… Schedule Interview")
interview_key = f"interview_{result['candidate_id']}"
if interview_key not in st.session_state.interview_data:
st.session_state.interview_data[interview_key] = {
"interviewer": "",
"meeting_link": "",
"notes": "",
"selected_slot": None
}
data = st.session_state.interview_data[interview_key]
data["interviewer"] = st.text_input(
"Interviewer Name",
value=data["interviewer"],
key=f"interviewer_{result['candidate_id']}"
)
data["meeting_link"] = st.text_input(
"Meeting Link (optional)",
value=data["meeting_link"],
key=f"meeting_{result['candidate_id']}"
)
data["notes"] = st.text_area(
"Additional Notes",
value=data["notes"],
key=f"notes_{result['candidate_id']}"
)
scheduler = InterviewScheduler(result['candidate_name'])
start_date = datetime.datetime.now() + datetime.timedelta(days=1)
slots = scheduler.generate_interview_slots(start_date)
data["selected_slot"] = st.selectbox(
"Select Interview Slot",
options=slots,
format_func=lambda x: x.strftime("%A, %B %d, %Y at %I:%M %p"),
key=f"slot_{result['candidate_id']}",
index=slots.index(data["selected_slot"]) if data["selected_slot"] in slots else 0
)
if st.button("Schedule Interview", key=f"schedule_{result['candidate_id']}"):
if data["selected_slot"] and data["interviewer"]:
schedule_interview(result, match, data)
else:
st.error("Please provide interviewer name and select a time slot")
def schedule_interview(result, match, data):
"""Schedule an interview and update session state"""
db = ResumeMatchDB()
scheduler = InterviewScheduler(result['candidate_name'])
invite = scheduler.generate_invite(
job_title=match['job_title'],
interview_date=data["selected_slot"],
interviewer=data["interviewer"],
meeting_link=data["meeting_link"],
additional_notes=data["notes"]
)
db.schedule_interview(
candidate_id=result['candidate_id'],
job_id=match['job_id'],
scheduled_date=data["selected_slot"],
interviewer=data["interviewer"],
meeting_link=data["meeting_link"],
notes=data["notes"]
)
st.session_state.scheduled_interviews.append({
"candidate_id": result['candidate_id'],
"candidate_name": result['candidate_name'],
"job_title": match['job_title'],
"interview_date": data["selected_slot"],
"interviewer": data["interviewer"]
})
st.success("βœ… Interview Scheduled!")
st.write("**Interview Invitation:**")
st.write(invite['message'])
def display_match_details(details):
"""Display match details in a clean format"""
col1, col2 = st.columns(2)
with col1:
st.metric("Skills Match", f"{details['skills_match']}%")
st.metric("Experience Match", f"{details['experience_match']}%")
with col2:
st.metric("Education Match", f"{details['education_match']}%")
st.metric("Certifications Match", f"{details['certifications_match']}%")
st.write("**Summary:**")
st.write(details['summary'])
def show_email_composer(shortlisted_candidates):
"""Show email composer with customization options"""
st.subheader("πŸ“§ Email Shortlisted Candidates")
email_sender = EmailSender()
# Check if SMTP is configured
if not email_sender.smtp_email or not email_sender.smtp_password:
st.error("❌ SMTP not configured. Please set SMTP_EMAIL and SMTP_PASSWORD in .env file")
return
# Initialize session state for email composer
if 'email_subject' not in st.session_state:
st.session_state.email_subject = "Congratulations! You've been shortlisted for {job_title}"
if 'email_body' not in st.session_state:
st.session_state.email_body = """<p>Dear {name},</p>
<p>We are excited to inform you that your application for the <strong>{job_title}</strong> position has progressed to the next stage!</p>
<div style="background-color: #f8f9fa; padding: 15px; border-radius: 5px; margin: 20px 0;">
<h3 style="color: #2c3e50; margin-top: 0;">Your Application Status</h3>
<p><strong>Position:</strong> {job_title}</p>
<p><strong>Status:</strong> Your application is under review</p>
<p>Our hiring team will be in touch shortly with the next steps in our selection process.</p>
</div>
<p>We were particularly impressed with your qualifications and experience. While we review all applications, we wanted to let you know that your profile has stood out to us.</p>
<p>If you have any questions in the meantime, please don't hesitate to reply to this email.</p>
<p style="margin-top: 30px;">
Best regards,<br>
<strong>ResumeIQ Hiring Team</strong>
</p>"""
# Candidate selection
st.write("**Select candidates to send emails:**")
if 'selected_candidates' not in st.session_state:
st.session_state.selected_candidates = {i: True for i in range(len(shortlisted_candidates))}
# Select/Deselect all
col1, col2 = st.columns([1, 5])
with col1:
select_all = st.checkbox("Select All", value=all(st.session_state.selected_candidates.values()))
if select_all != all(st.session_state.selected_candidates.values()):
for i in range(len(shortlisted_candidates)):
st.session_state.selected_candidates[i] = select_all
# Individual candidate selection
for idx, result in enumerate(shortlisted_candidates):
col1, col2, col3, col4 = st.columns([1, 3, 2, 2])
with col1:
st.session_state.selected_candidates[idx] = st.checkbox(
"βœ“",
value=st.session_state.selected_candidates.get(idx, True),
key=f"select_candidate_{idx}",
label_visibility="collapsed"
)
with col2:
st.write(f"**{result['candidate_name']}**")
with col3:
st.write(result['candidate_email'])
with col4:
st.write(f"{result['best_match']['match_score']:.1f}% match")
st.divider()
# Email customization
with st.expander("✏️ Customize Email", expanded=True):
st.write("**Available placeholders:** `{name}`, `{job_title}`, `{match_score}`")
st.session_state.email_subject = st.text_input(
"Email Subject:",
value=st.session_state.email_subject,
key="email_subject_input"
)
st.session_state.email_body = st.text_area(
"Email Body (HTML supported):",
value=st.session_state.email_body,
height=300,
key="email_body_input"
)
# Preview
if st.button("πŸ‘οΈ Preview Email"):
st.write("**Preview (with sample data):**")
sample_subject = st.session_state.email_subject.format(
name="John Doe",
job_title="Software Engineer",
match_score="85.5"
)
sample_body = st.session_state.email_body.format(
name="John Doe",
job_title="Software Engineer",
match_score="85.5"
)
st.write(f"**Subject:** {sample_subject}")
st.markdown(sample_body, unsafe_allow_html=True)
st.divider()
# Send button
selected_count = sum(1 for v in st.session_state.selected_candidates.values() if v)
if selected_count == 0:
st.warning("⚠️ Please select at least one candidate to send emails.")
else:
col1, col2, col3 = st.columns([2, 2, 3])
with col1:
st.metric("Selected Candidates", selected_count)
with col2:
if st.button(f"πŸ“¨ Send Emails to {selected_count} Candidate(s)", type="primary"):
send_customized_emails(shortlisted_candidates, email_sender)
def send_customized_emails(shortlisted_candidates, email_sender):
"""Send customized emails to selected candidates"""
# Prepare candidate data for selected candidates
candidates_data = []
for idx, result in enumerate(shortlisted_candidates):
if st.session_state.selected_candidates.get(idx, False):
candidates_data.append({
'name': result['candidate_name'],
'email': result['candidate_email'],
'job_title': result['best_match']['job_title'],
'match_score': result['best_match']['match_score']
})
if not candidates_data:
st.warning("⚠️ No candidates selected.")
return
# Send emails with custom subject and body
success_count = 0
failed_count = 0
with st.spinner(f"Sending emails to {len(candidates_data)} candidates..."):
for candidate in candidates_data:
# Format subject and body with candidate data
subject = st.session_state.email_subject.format(
name=candidate['name'],
job_title=candidate['job_title'],
match_score=f"{candidate['match_score']:.1f}"
)
body = st.session_state.email_body.format(
name=candidate['name'],
job_title=candidate['job_title'],
match_score=f"{candidate['match_score']:.1f}"
)
if email_sender.send_email(candidate['email'], subject, body):
success_count += 1
else:
failed_count += 1
# Display results
if success_count > 0:
st.success(f"βœ… Successfully sent {success_count} email(s)")
if failed_count > 0:
st.error(f"❌ Failed to send {failed_count} email(s)")
# Show detailed email list
with st.expander("πŸ“‹ Email Details"):
for candidate in candidates_data:
st.write(f"β€’ {candidate['name']} ({candidate['email']}) - {candidate['job_title']}")
def display_scheduled_interviews():
"""Display scheduled interviews and handle feedback"""
st.subheader("πŸ“… Upcoming Interviews")
db = ResumeMatchDB()
interviews = db.get_scheduled_interviews(status='pending')
if interviews:
for interview in interviews:
with st.expander(f"Interview with {interview['candidate_name']} for {interview['job_title']}"):
st.write(f"**Date:** {format_date(interview['scheduled_date'])}")
st.write(f"**Interviewer:** {interview['interviewer']}")
if interview['meeting_link']:
st.write(f"**Meeting Link:** {interview['meeting_link']}")
if interview['notes']:
st.write(f"**Notes:** {interview['notes']}")
feedback = st.text_area("Interview Feedback", key=f"feedback_{interview['id']}")
if st.button("Submit Feedback", key=f"submit_{interview['id']}"):
if feedback:
db.update_interview_status(
interview_id=interview['id'],
status='completed',
notes=feedback
)
st.success("βœ… Feedback submitted!")
else:
st.error("Please provide feedback")
else:
st.info("No upcoming interviews scheduled.")
# Main execution
# Debug info
if uploaded_files or job_descriptions:
with st.expander("πŸ“Š Ready to Analyze", expanded=False):
st.write(f"**Resumes loaded:** {len(uploaded_files) if uploaded_files else 0}")
st.write(f"**Job descriptions:** {len(job_descriptions) if job_descriptions else 0}")
if job_descriptions:
for idx, jd in enumerate(job_descriptions):
st.write(f" - {jd['title']} ({len(jd['content'])} characters)")
if uploaded_files and job_descriptions:
if st.button("πŸ” Analyze Resumes"):
analyze_resumes()
elif uploaded_files and not job_descriptions:
st.warning("⚠️ Please enter or upload a job description before analyzing resumes.")
elif not uploaded_files and job_descriptions:
st.warning("⚠️ Please upload resumes before analyzing.")
# Display results in tabs
if st.session_state.results:
st.divider()
tab1, tab2 = st.tabs(["πŸ“Š Analysis Results", "πŸ“… Upcoming Interviews"])
with tab1:
display_results()
with tab2:
display_scheduled_interviews()