Files
ResumeCustomizer/input/Docker/watch_and_customize.py

456 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Monitor the customization inbox, normalize messy job descriptions, and run the Codex CLI
to produce tailored resumes.
The watcher expects exactly one base resume Markdown file and processes one job file at a
time. After Codex succeeds, the generated resume is written into a timestamped outbox
folder using the pattern <company>-<jobtitle>.md, while the original job file is archived
under processed/. Failures move the job description into failed/.
"""
from __future__ import annotations
import logging
import os
import shlex
import shutil
import subprocess
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Sequence
INBOX = Path("/workspace/inbox")
OUTBOX = Path("/workspace/outbox")
PROCESSED = Path("/workspace/processed")
FAILED = Path("/workspace/failed")
RESUME_DIR = Path("/workspace/resume")
TEMPLATES_DIR = Path("/templates")
TEMPLATE_CACHE = Path("/tmp/templates")
PROMPT_TEMPLATE = TEMPLATES_DIR / "ResumeCustomizerPrompt.md"
PROMPT_TEMPLATE_EXAMPLE = TEMPLATES_DIR / "ResumeCustomizerPrompt.md.example"
NORMALIZER_TEMPLATE = Path('/app/JobDescriptionNormalizerPrompt.md')
POLL_INTERVAL_SECONDS = int(os.environ.get("POLL_INTERVAL_SECONDS", "5"))
CODEX_COMMAND_TEMPLATE = os.environ.get(
"CODEX_COMMAND_TEMPLATE",
"codex prompt --input {prompt} --output {output} --format markdown",
)
CODEX_NORMALIZER_COMMAND_TEMPLATE = os.environ.get(
"CODEX_NORMALIZER_COMMAND_TEMPLATE",
CODEX_COMMAND_TEMPLATE,
)
CODEX_TIMEOUT_SECONDS = int(os.environ.get("CODEX_TIMEOUT_SECONDS", "600"))
RESOLVED_PROMPT_TEMPLATE: Path | None = None
RESOLVED_NORMALIZER_TEMPLATE: Path | None = None
class FatalConfigurationError(RuntimeError):
"""Raised when the watcher encounters a non-recoverable configuration problem."""
@dataclass(frozen=True)
class NormalizedJobDescription:
company: str
job_title: str
description_markdown: str
def ensure_environment() -> None:
"""Verify required directories and template assets exist."""
global RESOLVED_PROMPT_TEMPLATE, RESOLVED_NORMALIZER_TEMPLATE
missing = [
str(path)
for path in (
INBOX,
OUTBOX,
PROCESSED,
FAILED,
RESUME_DIR,
TEMPLATES_DIR,
)
if not path.exists()
]
if missing:
raise FatalConfigurationError(
"Input pipeline is missing required paths: " + ", ".join(missing)
)
RESOLVED_PROMPT_TEMPLATE = resolve_template(
PROMPT_TEMPLATE,
PROMPT_TEMPLATE_EXAMPLE,
TEMPLATE_CACHE,
"Resume customization prompt",
)
RESOLVED_NORMALIZER_TEMPLATE = NORMALIZER_TEMPLATE
if not RESOLVED_NORMALIZER_TEMPLATE.exists():
raise FatalConfigurationError(
f"Job description normalizer prompt missing at {RESOLVED_NORMALIZER_TEMPLATE}"
)
def resolve_template(
primary: Path,
example: Path,
cache_dir: Path,
description: str,
) -> Path:
"""Return the template path, copying the example if needed."""
if primary.exists():
return primary
if example.exists():
cache_dir.mkdir(parents=True, exist_ok=True)
cached = cache_dir / primary.name
shutil.copy(example, cached)
return cached
raise FatalConfigurationError(
f"{description} missing: {primary} (no example found at {example})"
)
def ensure_single_resume() -> Path:
"""Return the single resume markdown file or raise if conditions are not met."""
resumes = sorted(RESUME_DIR.glob("*.md"))
if len(resumes) == 0:
raise FatalConfigurationError(
f"No resume Markdown file found in {RESUME_DIR}. Exactly one is required."
)
if len(resumes) > 1:
raise FatalConfigurationError(
f"Multiple resume Markdown files found in {RESUME_DIR}: "
+ ", ".join(r.name for r in resumes)
)
return resumes[0]
def ensure_single_job(paths: Sequence[Path]) -> Path | None:
"""Validate there is at most one job description file (any extension)."""
visible = [path for path in paths if path.is_file() and not path.name.startswith(".")]
if not visible:
return None
if len(visible) > 1:
names = ", ".join(p.name for p in visible)
raise FatalConfigurationError(
f"Multiple job description files detected in inbox: {names} "
"— expected exactly one."
)
return visible[0]
def build_prompt_text(resume: Path, job_markdown: str, prompt_template: Path) -> str:
"""Return the combined prompt string fed to the Codex CLI."""
resume_text = resume.read_text(encoding="utf-8").strip()
instructions_text = prompt_template.read_text(encoding="utf-8").strip()
return (
"# Resume Customization Request\n\n"
"## Instructions\n"
f"{instructions_text}\n\n"
"---\n\n"
"## Job Description\n"
f"{job_markdown.strip()}\n\n"
"---\n\n"
"## Current Resume\n"
f"{resume_text}\n"
)
def build_timestamp_dir(base: Path, timestamp: datetime) -> Path:
"""Create (if missing) and return the timestamped directory path."""
path = (
base
/ timestamp.strftime("%Y")
/ timestamp.strftime("%m")
/ timestamp.strftime("%d")
/ timestamp.strftime("%H%M")
)
path.mkdir(parents=True, exist_ok=True)
return path
def sanitize_stem(stem: str) -> str:
"""Replace characters that could interfere with filesystem operations."""
return "".join(ch if ch.isalnum() else "_" for ch in stem) or "resume"
def slugify(component: str) -> str:
"""Turn a free-form string into a filesystem-friendly slug."""
normalized = "".join(
ch.lower() if ch.isalnum() else "-" for ch in component.strip()
)
parts = [part for part in normalized.split("-") if part]
return "-".join(parts)
def run_codex(prompt_path: Path, output_path: Path, command_template: str) -> None:
"""Execute the Codex CLI using the provided command template."""
command_text = command_template.format(
prompt=str(prompt_path),
output=str(output_path),
)
logging.info("Running Codex CLI command: %s", command_text)
try:
command = shlex.split(command_text)
except ValueError as exc:
raise FatalConfigurationError(
f"Unable to parse Codex command template into arguments: {exc}"
) from exc
try:
subprocess.run(
command,
check=True,
timeout=CODEX_TIMEOUT_SECONDS,
env=os.environ.copy(),
)
except FileNotFoundError as exc:
raise FatalConfigurationError(
f"Executable not found while running Codex CLI command: {command[0]}"
) from exc
except subprocess.TimeoutExpired as exc:
raise RuntimeError("Codex CLI timed out") from exc
if not output_path.exists():
raise RuntimeError(
f"Codex CLI completed but expected output file {output_path} is missing."
)
def build_normalizer_prompt(raw_text: str) -> str:
"""Construct the prompt for normalizing the raw job description."""
if RESOLVED_NORMALIZER_TEMPLATE is None:
raise FatalConfigurationError("Normalizer template was not resolved during startup.")
instructions = RESOLVED_NORMALIZER_TEMPLATE.read_text(encoding="utf-8").strip()
return (
f"{instructions}\n\n"
"---\n\n"
"## Raw Job Description\n"
"```\n"
f"{raw_text.strip()}\n"
"```\n"
)
def parse_normalized_output(text: str) -> NormalizedJobDescription:
"""Parse the Codex-normalized output into structured pieces."""
lines = text.splitlines()
idx = 0
def next_non_empty(start: int) -> tuple[int, str]:
pos = start
while pos < len(lines):
content = lines[pos].strip()
if content:
return pos, content
pos += 1
raise RuntimeError("Normalized output is missing expected lines.")
idx, company_line = next_non_empty(idx)
if not company_line.lower().startswith("company:"):
raise RuntimeError(f"Expected 'Company:' line, found: {company_line!r}")
company = company_line[len("company:") :].strip()
idx, job_title_line = next_non_empty(idx + 1)
if not job_title_line.lower().startswith("job title:"):
raise RuntimeError(f"Expected 'Job Title:' line, found: {job_title_line!r}")
job_title = job_title_line[len("job title:") :].strip()
idx += 1
while idx < len(lines) and lines[idx].strip():
idx += 1
while idx < len(lines) and not lines[idx].strip():
idx += 1
description_lines = lines[idx:]
description = "\n".join(description_lines).strip()
if not description:
raise RuntimeError("Normalized output did not include a job description section.")
return NormalizedJobDescription(
company=company or "Company",
job_title=job_title or "Role",
description_markdown=description,
)
def normalize_job_description(job_file: Path) -> NormalizedJobDescription:
"""Use Codex to clean and extract metadata from the raw job description."""
raw_text = job_file.read_text(encoding="utf-8", errors="ignore").strip()
if not raw_text:
raise RuntimeError(f"Job description file {job_file.name} is empty after trimming.")
prompt_text = build_normalizer_prompt(raw_text)
with TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
prompt_path = tmp_dir / "normalize_prompt.md"
prompt_path.write_text(prompt_text, encoding="utf-8")
output_path = tmp_dir / "normalize_output.md"
run_codex(prompt_path, output_path, CODEX_NORMALIZER_COMMAND_TEMPLATE)
normalized_text = output_path.read_text(encoding="utf-8").strip()
return parse_normalized_output(normalized_text)
def move_with_unique_target(source: Path, destination_dir: Path) -> Path:
"""Move source into destination_dir, avoiding collisions with numeric suffixes."""
destination_dir.mkdir(parents=True, exist_ok=True)
target = destination_dir / source.name
stem = source.stem
suffix = source.suffix
counter = 1
while target.exists():
target = destination_dir / f"{stem}_{counter}{suffix}"
counter += 1
shutil.move(str(source), target)
return target
def process_job(job_file: Path) -> None:
"""Normalize the job description, run Codex, and archive outputs."""
timestamp = datetime.now().astimezone()
out_dir = build_timestamp_dir(OUTBOX, timestamp)
processed_dir = build_timestamp_dir(PROCESSED, timestamp)
resume_path = ensure_single_resume()
normalized = normalize_job_description(job_file)
if RESOLVED_PROMPT_TEMPLATE is None:
raise FatalConfigurationError("Prompt template was not resolved during startup.")
prompt_text = build_prompt_text(
resume_path,
normalized.description_markdown,
RESOLVED_PROMPT_TEMPLATE,
)
safe_company = slugify(normalized.company)
safe_title = slugify(normalized.job_title)
if safe_company and safe_title:
output_stem = f"{safe_company}-{safe_title}"
else:
output_stem = sanitize_stem(job_file.stem)
output_filename = f"{output_stem}.md"
with TemporaryDirectory() as tmp_dir_str:
tmp_dir = Path(tmp_dir_str)
prompt_path = tmp_dir / "prompt.md"
prompt_path.write_text(prompt_text, encoding="utf-8")
output_path = tmp_dir / "codex_output.md"
run_codex(prompt_path, output_path, CODEX_COMMAND_TEMPLATE)
generated_output = out_dir / output_filename
counter = 1
while generated_output.exists():
generated_output = out_dir / f"{output_stem}_{counter}.md"
counter += 1
shutil.move(str(output_path), generated_output)
logging.info(
"Generated customized resume for %s - %s at %s",
normalized.company,
normalized.job_title,
generated_output,
)
prompt_archive = out_dir / f"prompt-{generated_output.stem}.md"
prompt_archive.write_text(prompt_text, encoding="utf-8")
normalized_archive = out_dir / f"job-description-{generated_output.stem}.md"
normalized_archive.write_text(
(
f"Company: {normalized.company}\n"
f"Job Title: {normalized.job_title}\n\n"
"# Job Description\n"
f"{normalized.description_markdown}\n"
),
encoding="utf-8",
)
processed_target = move_with_unique_target(job_file, processed_dir)
logging.info(
"Archived job description %s to %s",
job_file.name,
processed_target,
)
def move_job_to_failed(job_file: Path) -> None:
"""Move the job description into the failed directory."""
if not job_file.exists():
return
failed_target = move_with_unique_target(job_file, FAILED)
logging.info(
"Moved job description %s into failed directory at %s",
job_file.name,
failed_target,
)
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
try:
ensure_environment()
except FatalConfigurationError as exc:
logging.error("Fatal configuration error: %s", exc)
raise SystemExit(2) from exc
logging.info("Resume customizer watcher started")
while True:
job_paths = sorted(INBOX.iterdir())
try:
job_file = ensure_single_job(job_paths)
except FatalConfigurationError as exc:
logging.error("Fatal configuration error: %s", exc)
raise SystemExit(2) from exc
if job_file is None:
time.sleep(POLL_INTERVAL_SECONDS)
continue
logging.info("Processing job description %s", job_file.name)
try:
process_job(job_file)
except FatalConfigurationError as exc:
logging.error("Fatal configuration error: %s", exc)
move_job_to_failed(job_file)
raise SystemExit(2) from exc
except subprocess.CalledProcessError as exc:
logging.error("Codex CLI failed with non-zero exit status: %s", exc)
move_job_to_failed(job_file)
except Exception as exc: # noqa: BLE001
logging.exception("Unexpected error while processing %s: %s", job_file.name, exc)
move_job_to_failed(job_file)
time.sleep(POLL_INTERVAL_SECONDS)
if __name__ == "__main__":
main()