- Define SQLModel schemas for Session, Note, and Link entities - Add API request/response models for RPC endpoints - Create LLM structured output models for Zettel extraction - Set up async database initialization with SQLModel and aiosqlite - Implement repository pattern for CRUD operations - Add complete test suite with pytest configuration - Create validation test runner for development workflow - Add .gitignore for Python/FastAPI project security
242 lines
8.3 KiB
Plaintext
242 lines
8.3 KiB
Plaintext
Product Requirements Document (PRD) & Implementation Plan
|
|
1. Overview
|
|
SkyTalk (api.skytalk.app) is an API service that functions as an AI interviewer to elicit user ideas, then processes the conversation to generate "Zettels" (atomic mini-blog posts). These Zettels are semantically linked using RAG and "Generative Linking."
|
|
|
|
2. Implementation Tasks (Linear Execution Plan)
|
|
Phase 1: Project Setup and Core Utilities
|
|
Task 1.1: Project Structure Initialization
|
|
Create the following directory and file structure:
|
|
|
|
Bash
|
|
|
|
skytalk-api/
|
|
├── .venv/
|
|
├── src/
|
|
│ ├── config.py
|
|
│ ├── db/
|
|
│ │ ├── database.py
|
|
│ │ └── repositories.py
|
|
│ ├── services/
|
|
│ │ ├── agents/
|
|
│ │ │ ├── interviewer.py
|
|
│ │ │ ├── synthesizer.py
|
|
│ │ │ └── prompts.py
|
|
│ │ ├── session_service.py
|
|
│ │ └── vector_service.py
|
|
│ ├── models.py
|
|
│ └── main.py
|
|
├── .env
|
|
└── requirements.txt
|
|
Task 1.2: Dependencies and Configuration
|
|
|
|
Create requirements.txt:
|
|
|
|
Plaintext
|
|
|
|
fastapi
|
|
uvicorn[standard]
|
|
pydantic-settings
|
|
sqlmodel
|
|
aiosqlite
|
|
langchain
|
|
langchain-google-genai
|
|
langchain-community
|
|
chromadb
|
|
tiktoken
|
|
Create src/config.py:
|
|
|
|
Python
|
|
|
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
|
|
|
class Settings(BaseSettings):
|
|
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
|
|
|
|
GOOGLE_API_KEY: str
|
|
DATABASE_URL: str = "sqlite+aiosqlite:///./skytalk.db"
|
|
CHROMA_PERSIST_DIR: str = "./chroma_db"
|
|
|
|
LLM_FLASH_MODEL: str = "gemini-2.5-flash-latest"
|
|
LLM_PRO_MODEL: str = "gemini-2.5-pro-latest"
|
|
EMBEDDING_MODEL: str = "models/text-embedding-004"
|
|
|
|
settings = Settings()
|
|
Phase 2: Data Models and Persistence
|
|
Task 2.1: SQLModel Definitions (src/models.py)
|
|
Define all data structures, combining database models and API schemas.
|
|
|
|
Python
|
|
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import List, Optional, Any
|
|
from sqlmodel import Field, SQLModel, Relationship, JSON, Column
|
|
|
|
# --- API Payloads (Not stored in DB) ---
|
|
class StartSessionRequest(SQLModel):
|
|
topic: str
|
|
|
|
class SendMessageRequest(SQLModel):
|
|
session_id: uuid.UUID
|
|
message: str
|
|
|
|
# --- LLM Structured Output Models ---
|
|
class RawZettel(SQLModel):
|
|
title: str = Field(description="Concise title of the atomic concept.")
|
|
content: str = Field(description="The synthesized mini-blog post.")
|
|
tags: List[str] = Field(description="List of relevant conceptual keywords.")
|
|
|
|
class SegmentationResult(SQLModel):
|
|
notes: List[RawZettel]
|
|
|
|
class RawLink(SQLModel):
|
|
target_note_id: str = Field(description="UUID of the related note.")
|
|
relationship_context: str = Field(description="Explanation of how the new note relates to the target note.")
|
|
|
|
class LinkingResult(SQLModel):
|
|
links: List[RawLink]
|
|
|
|
# --- Database Models ---
|
|
class Link(SQLModel, table=True):
|
|
id: Optional[int] = Field(default=None, primary_key=True)
|
|
context: str
|
|
|
|
source_id: uuid.UUID = Field(foreign_key="note.id")
|
|
target_id: uuid.UUID = Field(foreign_key="note.id")
|
|
|
|
class Note(SQLModel, table=True):
|
|
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
|
|
title: str
|
|
content: str
|
|
tags: List[str] = Field(sa_column=Column(JSON))
|
|
created_at: datetime = Field(default_factory=datetime.utcnow)
|
|
|
|
session_id: uuid.UUID = Field(foreign_key="session.id")
|
|
session: "Session" = Relationship(back_populates="notes")
|
|
|
|
class Session(SQLModel, table=True):
|
|
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
|
|
status: str = Field(default="active") # active, processing, completed, failed
|
|
transcript: List[dict] = Field(default=[], sa_column=Column(JSON))
|
|
created_at: datetime = Field(default_factory=datetime.utcnow)
|
|
|
|
notes: List["Note"] = Relationship(back_populates="session")
|
|
Task 2.2: Database Initialization (src/db/database.py)
|
|
|
|
Implement create_async_engine from sqlmodel.ext.asyncio.session.
|
|
|
|
Define AsyncSession and init_db() to create tables on startup.
|
|
|
|
Implement a FastAPI dependency get_session() to inject the AsyncSession.
|
|
|
|
Task 2.3: Repositories (src/db/repositories.py)
|
|
Implement async CRUD operations using SQLModel's syntax.
|
|
|
|
SessionRepository: create, get, update_status, append_transcript.
|
|
|
|
NoteRepository: create_bulk (efficiently inserts multiple notes).
|
|
|
|
LinkRepository: create_bulk.
|
|
|
|
Phase 3: Core Services
|
|
Task 3.1: Vector Store Service (src/services/vector_service.py)
|
|
|
|
Initialize GoogleGenerativeAIEmbeddings from langchain_google_genai.
|
|
|
|
Initialize a persistent Chroma client.
|
|
|
|
async add_notes(notes: List[Note]): Adds note content and metadata to Chroma, using note ids as document IDs.
|
|
|
|
async semantic_search(query: str, k: int = 5): Performs similarity search and returns LangChain Document objects.
|
|
|
|
Task 3.2: Prompts (src/services/agents/prompts.py)
|
|
|
|
INTERVIEWER_SYSTEM_PROMPT: Socratic persona. Accepts {retrieved_context}. Instructs AI to ask one question at a time and to output the exact token [END_SESSION] when the conversation naturally concludes.
|
|
|
|
SEGMENTATION_PROMPT: Instructs LLM to analyze {transcript} and segment it into atomic ideas, enforcing the SegmentationResult structure.
|
|
|
|
GENERATIVE_LINKING_PROMPT: Instructs LLM to compare {new_note} against {neighbors}, enforcing the LinkingResult structure.
|
|
|
|
Phase 4: The Agent Layer
|
|
Task 4.1: Interviewer Agent (src/services/agents/interviewer.py)
|
|
|
|
Initialize the Interview LLM (Gemini Flash) using ChatGoogleGenerativeAI.
|
|
|
|
Dependencies: VectorService.
|
|
|
|
Method async generate_response(transcript: List[dict], context_query: str):
|
|
|
|
RAG: Call VectorService.semantic_search(context_query).
|
|
|
|
LCEL Chain: Define the chain: ChatPromptTemplate | LLM | StrOutputParser().
|
|
|
|
Invocation: Use .ainvoke() with formatted context and chat history.
|
|
|
|
Termination Check: Check if the response contains [END_SESSION].
|
|
|
|
Return the response text and a boolean indicating if the session should end.
|
|
|
|
Task 4.2: Synthesizer Agent (src/services/agents/synthesizer.py)
|
|
|
|
Initialize the Synthesis LLM (Gemini Pro).
|
|
|
|
Dependencies: VectorService.
|
|
|
|
Method async segment_transcript(transcript: List[dict]) -> SegmentationResult:
|
|
|
|
Define the structured chain: ChatPromptTemplate | LLM.with_structured_output(schema=SegmentationResult).
|
|
|
|
Invoke asynchronously and return the result.
|
|
|
|
Method async generate_links(new_note: Note, neighbors: List[Document]) -> LinkingResult:
|
|
|
|
Define the structured chain for linking using LinkingResult.
|
|
|
|
Format new_note and neighbors for the prompt.
|
|
|
|
Invoke asynchronously and return the result.
|
|
|
|
Phase 5: Orchestration and API
|
|
Task 5.1: Session Orchestration Service (src/services/session_service.py)
|
|
This service manages the lifecycle and coordinates agents and repositories.
|
|
|
|
Dependencies: Repositories, Agents, VectorService.
|
|
|
|
async start_session(topic: str): Create session in DB. Call InterviewerAgent for the first message. Update transcript.
|
|
|
|
async handle_message(session_id: uuid.UUID, message: str): Get session. Update transcript with user message. Call InterviewerAgent. Update transcript with AI response. If [END_SESSION] detected, update status to processing.
|
|
|
|
async process_session_background_task(session_id: uuid.UUID): (The main synthesis pipeline)
|
|
|
|
Get session transcript.
|
|
|
|
Call SynthesizerAgent.segment_transcript.
|
|
|
|
Persistence (SQLite): Save new notes from SegmentationResult using NoteRepository.
|
|
|
|
Indexing (ChromaDB): Add the newly created notes using VectorService.add_notes.
|
|
|
|
Linking Loop:
|
|
|
|
For each new note, call VectorService.semantic_search to find neighbors.
|
|
|
|
Call SynthesizerAgent.generate_links.
|
|
|
|
Save links from LinkingResult using LinkRepository.
|
|
|
|
Finalize: Update session status to completed (or failed on error).
|
|
|
|
Task 5.2: API Endpoints (src/main.py)
|
|
|
|
Initialize FastAPI app with a lifespan manager to call init_db().
|
|
|
|
Implement dependency injection for the DB session and SessionService.
|
|
|
|
POST /sessions.start: Takes StartSessionRequest, calls SessionService.start_session.
|
|
|
|
POST /sessions.sendMessage: Takes SendMessageRequest. Calls SessionService.handle_message. If the service indicates the session is ending, trigger the background task.
|
|
|
|
Background Task Trigger: Use FastAPI's BackgroundTasks to run SessionService.process_session_background_task(session_id). This is critical for returning an immediate API response.
|
|
|
|
GET /sessions.getStatus: Takes a session_id query parameter and returns the session's current status from the DB.
|