HiveMatrix Architecture - Service-Specific Implementation¶
Version 4.2 Part 3 of 3: Service-Specific Deep Dives
This document covers service-specific implementations, production features, and advanced patterns for Brainhair, Ledger, KnowledgeTree, Helm, and other specialized services. Read Architecture - Core and Architecture - Development first.
Other Parts:
- Architecture - Core - Core concepts, authentication, service communication
- Architecture - Development - Development practices, security, tools, and patterns
16. Brainhair AI Assistant & Approval Flow¶
Brainhair is HiveMatrix's intelligent AI assistant service (port 5050) that enables natural language interaction with the entire platform through the official Claude Code CLI from Anthropic. It provides seamless integration with Claude AI for performing administrative tasks, answering questions, and managing system operations across all HiveMatrix services.
Design Goal: Brainhair can manage everything in the HiveMatrix platform. Any administrative task that can be performed through the web interface is also accessible via natural language commands through Brainhair. This includes creating, reading, updating, and deleting data across all services (Codex, Ledger, KnowledgeTree, etc.).
For comprehensive Brainhair documentation, see Brainhair Service Documentation.
Architecture Overview¶
┌─────────────────────────────────────────────────────────────────┐
│ Brainhair Architecture │
│ │
│ User (Browser) ←──→ Flask Routes (port 5050) │
│ │ │
│ ├─ POST /api/chat │
│ │ └─ Create response buffer │
│ │ └─ Spawn Claude Code subprocess │
│ │ └─ Return response_id │
│ │ │
│ ├─ GET /api/chat/poll/<id> │
│ │ └─ Check for approval requests │
│ │ └─ Return new chunks + done status │
│ │ │
│ └─ POST /api/chat/approval-response │
│ └─ Write response file │
│ │
│ ClaudeSessionManager │
│ └─ Background cleanup (30-min idle timeout) │
│ └─ PostgreSQL persistence (sessions + messages) │
│ └─ Spawns: npx @anthropic-ai/claude-code │
│ │
│ Claude Code (Subprocess) │
│ └─ Streams JSON output │
│ └─ Calls tools from claude_tools/ │
│ └─ Receives approval responses via file-based IPC │
│ │
│ Tools System (claude_tools/*.py) │
│ └─ codex_tools.py, ledger_tools.py, etc. │
│ └─ Uses service-to-service auth │
│ └─ Requests approval for write operations │
│ └─ Applies PHI/CJIS filtering via Presidio │
│ │
│ Approval Workflow (File-based IPC) │
│ 1. Tool writes /tmp/brainhair_approval_request_*.json │
│ 2. Poll endpoint detects file, injects into stream │
│ 3. Browser displays approval modal │
│ 4. User approves/denies │
│ 5. Browser writes /tmp/brainhair_approval_response_*.json │
│ 6. Tool reads response, continues or exits │
└─────────────────────────────────────────────────────────────────┘
Claude Code CLI Integration¶
Brainhair uses the official Claude Code CLI from Anthropic (not custom ai_tools):
Installation:
# NPX (automatic)
npx -y @anthropic-ai/claude-code --version
# Or global install
npm install -g @anthropic-ai/claude-code
Subprocess Invocation:
# In claude_session_manager.py
command = [
'npx', '-y', '@anthropic-ai/claude-code',
'--model', 'claude-sonnet-4-5',
'--allowed-tools', 'Bash,Read,Grep,Glob',
'--permission-mode', 'dontAsk',
'--output-format', 'stream-json',
'--print',
user_message
]
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=self.working_directory,
env=env
)
Why Claude Code CLI: - ✅ Official Anthropic tool (maintained and updated) - ✅ Streaming JSON output for real-time UI updates - ✅ Built-in tool system (Bash, Read, Grep, Glob) - ✅ Session management and conversation history - ✅ No need to maintain custom AI integration code
Polling Architecture¶
Brainhair uses server-side response buffering with client polling instead of Server-Sent Events (SSE) for better proxy compatibility:
Why Not SSE: - ❌ Proxies (Nexus, nginx, corporate) often buffer SSE responses - ❌ Timeout issues with long-running AI responses - ❌ Connection keep-alive challenges
Polling Flow:
1. User sends message → POST /api/chat
→ Creates response_id
→ Spawns Claude Code subprocess
→ Returns {"response_id": "abc123", "session_id": "session-uuid"}
2. Browser starts polling → GET /api/chat/poll/abc123?offset=0
→ Returns {"chunks": [...], "offset": 10, "done": false}
3. Poll loop continues every 500ms
→ GET /api/chat/poll/abc123?offset=10
→ Returns new chunks since last offset
4. Approval request detected (file-based IPC)
→ Poll returns {"chunks": [{"type": "approval_request", ...}]}
→ Browser shows approval modal
→ User approves/denies
→ POST /api/chat/approval-response
5. Claude Code finishes
→ Poll returns {"chunks": [...], "done": true, "session_id": "..."}
→ Browser stops polling
Response Buffer Structure:
response_buffers[response_id] = {
'chunks': [], # All output chunks
'done': False, # Completion flag
'error': None, # Error message if failed
'created_at': time.time(), # For cleanup
'session_id': None, # In-memory session ID
'db_session_id': None, # Database session ID
'process': subprocess # Claude Code process
}
Buffer Cleanup: - Buffers removed after 5 minutes of inactivity - Prevents memory leaks from abandoned polls
Tools System¶
Brainhair tools are Python modules in claude_tools/ directory (not ai_tools/):
Available Tools:
- codex_tools.py - Query companies, assets, users, tickets from Codex
- ledger_tools.py - Billing calculations and contract alignment
- contract_tools.py - Contract analysis and billing alignment
- knowledge_tools.py - Search and manage KnowledgeTree
- system_tools.py - Service health checks and system status
Tool Structure:
# claude_tools/codex_tools.py
import os
from brainhair_auth import BrainhairAuth
def list_companies(limit=50):
"""List all companies from Codex."""
auth = BrainhairAuth()
response = auth.get('/api/codex/companies', params={'limit': limit})
return response.json()
def get_company(account_number):
"""Get company details by account number."""
auth = BrainhairAuth()
response = auth.get(f'/api/codex/companies/{account_number}')
return response.json()
BrainhairAuth Helper:
# claude_tools/brainhair_auth.py
class BrainhairAuth:
"""Helper for service-to-service authentication."""
def __init__(self):
self.session_id = os.environ.get('BRAINHAIR_SESSION_ID')
self.user = os.environ.get('HIVEMATRIX_USER')
self.base_url = os.environ.get('BRAINHAIR_URL', 'http://localhost:5050')
def get_service_token(self):
"""Request service token from Brainhair Flask app."""
response = requests.post(
f"{self.base_url}/api/internal/service-token",
json={'session_id': self.session_id}
)
return response.json()['token']
def get(self, path, **kwargs):
"""Make authenticated GET request."""
token = self.get_service_token()
return requests.get(
f"{self.base_url}{path}",
headers={'Authorization': f'Bearer {token}'},
**kwargs
)
Approval Workflow (File-Based IPC)¶
Critical Rule: All write operations (create, update, delete) must request user approval before executing.
Flow Diagram:
┌──────────────────────────────────────────────────────────────┐
│ Approval Workflow (File-Based IPC) │
│ │
│ 1. Tool executes and needs approval │
│ └─ request_approval("Update billing", {...}) │
│ │
│ 2. Tool writes approval request file │
│ └─ /tmp/brainhair_approval_request_SESSION_TIMESTAMP.json│
│ { │
│ "type": "approval_request", │
│ "approval_id": "SESSION_TIMESTAMP", │
│ "session_id": "db-session-uuid", │
│ "action": "Update billing rates for Company X", │
│ "details": { │
│ "Company": "Acme Corp", │
│ "Per User Cost": "$15.00 → $18.00", │
│ "Monthly Impact": "+$75.00" │
│ } │
│ } │
│ │
│ 3. Poll endpoint detects approval file │
│ └─ Scans /tmp/brainhair_approval_request_SESSION_*.json │
│ └─ Injects into response stream as chunk │
│ │
│ 4. Browser receives approval chunk │
│ └─ Displays modal with action and details │
│ └─ User clicks "Approve" or "Deny" │
│ │
│ 5. Browser writes response file │
│ └─ POST /api/chat/approval-response │
│ └─ /tmp/brainhair_approval_response_APPROVAL_ID.json │
│ { │
│ "approved": true, │
│ "timestamp": "2025-11-22T10:30:00Z" │
│ } │
│ │
│ 6. Tool polls for response file (timeout: 5 minutes) │
│ └─ Reads response │
│ └─ If approved: continue execution │
│ └─ If denied: exit with error │
│ └─ If timeout: exit with timeout error │
│ │
│ 7. Cleanup │
│ └─ Both files deleted after response read │
└──────────────────────────────────────────────────────────────┘
Approval Helper (claude_tools/approval_helper.py):
def request_approval(action: str, details: dict, timeout: int = 300) -> bool:
"""
Request user approval for a write operation.
Args:
action: Description of the action (e.g., "Update billing rates")
details: Dict of details to show user
timeout: Seconds to wait for response (default: 5 minutes)
Returns:
True if approved, False if denied or timeout
"""
session_id = os.environ.get('BRAINHAIR_SESSION_ID')
approval_id = f"{session_id}_{int(time.time() * 1000)}"
# Write request file
request_file = f"/tmp/brainhair_approval_request_{approval_id}.json"
with open(request_file, 'w') as f:
json.dump({
'type': 'approval_request',
'approval_id': approval_id,
'session_id': session_id,
'action': action,
'details': details
}, f)
# Poll for response
response_file = f"/tmp/brainhair_approval_response_{approval_id}.json"
start_time = time.time()
while time.time() - start_time < timeout:
if os.path.exists(response_file):
with open(response_file, 'r') as f:
response = json.load(f)
os.remove(request_file)
os.remove(response_file)
return response.get('approved', False)
time.sleep(0.5)
# Timeout cleanup
if os.path.exists(request_file):
os.remove(request_file)
return False
Tools Requiring Approval:
- update_billing.py - Change billing rates, add line items
- set_company_plan.py - Assign/change billing plans
- manage_network_equipment.py - Add/remove assets
- update_features.py - Modify feature overrides
- execute_powershell.py - Run remote commands (Datto RMM)
Read-Only Tools (No Approval):
- list_companies.py - View company data
- view_billing.py - Display billing information
- search_knowledge.py - Search knowledge base
- get_company_plan.py - Show plan details
PHI/CJIS Filtering with Microsoft Presidio¶
Critical Security Feature: Brainhair implements automatic PHI (Protected Health Information) and CJIS (Criminal Justice Information Systems) data filtering to prevent sensitive information from being exposed to Claude AI or logged inappropriately.
Implementation: Uses Microsoft Presidio for automated detection and anonymization of sensitive data in tool responses. All data flowing through Brainhair is filtered before being sent to Claude.
Filtered Entity Types:
PHI Entities (Default):
- PERSON (names) - Anonymized to "FirstName L." format via custom FirstNameLastInitialOperator
- EMAIL_ADDRESS - Replaced with <EMAIL_ADDRESS>
- PHONE_NUMBER - Replaced with <PHONE_NUMBER>
- US_SSN - Replaced with <US_SSN>
- DATE_TIME - Replaced with <DATE_TIME>
- LOCATION - Replaced with <LOCATION>
- MEDICAL_LICENSE - Replaced with <MEDICAL_LICENSE>
- US_DRIVER_LICENSE - Replaced with <US_DRIVER_LICENSE>
- US_PASSPORT - Replaced with <US_PASSPORT>
- CREDIT_CARD - Replaced with <CREDIT_CARD>
- IP_ADDRESS - Replaced with <IP_ADDRESS>
CJIS Entities (Law Enforcement):
- Subset of PHI: PERSON, US_SSN, US_DRIVER_LICENSE, DATE_TIME, LOCATION, IP_ADDRESS
Example Filtering:
// Before PHI filtering
{
"requester": "John Smith",
"email": "john.smith@example.com",
"phone": "555-123-4567",
"ssn": "123-45-6789",
"ip_address": "192.168.1.100"
}
// After PHI filtering
{
"requester": "John S.",
"email": "<EMAIL_ADDRESS>",
"phone": "<PHONE_NUMBER>",
"ssn": "<US_SSN>",
"ip_address": "<IP_ADDRESS>"
}
Custom FirstNameLastInitialOperator: - Preserves first name for context (e.g., "Which John?") - Protects last name (e.g., "Smith" → "S.") - Better for AI understanding while maintaining privacy
Filtering in Tools:
All service API calls from tools pass through PHI filtering:
# In codex_tools.py
from app.presidio_filter import get_presidio_filter
def get_company_data(account_number):
response = call_service('codex', f'/api/companies/{account_number}')
data = response.json()
# Apply PHI filtering
presidio = get_presidio_filter()
filtered_data = presidio.filter_phi(data)
return filtered_data
When to Use: - PHI (default): Healthcare MSPs, general client data - CJIS: Law enforcement MSPs, criminal justice data - None: Internal operations with proper authorization only
See Brainhair PHI/CJIS Filtering Documentation for complete implementation details.
Contract Alignment¶
Contract Alignment allows Claude to analyze contract documents, extract billing terms, and automatically align Ledger billing settings to match the contract.
5-Step Workflow:
1. User provides contract text to Claude
"Here's the contract for Example Company..."
2. Claude extracts billing terms using NLP
→ per_user_rate: $15.00
→ hourly_rate: $150.00
→ prepaid_hours_monthly: 4.0
3. Claude calls contract_tools.get_current_settings()
→ Fetches current billing from Ledger
4. Claude calls contract_tools.compare_contract_to_settings()
→ Identifies discrepancies
→ Generates recommendations
5. Claude presents findings to user
"Found 3 discrepancies:"
" - Per user cost: $12 → should be $15"
" - Prepaid hours: 0 → should be 4"
6. User approves alignment (approval workflow)
→ Claude calls contract_tools.align_settings()
→ Requires approval (write operation)
7. Claude verifies alignment
→ contract_tools.verify_alignment()
→ "✓ Alignment complete. All settings match contract."
Contract Terms Structure:
contract_terms = {
"billing_method": "per_user", # Or: "flat_fee", "per_device"
"per_user_rate": 15.00,
"per_workstation_rate": 75.00,
"per_server_rate": 125.00,
"hourly_rate": 150.00,
"prepaid_hours_monthly": 4.0,
"support_level": "All Inclusive",
"included_users": 50, # Optional
"custom_items": [ # Optional
{"name": "Extra backup", "monthly_fee": 100.00}
]
}
API Functions (contract_tools.py):
- get_current_settings(account_number) - Get all current billing settings
- compare_contract_to_settings(account_number, contract_terms) - Identify discrepancies
- align_settings(account_number, adjustments, dry_run=False) - Apply adjustments (requires approval)
- verify_alignment(account_number, contract_terms) - Verify settings match contract
See Brainhair Contract Alignment Documentation for complete API reference and examples.
Session Management¶
Brainhair implements database-backed session management with automatic cleanup and conversation persistence.
Session Lifecycle:
1. Session Creation
├─ POST /api/chat (first message)
├─ Generate UUID for in-memory session
├─ Create or resume database session (ChatSession)
└─ Fetch context (ticket, client details)
2. Session Active
├─ Messages exchanged (user ←→ Claude)
├─ Claude Code subprocess spawned per message
├─ Conversation history maintained in memory
└─ Last activity timestamp updated
3. Session Idle (30 minutes)
├─ No activity for 30 minutes
├─ Background cleanup thread detects idle
├─ Session removed from in-memory cache
└─ Database session remains for history
4. Session Resumption
├─ POST /api/chat with db_session_id
├─ Loads messages from database (ChatMessage)
├─ Reconstructs conversation history
└─ Continues conversation
ClaudeSessionManager:
from app.claude_session_manager import get_session_manager
session_manager = get_session_manager() # Singleton
# Create new session
session_id = session_manager.create_session(
user="john.technician",
context={"ticket": "12345", "client": "ACME Corp"}
)
# Resume existing session
session_id = session_manager.create_session(
user="john.technician",
context={...},
db_session_id="existing-db-uuid"
)
# Send message
for chunk in session_manager.get_session(session_id).send_message_stream("Hello"):
print(chunk)
# Cleanup idle sessions (runs automatically every 5 minutes)
session_manager.cleanup_idle_sessions(max_age_seconds=1800) # 30 min
Database Schema:
# ChatSession model
class ChatSession(db.Model):
id = db.Column(db.String(36), primary_key=True) # UUID
user_id = db.Column(db.String(100)) # Username
user_name = db.Column(db.String(255)) # Display name
ticket_number = db.Column(db.String(50)) # Optional ticket
client_name = db.Column(db.String(255)) # Optional client
created_at = db.Column(db.DateTime)
updated_at = db.Column(db.DateTime)
messages = db.relationship('ChatMessage', backref='session')
# ChatMessage model
class ChatMessage(db.Model):
id = db.Column(db.Integer, primary_key=True)
session_id = db.Column(db.String(36), ForeignKey('chat_sessions.id'))
role = db.Column(db.String(20)) # 'user' or 'assistant'
content = db.Column(db.Text)
created_at = db.Column(db.DateTime)
Background Cleanup Thread:
def _cleanup_loop(self):
"""Runs every 5 minutes to remove idle sessions."""
cleanup_interval = 300 # 5 minutes
while not self._cleanup_stop_event.is_set():
if self._cleanup_stop_event.wait(timeout=cleanup_interval):
break
# Remove sessions idle for 30+ minutes
self.cleanup_idle_sessions(max_age_seconds=1800)
Session Persistence: - In-Memory: Active sessions for fast access - Database: All sessions + messages for history and resumption - Cleanup: Automatic removal of idle in-memory sessions - Resumption: Load from database on demand
See Brainhair Session Management Documentation for complete implementation details.
Security & Debugging¶
Security Considerations: - Tools run with service-level authentication (bypass user permission checks) - Approval system ensures human oversight for all write operations - Session ID tracking prevents cross-session approval hijacking - Timeout prevents tools from hanging indefinitely (default: 5 minutes) - Approval files cleaned up after use to prevent file system bloat - PHI/CJIS filtering prevents sensitive data exposure to AI and logs - All commands logged to Helm for audit trail
Debugging:
View Brainhair logs:
Check for approval files:
Monitor Claude Code subprocess:
For comprehensive troubleshooting, see Brainhair Troubleshooting Guide.
17. Service-to-Service Communication Enhancements¶
Service Token Caching¶
To improve performance and reduce load on Core, services implement service token caching with 5-minute expiration.
Without Caching (Old):
# Every API call requests a new token from Core
def call_service(service_name, path):
token = get_service_token_from_core(service_name) # HTTP call to Core
response = requests.get(f"{service_url}{path}", headers={'Authorization': f'Bearer {token}'})
return response
With Caching (New):
# In service_client.py
_token_cache = {}
def get_service_token(service_name):
"""Get cached token or request new one if expired."""
cache_key = f"token:{service_name}"
if cache_key in _token_cache:
token, expires_at = _token_cache[cache_key]
if time.time() < expires_at:
return token # Use cached token
# Request new token from Core
response = requests.post(
f"{CORE_SERVICE_URL}/service-token",
json={'calling_service': 'myservice', 'target_service': service_name}
)
token = response.json()['token']
# Cache for 4.5 minutes (leave 30s buffer before 5-min expiration)
_token_cache[cache_key] = (token, time.time() + 270)
return token
Benefits: - ✅ Reduces HTTP calls to Core by ~90% - ✅ Faster service-to-service communication - ✅ Lower Core service load - ✅ Same security (tokens still expire after 5 minutes)
Implementation Status: - ✅ Core service (already implemented) - ✅ Codex service - ✅ Ledger service - ✅ KnowledgeTree service - ✅ Brainhair service
18. Production Features¶
HiveMatrix services implement comprehensive production-ready features for reliability, observability, and security.
Redis Session Persistence (Core)¶
Problem: In-memory sessions are lost on service restart, forcing users to re-login.
Solution: Sessions stored in Redis with automatic TTL expiration.
Implementation (Core Service):
# In hivematrix-core/app/session_manager.py
import redis
from flask import current_app
class SessionManager:
def __init__(self):
redis_url = current_app.config.get('REDIS_URL')
if redis_url:
self.redis_client = redis.from_url(redis_url)
self.storage_type = 'redis'
else:
self.sessions = {} # Fallback to in-memory
self.storage_type = 'memory'
def create_session(self, user_data, ttl=3600):
"""Create session with 1-hour TTL."""
session_id = str(uuid.uuid4())
if self.storage_type == 'redis':
self.redis_client.setex(
f"session:{session_id}",
ttl,
json.dumps(user_data)
)
else:
self.sessions[session_id] = {
'data': user_data,
'expires_at': time.time() + ttl
}
return session_id
def get_session(self, session_id):
"""Retrieve session data."""
if self.storage_type == 'redis':
data = self.redis_client.get(f"session:{session_id}")
return json.loads(data) if data else None
else:
session = self.sessions.get(session_id)
if session and time.time() < session['expires_at']:
return session['data']
return None
Configuration:
Benefits: - ✅ Sessions survive Core service restarts - ✅ Automatic expiration (no manual cleanup needed) - ✅ Shared sessions across multiple Core instances (future scalability) - ✅ Falls back to in-memory if Redis unavailable
Per-User Rate Limiting¶
Problem: IP-based rate limiting fails when multiple users share IP (NAT, corporate proxy).
Solution: Rate limiting based on JWT sub (subject) claim with IP fallback.
Implementation (All Services):
# In app/rate_limit_key.py
from flask import request, g
def get_user_id_or_ip():
"""
Extract user ID from JWT for per-user rate limiting.
Falls back to IP if no JWT.
"""
if hasattr(g, 'user') and g.user:
# Use JWT subject claim (username)
return f"user:{g.user.get('sub', 'unknown')}"
else:
# Fallback to IP address
return f"ip:{request.remote_addr}"
Usage in Routes:
from flask_limiter import Limiter
from app.rate_limit_key import get_user_id_or_ip
limiter = Limiter(
app=app,
key_func=get_user_id_or_ip, # Per-user limits
default_limits=["10000 per hour", "500 per minute"]
)
@app.route('/api/data')
@limiter.limit("100 per minute")
@token_required
def get_data():
# Rate limited per user, not per IP
return jsonify({'data': ...})
Benefits: - ✅ Fair rate limiting for users behind shared IPs - ✅ Prevents one user from exhausting IP quota - ✅ Better abuse prevention (tracks individual users) - ✅ Graceful fallback to IP if no JWT
Structured JSON Logging¶
Problem: Plain text logs are hard to parse, search, and analyze programmatically.
Solution: JSON-formatted logs with correlation IDs for distributed tracing.
Implementation (All Services):
# In app/structured_logger.py
import logging
import json
from flask import request, g
import uuid
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
'timestamp': self.formatTime(record),
'level': record.levelname,
'service': os.getenv('SERVICE_NAME', 'unknown'),
'correlation_id': getattr(g, 'correlation_id', 'none'),
'message': record.getMessage(),
}
# Add extra fields if present
if hasattr(record, 'user'):
log_data['user'] = record.user
if hasattr(record, 'extra'):
log_data['extra'] = record.extra
return json.dumps(log_data)
def init_logging(app):
"""Initialize structured JSON logging."""
if app.config.get('ENABLE_JSON_LOGGING', False):
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
app.logger.addHandler(handler)
@app.before_request
def assign_correlation_id():
"""Assign correlation ID to each request for tracing."""
g.correlation_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))
Example Log Entry:
{
"timestamp": "2025-11-22T10:30:00Z",
"level": "INFO",
"service": "ledger",
"correlation_id": "req-abc-123",
"user": "billing@example.com",
"message": "Bill archived successfully",
"extra": {
"account_number": "620547",
"invoice_number": "620547-202410",
"total_amount": 4275.00
}
}
Benefits: - ✅ Easily parsed by log aggregation tools (Elasticsearch, Datadog, etc.) - ✅ Correlation IDs for tracing requests across services - ✅ Structured extra data for better debugging - ✅ Machine-readable for automated monitoring
RFC 7807 Problem Details¶
Problem: Inconsistent error response formats make client error handling difficult.
Solution: Standardized RFC 7807 Problem Details format for all errors.
Implementation (All Services):
# In app/error_responses.py
from flask import jsonify
def problem_detail(title, status, detail=None, type_uri=None, **kwargs):
"""
Generate RFC 7807 Problem Details response.
Args:
title: Short human-readable summary
status: HTTP status code
detail: Human-readable explanation
type_uri: URI identifying the problem type
**kwargs: Additional fields
"""
problem = {
'type': type_uri or f'https://httpstatuses.com/{status}',
'title': title,
'status': status
}
if detail:
problem['detail'] = detail
problem.update(kwargs)
return jsonify(problem), status
# Error handlers
@app.errorhandler(404)
def not_found(error):
return problem_detail(
title='Not Found',
status=404,
detail='The requested resource does not exist.'
)
@app.errorhandler(429)
def rate_limit_exceeded(error):
return problem_detail(
title='Too Many Requests',
status=429,
detail='Rate limit exceeded. Try again later.'
)
@app.errorhandler(500)
def internal_error(error):
return problem_detail(
title='Internal Server Error',
status=500,
detail='An unexpected error occurred.'
)
Example Error Response:
{
"type": "https://httpstatuses.com/404",
"title": "Not Found",
"status": 404,
"detail": "Company with account number '999999' does not exist.",
"account_number": "999999"
}
Benefits: - ✅ Standardized error format across all services - ✅ Machine-readable for automated error handling - ✅ Human-readable error messages - ✅ Extensible with custom fields
Health Checks¶
Problem: No programmatic way to check if a service is healthy.
Solution: /health endpoint with component-level health checks.
Implementation (All Services):
# In health_check.py
import psutil
import requests
from extensions import db
def check_database():
"""Check if database is accessible."""
try:
db.session.execute('SELECT 1')
return {'status': 'healthy', 'message': 'Connected'}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
def check_disk_space():
"""Check if sufficient disk space available."""
usage = psutil.disk_usage('/')
free_gb = usage.free / (1024 ** 3)
usage_percent = usage.percent
status = 'healthy' if free_gb > 10 else 'degraded'
return {
'status': status,
'free_gb': round(free_gb, 2),
'usage_percent': round(usage_percent, 2)
}
def check_dependency(service_name, url):
"""Check if a dependency service is healthy."""
try:
start = time.time()
response = requests.get(f"{url}/health", timeout=5)
duration = (time.time() - start) * 1000
if response.status_code == 200:
return {
'status': 'healthy',
'response_time_ms': round(duration, 2)
}
else:
return {
'status': 'degraded',
'http_status': response.status_code
}
except Exception as e:
return {'status': 'unhealthy', 'error': str(e)}
@app.route('/health')
def health_check():
"""Comprehensive health check endpoint."""
checks = {
'database': check_database(),
'disk': check_disk_space(),
'dependencies': {
'core': check_dependency('core', CORE_SERVICE_URL),
'codex': check_dependency('codex', CODEX_SERVICE_URL)
}
}
# Determine overall status
statuses = [checks['database']['status'], checks['disk']['status']]
statuses.extend([c['status'] for c in checks['dependencies'].values()])
if any(s == 'unhealthy' for s in statuses):
overall_status = 'unhealthy'
http_status = 503
elif any(s == 'degraded' for s in statuses):
overall_status = 'degraded'
http_status = 200
else:
overall_status = 'healthy'
http_status = 200
return jsonify({
'status': overall_status,
'timestamp': datetime.utcnow().isoformat() + 'Z',
'service': os.getenv('SERVICE_NAME'),
'checks': checks
}), http_status
Example Health Check Response:
{
"status": "healthy",
"timestamp": "2025-11-22T10:30:00Z",
"service": "ledger",
"checks": {
"database": {
"status": "healthy",
"message": "Connected"
},
"disk": {
"status": "healthy",
"free_gb": 120.5,
"usage_percent": 35.2
},
"dependencies": {
"core": {
"status": "healthy",
"response_time_ms": 15.3
},
"codex": {
"status": "healthy",
"response_time_ms": 42.1
}
}
}
}
Benefits: - ✅ Automated monitoring and alerting - ✅ Load balancer health checks - ✅ Container orchestration (Kubernetes) ready - ✅ Component-level diagnostics
OpenAPI/Swagger Documentation¶
Problem: No automatic API documentation for developers.
Solution: OpenAPI 3.0 documentation at /docs endpoint.
Implementation (All Services):
# In app/__init__.py
from flask_swagger_ui import get_swaggerui_blueprint
# Swagger UI configuration
SWAGGER_URL = '/docs'
API_URL = '/static/openapi.json'
swaggerui_blueprint = get_swaggerui_blueprint(
SWAGGER_URL,
API_URL,
config={'app_name': "Ledger API"}
)
app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)
# static/openapi.json
openapi: 3.0.0
info:
title: Ledger API
version: 2.0.0
description: Billing calculation and invoicing service
paths:
/api/billing/{account_number}:
get:
summary: Get billing data for a company
parameters:
- name: account_number
in: path
required: true
schema:
type: string
- name: year
in: query
schema:
type: integer
- name: month
in: query
schema:
type: integer
responses:
'200':
description: Billing data
content:
application/json:
schema:
$ref: '#/components/schemas/BillingData'
Access:
- Local: http://localhost:5030/docs
- Via Nexus: https://server/ledger/docs
Benefits: - ✅ Interactive API documentation - ✅ Try API calls directly from browser - ✅ Auto-generated from OpenAPI spec - ✅ Standard format for API clients
19. Centralized Logging Architecture (Helm)¶
Helm provides centralized log aggregation from all HiveMatrix services to a PostgreSQL database for unified searching, filtering, and analysis.
Architecture¶
┌──────────────────────────────────────────────────────────┐
│ Centralized Logging Architecture │
│ │
│ Services (Core, Codex, Ledger, etc.) │
│ ↓ HTTP POST │
│ Helm Flask App (/api/logs) │
│ ↓ INSERT │
│ PostgreSQL (helm_db.log_entries table) │
│ ↓ SELECT │
│ Helm Web Interface (/logs) │
│ ↓ CLI │
│ logs_cli.py (command-line viewer) │
└──────────────────────────────────────────────────────────┘
Database Schema¶
CREATE TABLE log_entries (
id SERIAL PRIMARY KEY,
service VARCHAR(50) NOT NULL,
level VARCHAR(20) NOT NULL,
message TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
correlation_id VARCHAR(100),
user_email VARCHAR(255),
extra_data JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_log_service ON log_entries(service);
CREATE INDEX idx_log_level ON log_entries(level);
CREATE INDEX idx_log_timestamp ON log_entries(timestamp);
CREATE INDEX idx_log_correlation ON log_entries(correlation_id);
Service Integration¶
Each service sends logs to Helm via HTTP:
# In app/helm_logger.py
import requests
import logging
class HelmHandler(logging.Handler):
"""Send logs to Helm's centralized logging."""
def __init__(self, service_name, helm_url):
super().__init__()
self.service_name = service_name
self.helm_url = helm_url
def emit(self, record):
"""Send log entry to Helm."""
log_data = {
'service': self.service_name,
'level': record.levelname,
'message': record.getMessage(),
'timestamp': datetime.fromtimestamp(record.created).isoformat(),
'correlation_id': getattr(g, 'correlation_id', None),
'user_email': getattr(g, 'user', {}).get('email'),
'extra_data': getattr(record, 'extra', {})
}
try:
requests.post(
f"{self.helm_url}/api/logs",
json=log_data,
timeout=2
)
except:
pass # Don't fail if Helm is down
Log Viewing (CLI)¶
cd hivematrix-helm
source pyenv/bin/activate
# View logs from specific service
python logs_cli.py ledger --tail 50
# Filter by level
python logs_cli.py core --level ERROR --tail 100
# View all services
python logs_cli.py --tail 30
# Filter by correlation ID
python logs_cli.py --correlation req-abc-123
Log Viewing (Web Interface)¶
Helm Web Dashboard (http://localhost:5004/logs):
- Filter by service, level, date range
- Search log messages
- View correlation chains (trace requests across services)
- Export to CSV/JSON
Log Retention¶
Automatic Cleanup:
# In hivematrix-helm/cleanup_old_logs.py
from extensions import db
from datetime import datetime, timedelta
def cleanup_old_logs(days=30):
"""Delete logs older than specified days."""
cutoff_date = datetime.utcnow() - timedelta(days=days)
db.session.execute(
"DELETE FROM log_entries WHERE timestamp < :cutoff",
{'cutoff': cutoff_date}
)
db.session.commit()
Cron Job:
Benefits¶
- ✅ Single place to view logs from all services
- ✅ Correlation ID tracking for distributed tracing
- ✅ Searchable and filterable
- ✅ Historical log retention
- ✅ No need for log file management
20. Billing Architecture (Ledger)¶
Ledger is HiveMatrix's billing calculation and invoicing engine, with integrated Archive functionality for billing snapshots.
Billing Calculation Flow¶
┌──────────────────────────────────────────────────────────┐
│ Billing Calculation Engine │
│ │
│ 1. Fetch Base Data from Codex │
│ ├─ Company profile (billing plan, contract) │
│ ├─ Active users/contacts │
│ ├─ Active assets (workstations, servers, etc.) │
│ ├─ Backup usage data │
│ └─ Support tickets with billable hours │
│ │
│ 2. Determine Billing Plan │
│ ├─ Company's default plan from Codex │
│ └─ Apply client-specific plan override if exists │
│ │
│ 3. Calculate Base Charges │
│ ├─ User charges: Count × per-user rate │
│ ├─ Asset charges: Count by type × per-asset rate │
│ ├─ Backup charges: Base fee + overage │
│ └─ Ticket charges: Total hours × hourly rate │
│ │
│ 4. Apply Overrides │
│ ├─ Client-specific custom rates │
│ ├─ Asset-specific billing type changes │
│ ├─ User-specific free/paid/custom status │
│ ├─ Manual assets/users │
│ └─ Custom line items (one-off, recurring, yearly) │
│ │
│ 5. Generate Output │
│ ├─ Itemized receipt with all charges │
│ ├─ Total amount due │
│ ├─ CSV invoice for download │
│ └─ Optional: Billing snapshot for archive │
└──────────────────────────────────────────────────────────┘
Override System¶
Ledger provides a comprehensive override system for client-specific billing customization:
1. Client Billing Overrides (ClientBillingOverride model):
- Custom billing plan assignment
- Per-user cost override
- Per-workstation/server/VM/switch/firewall cost overrides
- Hourly rate override
- Backup pricing overrides
- Prepaid hours (monthly/yearly)
2. Asset Billing Overrides (AssetBillingOverride model):
- Change billing type for specific assets
- Example: Bill laptop as workstation instead of server
- Custom cost per asset
3. User Billing Overrides (UserBillingOverride model):
- Mark users as free, paid, or custom cost
- Example: Don't charge for CEO's account
4. Manual Assets (ManualAsset model):
- Add assets not in Codex/Datto
- Example: BYOD devices, third-party equipment
5. Manual Users (ManualUser model):
- Add users not in Codex/PSA
- Example: Contractors not in ticketing system
6. Custom Line Items (CustomLineItem model):
- Monthly recurring: Cloud hosting, licenses
- One-off: Network upgrades, project charges
- Yearly: Annual renewals billed specific month
Archive Integration¶
Billing Snapshots are immutable records of finalized bills stored in Ledger's database (formerly Archive service):
BillingSnapshot Model:
class BillingSnapshot(db.Model):
company_account_number = db.Column(db.String(50))
company_name = db.Column(db.String(255))
invoice_number = db.Column(db.String(100), unique=True) # ACCOUNT-YYYYMM
billing_year = db.Column(db.Integer)
billing_month = db.Column(db.Integer)
total_amount = db.Column(db.Numeric(10, 2))
billing_data_json = db.Column(db.Text) # Complete calculation
invoice_csv = db.Column(db.Text) # CSV invoice content
archived_at = db.Column(db.String(30))
created_by = db.Column(db.String(255))
Snapshot Creation:
# In app/archive/snapshot.py
def create_snapshot(account_number, year, month, created_by='admin'):
"""Create immutable billing snapshot."""
# Calculate billing
billing_data = get_billing_data_for_client(account_number, year, month)
# Generate invoice
invoice_csv = generate_invoice_csv(account_number, billing_data)
# Create snapshot
snapshot = BillingSnapshot(
company_account_number=account_number,
invoice_number=f"{account_number}-{year}{month:02d}",
billing_year=year,
billing_month=month,
total_amount=billing_data['receipt_data']['total'],
billing_data_json=json.dumps(billing_data),
invoice_csv=invoice_csv,
archived_at=datetime.utcnow().isoformat() + 'Z',
created_by=created_by
)
db.session.add(snapshot)
db.session.commit()
return snapshot.invoice_number
Benefits of Merge (Archive → Ledger): - ✅ Eliminates HTTP overhead between services - ✅ Atomic transactions (billing + archive in one operation) - ✅ Simpler deployment (one less service) - ✅ Better data integrity - ✅ Faster snapshot creation
Bulk Operations¶
Generate All Invoices:
# GET /invoices/bulk/download?year=2024&month=10
@app.route('/invoices/bulk/download')
@billing_required
def bulk_invoice_download():
"""Download ZIP of all company invoices."""
year = int(request.args.get('year', current_year))
month = int(request.args.get('month', current_month))
companies = get_all_companies()
# Create in-memory ZIP
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zip_file:
for company in companies:
billing_data = get_billing_data_for_client(
company['account_number'], year, month
)
csv_content = generate_invoice_csv(company['account_number'], billing_data)
filename = f"{company['name']}-{company['account_number']}-{year}{month:02d}.csv"
zip_file.writestr(filename, csv_content)
return send_file(
zip_buffer,
mimetype='application/zip',
as_attachment=True,
download_name=f'invoices-{year}-{month:02d}.zip'
)
Bulk Snapshot Creation:
# Archive all bills for October 2024
POST /archive/api/snapshot/bulk
{
"year": 2024,
"month": 10,
"account_numbers": null # null = all companies
}
Performance Optimization¶
Bulk Codex API:
Instead of N API calls for N companies:
# BAD: N+1 queries
companies = get_all_companies()
for company in companies:
assets = get_company_assets(company['account_number']) # N calls
users = get_company_users(company['account_number']) # N calls
Use bulk endpoint (1 call):
# GOOD: 1 bulk API call
all_data = call_service('codex', '/api/companies/bulk?include_tickets=true&year=2024')
# Returns companies with assets, users, tickets pre-populated
Performance Gain: Dashboard load time: 30s → 2s
See Ledger Service Documentation for complete billing engine details, override system, and API reference.
21. Graph Database Pattern (KnowledgeTree)¶
KnowledgeTree implements a Neo4j graph database for hierarchical knowledge management with full-text search.
Why Neo4j for Knowledge Management¶
Traditional relational databases struggle with hierarchical data: - ❌ Self-referential foreign keys are complex - ❌ Recursive queries (WITH RECURSIVE) are slow - ❌ Path queries require multiple JOINs - ❌ Tree structure changes are expensive
Neo4j excels at hierarchical data: - ✅ Natural graph representation of folders/items - ✅ Fast path traversal (ancestors, descendants) - ✅ Relationship-based queries (parent-child) - ✅ Full-text search with score ranking
Schema Design¶
// Node type: ContextItem
(:ContextItem {
id: String, // UUID
name: String, // Display name
content: String, // Markdown content
is_folder: Boolean, // true for folders, false for items
is_attached: Boolean, // Attached to tickets/companies
read_only: Boolean, // System items
created_at: String, // ISO timestamp
updated_at: String // ISO timestamp
})
// Relationship: PARENT_OF
(:ContextItem)-[:PARENT_OF]->(:ContextItem)
Example Hierarchy:
(root:ContextItem {name: "Root", is_folder: true})
-[:PARENT_OF]->(folder1:ContextItem {name: "PowerShell Scripts", is_folder: true})
-[:PARENT_OF]->(item1:ContextItem {name: "Restart Print Spooler", is_folder: false})
-[:PARENT_OF]->(item2:ContextItem {name: "Clear DNS Cache", is_folder: false})
-[:PARENT_OF]->(folder2:ContextItem {name: "Network Diagnostics", is_folder: true})
-[:PARENT_OF]->(item3:ContextItem {name: "Ping Test", is_folder: false})
Core Queries¶
Get All Root Items:
Get Children of Folder:
MATCH (parent:ContextItem {id: $parent_id})-[:PARENT_OF]->(child:ContextItem)
RETURN child
ORDER BY child.is_folder DESC, child.name ASC
Get Full Path (Breadcrumbs):
MATCH path = (root:ContextItem)<-[:PARENT_OF*]-(item:ContextItem {id: $item_id})
WHERE NOT (root)<-[:PARENT_OF]-()
RETURN [node IN nodes(path) | {id: node.id, name: node.name}] AS path
Move Item to New Parent:
// Delete old parent relationship
MATCH (item:ContextItem {id: $item_id})<-[r:PARENT_OF]-()
DELETE r
// Create new parent relationship
MATCH (new_parent:ContextItem {id: $new_parent_id}),
(item:ContextItem {id: $item_id})
CREATE (new_parent)-[:PARENT_OF]->(item)
Full-Text Search:
// Create full-text index
CREATE FULLTEXT INDEX contextItemSearch FOR (n:ContextItem) ON EACH [n.name, n.content]
// Search with score ranking
CALL db.index.fulltext.queryNodes('contextItemSearch', $query)
YIELD node, score
WHERE node.is_folder = false
RETURN node, score
ORDER BY score DESC
LIMIT 20
Integration with Codex¶
KnowledgeTree syncs data from Codex for AI assistant context:
Sync Companies:
# sync_codex_data.py
from app import app, neo4j_driver
from app.codex_client import get_all_companies
def sync_companies():
"""Sync companies from Codex to Neo4j for context."""
companies = get_all_companies()
with neo4j_driver.session() as session:
for company in companies:
session.execute_write(_create_company_node, company)
def _create_company_node(tx, company):
query = """
MERGE (c:Company {id: $id})
SET c.name = $name,
c.account_number = $account_number,
c.updated_at = datetime()
"""
tx.run(query,
id=company['id'],
name=company['name'],
account_number=company['account_number'])
Query Context for Ticket:
// Get all context attached to a ticket
MATCH (ticket:Ticket {number: $ticket_number})-[:HAS_CONTEXT]->(item:ContextItem)
RETURN item
ORDER BY item.name
API Patterns¶
Flask Integration:
# In app/__init__.py
from neo4j import GraphDatabase
neo4j_uri = os.getenv('NEO4J_URI', 'bolt://localhost:7687')
neo4j_user = os.getenv('NEO4J_USER', 'neo4j')
neo4j_password = os.getenv('NEO4J_PASSWORD', 'password')
neo4j_driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))
@app.teardown_appcontext
def close_neo4j_driver(error):
if neo4j_driver:
neo4j_driver.close()
Route Example:
@app.route('/api/browse/<path:folder_id>')
@token_required
def browse_folder(folder_id):
"""Get children of a folder."""
with neo4j_driver.session() as session:
result = session.run("""
MATCH (parent:ContextItem {id: $folder_id})-[:PARENT_OF]->(child:ContextItem)
RETURN child
ORDER BY child.is_folder DESC, child.name ASC
""", folder_id=folder_id)
items = [dict(record['child']) for record in result]
return jsonify({'items': items})
Benefits of Graph Database¶
For Knowledge Management: - ✅ Natural folder/item hierarchy - ✅ Fast ancestor/descendant queries - ✅ Easy tree restructuring - ✅ Relationship-based permissions (future)
For AI Assistant Context: - ✅ Quick context gathering for tickets - ✅ Related item discovery - ✅ Company-specific knowledge lookup - ✅ Full-text search with relevance scoring
Performance: - ✅ Traversal queries O(n) instead of O(n²) - ✅ No complex JOIN operations - ✅ Efficient path finding - ✅ Scales well with deep hierarchies
See KnowledgeTree Service Documentation for complete Neo4j implementation, API reference, and Cypher query examples.
22. Version History¶
-
4.2 - Comprehensive Service Documentation Integration (November 2025): Major ARCHITECTURE.md update incorporating findings from completed service documentation overhaul. Section 16 (Brainhair) completely rewritten with current architecture: Claude Code CLI integration (official Anthropic CLI), polling architecture with server-side response buffers, tools system (
claude_tools/), file-based IPC approval workflow, PHI/CJIS filtering via Microsoft Presidio with FirstNameLastInitialOperator, contract alignment workflow (5-step process), ClaudeSessionManager with PostgreSQL persistence and 30-minute idle cleanup. New Section 17: Service-to-service communication enhancements including service token caching (5-minute TTL, 4.5-minute cache) reducing Core load by ~90%. New Section 18: Production features across all services - Redis session persistence (Core), per-user JWT-based rate limiting, structured JSON logging with correlation IDs, RFC 7807 Problem Details error responses, component-level/healthendpoints, OpenAPI/Swagger documentation at/docs. New Section 19: Centralized logging architecture (Helm) with PostgreSQL log aggregation,logs_cli.pyviewer, correlation ID tracking, web dashboard, and 30-day retention. New Section 20: Billing architecture (Ledger) with 5-step calculation flow, comprehensive override system (6 types), Archive integration (merged from separate service), bulk operations, Codex bulk API optimization (30s→2s dashboard load). New Section 21: Graph database pattern (KnowledgeTree) with Neo4j schema, hierarchical queries, full-text search, Codex sync integration. Added cross-references to detailed service documentation throughout. Previous Section 17 (Version History) renumbered to Section 22. -
4.1 - Production Readiness & Service Consolidation (November 2025): Implemented comprehensive production-ready features across all services. Infrastructure improvements: Automated daily database backups with 30-day retention via systemd, comprehensive component-level health checks (
/healthendpoints), Redis session persistence for session survival across restarts, structured JSON logging with correlation IDs for distributed tracing. API improvements: OpenAPI/Swagger documentation at/docsendpoints across all services, RFC 7807 Problem Details for standardized machine-readable error responses, per-user JWT-based rate limiting to prevent shared IP abuse. Observability: Enhanced log viewer with correlation ID filtering, 30-second request timeouts to prevent hanging requests, service health dashboard with real-time latency metrics. Service consolidation: Merged Archive functionality into Ledger service (accessible at/ledger/archive/) for simplified architecture and improved performance, cancelled Resolve service. Optional pre-commit hooks for code quality (black, isort, flake8). -
4.0 - Brainhair AI Assistant & Approval Flow: Added comprehensive documentation for Brainhair AI assistant service including AI tools pattern, approval flow for write operations, file-based IPC mechanism, browser integration, PHI/CJIS filtering with Microsoft Presidio, security considerations, and debugging guides. All write operations in AI tools (update_billing.py, set_company_plan.py, manage_network_equipment.py, update_features.py) now require explicit user approval before execution via approval_helper.py. Automatic PHI filtering protects sensitive information in tool responses using Presidio entity detection and anonymization.
-
3.9 - Dynamic Service Discovery & Keycloak Auto-Configuration: Added
scan_all_services()toinstall_manager.pyfor automatic detection of allhivematrix-*services (not just registry). Services are discovered on everystart.shrun, allowing manual copies and git pulls to work seamlessly. Enhanced Keycloak setup with intelligent synchronization between Keycloak database andmaster_config.json- tracks fresh Keycloak installations (KEYCLOAK_FRESH_INSTALL), clears old config when reinstalling, and ensures realm/users are always configured. Two-way sync prevents configuration drift. Both improvements make the system more resilient and reduce manual configuration. -
3.8 - Documented apps_registry.json as the authoritative source for service configuration, with install_manager.py update-config generating both master_services.json and services.json automatically. Added archive service example to registry documentation.
-
3.7 - Documented master_services.json and services.json dual configuration system for service discovery and management
-
3.6 - Updated URL prefix handling to use werkzeug's ProxyFix middleware instead of custom PrefixMiddleware. Added X-Forwarded-Prefix header from Nexus. Documented cookie-based authentication fallback for AJAX requests with credentials: 'same-origin'.
- 3.5 - Added Development & Debugging Tools section: logs_cli.py for centralized log viewing, create_test_token.py for JWT token generation, test_with_token.sh for quick endpoint testing, and comprehensive debugging workflows
- 3.4 - Added comprehensive security architecture, security audit tool (security_audit.py), firewall generation, service binding requirements, and automated security checks in start.sh
- 3.3 - Added centralized configuration management (config_manager.py), auto-installation architecture (install_manager.py), unified startup script (start.sh), and comprehensive deployment documentation
- 3.2 - Added revokable session management, logout flow, token validation, Keycloak proxy on port 443
- 3.1 - Added service-to-service communication, permission levels, database best practices, external integrations
- 3.0 - Initial version with core architecture patterns