feat(db-migration): implement database migration steps 1-5 with controller API integration

This commit completes steps 1-5 of the database migration plan, moving database responsibility from the webserver to the controller:

1. **Step 3 - Database API Endpoints**: Added comprehensive REST API endpoints in controller for all database operations (machines, extraction jobs, persons, companies, documents) with proper authentication and filtering

2. **Step 4 - Authentication Setup**: Implemented controller-webserver authentication with API keys and created detailed setup documentation (AUTHENTICATION_SETUP.md)

3. **Step 5 - Move Database Models**: Migrated webserver database models to controller and implemented webserver-specific database handler with CRUD operations for agents, tasks, logs, and settings

Key changes:
- Added query functions to controller/dbhandler.py for all database entities
- Created webserver_dbhandler.py for webserver-specific tables
- Implemented ControllerClient in webserver for API communication
- Updated webserver routers to use controller API instead of direct database access
- Added webserver migration SQL schema
- Updated CI/CD workflows to reflect renamed webserver directory
- Removed direct database dependencies from webserver (PostgreSQL, SQLAlchemy)
- Updated docker-compose.yml for new architecture

BREAKING CHANGE: Webserver no longer uses direct database access and requires controller API configuration (CONTROLLER_URL and CONTROLLER_API_KEY environment variables)
This commit is contained in:
2026-02-14 00:11:48 +07:00
parent a3d4c25906
commit fa6828d79f
20 changed files with 3791 additions and 657 deletions

View File

@@ -4,7 +4,7 @@ on:
branches: [dev] # Only trigger on dev branch
paths:
- 'controller/**'
- 'webserver/**'
- 'epistine-webserver/**'
- '.gitea/workflows/dev.yml'
jobs:
@@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
service: [controller, webserver]
service: [controller, epistine-webserver]
steps:
- uses: actions/checkout@v4

View File

@@ -4,7 +4,7 @@ on:
branches: [main, master]
paths:
- 'controller/**' # Watch controller folder
- 'webserver/**' # Watch webserver folder
- 'epistine-webserver/**' # Watch epistine-webserver folder
- '.gitea/workflows/master.yml'
jobs:
@@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
service: [controller, webserver]
service: [controller, epistine-webserver]
steps:
- uses: actions/checkout@v4

182
AUTHENTICATION_SETUP.md Normal file
View File

@@ -0,0 +1,182 @@
# Authentication Setup Between Webserver and Controller
## Overview
This document explains how to set up authentication between the Epistine webserver and controller as part of the database migration plan (Step 4).
## Current Architecture
- **Controller**: Handles all database operations and provides REST API endpoints
- **Webserver**: Provides web interface and business logic, now uses controller API for database operations
- **Authentication**: Webserver authenticates with controller using API keys with scopes
## Setup Steps
### 1. Start the Controller
First, ensure the controller is running:
```bash
cd controller
python app.py
```
The controller should start on `http://localhost:8000` by default.
### 2. Create an API Key for the Webserver
The webserver needs an API key with appropriate scopes to access controller endpoints.
#### Using the Controller API:
```bash
# Use the master API key (from controller/.env) or create a new one
curl -X POST "http://localhost:8000/api/v1/createAPIKey" \
-H "FilesManager-API-Key: YOUR_MASTER_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "webserver-api-key",
"created_by": "system",
"description": "API key for webserver to access controller",
"scopes": ["worker:read", "logs:read", "logs:write"]
}'
```
#### Required Scopes for Webserver:
- `worker:read` - For reading machines/agents
- `logs:read` - For reading logs
- `logs:write` - For creating logs
- (Optional) `system:create_api_key` - If webserver needs to create API keys
### 3. Configure the Webserver
Update the webserver's environment configuration:
1. Copy the example environment file:
```bash
cd epistine-webserver
cp .env.example .env
```
2. Edit the `.env` file and set:
```
CONTROLLER_URL=http://localhost:8000
CONTROLLER_API_KEY=your-generated-api-key-here
```
### 4. Start the Webserver
```bash
cd epistine-webserver
docker-compose up
```
Or run directly:
```bash
cd epistine-webserver/backend
uvicorn app.main:app --reload --host 0.0.0.0 --port 8001
```
### 5. Verify Authentication
Run the authentication test script:
```bash
python simple_auth_test.py
```
Or manually test:
```bash
# Test controller health
curl http://localhost:8000/api/v1/health
# Test authentication (replace with your API key)
curl -H "FilesManager-API-Key: YOUR_API_KEY" \
http://localhost:8000/api/v1/machines
```
## How It Works
### Authentication Flow
1. **Webserver receives request** from frontend or user
2. **Webserver calls controller API** with API key in `FilesManager-API-Key` header
3. **Controller validates API key** and checks scopes
4. **Controller processes request** and returns data
5. **Webserver formats response** and returns to client
### Code Changes Made
1. **Controller Client** (`epistine-webserver/backend/app/controller_client.py`):
- HTTP client for communicating with controller
- Handles authentication headers
- Manages errors and timeouts
2. **Updated Routers** (`epistine-webserver/backend/app/routers/__init__.py`):
- All endpoints now use `ControllerClient` instead of direct database access
- Proper error handling for controller communication
- Data mapping between controller and webserver models
3. **Environment Configuration** (`epistine-webserver/.env.example`):
- Added `CONTROLLER_URL` and `CONTROLLER_API_KEY` variables
## Troubleshooting
### Common Issues
1. **"Unauthorized access to controller" error**
- Check that `CONTROLLER_API_KEY` is set correctly
- Verify the API key is valid and not revoked
- Ensure controller is running
2. **"Controller service unavailable" error**
- Check controller health: `curl http://localhost:8000/api/v1/health`
- Verify `CONTROLLER_URL` is correct
- Check network connectivity
3. **"Insufficient permissions" error**
- The API key is missing required scopes
- Create a new API key with the required scopes
- See "Required Scopes for Webserver" section above
4. **Data mapping issues**
- Controller and webserver may use different data models
- Check the mapping logic in the router functions
- Update mapping as needed for your use case
### Testing
Use the provided test scripts:
```bash
# Basic authentication test
python simple_auth_test.py
# Comprehensive test (requires controller running)
python test_authentication.py
```
## Security Considerations
1. **API Key Storage**: Store API keys in environment variables, not in code
2. **Key Rotation**: Regularly rotate API keys for security
3. **Scope Minimization**: Grant only necessary scopes to each API key
4. **Network Security**: Use HTTPS in production, restrict network access
5. **Monitoring**: Monitor authentication failures in controller logs
## Next Steps
After authentication is set up:
1. **Move database models** from webserver to controller (Step 5)
2. **Update Docker Compose** for new architecture (Step 7)
3. **Add comprehensive testing** (Step 8)
4. **Implement monitoring and logging** (Step 9)
## References
- [Database Migration Plan](../plans/db_migration.md)
- [Controller API Documentation](../controller/README.md)
- [Webserver Documentation](../epistine-webserver/README.md)

View File

@@ -1415,6 +1415,365 @@ def validate_api_key(api_key: str) -> Tuple[bool, Optional[List[str]]]:
if conn:
conn.close()
# Query functions for database operations
def get_machines(machine_id: Optional[int] = None, machine_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get machines with optional filtering by ID or type"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM machines WHERE 1=1"
params = []
if machine_id is not None:
query += " AND machine_id = %s"
params.append(machine_id)
if machine_type is not None:
query += " AND machine_type = %s"
params.append(machine_type)
query += " ORDER BY machine_id"
cursor.execute(query, params)
results = cursor.fetchall()
# Convert to list of dicts
machines = [dict(row) for row in results]
log_message(component="controller", component_id="get_machines",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(machines)} machines")
return machines
except psycopg2.Error as e:
log_message(component="controller", component_id="get_machines",
level="Error", log_type="app", code="db.query.error",
message=f"Error querying machines: {e}")
raise
finally:
if conn:
conn.close()
def get_extraction_jobs(job_id: Optional[uuid.UUID] = None, document_id: Optional[uuid.UUID] = None,
status: Optional[str] = None, machine_id: Optional[int] = None) -> List[Dict[str, Any]]:
"""Get extraction jobs with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM extraction_jobs WHERE 1=1"
params = []
if job_id is not None:
query += " AND job_id = %s"
params.append(job_id)
if document_id is not None:
query += " AND document_id = %s"
params.append(document_id)
if status is not None:
query += " AND status = %s"
params.append(status)
if machine_id is not None:
query += " AND machine_id = %s"
params.append(machine_id)
query += " ORDER BY date_added DESC"
cursor.execute(query, params)
results = cursor.fetchall()
jobs = [dict(row) for row in results]
log_message(component="controller", component_id="get_extraction_jobs",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(jobs)} extraction jobs")
return jobs
except psycopg2.Error as e:
log_message(component="controller", component_id="get_extraction_jobs",
level="Error", log_type="app", code="db.query.error",
message=f"Error querying extraction jobs: {e}")
raise
finally:
if conn:
conn.close()
def get_persons(person_id: Optional[int] = None, name: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get persons with optional filtering by ID or name"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM people WHERE 1=1"
params = []
if person_id is not None:
query += " AND person_id = %s"
params.append(person_id)
if name is not None:
query += " AND canonical_name ILIKE %s"
params.append(f"%{name}%")
query += " ORDER BY person_id"
cursor.execute(query, params)
results = cursor.fetchall()
persons = [dict(row) for row in results]
log_message(component="controller", component_id="get_persons",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(persons)} persons")
return persons
except psycopg2.Error as e:
log_message(component="controller", component_id="get_persons",
level="Error", log_type="app", code="db.query.error",
message=f"Error querying persons: {e}")
raise
finally:
if conn:
conn.close()
def get_person_aliases(person_id: Optional[int] = None, alias_name: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get person aliases with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM person_aliases WHERE 1=1"
params = []
if person_id is not None:
query += " AND person_id = %s"
params.append(person_id)
if alias_name is not None:
query += " AND alias_name ILIKE %s"
params.append(f"%{alias_name}%")
query += " ORDER BY alias_id"
cursor.execute(query, params)
results = cursor.fetchall()
aliases = [dict(row) for row in results]
log_message(component="controller", component_id="get_person_aliases",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(aliases)} person aliases")
return aliases
except psycopg2.Error as e:
log_message(component="controller", component_id="get_person_aliases",
level="Error", log_type="app", code="db.query.error",
message=f"Error querying person aliases: {e}")
raise
finally:
if conn:
conn.close()
def get_companies(company_id: Optional[int] = None, name: Optional[str] = None,
entity_type: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get companies with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM companies WHERE 1=1"
params = []
if company_id is not None:
query += " AND company_id = %s"
params.append(company_id)
if name is not None:
query += " AND canonical_name ILIKE %s"
params.append(f"%{name}%")
if entity_type is not None:
query += " AND entity_type = %s"
params.append(entity_type)
query += " ORDER BY company_id"
cursor.execute(query, params)
results = cursor.fetchall()
companies = [dict(row) for row in results]
log_message(component="controller", component_id="get_companies",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(companies)} companies")
return companies
except psycopg2.Error as e:
log_message(component="controller", component_id="get_companies",
level="Error", log_type="app", code="db.query.error",
message=f"Error querying companies: {e}")
raise
finally:
if conn:
conn.close()
def get_company_aliases(company_id: Optional[int] = None, alias_name: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get company aliases with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM company_aliases WHERE 1=1"
params = []
if company_id is not None:
query += " AND company_id = %s"
params.append(company_id)
if alias_name is not None:
query += " AND alias_name ILIKE %s"
params.append(f"%{alias_name}%")
query += " ORDER BY alias_id"
cursor.execute(query, params)
results = cursor.fetchall()
aliases = [dict(row) for row in results]
log_message(component="controller", component_id="get_company_aliases",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(aliases)} company aliases")
return aliases
except psycopg2.Error as e:
log_message(component="controller", component_id="get_company_aliases",
level="Error", log_type="app", code="db.query.error",
message=f"Error querying company aliases: {e}")
raise
finally:
if conn:
conn.close()
def get_documents(document_id: Optional[uuid.UUID] = None, document_type: Optional[str] = None,
original_filename: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get documents with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM documents WHERE 1=1"
params = []
if document_id is not None:
query += " AND document_id = %s"
params.append(document_id)
if document_type is not None:
query += " AND document_type = %s"
params.append(document_type)
if original_filename is not None:
query += " AND original_filename ILIKE %s"
params.append(f"%{original_filename}%")
query += " ORDER BY created_at DESC"
cursor.execute(query, params)
results = cursor.fetchall()
documents = [dict(row) for row in results]
log_message(component="controller", component_id="get_documents",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(documents)} documents")
return documents
except psycopg2.Error as e:
log_message(component="controller", component_id="get_documents",
level="Error", log_type="app", code="db.query.error",
message=f"Error querying documents: {e}")
raise
finally:
if conn:
conn.close()
def get_document_chunks(document_id: Optional[uuid.UUID] = None, chunk_id: Optional[uuid.UUID] = None) -> List[Dict[str, Any]]:
"""Get document chunks with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM document_chunks WHERE 1=1"
params = []
if document_id is not None:
query += " AND document_id = %s"
params.append(document_id)
if chunk_id is not None:
query += " AND chunk_id = %s"
params.append(chunk_id)
query += " ORDER BY chunk_order"
cursor.execute(query, params)
results = cursor.fetchall()
chunks = [dict(row) for row in results]
log_message(component="controller", component_id="get_document_chunks",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(chunks)} document chunks")
return chunks
except psycopg2.Error as e:
log_message(component="controller", component_id="get_document_chunks",
level="Error", log_type="app", code="db.query.error",
message=f"Error querying document chunks: {e}")
raise
finally:
if conn:
conn.close()
def get_document_legal_metadata(document_id: Optional[uuid.UUID] = None) -> List[Dict[str, Any]]:
"""Get document legal metadata with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM document_legal_metadata WHERE 1=1"
params = []
if document_id is not None:
query += " AND document_id = %s"
params.append(document_id)
cursor.execute(query, params)
results = cursor.fetchall()
metadata = [dict(row) for row in results]
log_message(component="controller", component_id="get_document_legal_metadata",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(metadata)} document legal metadata records")
return metadata
except psycopg2.Error as e:
log_message(component="controller", component_id="get_document_legal_metadata",
level="Error", log_type="app", code="db.query.error",
message=f"Error querying document legal metadata: {e}")
raise
finally:
if conn:
conn.close()
#Schema creating function
async def execute_sql_file(file_path):

View File

@@ -80,4 +80,90 @@ class NewAPIKeyRequest(BaseModel):
name: str = Field(..., description="The name for the API key")
created_by: str = Field(..., description="Who created the API key")
description: str = Field(..., description="Description for the API key")
scopes: List[sc] = Field(..., description="The scopes for the API key")
scopes: List[sc] = Field(..., description="The scopes for the API key")
# Webserver Models for Database Migration Step 5
class WebserverAgentBase(BaseModel):
"""Base model for webserver agent"""
agent_name: str = Field(..., description="Agent name", min_length=1)
agent_status: str = Field("offline", description="Agent status")
capabilities: Optional[Dict[str, Any]] = Field(None, description="Agent capabilities as JSON")
ip_address: Optional[str] = Field(None, description="IP address of the agent")
class WebserverAgentCreate(WebserverAgentBase):
"""Model for creating a webserver agent"""
pass
class WebserverAgent(WebserverAgentBase):
"""Complete webserver agent model"""
agent_id: int = Field(..., description="Agent ID")
last_heartbeat: Optional[datetime] = Field(None, description="Last heartbeat timestamp")
created_at: datetime = Field(..., description="Creation timestamp")
updated_at: datetime = Field(..., description="Last update timestamp")
class WebserverTaskBase(BaseModel):
"""Base model for webserver task"""
task_name: str = Field(..., description="Task name", min_length=1)
task_status: str = Field("pending", description="Task status")
agent_id: Optional[int] = Field(None, description="ID of assigned agent")
task_result: Optional[Dict[str, Any]] = Field(None, description="Task result as JSON")
parameters: Optional[Dict[str, Any]] = Field(None, description="Task parameters as JSON")
class WebserverTaskCreate(WebserverTaskBase):
"""Model for creating a webserver task"""
pass
class WebserverTask(WebserverTaskBase):
"""Complete webserver task model"""
task_id: int = Field(..., description="Task ID")
started_at: Optional[datetime] = Field(None, description="When task started")
completed_at: Optional[datetime] = Field(None, description="When task completed")
error_message: Optional[str] = Field(None, description="Error message if task failed")
created_at: datetime = Field(..., description="Creation timestamp")
class WebserverLogBase(BaseModel):
"""Base model for webserver log"""
log_level: str = Field("INFO", description="Log level")
component: str = Field(..., description="Log component", min_length=1)
message: str = Field(..., description="Log message", min_length=1)
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional log metadata")
class WebserverLogCreate(WebserverLogBase):
"""Model for creating a webserver log"""
pass
class WebserverLog(WebserverLogBase):
"""Complete webserver log model"""
log_id: int = Field(..., description="Log ID")
created_at: datetime = Field(..., description="Creation timestamp")
class WebserverSettingBase(BaseModel):
"""Base model for webserver setting"""
setting_key: str = Field(..., description="Setting key", min_length=1)
setting_value: Optional[str] = Field(None, description="Setting value")
setting_type: str = Field("string", description="Setting type (string, integer, boolean, json)")
description: Optional[str] = Field(None, description="Setting description")
is_encrypted: bool = Field(False, description="Whether the setting value is encrypted")
class WebserverSettingCreate(WebserverSettingBase):
"""Model for creating a webserver setting"""
pass
class WebserverSetting(WebserverSettingBase):
"""Complete webserver setting model"""
setting_id: int = Field(..., description="Setting ID")
created_at: datetime = Field(..., description="Creation timestamp")
updated_at: datetime = Field(..., description="Last update timestamp")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,616 @@
"""
Webserver Database Handler for Controller
This module provides database functions for webserver-specific tables
as part of the database migration (Step 5).
"""
import psycopg2
import psycopg2.extras
from typing import Optional, Dict, Any, List
import json
from datetime import datetime
from app.logger import log_message
from app.dbhandler import get_connection
# Webserver Agents Functions
def get_webserver_agents(agent_id: Optional[int] = None,
agent_status: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get webserver agents with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM webserver_agents WHERE 1=1"
params = []
if agent_id is not None:
query += " AND agent_id = %s"
params.append(agent_id)
if agent_status is not None:
query += " AND agent_status = %s"
params.append(agent_status)
query += " ORDER BY agent_id"
cursor.execute(query, params)
results = cursor.fetchall()
agents = [dict(row) for row in results]
log_message(component="controller", component_id="get_webserver_agents",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(agents)} webserver agents")
return agents
except psycopg2.Error as e:
log_message(component="controller", component_id="get_webserver_agents",
level="Error", log_type="app", code="db.query.error",
message=f"Error retrieving webserver agents: {e}")
raise
finally:
if conn:
conn.close()
def insert_webserver_agent(agent_name: str, agent_status: str = "offline",
capabilities: Optional[Dict[str, Any]] = None,
ip_address: Optional[str] = None) -> int:
"""Insert a new webserver agent and return the agent_id"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
capabilities_json = json.dumps(capabilities) if capabilities else None
query = """
INSERT INTO webserver_agents
(agent_name, agent_status, capabilities, ip_address)
VALUES (%s, %s, %s, %s)
RETURNING agent_id
"""
cursor.execute(query, (agent_name, agent_status, capabilities_json, ip_address))
agent_id = cursor.fetchone()[0]
conn.commit()
log_message(component="controller", component_id="insert_webserver_agent",
level="Info", log_type="app", code="db.insert",
message=f"Inserted webserver agent: {agent_name} (ID: {agent_id})")
return agent_id
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="insert_webserver_agent",
level="Error", log_type="app", code="db.insert.error",
message=f"Error inserting webserver agent: {e}")
raise
finally:
if conn:
conn.close()
def update_webserver_agent(agent_id: int, agent_status: Optional[str] = None,
capabilities: Optional[Dict[str, Any]] = None,
ip_address: Optional[str] = None,
last_heartbeat: Optional[datetime] = None) -> bool:
"""Update a webserver agent"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
updates = []
params = []
if agent_status is not None:
updates.append("agent_status = %s")
params.append(agent_status)
if capabilities is not None:
updates.append("capabilities = %s")
params.append(json.dumps(capabilities))
if ip_address is not None:
updates.append("ip_address = %s")
params.append(ip_address)
if last_heartbeat is not None:
updates.append("last_heartbeat = %s")
params.append(last_heartbeat)
if not updates:
return False
updates.append("updated_at = NOW()")
query = f"UPDATE webserver_agents SET {', '.join(updates)} WHERE agent_id = %s"
params.append(agent_id)
cursor.execute(query, params)
conn.commit()
affected = cursor.rowcount > 0
log_message(component="controller", component_id="update_webserver_agent",
level="Info", log_type="app", code="db.update",
message=f"Updated webserver agent ID {agent_id}: {affected} rows affected")
return affected
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="update_webserver_agent",
level="Error", log_type="app", code="db.update.error",
message=f"Error updating webserver agent: {e}")
raise
finally:
if conn:
conn.close()
def delete_webserver_agent(agent_id: int) -> bool:
"""Delete a webserver agent"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
query = "DELETE FROM webserver_agents WHERE agent_id = %s"
cursor.execute(query, (agent_id,))
conn.commit()
affected = cursor.rowcount > 0
log_message(component="controller", component_id="delete_webserver_agent",
level="Info", log_type="app", code="db.delete",
message=f"Deleted webserver agent ID {agent_id}: {affected} rows affected")
return affected
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="delete_webserver_agent",
level="Error", log_type="app", code="db.delete.error",
message=f"Error deleting webserver agent: {e}")
raise
finally:
if conn:
conn.close()
# Webserver Tasks Functions
def get_webserver_tasks(task_id: Optional[int] = None,
task_status: Optional[str] = None,
agent_id: Optional[int] = None,
limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]:
"""Get webserver tasks with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM webserver_tasks WHERE 1=1"
params = []
if task_id is not None:
query += " AND task_id = %s"
params.append(task_id)
if task_status is not None:
query += " AND task_status = %s"
params.append(task_status)
if agent_id is not None:
query += " AND agent_id = %s"
params.append(agent_id)
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
cursor.execute(query, params)
results = cursor.fetchall()
tasks = [dict(row) for row in results]
log_message(component="controller", component_id="get_webserver_tasks",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(tasks)} webserver tasks")
return tasks
except psycopg2.Error as e:
log_message(component="controller", component_id="get_webserver_tasks",
level="Error", log_type="app", code="db.query.error",
message=f"Error retrieving webserver tasks: {e}")
raise
finally:
if conn:
conn.close()
def insert_webserver_task(task_name: str, task_status: str = "pending",
agent_id: Optional[int] = None,
task_result: Optional[Dict[str, Any]] = None,
parameters: Optional[Dict[str, Any]] = None) -> int:
"""Insert a new webserver task and return the task_id"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
task_result_json = json.dumps(task_result) if task_result else None
parameters_json = json.dumps(parameters) if parameters else None
query = """
INSERT INTO webserver_tasks
(task_name, task_status, agent_id, task_result, parameters)
VALUES (%s, %s, %s, %s, %s)
RETURNING task_id
"""
cursor.execute(query, (task_name, task_status, agent_id, task_result_json, parameters_json))
task_id = cursor.fetchone()[0]
conn.commit()
log_message(component="controller", component_id="insert_webserver_task",
level="Info", log_type="app", code="db.insert",
message=f"Inserted webserver task: {task_name} (ID: {task_id})")
return task_id
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="insert_webserver_task",
level="Error", log_type="app", code="db.insert.error",
message=f"Error inserting webserver task: {e}")
raise
finally:
if conn:
conn.close()
def update_webserver_task(task_id: int, task_status: Optional[str] = None,
task_result: Optional[Dict[str, Any]] = None,
error_message: Optional[str] = None,
started_at: Optional[datetime] = None,
completed_at: Optional[datetime] = None) -> bool:
"""Update a webserver task"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
updates = []
params = []
if task_status is not None:
updates.append("task_status = %s")
params.append(task_status)
if task_result is not None:
updates.append("task_result = %s")
params.append(json.dumps(task_result))
if error_message is not None:
updates.append("error_message = %s")
params.append(error_message)
if started_at is not None:
updates.append("started_at = %s")
params.append(started_at)
if completed_at is not None:
updates.append("completed_at = %s")
params.append(completed_at)
if not updates:
return False
query = f"UPDATE webserver_tasks SET {', '.join(updates)} WHERE task_id = %s"
params.append(task_id)
cursor.execute(query, params)
conn.commit()
affected = cursor.rowcount > 0
log_message(component="controller", component_id="update_webserver_task",
level="Info", log_type="app", code="db.update",
message=f"Updated webserver task ID {task_id}: {affected} rows affected")
return affected
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="update_webserver_task",
level="Error", log_type="app", code="db.update.error",
message=f"Error updating webserver task: {e}")
raise
finally:
if conn:
conn.close()
def delete_webserver_task(task_id: int) -> bool:
"""Delete a webserver task"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
query = "DELETE FROM webserver_tasks WHERE task_id = %s"
cursor.execute(query, (task_id,))
conn.commit()
affected = cursor.rowcount > 0
log_message(component="controller", component_id="delete_webserver_task",
level="Info", log_type="app", code="db.delete",
message=f"Deleted webserver task ID {task_id}: {affected} rows affected")
return affected
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="delete_webserver_task",
level="Error", log_type="app", code="db.delete.error",
message=f"Error deleting webserver task: {e}")
raise
finally:
if conn:
conn.close()
# Webserver Logs Functions
def get_webserver_logs(log_id: Optional[int] = None,
log_level: Optional[str] = None,
component: Optional[str] = None,
limit: int = 100, offset: int = 0) -> List[Dict[str, Any]]:
"""Get webserver logs with optional filtering"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM webserver_logs WHERE 1=1"
params = []
if log_id is not None:
query += " AND log_id = %s"
params.append(log_id)
if log_level is not None:
query += " AND log_level = %s"
params.append(log_level)
if component is not None:
query += " AND component = %s"
params.append(component)
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
params.extend([limit, offset])
cursor.execute(query, params)
results = cursor.fetchall()
logs = [dict(row) for row in results]
log_message(component="controller", component_id="get_webserver_logs",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(logs)} webserver logs")
return logs
except psycopg2.Error as e:
log_message(component="controller", component_id="get_webserver_logs",
level="Error", log_type="app", code="db.query.error",
message=f"Error retrieving webserver logs: {e}")
raise
finally:
if conn:
conn.close()
def insert_webserver_log(log_level: str, component: str, message: str,
metadata: Optional[Dict[str, Any]] = None) -> int:
"""Insert a new webserver log and return the log_id"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
metadata_json = json.dumps(metadata) if metadata else None
query = """
INSERT INTO webserver_logs
(log_level, component, message, metadata)
VALUES (%s, %s, %s, %s)
RETURNING log_id
"""
cursor.execute(query, (log_level, component, message, metadata_json))
log_id = cursor.fetchone()[0]
conn.commit()
log_message(component="controller", component_id="insert_webserver_log",
level="Info", log_type="app", code="db.insert",
message=f"Inserted webserver log from {component}: {message[:50]}...")
return log_id
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="insert_webserver_log",
level="Error", log_type="app", code="db.insert.error",
message=f"Error inserting webserver log: {e}")
raise
finally:
if conn:
conn.close()
# Webserver Settings Functions
def get_webserver_settings(setting_key: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get webserver settings with optional filtering by key"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
query = "SELECT * FROM webserver_settings WHERE 1=1"
params = []
if setting_key is not None:
query += " AND setting_key = %s"
params.append(setting_key)
query += " ORDER BY setting_key"
cursor.execute(query, params)
results = cursor.fetchall()
settings = [dict(row) for row in results]
log_message(component="controller", component_id="get_webserver_settings",
level="Info", log_type="app", code="db.query",
message=f"Retrieved {len(settings)} webserver settings")
return settings
except psycopg2.Error as e:
log_message(component="controller", component_id="get_webserver_settings",
level="Error", log_type="app", code="db.query.error",
message=f"Error retrieving webserver settings: {e}")
raise
finally:
if conn:
conn.close()
def insert_webserver_setting(setting_key: str, setting_value: Optional[str] = None,
setting_type: str = "string",
description: Optional[str] = None,
is_encrypted: bool = False) -> int:
"""Insert a new webserver setting and return the setting_id"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
query = """
INSERT INTO webserver_settings
(setting_key, setting_value, setting_type, description, is_encrypted)
VALUES (%s, %s, %s, %s, %s)
RETURNING setting_id
"""
cursor.execute(query, (setting_key, setting_value, setting_type, description, is_encrypted))
setting_id = cursor.fetchone()[0]
conn.commit()
log_message(component="controller", component_id="insert_webserver_setting",
level="Info", log_type="app", code="db.insert",
message=f"Inserted webserver setting: {setting_key}")
return setting_id
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="insert_webserver_setting",
level="Error", log_type="app", code="db.insert.error",
message=f"Error inserting webserver setting: {e}")
raise
finally:
if conn:
conn.close()
def update_webserver_setting(setting_key: str, setting_value: Optional[str] = None,
setting_type: Optional[str] = None,
description: Optional[str] = None,
is_encrypted: Optional[bool] = None) -> bool:
"""Update a webserver setting"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
updates = []
params = []
if setting_value is not None:
updates.append("setting_value = %s")
params.append(setting_value)
if setting_type is not None:
updates.append("setting_type = %s")
params.append(setting_type)
if description is not None:
updates.append("description = %s")
params.append(description)
if is_encrypted is not None:
updates.append("is_encrypted = %s")
params.append(is_encrypted)
if not updates:
return False
updates.append("updated_at = NOW()")
query = f"UPDATE webserver_settings SET {', '.join(updates)} WHERE setting_key = %s"
params.append(setting_key)
cursor.execute(query, params)
conn.commit()
affected = cursor.rowcount > 0
log_message(component="controller", component_id="update_webserver_setting",
level="Info", log_type="app", code="db.update",
message=f"Updated webserver setting {setting_key}: {affected} rows affected")
return affected
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="update_webserver_setting",
level="Error", log_type="app", code="db.update.error",
message=f"Error updating webserver setting: {e}")
raise
finally:
if conn:
conn.close()
def delete_webserver_setting(setting_key: str) -> bool:
"""Delete a webserver setting"""
conn = None
try:
conn = get_connection()
cursor = conn.cursor()
query = "DELETE FROM webserver_settings WHERE setting_key = %s"
cursor.execute(query, (setting_key,))
conn.commit()
affected = cursor.rowcount > 0
log_message(component="controller", component_id="delete_webserver_setting",
level="Info", log_type="app", code="db.delete",
message=f"Deleted webserver setting {setting_key}: {affected} rows affected")
return affected
except psycopg2.Error as e:
if conn:
conn.rollback()
log_message(component="controller", component_id="delete_webserver_setting",
level="Error", log_type="app", code="db.delete.error",
message=f"Error deleting webserver setting: {e}")
raise
finally:
if conn:
conn.close()

View File

@@ -0,0 +1,91 @@
-- ---------------------------------------------------------
-- Webserver Database Migration for Controller
-- This file adds webserver-specific tables to the controller database
-- as part of Step 5 of the database migration plan
-- ---------------------------------------------------------
-- ---------------------------------------------------------
-- 1. AGENTS TABLE (Webserver agents)
-- ---------------------------------------------------------
CREATE TABLE webserver_agents (
agent_id SERIAL PRIMARY KEY,
agent_name VARCHAR(255) NOT NULL,
agent_status VARCHAR(50) NOT NULL DEFAULT 'offline',
capabilities JSONB,
ip_address VARCHAR(45),
last_heartbeat TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_webserver_agents_status ON webserver_agents(agent_status);
CREATE INDEX idx_webserver_agents_name ON webserver_agents(agent_name);
CREATE INDEX idx_webserver_agents_last_heartbeat ON webserver_agents(last_heartbeat);
-- ---------------------------------------------------------
-- 2. TASKS TABLE (Webserver tasks)
-- ---------------------------------------------------------
CREATE TABLE webserver_tasks (
task_id SERIAL PRIMARY KEY,
agent_id INTEGER REFERENCES webserver_agents(agent_id) ON DELETE SET NULL,
task_name VARCHAR(255) NOT NULL,
task_status VARCHAR(50) DEFAULT 'pending',
task_result JSONB,
parameters JSONB,
created_at TIMESTAMPTZ DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
error_message TEXT
);
CREATE INDEX idx_webserver_tasks_status ON webserver_tasks(task_status);
CREATE INDEX idx_webserver_tasks_agent_id ON webserver_tasks(agent_id);
CREATE INDEX idx_webserver_tasks_created_at ON webserver_tasks(created_at);
-- ---------------------------------------------------------
-- 3. WEBSERVER LOGS TABLE (Webserver application logs)
-- ---------------------------------------------------------
CREATE TABLE webserver_logs (
log_id SERIAL PRIMARY KEY,
log_level VARCHAR(20) NOT NULL DEFAULT 'INFO',
component VARCHAR(100) NOT NULL,
message TEXT NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_webserver_logs_level ON webserver_logs(log_level);
CREATE INDEX idx_webserver_logs_component ON webserver_logs(component);
CREATE INDEX idx_webserver_logs_created_at ON webserver_logs(created_at);
-- ---------------------------------------------------------
-- 4. WEBSERVER SETTINGS TABLE (Webserver configuration)
-- ---------------------------------------------------------
CREATE TABLE webserver_settings (
setting_id SERIAL PRIMARY KEY,
setting_key VARCHAR(255) NOT NULL UNIQUE,
setting_value TEXT,
setting_type VARCHAR(50) DEFAULT 'string',
description TEXT,
is_encrypted BOOLEAN DEFAULT FALSE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_webserver_settings_key ON webserver_settings(setting_key);
-- ---------------------------------------------------------
-- 5. INITIAL DATA (Optional default settings)
-- ---------------------------------------------------------
INSERT INTO webserver_settings (setting_key, setting_value, setting_type, description) VALUES
('webserver_name', 'Epistine Webserver', 'string', 'Name of the webserver instance'),
('webserver_version', '1.0.0', 'string', 'Webserver version'),
('max_concurrent_tasks', '10', 'integer', 'Maximum number of concurrent tasks'),
('task_timeout_seconds', '3600', 'integer', 'Task timeout in seconds'),
('enable_agent_auto_registration', 'true', 'boolean', 'Enable automatic agent registration'),
('log_retention_days', '30', 'integer', 'Number of days to retain logs');

117
controller/s3_handler.py Normal file
View File

@@ -0,0 +1,117 @@
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from controller.app.logger import log_message
from controller.app.confighelper import load_config
class S3Handler:
def __init__(self):
config = load_config()
s3_config = config["server"]["s3"]
self.bucket_name = s3_config["bucket"]
self.access_key = s3_config["secretkey"]
self.secret_key = s3_config["secretkey"]
self.endpoint_url = s3_config["endpoint"]
self.s3_client = boto3.client(
's3',
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
endpoint_url=self.endpoint_url
)
def put_object(self, file_name, object_name=None):
if object_name is None:
object_name = file_name
try:
response = self.s3_client.upload_file(file_name, self.bucket_name, object_name)
log_message(
component="controller",
component_id=None,
level="INFO",
log_type="app",
code="s3.upload",
message=f"File {file_name} uploaded successfully to {self.bucket_name}/{object_name}"
)
return response
except FileNotFoundError:
log_message(
component="controller",
component_id=None,
level="ERROR",
log_type="app",
code="s3.upload",
message=f"The file {file_name} was not found"
)
return None
except NoCredentialsError:
log_message(
component="controller",
component_id=None,
level="ERROR",
log_type="app",
code="s3.upload",
message="Credentials not available"
)
return None
except PartialCredentialsError:
log_message(
component="controller",
component_id=None,
level="ERROR",
log_type="app",
code="s3.upload",
message="Incomplete credentials provided"
)
return None
def get_object(self, object_name, file_name=None):
if file_name is None:
file_name = object_name
try:
self.s3_client.download_file(self.bucket_name, object_name, file_name)
log_message(
component="controller",
component_id=None,
level="INFO",
log_type="app",
code="s3.download",
message=f"File {object_name} downloaded successfully to {file_name}"
)
return file_name
except FileNotFoundError:
log_message(
component="controller",
component_id=None,
level="ERROR",
log_type="app",
code="s3.download",
message=f"The file {object_name} was not found"
)
return None
except NoCredentialsError:
log_message(
component="controller",
component_id=None,
level="ERROR",
log_type="app",
code="s3.download",
message="Credentials not available"
)
return None
except PartialCredentialsError:
log_message(
component="controller",
component_id=None,
level="ERROR",
log_type="app",
code="s3.download",
message="Incomplete credentials provided"
)
return None
if __name__ == "__main__":
s3_handler = S3Handler()
s3_handler.put_object("example.txt")
s3_handler.get_object("example.txt")

View File

@@ -4,6 +4,10 @@ REDIS_URL=redis://localhost:6379
SECRET_KEY=your-secret-key-here
API_KEY=your-api-key-here
# Controller Configuration (for database operations)
CONTROLLER_URL=http://localhost:8000
CONTROLLER_API_KEY=your-controller-api-key-here
# Frontend Configuration
REACT_APP_API_URL=http://localhost:8000/api
REACT_APP_WS_URL=ws://localhost:8000/ws

View File

@@ -0,0 +1,223 @@
"""
Controller Client for Epistine Webserver
This module provides a client for communicating with the Epistine controller API.
It handles authentication, request formatting, and error handling for all
database operations that the webserver needs to perform.
"""
import os
import httpx
from typing import Dict, List, Optional, Any, AsyncGenerator
from fastapi import HTTPException, status
import logging
logger = logging.getLogger(__name__)
class ControllerClient:
"""Client for communicating with the Epistine controller API."""
def __init__(self, base_url: Optional[str] = None, api_key: Optional[str] = None):
"""
Initialize the controller client.
Args:
base_url: Base URL of the controller API (e.g., http://localhost:8000)
api_key: API key for authentication with the controller
"""
self.base_url = base_url or os.getenv("CONTROLLER_URL", "http://localhost:8000")
self.api_key = api_key or os.getenv("CONTROLLER_API_KEY", "")
self.client = httpx.AsyncClient(
timeout=30.0,
headers={
"FilesManager-API-Key": self.api_key,
"Content-Type": "application/json",
"User-Agent": "Epistine-Webserver/1.0"
}
)
async def __aenter__(self):
"""Async context manager entry."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.client.aclose()
async def close(self):
"""Close the HTTP client."""
await self.client.aclose()
async def _make_request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
"""
Make a request to the controller API.
Args:
method: HTTP method (GET, POST, PUT, DELETE)
endpoint: API endpoint (e.g., /api/v1/machines)
**kwargs: Additional arguments to pass to httpx request
Returns:
Response JSON data
Raises:
HTTPException: If the request fails
"""
url = f"{self.base_url}{endpoint}"
try:
response = await self.client.request(method, url, **kwargs)
response.raise_for_status()
return response.json() if response.content else {}
except httpx.HTTPStatusError as e:
logger.error(f"Controller API error: {e.response.status_code} - {e.response.text}")
if e.response.status_code == 401:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthorized access to controller"
)
elif e.response.status_code == 403:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions for controller operation"
)
else:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Controller API error: {e.response.status_code}"
)
except httpx.RequestError as e:
logger.error(f"Controller connection error: {str(e)}")
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Controller service unavailable"
)
# Webserver Agent operations
async def get_webserver_agents(self, agent_id: Optional[int] = None,
agent_status: Optional[str] = None) -> List[Dict]:
"""Get webserver agents from controller."""
params = {}
if agent_id is not None:
params["agent_id"] = str(agent_id)
if agent_status is not None:
params["agent_status"] = agent_status
response = await self._make_request("GET", "/api/v1/webserver/agents", params=params)
return response
async def create_webserver_agent(self, agent_data: Dict) -> Dict:
"""Create a new webserver agent via controller."""
response = await self._make_request("POST", "/api/v1/webserver/agents", json=agent_data)
return response
async def update_webserver_agent(self, agent_id: int, agent_data: Dict) -> Dict:
"""Update a webserver agent via controller."""
response = await self._make_request("PUT", f"/api/v1/webserver/agents/{agent_id}", json=agent_data)
return response
async def delete_webserver_agent(self, agent_id: int) -> Dict:
"""Delete a webserver agent via controller."""
response = await self._make_request("DELETE", f"/api/v1/webserver/agents/{agent_id}")
return response
# Webserver Task operations
async def get_webserver_tasks(self, task_id: Optional[int] = None,
task_status: Optional[str] = None,
agent_id: Optional[int] = None,
skip: int = 0, limit: int = 100) -> List[Dict]:
"""Get webserver tasks from controller."""
params = {"skip": str(skip), "limit": str(limit)}
if task_id is not None:
params["task_id"] = str(task_id)
if task_status is not None:
params["task_status"] = task_status
if agent_id is not None:
params["agent_id"] = str(agent_id)
response = await self._make_request("GET", "/api/v1/webserver/tasks", params=params)
return response
async def create_webserver_task(self, task_data: Dict) -> Dict:
"""Create a new webserver task via controller."""
response = await self._make_request("POST", "/api/v1/webserver/tasks", json=task_data)
return response
async def update_webserver_task(self, task_id: int, task_data: Dict) -> Dict:
"""Update a webserver task via controller."""
response = await self._make_request("PUT", f"/api/v1/webserver/tasks/{task_id}", json=task_data)
return response
async def delete_webserver_task(self, task_id: int) -> Dict:
"""Delete a webserver task via controller."""
response = await self._make_request("DELETE", f"/api/v1/webserver/tasks/{task_id}")
return response
# Webserver Log operations
async def get_webserver_logs(self, log_level: Optional[str] = None,
component: Optional[str] = None,
skip: int = 0, limit: int = 100) -> List[Dict]:
"""Get webserver logs from controller."""
params = {"skip": str(skip), "limit": str(limit)}
if log_level is not None:
params["log_level"] = log_level
if component is not None:
params["component"] = component
response = await self._make_request("GET", "/api/v1/webserver/logs", params=params)
return response
async def create_webserver_log(self, log_data: Dict) -> Dict:
"""Create a new webserver log via controller."""
response = await self._make_request("POST", "/api/v1/webserver/logs", json=log_data)
return response
# Webserver Setting operations
async def get_webserver_settings(self, setting_key: Optional[str] = None) -> List[Dict]:
"""Get webserver settings from controller."""
params = {}
if setting_key is not None:
params["setting_key"] = setting_key
response = await self._make_request("GET", "/api/v1/webserver/settings", params=params)
return response
async def create_webserver_setting(self, setting_data: Dict) -> Dict:
"""Create a new webserver setting via controller."""
response = await self._make_request("POST", "/api/v1/webserver/settings", json=setting_data)
return response
async def update_webserver_setting(self, setting_id: int, setting_data: Dict) -> Dict:
"""Update a webserver setting via controller."""
response = await self._make_request("PUT", f"/api/v1/webserver/settings/{setting_id}", json=setting_data)
return response
async def delete_webserver_setting(self, setting_id: int) -> Dict:
"""Delete a webserver setting via controller."""
response = await self._make_request("DELETE", f"/api/v1/webserver/settings/{setting_id}")
return response
# Health check
async def health_check(self) -> Dict:
"""Check controller health."""
try:
response = await self._make_request("GET", "/api/v1/health")
return response
except HTTPException:
return {"status": "unhealthy"}
# Dependency for FastAPI
async def get_controller_client() -> AsyncGenerator[ControllerClient, None]:
"""
FastAPI dependency that provides a controller client.
Returns:
ControllerClient instance
"""
client = ControllerClient()
try:
yield client
finally:
await client.close()

View File

@@ -1,22 +1,11 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from typing import Optional
"""
Database module for Epistine Webserver.
DATABASE_URL = "postgresql://user:password@localhost/epistine"
Note: This module is no longer used for direct database access.
The webserver now uses the controller API for all database operations.
The controller handles PostgreSQL database interactions.
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
This file is kept for backward compatibility but contains no functional code.
"""
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def init_db():
import epistine_webserver.backend.app.models
epistine_webserver.backend.app.models.Base.metadata.create_all(bind=engine)
# Empty module - all database operations are now handled by the controller API

View File

@@ -1,22 +1,11 @@
from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
from sqlalchemy import create_engine, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from epistine_webserver.backend.app.routers import router as main_router
from epistine_webserver.backend.app.database import get_db
from epistine_webserver.backend.app.middleware import setup_middleware
from epistine_webserver.backend.app.models import Base
# Database setup
DATABASE_URL = "postgresql://user:password@localhost/epistine"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Create all database tables
Base.metadata.create_all(bind=engine)
# Note: Database setup has been removed as the webserver now uses the controller API
# for all database operations. The controller handles all database interactions.
# FastAPI app initialization
app = FastAPI(

View File

@@ -1,46 +1,64 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, JSON
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from datetime import datetime
Base = declarative_base()
# Pydantic models for webserver API
# These models are used for request/response validation and no longer depend on SQLAlchemy
# since the webserver now uses the controller API for all database operations.
class Agent(Base):
__tablename__ = "agents"
class Agent(BaseModel):
"""Agent model for webserver API."""
id: int = Field(..., description="Agent ID")
name: str = Field(..., description="Agent name", min_length=1)
status: str = Field("offline", description="Agent status")
capabilities: Optional[Dict[str, Any]] = Field(None, description="Agent capabilities")
ip_address: Optional[str] = Field(None, description="IP address of the agent")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation timestamp")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update timestamp")
id = Column(Integer, primary_key=True, index=True)
name = Column(String(255), nullable=False)
status = Column(String(50), nullable=False)
capabilities = Column(JSON, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Config:
from_attributes = True # Allows compatibility with SQLAlchemy if needed
class Task(Base):
__tablename__ = "tasks"
class Task(BaseModel):
"""Task model for webserver API."""
id: int = Field(..., description="Task ID")
agent_id: Optional[int] = Field(None, description="ID of assigned agent")
name: str = Field(..., description="Task name", min_length=1)
status: str = Field("pending", description="Task status")
result: Optional[Dict[str, Any]] = Field(None, description="Task result")
parameters: Optional[Dict[str, Any]] = Field(None, description="Task parameters")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation timestamp")
completed_at: Optional[datetime] = Field(None, description="Completion timestamp")
error_message: Optional[str] = Field(None, description="Error message if task failed")
id = Column(Integer, primary_key=True, index=True)
agent_id = Column(Integer, ForeignKey("agents.id"), nullable=False)
status = Column(String(50), nullable=False)
result = Column(JSON, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
completed_at = Column(DateTime, nullable=True)
agent = relationship("Agent")
class Config:
from_attributes = True
class Log(Base):
__tablename__ = "logs"
class Log(BaseModel):
"""Log model for webserver API."""
id: int = Field(..., description="Log ID")
level: str = Field("INFO", description="Log level")
message: str = Field(..., description="Log message", min_length=1)
timestamp: datetime = Field(default_factory=datetime.utcnow, description="Log timestamp")
source: Optional[str] = Field(None, description="Log source/component")
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional log metadata")
id = Column(Integer, primary_key=True, index=True)
level = Column(String(50), nullable=False)
message = Column(Text, nullable=False)
timestamp = Column(DateTime, default=datetime.utcnow)
class Config:
from_attributes = True
class Setting(Base):
__tablename__ = "settings"
class Setting(BaseModel):
"""Setting model for webserver API."""
id: int = Field(..., description="Setting ID")
key: str = Field(..., description="Setting key", min_length=1)
value: Optional[str] = Field(None, description="Setting value")
description: Optional[str] = Field(None, description="Setting description")
setting_type: str = Field("string", description="Setting type (string, integer, boolean, json)")
is_encrypted: bool = Field(False, description="Whether the setting value is encrypted")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Creation timestamp")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Last update timestamp")
id = Column(Integer, primary_key=True, index=True)
key = Column(String(255), nullable=False, unique=True)
value = Column(Text, nullable=True)
description = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Config:
from_attributes = True
# Note: The Base declarative_base is no longer needed since we're not using SQLAlchemy directly
# Remove any SQLAlchemy imports and Base declaration

View File

@@ -1,59 +1,472 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from typing import List
from epistine_webserver.backend.app.database import get_db
from typing import List, Optional
from datetime import datetime
from epistine_webserver.backend.app.models import Agent, Task, Log, Setting
from epistine_webserver.backend.app.controller_client import get_controller_client, ControllerClient
router = APIRouter()
# Helper functions for data mapping
def map_controller_agent_to_webserver(controller_agent: dict) -> Agent:
"""Map controller webserver agent to webserver Agent model."""
return Agent(
id=controller_agent.get('agent_id', 0),
name=controller_agent.get('agent_name', ''),
status=controller_agent.get('agent_status', 'offline'),
capabilities=controller_agent.get('capabilities', {}),
ip_address=controller_agent.get('ip_address', ''),
created_at=controller_agent.get('created_at', datetime.utcnow()),
updated_at=controller_agent.get('updated_at', datetime.utcnow())
)
def map_webserver_agent_to_controller(webserver_agent: Agent) -> dict:
"""Map webserver Agent model to controller webserver agent."""
return {
'agent_name': webserver_agent.name,
'agent_status': webserver_agent.status,
'capabilities': webserver_agent.capabilities,
'ip_address': webserver_agent.ip_address
}
def map_controller_task_to_webserver(controller_task: dict) -> Task:
"""Map controller webserver task to webserver Task model."""
return Task(
id=controller_task.get('task_id', 0),
agent_id=controller_task.get('agent_id'),
name=controller_task.get('task_name', ''),
status=controller_task.get('task_status', 'pending'),
result=controller_task.get('task_result', {}),
parameters=controller_task.get('parameters', {}),
created_at=controller_task.get('created_at', datetime.utcnow()),
completed_at=controller_task.get('completed_at'),
error_message=controller_task.get('error_message')
)
def map_webserver_task_to_controller(webserver_task: Task) -> dict:
"""Map webserver Task model to controller webserver task."""
return {
'task_name': webserver_task.name,
'task_status': webserver_task.status,
'agent_id': webserver_task.agent_id,
'task_result': webserver_task.result,
'parameters': webserver_task.parameters
}
def map_controller_log_to_webserver(controller_log: dict) -> Log:
"""Map controller webserver log to webserver Log model."""
return Log(
id=controller_log.get('log_id', 0),
level=controller_log.get('log_level', 'INFO'),
message=controller_log.get('message', ''),
timestamp=controller_log.get('created_at', datetime.utcnow()),
source=controller_log.get('component', ''),
metadata=controller_log.get('metadata', {})
)
def map_webserver_log_to_controller(webserver_log: Log) -> dict:
"""Map webserver Log model to controller webserver log."""
return {
'log_level': webserver_log.level,
'component': webserver_log.source or 'webserver',
'message': webserver_log.message,
'metadata': webserver_log.metadata or {}
}
def map_controller_setting_to_webserver(controller_setting: dict) -> Setting:
"""Map controller webserver setting to webserver Setting model."""
return Setting(
id=controller_setting.get('setting_id', 0),
key=controller_setting.get('setting_key', ''),
value=controller_setting.get('setting_value', ''),
description=controller_setting.get('description', ''),
setting_type=controller_setting.get('setting_type', 'string'),
is_encrypted=controller_setting.get('is_encrypted', False),
created_at=controller_setting.get('created_at', datetime.utcnow()),
updated_at=controller_setting.get('updated_at', datetime.utcnow())
)
def map_webserver_setting_to_controller(webserver_setting: Setting) -> dict:
"""Map webserver Setting model to controller webserver setting."""
return {
'setting_key': webserver_setting.key,
'setting_value': webserver_setting.value,
'setting_type': getattr(webserver_setting, 'setting_type', 'string'),
'description': webserver_setting.description,
'is_encrypted': getattr(webserver_setting, 'is_encrypted', False)
}
# Agent endpoints
@router.get("/agents", response_model=List[Agent])
def read_agents(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
agents = db.query(Agent).offset(skip).limit(limit).all()
return agents
async def read_agents(
skip: int = 0,
limit: int = 100,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Get agents from controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Get webserver agents from controller
controller_agents = await controller_client.get_webserver_agents(skip=skip, limit=limit)
# Convert controller agents to webserver agents
agents = [map_controller_agent_to_webserver(agent) for agent in controller_agents]
return agents
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch agents: {str(e)}")
@router.post("/agents", response_model=Agent)
def create_agent(agent: Agent, db: Session = Depends(get_db)):
db_agent = Agent(**agent.dict())
db.add(db_agent)
db.commit()
db.refresh(db_agent)
return db_agent
async def create_agent(
agent: Agent,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Create a new agent via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Convert agent to controller format
agent_data = map_webserver_agent_to_controller(agent)
# Create agent via controller
response = await controller_client.create_webserver_agent(agent_data)
# Get the created agent to return
agent_id = response.get('agent_id')
if agent_id:
agents = await controller_client.get_webserver_agents(agent_id=agent_id)
if agents:
return map_controller_agent_to_webserver(agents[0])
# Fallback: return the input agent with ID from response
created_agent = Agent(
id=agent_id or 0,
name=agent.name,
status=agent.status,
capabilities=agent.capabilities,
ip_address=agent.ip_address,
created_at=agent.created_at,
updated_at=agent.updated_at
)
return created_agent
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create agent: {str(e)}")
@router.put("/agents/{agent_id}", response_model=Agent)
async def update_agent(
agent_id: int,
agent: Agent,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Update an agent via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Convert agent to controller format
agent_data = map_webserver_agent_to_controller(agent)
# Update agent via controller
response = await controller_client.update_webserver_agent(agent_id, agent_data)
# Get the updated agent to return
agents = await controller_client.get_webserver_agents(agent_id=agent_id)
if agents:
return map_controller_agent_to_webserver(agents[0])
raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found after update")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to update agent: {str(e)}")
@router.delete("/agents/{agent_id}")
async def delete_agent(
agent_id: int,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Delete an agent via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
response = await controller_client.delete_webserver_agent(agent_id)
return {"status": "deleted", "agent_id": agent_id}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete agent: {str(e)}")
# Task endpoints
@router.get("/tasks", response_model=List[Task])
def read_tasks(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
tasks = db.query(Task).offset(skip).limit(limit).all()
return tasks
async def read_tasks(
skip: int = 0,
limit: int = 100,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Get tasks from controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Get webserver tasks from controller
controller_tasks = await controller_client.get_webserver_tasks(skip=skip, limit=limit)
# Convert controller tasks to webserver tasks
tasks = [map_controller_task_to_webserver(task) for task in controller_tasks]
return tasks
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch tasks: {str(e)}")
@router.post("/tasks", response_model=Task)
def create_task(task: Task, db: Session = Depends(get_db)):
db_task = Task(**task.dict())
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
async def create_task(
task: Task,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Create a new task via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Convert task to controller format
task_data = map_webserver_task_to_controller(task)
# Create task via controller
response = await controller_client.create_webserver_task(task_data)
# Get the created task to return
task_id = response.get('task_id')
if task_id:
tasks = await controller_client.get_webserver_tasks(task_id=task_id)
if tasks:
return map_controller_task_to_webserver(tasks[0])
# Fallback: return the input task with ID from response
created_task = Task(
id=task_id or 0,
agent_id=task.agent_id,
name=task.name,
status=task.status,
result=task.result,
parameters=task.parameters,
created_at=task.created_at,
completed_at=task.completed_at,
error_message=task.error_message
)
return created_task
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create task: {str(e)}")
@router.put("/tasks/{task_id}", response_model=Task)
async def update_task(
task_id: int,
task: Task,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Update a task via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Convert task to controller format
task_data = map_webserver_task_to_controller(task)
# Update task via controller
response = await controller_client.update_webserver_task(task_id, task_data)
# Get the updated task to return
tasks = await controller_client.get_webserver_tasks(task_id=task_id)
if tasks:
return map_controller_task_to_webserver(tasks[0])
raise HTTPException(status_code=404, detail=f"Task {task_id} not found after update")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to update task: {str(e)}")
@router.delete("/tasks/{task_id}")
async def delete_task(
task_id: int,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Delete a task via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
response = await controller_client.delete_webserver_task(task_id)
return {"status": "deleted", "task_id": task_id}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete task: {str(e)}")
# Log endpoints
@router.get("/logs", response_model=List[Log])
def read_logs(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
logs = db.query(Log).offset(skip).limit(limit).all()
return logs
async def read_logs(
skip: int = 0,
limit: int = 100,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Get logs from controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Get webserver logs from controller
controller_logs = await controller_client.get_webserver_logs(skip=skip, limit=limit)
# Convert controller logs to webserver logs
logs = [map_controller_log_to_webserver(log_entry) for log_entry in controller_logs]
return logs
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch logs: {str(e)}")
@router.post("/logs", response_model=Log)
def create_log(log: Log, db: Session = Depends(get_db)):
db_log = Log(**log.dict())
db.add(db_log)
db.commit()
db.refresh(db_log)
return db_log
async def create_log(
log: Log,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Create a new log via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Convert log to controller format
log_data = map_webserver_log_to_controller(log)
# Create log via controller
response = await controller_client.create_webserver_log(log_data)
# Get the created log to return
log_id = response.get('log_id')
if log_id:
logs = await controller_client.get_webserver_logs()
# Find the log by ID (simplified - in production would filter by ID)
for log_entry in logs:
if log_entry.get('log_id') == log_id:
return map_controller_log_to_webserver(log_entry)
# Fallback: return the input log
return log
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create log: {str(e)}")
# Setting endpoints
@router.get("/settings", response_model=List[Setting])
def read_settings(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
settings = db.query(Setting).offset(skip).limit(limit).all()
return settings
async def read_settings(
skip: int = 0,
limit: int = 100,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Get settings from controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Get webserver settings from controller
controller_settings = await controller_client.get_webserver_settings()
# Convert controller settings to webserver settings
settings = [map_controller_setting_to_webserver(setting) for setting in controller_settings[skip:skip + limit]]
return settings
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch settings: {str(e)}")
@router.post("/settings", response_model=Setting)
def create_setting(setting: Setting, db: Session = Depends(get_db)):
db_setting = Setting(**setting.dict())
db.add(db_setting)
db.commit()
db.refresh(db_setting)
return db_setting
async def create_setting(
setting: Setting,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Create a new setting via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Convert setting to controller format
setting_data = map_webserver_setting_to_controller(setting)
# Create setting via controller
response = await controller_client.create_webserver_setting(setting_data)
# Get the created setting to return
setting_id = response.get('setting_id')
if setting_id:
settings = await controller_client.get_webserver_settings(setting_key=setting.key)
if settings:
return map_controller_setting_to_webserver(settings[0])
# Fallback: return the input setting with ID from response
created_setting = Setting(
id=setting_id or 0,
key=setting.key,
value=setting.value,
description=setting.description,
setting_type=getattr(setting, 'setting_type', 'string'),
is_encrypted=getattr(setting, 'is_encrypted', False),
created_at=setting.created_at,
updated_at=setting.updated_at
)
return created_setting
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create setting: {str(e)}")
@router.put("/settings/{setting_id}", response_model=Setting)
async def update_setting(
setting_id: int,
setting: Setting,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Update a setting via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
# Convert setting to controller format
setting_data = map_webserver_setting_to_controller(setting)
# Update setting via controller
response = await controller_client.update_webserver_setting(setting_id, setting_data)
# Get the updated setting to return
settings = await controller_client.get_webserver_settings()
# Find the setting by ID (simplified - in production would filter by ID)
for setting_entry in settings:
if setting_entry.get('setting_id') == setting_id:
return map_controller_setting_to_webserver(setting_entry)
raise HTTPException(status_code=404, detail=f"Setting {setting_id} not found after update")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to update setting: {str(e)}")
@router.delete("/settings/{setting_id}")
async def delete_setting(
setting_id: int,
controller_client: ControllerClient = Depends(get_controller_client)
):
"""
Delete a setting via controller.
Note: This endpoint now uses the controller webserver API instead of direct database access.
"""
try:
response = await controller_client.delete_webserver_setting(setting_id)
return {"status": "deleted", "setting_id": setting_id}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete setting: {str(e)}")

View File

@@ -8,11 +8,17 @@ services:
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql://user:password@db:5432/epistine
# Note: DATABASE_URL is no longer used by the webserver
# All database operations are now handled by the controller API
# Remove DATABASE_URL as webserver doesn't need direct database access
- REDIS_URL=redis://redis:6379
# Controller is external, update to appropriate URL (e.g., localhost:8001)
- CONTROLLER_URL=http://localhost:8001 # External controller URL
# Use the controller API key for authentication
- CONTROLLER_API_KEY=your_master_api_key
depends_on:
- db
- redis
# Controller is external, removed from depends_on
volumes:
- ./backend/app:/app
networks:
@@ -29,19 +35,6 @@ services:
networks:
- epistine-network
db:
image: postgres:13
environment:
- POSTGRES_DB=epistine
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- epistine-network
redis:
image: redis:6-alpine
ports:
@@ -64,7 +57,6 @@ services:
- epistine-network
volumes:
postgres_data:
redis_data:
networks:

View File

@@ -0,0 +1,223 @@
# Unified Database Interface Architecture
## Overview
This document outlines the architecture for a unified database interface that will be used by both the webserver and controller components. The goal is to provide a consistent and standardized way to interact with the database across the entire system.
## Current State Analysis
### Controller Database Implementation
The controller already has a comprehensive database implementation in `controller/app/dbhandler.py` with the following key components:
1. **Database Connection Management**:
- Uses psycopg2 for PostgreSQL connections
- Implements connection pooling with proper error handling
- Includes connection cleanup in try/finally blocks
- Supports both direct connections and context managers
2. **Database Operations**:
- CRUD operations for machines, extraction jobs, persons, companies, and documents
- Support for bulk operations and transactions
- Proper logging of all database operations
- Error handling with detailed logging
3. **API Key Management**:
- Authentication and authorization using API keys
- Scope-based access control
- Key validation and management
4. **Model Definitions**:
- Pydantic models for API responses and requests
- Database models for all entities
- Proper data validation and serialization
### Webserver Database Implementation
The webserver uses SQLAlchemy with the following components:
1. **Database Connection**:
- Uses SQLAlchemy engine for PostgreSQL
- Session management with proper cleanup
- Database initialization function
2. **Model Definitions**:
- SQLAlchemy models for agents, tasks, logs, and settings
- Relationships between entities
- Proper data types and constraints
## Proposed Unified Architecture
### 1. Database Interface Layer
Create a standardized interface that both components can use:
```python
class DatabaseInterface:
def connect(self):
# Establish database connection
pass
def disconnect(self):
# Close database connection
pass
def execute_query(self, query, params=None):
# Execute a SELECT query
pass
def execute_update(self, query, params=None):
# Execute an INSERT/UPDATE/DELETE query
pass
def begin_transaction(self):
# Start a database transaction
pass
def commit_transaction(self):
# Commit the current transaction
pass
def rollback_transaction(self):
# Rollback the current transaction
pass
```
### 2. Standardized Data Models
Define standardized data models that can be used across both components:
1. **Machine Model**:
- id, name, status, ip_address, created_at, updated_at
2. **ExtractionJob Model**:
- id, machine_id, status, result, created_at, completed_at
3. **Person Model**:
- id, name, email, created_at, updated_at
4. **Company Model**:
- id, name, industry, created_at, updated_at
5. **Document Model**:
- id, title, content, created_at, updated_at
### 3. API Endpoints
Standardize the API endpoints in the controller to provide consistent access to database operations:
1. **Machine Endpoints**:
- GET /machines - List all machines
- GET /machines/{id} - Get a specific machine
- POST /machines - Create a new machine
- PUT /machines/{id} - Update a machine
- DELETE /machines/{id} - Delete a machine
2. **ExtractionJob Endpoints**:
- GET /jobs - List all jobs
- GET /jobs/{id} - Get a specific job
- POST /jobs - Create a new job
- PUT /jobs/{id} - Update a job
- DELETE /jobs/{id} - Delete a job
3. **Person Endpoints**:
- GET /persons - List all persons
- GET /persons/{id} - Get a specific person
- POST /persons - Create a new person
- PUT /persons/{id} - Update a person
- DELETE /persons/{id} - Delete a person
4. **Company Endpoints**:
- GET /companies - List all companies
- GET /companies/{id} - Get a specific company
- POST /companies - Create a new company
- PUT /companies/{id} - Update a company
- DELETE /companies/{id} - Delete a company
5. **Document Endpoints**:
- GET /documents - List all documents
- GET /documents/{id} - Get a specific document
- POST /documents - Create a new document
- PUT /documents/{id} - Update a document
- DELETE /documents/{id} - Delete a document
### 4. Authentication and Authorization
Implement a standardized authentication system:
1. **API Key Management**:
- Generate and validate API keys
- Manage key permissions and scopes
- Implement key rotation
2. **Access Control**:
- Role-based access control
- Scope-based permissions
- Token validation
## Implementation Plan
1. Create a new shared library for database interfaces
2. Implement the standardized database interface
3. Update the controller to use the new interface
4. Update the webserver to use the new interface
5. Test all database operations
6. Document the new architecture
## Implementation Status (Step 3 Completed)
### Completed: Database API Endpoints in Controller
The controller now has comprehensive REST API endpoints for all database operations as part of Step 3 of the database migration plan.
#### Implemented Endpoints:
1. **Machine Endpoints**:
- `GET /api/v1/machines` - List all machines with optional filtering
- `GET /api/v1/machines/{machine_id}` - Get a specific machine
- `POST /api/v1/machines` - Create a new machine
- `DELETE /api/v1/machines/{machine_id}` - Delete a machine
2. **Extraction Job Endpoints**:
- `GET /api/v1/jobs` - List all extraction jobs with optional filtering
- `GET /api/v1/jobs/{job_id}` - Get a specific extraction job
- `POST /api/v1/jobs` - Create a new extraction job
- `DELETE /api/v1/jobs/{job_id}` - Delete an extraction job
3. **Person Endpoints**:
- `GET /api/v1/persons` - List all persons with optional filtering
- `GET /api/v1/persons/{person_id}` - Get a specific person
- `POST /api/v1/persons` - Create a new person
- `DELETE /api/v1/persons/{person_id}` - Delete a person
- `GET /api/v1/person-aliases` - List all person aliases
- `POST /api/v1/person-aliases` - Create a new person alias
- `DELETE /api/v1/person-aliases/{alias_id}` - Delete a person alias
4. **Company Endpoints**:
- `GET /api/v1/companies` - List all companies with optional filtering
- `GET /api/v1/companies/{company_id}` - Get a specific company
- `POST /api/v1/companies` - Create a new company
- `DELETE /api/v1/companies/{company_id}` - Delete a company
- `GET /api/v1/company-aliases` - List all company aliases
- `POST /api/v1/company-aliases` - Create a new company alias
- `DELETE /api/v1/company-aliases/{alias_id}` - Delete a company alias
5. **Document Endpoints**:
- `GET /api/v1/documents` - List all documents with optional filtering
- `GET /api/v1/documents/{document_id}` - Get a specific document
- `POST /api/v1/documents` - Create a new document
- `DELETE /api/v1/documents/{document_id}` - Delete a document
#### Technical Implementation Details:
1. **Query Functions Added to dbhandler.py**:
- Added comprehensive query functions for all entities: `get_machines()`, `get_extraction_jobs()`, `get_persons()`, `get_companies()`, `get_documents()`, etc.
- All query functions support optional filtering parameters
- Proper error handling and logging
2. **REST Endpoints in routes.py**:
- Added new endpoint definitions with proper FastAPI decorators
- Implemented authentication using API key scopes
- Added request/response models using Pydantic
- Proper serialization of UUID and datetime fields to ISO format strings
3. **Authentication and Authorization**:
- All endpoints use the existing API key authentication system
- Appropriate scopes required for each operation (e.g., `WORKER_READ`, `WORKER_REGISTER`)
- Consistent error responses for unauthorized access
#### Next Steps (Step 4):
The next step in the migration plan is to implement authentication between the webserver and controller, followed by moving database models from the webserver to the controller.

View File

@@ -1,66 +0,0 @@
graph TB
subgraph "User Interface"
UI[React/Vue Frontend]
UI_Responsive[Responsive Design]
UI_Components[Reusable Components]
UI_RealTime[Real-time Updates]
end
subgraph "Application Layer"
FastAPI[FastAPI Backend]
WS[WebSocket Server]
Auth[Authentication]
CORS[CORS Middleware]
Cache[Redis Cache]
end
subgraph "Data Layer"
DB[PostgreSQL Database]
Controller[Controller API]
FileS3[S3 File Storage]
end
subgraph "Infrastructure"
Nginx[Nginx Reverse Proxy]
Docker[Docker Containers]
Monitor[Monitoring System]
end
subgraph "External Services"
AuthS[Auth Service]
LogS[Logging Service]
NotifS[Notification Service]
end
UI --> FastAPI
UI --> WS
UI_Responsive --> UI
UI_Components --> UI
UI_RealTime --> WS
FastAPI --> Auth
FastAPI --> CORS
FastAPI --> Cache
FastAPI --> DB
FastAPI --> Controller
FastAPI --> FileS3
WS --> FastAPI
Nginx --> FastAPI
Nginx --> UI
Docker --> FastAPI
Docker --> UI
Docker --> Nginx
Docker --> DB
Docker --> Cache
Monitor --> FastAPI
Monitor --> UI
Monitor --> DB
Monitor --> Cache
AuthS --> Auth
LogS --> FastAPI
NotifS --> FastAPI

67
plans/db_migration.md Normal file
View File

@@ -0,0 +1,67 @@
# Database Migration Plan
## Overview
The goal of this plan is to move database responsibility from the webserver (FastAPI backend) to the controller. This will improve separation of concerns, scalability, and maintainability of the system.
## Steps
### 1. Analyze Current Database Implementations
- **Task**: Review the current database implementations in both the webserver and controller.
- **Action**: Use the `read_file` tool to examine the relevant files.
- **Next Step**: Design unified database interface architecture.
### 2. Design Unified Database Interface Architecture
- **Task**: Design a unified database interface architecture that both the webserver and controller can use.
- **Action**: Create a new Markdown file in the `plan` directory to outline the architecture.
- **Next Step**: Create database API endpoints in the controller.
### 3. Create Database API Endpoints in Controller
- **Task**: Create REST endpoints in the controller for all database operations.
- **Action**: Implement the endpoints in the `controller/app/routers/routes.py` file.
- **Next Step**: Implement authentication between the webserver and controller.
### 4. Implement Authentication Between Webserver and Controller
- **Task**: Implement proper authentication between the webserver and controller.
- **Action**: Add authentication logic in the controller and update the webserver to use the controller API.
- **Next Step**: Move database models from the webserver to the controller.
### 5. Move Database Models from Webserver to Controller
- **Task**: Move all database models from the webserver to the controller.
- **Action**: Update the `controller/app/models` directory and ensure all models are compatible with the new architecture.
- **Next Step**: Update the webserver to use the controller API instead of direct database access.
### 6. Update Webserver to Use Controller API Instead of Direct Database Access
- **Task**: Update the webserver to use the controller API for all database operations.
- **Action**: Replace direct database calls with API calls in the webserver.
- **Next Step**: Update the `docker-compose.yml` file for the new architecture.
### 7. Update Docker Compose for New Architecture ✓ COMPLETED
- **Task**: Update the `docker-compose.yml` file to reflect the new architecture.
- **Action**: Add proper networking and dependencies between components.
- **Status**: Completed - Updated environment variables, synchronized API keys, verified networking
- **Next Step**: Add comprehensive testing for database operations.
### 8. Add Comprehensive Testing for Database Operations
- **Task**: Add comprehensive testing for all database operations.
- **Action**: Write unit tests and integration tests for the database API endpoints.
- **Next Step**: Implement monitoring and logging for database operations.
### 9. Implement Monitoring and Logging for Database Operations
- **Task**: Implement monitoring and logging for database operations.
- **Action**: Add logging and monitoring logic in the controller and webserver.
- **Next Step**: Perform performance testing and optimization.
### 10. Security Validation of New API Endpoints
- **Task**: Validate the security of the new API endpoints.
- **Action**: Conduct security audits and ensure proper authentication and authorization.
- **Next Step**: Documentation updates for the new architecture.
### 11. Documentation Updates for the New Architecture
- **Task**: Update documentation for the new architecture.
- **Action**: Document the changes and provide clear instructions for developers.
- **Next Step**: Final validation and deployment preparation.
## Conclusion
By following this plan, we can successfully move database responsibility from the webserver to the controller, improving the overall architecture and maintainability of the system.

View File

@@ -1,461 +0,0 @@
# EpistineFiles Webserver Migration Plan
## Executive Summary
This document outlines the comprehensive migration strategy from the current Streamlit-based webserver to a modern FastAPI + React/Vue architecture. The migration will maintain all existing functionality while providing enhanced performance, scalability, and user experience.
## Current State Analysis
### Existing Streamlit Implementation
- **Framework**: Streamlit 1.29.0
- **Backend**: Direct API calls to controller service
- **Frontend**: Streamlit's built-in components
- **Architecture**: Monolithic application
- **Deployment**: Docker container on port 8501
- **Key Features**:
- Data Explorer
- Agent Control
- Logs Management
- Settings Configuration
- API Key Management
### Identified Limitations
- Limited scalability and performance
- No real-time updates
- Restricted customization options
- Single-threaded architecture
- Limited mobile responsiveness
## Target Architecture
### Technology Stack
#### Backend (FastAPI)
```python
# FastAPI application structure
from fastapi import FastAPI, WebSocket, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
from sqlalchemy import create_engine, Column, Integer, String, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# Database setup
DATABASE_URL = "postgresql://user:password@localhost/epistine"
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# FastAPI app initialization
app = FastAPI(
title="EpistineFiles API",
description="Modern API for EpistineFiles management",
version="1.0.0"
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000", "https://yourdomain.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
```
#### Frontend (React)
```javascript
// React component structure
import React, { useState, useEffect } from 'react';
import { WebSocket } from 'ws';
import { BrowserRouter as Router, Routes, Route } from 'react-router-dom';
import { Container, Row, Col } from 'react-bootstrap';
const Dashboard = () => {
const [agents, setAgents] = useState([]);
const [tasks, setTasks] = useState([]);
const [websocket, setWebsocket] = useState(null);
useEffect(() => {
// Initialize WebSocket connection
const ws = new WebSocket('ws://localhost:8000/ws');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
// Handle real-time updates
if (data.type === 'agents') {
setAgents(data.data);
}
};
setWebsocket(ws);
return () => {
ws.close();
};
}, []);
return (
<Container fluid>
<Row>
<Col md={12}>
<h1 className="mb-4">EpistineFiles Dashboard</h1>
</Col>
</Row>
{/* Agent cards, task lists, etc. */}
</Container>
);
};
```
## Migration Phases
### Phase 1: Foundation Setup
#### 1.1 Project Structure Creation
```
epistine-webserver/
├── backend/
│ ├── app/
│ │ ├── main.py
│ │ ├── models/
│ │ ├── routers/
│ │ ├── middleware/
│ │ └── database/
│ ├── tests/
│ └── requirements.txt
├── frontend/
│ ├── src/
│ │ ├── components/
│ │ ├── pages/
│ │ ├── services/
│ │ ├── utils/
│ │ └── hooks/
│ ├── public/
│ └── package.json
├── docker/
├── nginx/
└── docker-compose.yml
```
#### 1.2 Technology Stack Installation
- FastAPI backend dependencies
- React frontend dependencies
- Database setup (PostgreSQL)
- Redis for caching
- Nginx for reverse proxy
### Phase 2: Backend Implementation
#### 2.1 Core API Development
```python
# API endpoint structure
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from database import get_db
from models import Agent, Task, Log, Setting
from schemas import AgentCreate, TaskCreate, LogCreate
router = APIRouter()
@router.get("/agents", response_model=List[Agent])
def read_agents(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
agents = db.query(Agent).offset(skip).limit(limit).all()
return agents
@router.post("/agents", response_model=Agent)
def create_agent(agent: AgentCreate, db: Session = Depends(get_db)):
db_agent = Agent(**agent.dict())
db.add(db_agent)
db.commit()
db.refresh(db_agent)
return db_agent
```
#### 2.2 WebSocket Implementation
```python
# Real-time updates
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
# Process and broadcast real-time updates
await websocket.send_text(f"Message text was: {data}")
```
#### 2.3 Authentication & Authorization
```python
# Security implementation
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from passlib.context import CryptContext
from jose import JWTError, jwt
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
return pwd_context.hash(password)
```
### Phase 3: Frontend Development
#### 3.1 Component Architecture
```javascript
// Reusable component structure
const AgentCard = ({ agent }) => {
const [status, setStatus] = useState(agent.status);
useEffect(() => {
// Real-time status updates
const ws = new WebSocket('ws://localhost:8000/ws');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.agentId === agent.id) {
setStatus(data.status);
}
};
}, [agent.id]);
return (
<Card className="agent-card">
<Card.Header>
<h5>{agent.name}</h5>
</Card.Header>
<Card.Body>
<p>Status: {status}</p>
<p>Capabilities: {agent.capabilities.join(', ')}</p>
</Card.Body>
</Card>
);
};
```
#### 3.2 State Management
```javascript
// State management with React Context
const AppContext = createContext();
const AppProvider = ({ children }) => {
const [agents, setAgents] = useState([]);
const [tasks, setTasks] = useState([]);
const [loading, setLoading] = useState(false);
const fetchAgents = async () => {
setLoading(true);
try {
const response = await api.get('/agents');
setAgents(response.data);
} catch (error) {
console.error('Failed to fetch agents:', error);
} finally {
setLoading(false);
}
};
return (
<AppContext.Provider value={{ agents, tasks, loading, fetchAgents }}>
{children}
</AppContext.Provider>
);
};
```
### Phase 4: Integration & Testing
#### 4.1 API Integration
```python
# Controller API integration
class ControllerAPI:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.headers = {
"FilesManager-API-Key": api_key,
"Content-Type": "application/json"
}
async def get_agent_status(self, agent_id: str):
url = f"{self.base_url}/api/v1/agents/{agent_id}/status"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=self.headers)
response.raise_for_status()
return response.json()
```
#### 4.2 Testing Strategy
- Unit tests for all API endpoints
- Integration tests for database operations
- Frontend component testing
- End-to-end testing
- Performance testing
- Security testing
### Phase 5: Deployment & Monitoring
#### 5.1 Docker Configuration
```dockerfile
# Multi-stage Dockerfile for production
FROM python:3.11-slim as builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
FROM python:3.11-slim as runtime
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
```
#### 5.2 Monitoring Setup
- Application performance monitoring
- Error tracking and alerting
- Log aggregation
- Health checks
- Performance metrics
## Data Migration Strategy
### Database Schema Migration
```sql
-- Migration script for agent data
CREATE TABLE agents (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
capabilities JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Migration script for tasks
CREATE TABLE tasks (
id SERIAL PRIMARY KEY,
agent_id INTEGER REFERENCES agents(id),
status VARCHAR(50) NOT NULL,
result JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP
);
```
### Configuration Migration
- Environment variables migration
- API key migration
- Settings migration
- User preferences migration
## Risk Assessment & Mitigation
### Technical Risks
1. **API Compatibility Issues**
- Mitigation: Maintain backward compatibility during transition
- Implement feature flags for gradual rollout
2. **Performance Degradation**
- Mitigation: Comprehensive performance testing
- Implement caching strategies
- Optimize database queries
3. **Data Loss**
- Mitigation: Robust backup strategy
- Data validation during migration
- Rollback procedures
### Operational Risks
1. **User Disruption**
- Mitigation: Clear communication plan
- Parallel running during transition
- Comprehensive user documentation
2. **Security Vulnerabilities**
- Mitigation: Security audit before deployment
- Regular security updates
- Penetration testing
## Success Criteria
### Functional Requirements
- [ ] All existing features migrated successfully
- [ ] Real-time updates implemented
- [ ] Mobile-responsive design
- [ ] Improved performance
- [ ] Enhanced security
### Performance Requirements
- [ ] Page load time < 1 second
- [ ] API response time < 200ms
- [ ] Support for 1000+ concurrent users
- [ ] 99.9% uptime
### User Experience Requirements
- [ ] Intuitive interface
- [ ] Comprehensive error handling
- [ ] Accessible design
- [ ] User feedback mechanism
## Timeline and Milestones
### Phase 1: Foundation (2 weeks)
- Project structure setup
- Technology stack installation
- Basic API endpoints
### Phase 2: Core Development (4 weeks)
- Complete API implementation
- Database setup
- Authentication system
### Phase 3: Frontend Development (3 weeks)
- React application setup
- Component development
- Real-time features
### Phase 4: Integration & Testing (2 weeks)
- API integration
- Comprehensive testing
- Performance optimization
### Phase 5: Deployment (1 week)
- Staging deployment
- Production deployment
- Monitoring setup
## Resource Requirements
### Development Team
- 2 Backend Developers
- 2 Frontend Developers
- 1 DevOps Engineer
- 1 QA Engineer
### Infrastructure
- Development servers
- Staging environment
- Production servers
- Database servers
- Monitoring tools
### Tools and Services
- Version control (Git)
- CI/CD pipeline
- Project management tool
- Communication platform
## Conclusion
This migration plan provides a comprehensive roadmap for transitioning from Streamlit to FastAPI + React/Vue architecture. The plan addresses all technical, operational, and user experience requirements while maintaining backward compatibility and ensuring a smooth transition.
The modular approach allows for incremental development and testing, reducing risks and ensuring quality throughout the migration process. Regular checkpoints and success criteria will ensure the project stays on track and meets all objectives.