feat: add session orchestration service

This commit is contained in:
Albert
2025-08-17 01:52:38 +00:00
parent 89273619c2
commit af7362df3d

View File

@@ -0,0 +1,246 @@
import uuid
from typing import Dict, List, Optional
import logging
from app.data.database import get_session
from app.data.repositories import SessionRepository, NoteRepository, LinkRepository
from app.data.models import Session, Note, Link, RawZettel
from app.services.vector import VectorService
from app.services.interviewer import InterviewerAgent
from app.services.synthesizer import SynthesizerAgent
logger = logging.getLogger(__name__)
class SessionService:
"""Service responsible for orchestrating the complete interview-to-synthesis pipeline."""
def __init__(self):
self.vector_service = VectorService()
self.interviewer = InterviewerAgent(self.vector_service)
self.synthesizer = SynthesizerAgent(self.vector_service)
async def start_session(self, topic: str) -> Dict[str, any]:
"""Start a new interview session.
Args:
topic: The initial topic for the conversation
Returns:
Dictionary containing session_id, status, and initial message
"""
async with get_session() as db:
session_repo = SessionRepository(db)
# Create new session
session = await session_repo.create(status="active")
# Generate first interviewer response
transcript = [{"role": "user", "content": f"I want to explore the topic: {topic}"}]
response, should_end = await self.interviewer.generate_response(
transcript=transcript,
context_query=topic
)
# Update transcript with initial exchange
await session_repo.append_transcript(session.id, {
"role": "user",
"content": f"I want to explore the topic: {topic}"
})
await session_repo.append_transcript(session.id, {
"role": "assistant",
"content": response
})
if should_end:
await session_repo.update_status(session.id, "processing")
return {
"session_id": session.id,
"status": session.status,
"message": response
}
async def handle_message(self, session_id: uuid.UUID, message: str) -> Dict[str, any]:
"""Handle a user message in an active session.
Args:
session_id: UUID of the session
message: User's message content
Returns:
Dictionary containing response and session status
"""
async with get_session() as db:
session_repo = SessionRepository(db)
# Get session
session = await session_repo.get(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
if session.status != "active":
raise ValueError(f"Session {session_id} is not active (status: {session.status})")
# Add user message to transcript
await session_repo.append_transcript(session.id, {
"role": "user",
"content": message
})
# Get updated transcript
updated_session = await session_repo.get(session_id)
transcript = updated_session.transcript
# Generate response using the full conversation context
response, should_end = await self.interviewer.generate_response(
transcript=transcript,
context_query=message
)
# Add AI response to transcript
await session_repo.append_transcript(session.id, {
"role": "assistant",
"content": response
})
# Check if session should end
if should_end:
await session_repo.update_status(session.id, "processing")
return {
"session_id": session.id,
"status": "processing" if should_end else "active",
"message": response,
"session_ending": should_end
}
async def process_session_background_task(self, session_id: uuid.UUID) -> None:
"""Background task to synthesize a completed session into Zettels and links.
Args:
session_id: UUID of the session to process
"""
try:
logger.info(f"Starting synthesis for session {session_id}")
async with get_session() as db:
session_repo = SessionRepository(db)
note_repo = NoteRepository(db)
link_repo = LinkRepository(db)
# Get session with transcript
session = await session_repo.get(session_id)
if not session:
logger.error(f"Session {session_id} not found")
return
# Update status to processing
await session_repo.update_status(session_id, "processing")
# Segment transcript into Zettels
logger.info(f"Segmenting transcript for session {session_id}")
segmentation_result = await self.synthesizer.segment_transcript(session.transcript)
# Create Note objects from Zettels
notes_to_create = []
for zettel in segmentation_result.notes:
note = Note(
title=zettel.title,
content=zettel.content,
tags=zettel.tags,
session_id=session_id
)
notes_to_create.append(note)
# Save notes to database
if notes_to_create:
logger.info(f"Creating {len(notes_to_create)} notes for session {session_id}")
created_notes = await note_repo.create_bulk(notes_to_create)
# Add notes to vector store
await self.vector_service.add_notes(created_notes)
logger.info(f"Added {len(created_notes)} notes to vector store")
# Generate links for each new note
await self._create_links_for_notes(created_notes, link_repo)
# Mark session as completed
await session_repo.update_status(session_id, "completed")
logger.info(f"Synthesis completed for session {session_id}")
except Exception as e:
logger.error(f"Error processing session {session_id}: {e}")
async with get_session() as db:
session_repo = SessionRepository(db)
await session_repo.update_status(session_id, "failed")
async def get_session_status(self, session_id: uuid.UUID) -> Dict[str, any]:
"""Get the current status of a session.
Args:
session_id: UUID of the session
Returns:
Dictionary containing session status and metadata
"""
async with get_session() as db:
session_repo = SessionRepository(db)
note_repo = NoteRepository(db)
session = await session_repo.get(session_id)
if not session:
raise ValueError(f"Session {session_id} not found")
notes = await note_repo.get_by_session(session_id)
return {
"session_id": session.id,
"status": session.status,
"notes_count": len(notes),
"created_at": session.created_at
}
async def _create_links_for_notes(self, notes: List[Note], link_repo: LinkRepository) -> None:
"""Create semantic links for a list of new notes."""
for note in notes:
try:
# Find semantically similar notes
neighbors = await self.vector_service.semantic_search(
query=f"{note.title} {note.content}",
k=5
)
# Filter out the note itself and low similarity matches
filtered_neighbors = [
doc for doc in neighbors
if doc.metadata.get("similarity_score", 0) > 0.5
and doc.metadata.get("title") != note.title
]
if filtered_neighbors:
# Generate links using the synthesizer
linking_result = await self.synthesizer.generate_links(note, filtered_neighbors)
# Create Link objects
links_to_create = []
for raw_link in linking_result.links:
try:
target_id = uuid.UUID(raw_link.target_note_id)
link = Link(
context=raw_link.relationship_context,
source_id=note.id,
target_id=target_id
)
links_to_create.append(link)
except (ValueError, TypeError) as e:
logger.warning(f"Invalid link target ID {raw_link.target_note_id}: {e}")
# Save links
if links_to_create:
await link_repo.create_bulk(links_to_create)
logger.info(f"Created {len(links_to_create)} links for note {note.id}")
except Exception as e:
logger.error(f"Error creating links for note {note.id}: {e}")