fix: main.py

- Create end-to-end tests
  - Add diagnose file to determine cause of failure for end-to-end tests
This commit is contained in:
Albert
2025-08-17 02:11:58 +00:00
parent e59d5eea73
commit b745151d2c
6 changed files with 678 additions and 6 deletions

155
DEVELOPMENT.md Normal file
View File

@@ -0,0 +1,155 @@
# SkyTalk API - Development Guide
Quick guide to get the SkyTalk API running and test it.
## 🚀 Quick Start
### 1. Setup Environment
```bash
# Create and activate virtual environment
uv venv
source .venv/bin/activate # Linux/Mac
# .venv\Scripts\activate # Windows
# Install dependencies
uv pip install -r requirements.txt
```
### 2. Configure Environment Variables
Edit `.env` file and add your Google API key:
```bash
GOOGLE_API_KEY=your_actual_google_api_key_here
```
### 3. Run Tests
```bash
# Quick validation tests
python run_tests.py
# Full test suite
pytest
# Test count: Should show 54 tests passing
```
### 4. Start the API Server
```bash
# Method 1: Direct Python
python -m uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# Method 2: Using the main.py script
python app/main.py
```
### 5. Test the API
#### Option A: Automated Testing (Recommended)
```bash
# Start server and run comprehensive tests
python test_api_manual.py --start-server
# Or test against already running server
python test_api_manual.py
```
#### Option B: Manual Testing with curl
```bash
# Make the script executable
chmod +x test_api_curl.sh
# Run the tests (requires jq for JSON formatting)
./test_api_curl.sh
```
#### Option C: Interactive Testing
1. Open browser to http://localhost:8000/docs
2. Use the interactive Swagger UI to test endpoints
3. Start with `/sessions/start` with topic like "AI ethics"
## 📁 Project Structure
```
app/
├── api/ # FastAPI endpoints (future: organized by domain)
├── services/ # Business logic and AI agents
│ ├── interviewer.py # RAG-powered Socratic interviewer
│ ├── synthesizer.py # Transcript → Zettels + Links
│ ├── vector.py # ChromaDB + embeddings
│ └── session_service.py # Orchestrates the full pipeline
├── data/ # Database layer
│ ├── models/ # SQLModel schemas + API models
│ ├── repositories/ # Async CRUD operations
│ └── database.py # Async SQLite setup
├── core/ # Configuration and utilities
│ ├── config.py # Pydantic settings
│ ├── prompts/ # External prompt files
│ └── prompt_loader.py # Prompt management
└── main.py # FastAPI app with lifespan management
```
## 🔄 API Workflow
1. **Start Session**: `POST /sessions/start` with a topic
2. **Conversation**: `POST /sessions/sendMessage` back and forth
3. **Auto-End**: AI detects natural conclusion with `[END_SESSION]`
4. **Background Synthesis**: Async processing begins automatically
5. **Check Status**: `GET /sessions/getStatus` to monitor progress
6. **Result**: Session status becomes "completed" with generated notes
## 🧪 Testing Strategy
- **Unit Tests**: 54 tests covering all core functionality
- **Integration Tests**: `test_api_manual.py` for end-to-end workflows
- **Manual Tests**: `test_api_curl.sh` for quick verification
- **Development Tests**: `run_tests.py` for rapid validation
## 🔧 Development Commands
```bash
# Code quality (run before committing)
black . # Format code
mypy . # Type checking
ruff check . # Linting
# Database operations
rm skytalk.db* # Reset database
rm -rf chroma_db/ # Reset vector store
# Run specific test categories
pytest tests/test_models.py -v
pytest tests/test_vector_service.py -v
pytest tests/test_interviewer_agent.py -v
```
## 🐛 Common Issues
### API Key Issues
- Make sure `GOOGLE_API_KEY` is set in `.env`
- Verify the key has access to Gemini API and embedding models
### Database Issues
- Delete `skytalk.db` to reset the database
- Check file permissions in the project directory
### ChromaDB Issues
- Delete `chroma_db/` directory to reset vector store
- Ensure sufficient disk space for embeddings
### Import Issues
- Ensure virtual environment is activated
- Run `uv pip install -r requirements.txt` again
## 📊 Monitoring
- **Logs**: Check console output for detailed logging
- **Database**: Use SQLite browser to inspect `skytalk.db`
- **Vector Store**: ChromaDB data in `chroma_db/` directory
- **Health**: `GET /health` endpoint for service status
## 🚀 Next Steps
- **Authentication**: Add user accounts and JWT tokens
- **Rate Limiting**: Implement API rate limiting
- **Monitoring**: Add structured logging and metrics
- **Deployment**: Containerize with Docker
- **Frontend**: Build web interface consuming this API

View File

@@ -8,8 +8,8 @@ class Settings(BaseSettings):
DATABASE_URL: str = "sqlite+aiosqlite:///./skytalk.db"
CHROMA_PERSIST_DIR: str = "./chroma_db"
LLM_FLASH_MODEL: str = "gemini-2.5-flash-latest"
LLM_PRO_MODEL: str = "gemini-2.5-pro-latest"
LLM_FLASH_MODEL: str = "gemini-2.5-flash"
LLM_PRO_MODEL: str = "gemini-2.5-pro"
EMBEDDING_MODEL: str = "models/text-embedding-004"
API_HOST: str = "0.0.0.0"

View File

@@ -3,7 +3,7 @@ import logging
from contextlib import asynccontextmanager
from typing import Annotated
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks
from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks, Request
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
@@ -23,10 +23,16 @@ logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize database on startup."""
"""Initialize database and services on startup."""
logger.info("Initializing database...")
await init_db()
logger.info("Database initialized successfully")
# Initialize singleton services at startup
logger.info("Initializing services...")
app.state.session_service = SessionService()
logger.info("Services initialized successfully")
yield
logger.info("Shutting down...")
@@ -48,9 +54,9 @@ app.add_middleware(
)
def get_session_service() -> SessionService:
def get_session_service(request: Request) -> SessionService:
"""Dependency to get SessionService instance."""
return SessionService()
return request.app.state.session_service
@app.post("/sessions/start", response_model=SessionResponse)

106
diagnose.py Normal file
View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python
"""Diagnostic script to test each component independently."""
import asyncio
import sys
from pathlib import Path
# Add the app directory to Python path
app_dir = Path(__file__).parent
sys.path.insert(0, str(app_dir))
async def diagnose():
print("🔍 SkyTalk API Diagnostics\n")
# 1. Test configuration
print("1. Testing configuration...")
try:
from app.core.config import settings
print(f" ✅ Config loaded")
print(f" - API Key set: {bool(settings.GOOGLE_API_KEY and len(settings.GOOGLE_API_KEY) > 10)}")
print(f" - Flash model: {settings.LLM_FLASH_MODEL}")
print(f" - Pro model: {settings.LLM_PRO_MODEL}")
print(f" - Embedding model: {settings.EMBEDDING_MODEL}")
except Exception as e:
print(f" ❌ Config error: {e}")
return
# 2. Test database
print("\n2. Testing database...")
try:
from app.data.database import init_db, get_session
await init_db()
print(f" ✅ Database initialized")
async with get_session() as db:
from app.data.repositories import SessionRepository
repo = SessionRepository(db)
session = await repo.create()
print(f" ✅ Can create sessions: {session.id}")
except Exception as e:
print(f" ❌ Database error: {e}")
return
# 3. Test vector service
print("\n3. Testing vector service...")
try:
from app.services.vector import VectorService
vector_service = VectorService()
print(f" ✅ Vector service initialized")
except Exception as e:
print(f" ❌ Vector service error: {e}")
return
# 4. Test interviewer agent
print("\n4. Testing interviewer agent...")
try:
from app.services.interviewer import InterviewerAgent
interviewer = InterviewerAgent(vector_service)
print(f" ✅ Interviewer agent initialized")
# Test a simple response
response, should_end = await interviewer.generate_response(
transcript=[{"role": "user", "content": "Hello"}],
context_query="greeting"
)
print(f" ✅ Can generate responses (length: {len(response)})")
except Exception as e:
print(f" ❌ Interviewer error: {e}")
import traceback
traceback.print_exc()
return
# 5. Test synthesizer agent
print("\n5. Testing synthesizer agent...")
try:
from app.services.synthesizer import SynthesizerAgent
synthesizer = SynthesizerAgent(vector_service)
print(f" ✅ Synthesizer agent initialized")
except Exception as e:
print(f" ❌ Synthesizer error: {e}")
return
# 6. Test session service
print("\n6. Testing session service...")
try:
from app.services.session_service import SessionService
service = SessionService()
print(f" ✅ Session service initialized")
# Test starting a session
result = await service.start_session("test diagnostics")
print(f" ✅ Can start sessions: {result['session_id']}")
print(f" - Initial message: {result['message'][:50]}...")
except Exception as e:
print(f" ❌ Session service error: {e}")
import traceback
traceback.print_exc()
return
print("\n✅ All components working!")
print("\nIf the API is still failing, check:")
print("1. The server logs for the actual error")
print("2. That the server was fully restarted after config changes")
print("3. That there are no port conflicts")
if __name__ == "__main__":
asyncio.run(diagnose())

98
test_api_curl.sh Executable file
View File

@@ -0,0 +1,98 @@
#!/bin/bash
# SkyTalk API - Manual Testing with curl
# Run this script to test the API endpoints manually
BASE_URL="http://localhost:8000"
echo "🌟 SkyTalk API Manual Testing with curl"
echo "========================================"
# Test 1: Health check
echo ""
echo "🏥 Testing health check..."
curl -s -X GET "$BASE_URL/health" | jq '.' || echo "Health check failed"
# Test 2: Root endpoint
echo ""
echo "🏠 Testing root endpoint..."
curl -s -X GET "$BASE_URL/" | jq '.' || echo "Root endpoint failed"
# Test 3: Start session
echo ""
echo "🚀 Starting a new session..."
SESSION_RESPONSE=$(curl -s -X POST "$BASE_URL/sessions/start" \
-H "Content-Type: application/json" \
-d '{"topic": "AI ethics in autonomous vehicles"}')
echo $SESSION_RESPONSE | jq '.'
# Extract session ID
SESSION_ID=$(echo $SESSION_RESPONSE | jq -r '.session_id')
echo "Session ID: $SESSION_ID"
if [ "$SESSION_ID" = "null" ]; then
echo "❌ Failed to start session"
exit 1
fi
# Test 4: Send messages
echo ""
echo "💬 Sending messages to the session..."
MESSAGES=(
"I'm concerned about how autonomous vehicles make ethical decisions"
"What happens when a car has to choose between hitting one person or five people?"
"I think we need clear guidelines and possibly government regulation"
"Yes, I think we've covered the main ethical dilemmas I was thinking about"
)
for message in "${MESSAGES[@]}"; do
echo ""
echo "Sending: $message"
RESPONSE=$(curl -s -X POST "$BASE_URL/sessions/sendMessage" \
-H "Content-Type: application/json" \
-d "{\"session_id\": \"$SESSION_ID\", \"message\": \"$message\"}")
echo $RESPONSE | jq '.'
# Check if session ended
STATUS=$(echo $RESPONSE | jq -r '.status')
if [ "$STATUS" = "processing" ]; then
echo "🔄 Session ended, synthesis should begin..."
break
fi
# Wait a bit between messages
sleep 2
done
# Test 5: Check session status
echo ""
echo "📊 Checking session status..."
for i in {1..5}; do
echo "Status check #$i:"
STATUS_RESPONSE=$(curl -s -X GET "$BASE_URL/sessions/getStatus?session_id=$SESSION_ID")
echo $STATUS_RESPONSE | jq '.'
STATUS=$(echo $STATUS_RESPONSE | jq -r '.status')
if [ "$STATUS" = "completed" ]; then
echo "✅ Synthesis completed!"
break
elif [ "$STATUS" = "failed" ]; then
echo "❌ Synthesis failed!"
break
fi
echo "Still processing... waiting 5 seconds"
sleep 5
done
echo ""
echo "🎉 Manual testing completed!"
echo ""
echo "📚 Next steps:"
echo " - Check the generated database: skytalk.db"
echo " - Explore the vector store: chroma_db/"
echo " - View API docs: $BASE_URL/docs"

307
test_api_manual.py Normal file
View File

@@ -0,0 +1,307 @@
#!/usr/bin/env python
"""
Manual test script for SkyTalk API.
This script provides a comprehensive way to test the API functionality
without relying on external tools like curl or Postman.
"""
import asyncio
import sys
import time
import uuid
from pathlib import Path
import httpx
import uvicorn
from multiprocessing import Process
# Add the app directory to Python path
app_dir = Path(__file__).parent
sys.path.insert(0, str(app_dir))
class SkyTalkAPITester:
"""Comprehensive tester for SkyTalk API."""
def __init__(self, base_url: str = "http://localhost:8000"):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=30.0)
async def test_health_check(self) -> bool:
"""Test the health check endpoint."""
print("🏥 Testing health check...")
try:
response = await self.client.get(f"{self.base_url}/health")
if response.status_code == 200:
data = response.json()
print(f"✅ Health check passed: {data['status']}")
return True
else:
print(f"❌ Health check failed: {response.status_code}")
return False
except Exception as e:
print(f"❌ Health check error: {e}")
return False
async def test_root_endpoint(self) -> bool:
"""Test the root endpoint."""
print("\n🏠 Testing root endpoint...")
try:
response = await self.client.get(f"{self.base_url}/")
if response.status_code == 200:
data = response.json()
print(f"✅ Root endpoint: {data['message']}")
print(f" Version: {data['version']}")
return True
else:
print(f"❌ Root endpoint failed: {response.status_code}")
return False
except Exception as e:
print(f"❌ Root endpoint error: {e}")
return False
async def test_start_session(self, topic: str) -> str:
"""Test starting a new session."""
print(f"\n🚀 Starting session with topic: '{topic}'...")
try:
response = await self.client.post(
f"{self.base_url}/sessions/start",
json={"topic": topic}
)
if response.status_code == 200:
data = response.json()
session_id = data["session_id"]
print(f"✅ Session started: {session_id}")
print(f" Status: {data['status']}")
print(f" Initial message: {data['message'][:100]}...")
return session_id
else:
print(f"❌ Start session failed: {response.status_code}")
print(f" Response: {response.text}")
return None
except Exception as e:
print(f"❌ Start session error: {e}")
return None
async def test_send_message(self, session_id: str, message: str) -> dict:
"""Test sending a message to a session."""
print(f"\n💬 Sending message: '{message}'...")
try:
response = await self.client.post(
f"{self.base_url}/sessions/sendMessage",
json={"session_id": session_id, "message": message}
)
if response.status_code == 200:
data = response.json()
print(f"✅ Message sent successfully")
print(f" Status: {data['status']}")
print(f" Response: {data['message'][:100]}...")
return data
else:
print(f"❌ Send message failed: {response.status_code}")
print(f" Response: {response.text}")
return None
except Exception as e:
print(f"❌ Send message error: {e}")
return None
async def test_get_status(self, session_id: str) -> dict:
"""Test getting session status."""
print(f"\n📊 Getting session status...")
try:
response = await self.client.get(
f"{self.base_url}/sessions/getStatus",
params={"session_id": session_id}
)
if response.status_code == 200:
data = response.json()
print(f"✅ Status retrieved: {data['status']}")
print(f" Notes count: {data['notes_count']}")
print(f" Created at: {data['created_at']}")
return data
else:
print(f"❌ Get status failed: {response.status_code}")
print(f" Response: {response.text}")
return None
except Exception as e:
print(f"❌ Get status error: {e}")
return None
async def test_complete_conversation_flow(self) -> bool:
"""Test a complete conversation flow from start to synthesis."""
print("\n🔄 Testing complete conversation flow...")
# Start session
session_id = await self.test_start_session("AI ethics in healthcare")
if not session_id:
return False
# Have a conversation
messages = [
"I'm particularly concerned about bias in medical AI systems",
"Yes, I think algorithms could discriminate against certain patient groups",
"Privacy is also a huge concern - medical data is so sensitive",
"I think we need strict regulations and auditing processes",
"Yes, I feel like we've covered the main points I wanted to explore"
]
session_ended = False
for message in messages:
result = await self.test_send_message(session_id, message)
if not result:
return False
# Check if session ended (processing status)
if result.get("status") == "processing":
print("🔄 Session ended, synthesis should begin...")
session_ended = True
break
# Small delay between messages
await asyncio.sleep(1)
# Wait a bit for background processing
if session_ended:
print("\n⏳ Waiting for synthesis to complete...")
for i in range(10): # Wait up to 30 seconds
await asyncio.sleep(3)
status = await self.test_get_status(session_id)
if status and status.get("status") == "completed":
print(f"✅ Synthesis completed! Generated {status.get('notes_count', 0)} notes")
return True
elif status and status.get("status") == "failed":
print("❌ Synthesis failed")
return False
print(f" Still processing... ({i+1}/10)")
print("⚠️ Synthesis still processing after 30 seconds")
return True # Not necessarily a failure
print("⚠️ Session didn't end naturally")
return True
async def test_error_conditions(self) -> bool:
"""Test error handling."""
print("\n🚨 Testing error conditions...")
# Test invalid session ID
print(" Testing invalid session ID...")
fake_session_id = str(uuid.uuid4())
result = await self.test_send_message(fake_session_id, "test message")
if result is None:
print(" ✅ Correctly handled invalid session ID")
else:
print(" ❌ Should have failed with invalid session ID")
return False
# Test empty topic
print(" Testing empty topic...")
try:
response = await self.client.post(
f"{self.base_url}/sessions/start",
json={"topic": ""}
)
if response.status_code != 200:
print(" ✅ Correctly rejected empty topic")
else:
print(" ❌ Should have rejected empty topic")
return False
except Exception as e:
print(f" ✅ Correctly handled empty topic error: {e}")
return True
async def run_all_tests(self) -> bool:
"""Run all tests in sequence."""
print("🧪 Starting SkyTalk API Test Suite\n")
tests = [
("Health Check", self.test_health_check),
("Root Endpoint", self.test_root_endpoint),
("Complete Conversation Flow", self.test_complete_conversation_flow),
("Error Conditions", self.test_error_conditions),
]
passed = 0
total = len(tests)
for test_name, test_func in tests:
print(f"\n{'='*50}")
print(f"Running: {test_name}")
print('='*50)
try:
if await test_func():
passed += 1
print(f"{test_name} PASSED")
else:
print(f"{test_name} FAILED")
except Exception as e:
print(f"{test_name} ERROR: {e}")
print(f"\n{'='*50}")
print(f"Test Results: {passed}/{total} tests passed")
print('='*50)
if passed == total:
print("🎉 All tests passed! SkyTalk API is working correctly.")
else:
print("⚠️ Some tests failed. Check the logs above.")
return passed == total
async def close(self):
"""Close the HTTP client."""
await self.client.aclose()
def start_api_server():
"""Start the API server in a separate process."""
import uvicorn
uvicorn.run(
"app.main:app",
host="127.0.0.1",
port=8000,
log_level="warning" # Reduce log noise during testing
)
async def main():
"""Main test function."""
print("🌟 SkyTalk API Manual Test Script")
print("=" * 50)
# Check if we should start the server
start_server = len(sys.argv) > 1 and sys.argv[1] == "--start-server"
if start_server:
print("🚀 Starting API server...")
server_process = Process(target=start_api_server)
server_process.start()
# Wait for server to start
print("⏳ Waiting for server to start...")
await asyncio.sleep(5)
try:
# Run tests
tester = SkyTalkAPITester()
success = await tester.run_all_tests()
await tester.close()
if success:
print("\n✅ Manual testing completed successfully!")
else:
print("\n❌ Some tests failed during manual testing.")
finally:
if start_server:
print("🛑 Stopping API server...")
server_process.terminate()
server_process.join()
return 0 if success else 1
if __name__ == "__main__":
sys.exit(asyncio.run(main()))