From af7362df3dc1ee421ef3570ed1900ab475ee6165 Mon Sep 17 00:00:00 2001 From: Albert Date: Sun, 17 Aug 2025 01:52:38 +0000 Subject: [PATCH] feat: add session orchestration service --- app/services/session_service.py | 246 ++++++++++++++++++++++++++++++++ 1 file changed, 246 insertions(+) create mode 100644 app/services/session_service.py diff --git a/app/services/session_service.py b/app/services/session_service.py new file mode 100644 index 0000000..0e24613 --- /dev/null +++ b/app/services/session_service.py @@ -0,0 +1,246 @@ +import uuid +from typing import Dict, List, Optional +import logging + +from app.data.database import get_session +from app.data.repositories import SessionRepository, NoteRepository, LinkRepository +from app.data.models import Session, Note, Link, RawZettel +from app.services.vector import VectorService +from app.services.interviewer import InterviewerAgent +from app.services.synthesizer import SynthesizerAgent + +logger = logging.getLogger(__name__) + + +class SessionService: + """Service responsible for orchestrating the complete interview-to-synthesis pipeline.""" + + def __init__(self): + self.vector_service = VectorService() + self.interviewer = InterviewerAgent(self.vector_service) + self.synthesizer = SynthesizerAgent(self.vector_service) + + async def start_session(self, topic: str) -> Dict[str, any]: + """Start a new interview session. + + Args: + topic: The initial topic for the conversation + + Returns: + Dictionary containing session_id, status, and initial message + """ + async with get_session() as db: + session_repo = SessionRepository(db) + + # Create new session + session = await session_repo.create(status="active") + + # Generate first interviewer response + transcript = [{"role": "user", "content": f"I want to explore the topic: {topic}"}] + + response, should_end = await self.interviewer.generate_response( + transcript=transcript, + context_query=topic + ) + + # Update transcript with initial exchange + await session_repo.append_transcript(session.id, { + "role": "user", + "content": f"I want to explore the topic: {topic}" + }) + await session_repo.append_transcript(session.id, { + "role": "assistant", + "content": response + }) + + if should_end: + await session_repo.update_status(session.id, "processing") + + return { + "session_id": session.id, + "status": session.status, + "message": response + } + + async def handle_message(self, session_id: uuid.UUID, message: str) -> Dict[str, any]: + """Handle a user message in an active session. + + Args: + session_id: UUID of the session + message: User's message content + + Returns: + Dictionary containing response and session status + """ + async with get_session() as db: + session_repo = SessionRepository(db) + + # Get session + session = await session_repo.get(session_id) + if not session: + raise ValueError(f"Session {session_id} not found") + + if session.status != "active": + raise ValueError(f"Session {session_id} is not active (status: {session.status})") + + # Add user message to transcript + await session_repo.append_transcript(session.id, { + "role": "user", + "content": message + }) + + # Get updated transcript + updated_session = await session_repo.get(session_id) + transcript = updated_session.transcript + + # Generate response using the full conversation context + response, should_end = await self.interviewer.generate_response( + transcript=transcript, + context_query=message + ) + + # Add AI response to transcript + await session_repo.append_transcript(session.id, { + "role": "assistant", + "content": response + }) + + # Check if session should end + if should_end: + await session_repo.update_status(session.id, "processing") + + return { + "session_id": session.id, + "status": "processing" if should_end else "active", + "message": response, + "session_ending": should_end + } + + async def process_session_background_task(self, session_id: uuid.UUID) -> None: + """Background task to synthesize a completed session into Zettels and links. + + Args: + session_id: UUID of the session to process + """ + try: + logger.info(f"Starting synthesis for session {session_id}") + + async with get_session() as db: + session_repo = SessionRepository(db) + note_repo = NoteRepository(db) + link_repo = LinkRepository(db) + + # Get session with transcript + session = await session_repo.get(session_id) + if not session: + logger.error(f"Session {session_id} not found") + return + + # Update status to processing + await session_repo.update_status(session_id, "processing") + + # Segment transcript into Zettels + logger.info(f"Segmenting transcript for session {session_id}") + segmentation_result = await self.synthesizer.segment_transcript(session.transcript) + + # Create Note objects from Zettels + notes_to_create = [] + for zettel in segmentation_result.notes: + note = Note( + title=zettel.title, + content=zettel.content, + tags=zettel.tags, + session_id=session_id + ) + notes_to_create.append(note) + + # Save notes to database + if notes_to_create: + logger.info(f"Creating {len(notes_to_create)} notes for session {session_id}") + created_notes = await note_repo.create_bulk(notes_to_create) + + # Add notes to vector store + await self.vector_service.add_notes(created_notes) + logger.info(f"Added {len(created_notes)} notes to vector store") + + # Generate links for each new note + await self._create_links_for_notes(created_notes, link_repo) + + # Mark session as completed + await session_repo.update_status(session_id, "completed") + logger.info(f"Synthesis completed for session {session_id}") + + except Exception as e: + logger.error(f"Error processing session {session_id}: {e}") + async with get_session() as db: + session_repo = SessionRepository(db) + await session_repo.update_status(session_id, "failed") + + async def get_session_status(self, session_id: uuid.UUID) -> Dict[str, any]: + """Get the current status of a session. + + Args: + session_id: UUID of the session + + Returns: + Dictionary containing session status and metadata + """ + async with get_session() as db: + session_repo = SessionRepository(db) + note_repo = NoteRepository(db) + + session = await session_repo.get(session_id) + if not session: + raise ValueError(f"Session {session_id} not found") + + notes = await note_repo.get_by_session(session_id) + + return { + "session_id": session.id, + "status": session.status, + "notes_count": len(notes), + "created_at": session.created_at + } + + async def _create_links_for_notes(self, notes: List[Note], link_repo: LinkRepository) -> None: + """Create semantic links for a list of new notes.""" + for note in notes: + try: + # Find semantically similar notes + neighbors = await self.vector_service.semantic_search( + query=f"{note.title} {note.content}", + k=5 + ) + + # Filter out the note itself and low similarity matches + filtered_neighbors = [ + doc for doc in neighbors + if doc.metadata.get("similarity_score", 0) > 0.5 + and doc.metadata.get("title") != note.title + ] + + if filtered_neighbors: + # Generate links using the synthesizer + linking_result = await self.synthesizer.generate_links(note, filtered_neighbors) + + # Create Link objects + links_to_create = [] + for raw_link in linking_result.links: + try: + target_id = uuid.UUID(raw_link.target_note_id) + link = Link( + context=raw_link.relationship_context, + source_id=note.id, + target_id=target_id + ) + links_to_create.append(link) + except (ValueError, TypeError) as e: + logger.warning(f"Invalid link target ID {raw_link.target_note_id}: {e}") + + # Save links + if links_to_create: + await link_repo.create_bulk(links_to_create) + logger.info(f"Created {len(links_to_create)} links for note {note.id}") + + except Exception as e: + logger.error(f"Error creating links for note {note.id}: {e}") \ No newline at end of file