Files
app/implementation-plan.txt
Albert f60d61a78f feat: implement data layer with comprehensive test infrastructure
- Define SQLModel schemas for Session, Note, and Link entities
  - Add API request/response models for RPC endpoints
  - Create LLM structured output models for Zettel extraction
  - Set up async database initialization with SQLModel and aiosqlite
  - Implement repository pattern for CRUD operations
  - Add complete test suite with pytest configuration
  - Create validation test runner for development workflow
  - Add .gitignore for Python/FastAPI project security
2025-08-17 01:25:16 +00:00

242 lines
8.3 KiB
Plaintext

Product Requirements Document (PRD) & Implementation Plan
1. Overview
SkyTalk (api.skytalk.app) is an API service that functions as an AI interviewer to elicit user ideas, then processes the conversation to generate "Zettels" (atomic mini-blog posts). These Zettels are semantically linked using RAG and "Generative Linking."
2. Implementation Tasks (Linear Execution Plan)
Phase 1: Project Setup and Core Utilities
Task 1.1: Project Structure Initialization
Create the following directory and file structure:
Bash
skytalk-api/
├── .venv/
├── src/
│ ├── config.py
│ ├── db/
│ │ ├── database.py
│ │ └── repositories.py
│ ├── services/
│ │ ├── agents/
│ │ │ ├── interviewer.py
│ │ │ ├── synthesizer.py
│ │ │ └── prompts.py
│ │ ├── session_service.py
│ │ └── vector_service.py
│ ├── models.py
│ └── main.py
├── .env
└── requirements.txt
Task 1.2: Dependencies and Configuration
Create requirements.txt:
Plaintext
fastapi
uvicorn[standard]
pydantic-settings
sqlmodel
aiosqlite
langchain
langchain-google-genai
langchain-community
chromadb
tiktoken
Create src/config.py:
Python
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
GOOGLE_API_KEY: str
DATABASE_URL: str = "sqlite+aiosqlite:///./skytalk.db"
CHROMA_PERSIST_DIR: str = "./chroma_db"
LLM_FLASH_MODEL: str = "gemini-2.5-flash-latest"
LLM_PRO_MODEL: str = "gemini-2.5-pro-latest"
EMBEDDING_MODEL: str = "models/text-embedding-004"
settings = Settings()
Phase 2: Data Models and Persistence
Task 2.1: SQLModel Definitions (src/models.py)
Define all data structures, combining database models and API schemas.
Python
import uuid
from datetime import datetime
from typing import List, Optional, Any
from sqlmodel import Field, SQLModel, Relationship, JSON, Column
# --- API Payloads (Not stored in DB) ---
class StartSessionRequest(SQLModel):
topic: str
class SendMessageRequest(SQLModel):
session_id: uuid.UUID
message: str
# --- LLM Structured Output Models ---
class RawZettel(SQLModel):
title: str = Field(description="Concise title of the atomic concept.")
content: str = Field(description="The synthesized mini-blog post.")
tags: List[str] = Field(description="List of relevant conceptual keywords.")
class SegmentationResult(SQLModel):
notes: List[RawZettel]
class RawLink(SQLModel):
target_note_id: str = Field(description="UUID of the related note.")
relationship_context: str = Field(description="Explanation of how the new note relates to the target note.")
class LinkingResult(SQLModel):
links: List[RawLink]
# --- Database Models ---
class Link(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
context: str
source_id: uuid.UUID = Field(foreign_key="note.id")
target_id: uuid.UUID = Field(foreign_key="note.id")
class Note(SQLModel, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
title: str
content: str
tags: List[str] = Field(sa_column=Column(JSON))
created_at: datetime = Field(default_factory=datetime.utcnow)
session_id: uuid.UUID = Field(foreign_key="session.id")
session: "Session" = Relationship(back_populates="notes")
class Session(SQLModel, table=True):
id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True)
status: str = Field(default="active") # active, processing, completed, failed
transcript: List[dict] = Field(default=[], sa_column=Column(JSON))
created_at: datetime = Field(default_factory=datetime.utcnow)
notes: List["Note"] = Relationship(back_populates="session")
Task 2.2: Database Initialization (src/db/database.py)
Implement create_async_engine from sqlmodel.ext.asyncio.session.
Define AsyncSession and init_db() to create tables on startup.
Implement a FastAPI dependency get_session() to inject the AsyncSession.
Task 2.3: Repositories (src/db/repositories.py)
Implement async CRUD operations using SQLModel's syntax.
SessionRepository: create, get, update_status, append_transcript.
NoteRepository: create_bulk (efficiently inserts multiple notes).
LinkRepository: create_bulk.
Phase 3: Core Services
Task 3.1: Vector Store Service (src/services/vector_service.py)
Initialize GoogleGenerativeAIEmbeddings from langchain_google_genai.
Initialize a persistent Chroma client.
async add_notes(notes: List[Note]): Adds note content and metadata to Chroma, using note ids as document IDs.
async semantic_search(query: str, k: int = 5): Performs similarity search and returns LangChain Document objects.
Task 3.2: Prompts (src/services/agents/prompts.py)
INTERVIEWER_SYSTEM_PROMPT: Socratic persona. Accepts {retrieved_context}. Instructs AI to ask one question at a time and to output the exact token [END_SESSION] when the conversation naturally concludes.
SEGMENTATION_PROMPT: Instructs LLM to analyze {transcript} and segment it into atomic ideas, enforcing the SegmentationResult structure.
GENERATIVE_LINKING_PROMPT: Instructs LLM to compare {new_note} against {neighbors}, enforcing the LinkingResult structure.
Phase 4: The Agent Layer
Task 4.1: Interviewer Agent (src/services/agents/interviewer.py)
Initialize the Interview LLM (Gemini Flash) using ChatGoogleGenerativeAI.
Dependencies: VectorService.
Method async generate_response(transcript: List[dict], context_query: str):
RAG: Call VectorService.semantic_search(context_query).
LCEL Chain: Define the chain: ChatPromptTemplate | LLM | StrOutputParser().
Invocation: Use .ainvoke() with formatted context and chat history.
Termination Check: Check if the response contains [END_SESSION].
Return the response text and a boolean indicating if the session should end.
Task 4.2: Synthesizer Agent (src/services/agents/synthesizer.py)
Initialize the Synthesis LLM (Gemini Pro).
Dependencies: VectorService.
Method async segment_transcript(transcript: List[dict]) -> SegmentationResult:
Define the structured chain: ChatPromptTemplate | LLM.with_structured_output(schema=SegmentationResult).
Invoke asynchronously and return the result.
Method async generate_links(new_note: Note, neighbors: List[Document]) -> LinkingResult:
Define the structured chain for linking using LinkingResult.
Format new_note and neighbors for the prompt.
Invoke asynchronously and return the result.
Phase 5: Orchestration and API
Task 5.1: Session Orchestration Service (src/services/session_service.py)
This service manages the lifecycle and coordinates agents and repositories.
Dependencies: Repositories, Agents, VectorService.
async start_session(topic: str): Create session in DB. Call InterviewerAgent for the first message. Update transcript.
async handle_message(session_id: uuid.UUID, message: str): Get session. Update transcript with user message. Call InterviewerAgent. Update transcript with AI response. If [END_SESSION] detected, update status to processing.
async process_session_background_task(session_id: uuid.UUID): (The main synthesis pipeline)
Get session transcript.
Call SynthesizerAgent.segment_transcript.
Persistence (SQLite): Save new notes from SegmentationResult using NoteRepository.
Indexing (ChromaDB): Add the newly created notes using VectorService.add_notes.
Linking Loop:
For each new note, call VectorService.semantic_search to find neighbors.
Call SynthesizerAgent.generate_links.
Save links from LinkingResult using LinkRepository.
Finalize: Update session status to completed (or failed on error).
Task 5.2: API Endpoints (src/main.py)
Initialize FastAPI app with a lifespan manager to call init_db().
Implement dependency injection for the DB session and SessionService.
POST /sessions.start: Takes StartSessionRequest, calls SessionService.start_session.
POST /sessions.sendMessage: Takes SendMessageRequest. Calls SessionService.handle_message. If the service indicates the session is ending, trigger the background task.
Background Task Trigger: Use FastAPI's BackgroundTasks to run SessionService.process_session_background_task(session_id). This is critical for returning an immediate API response.
GET /sessions.getStatus: Takes a session_id query parameter and returns the session's current status from the DB.