Skip to content

HiveMatrix Architecture - Service-Specific Implementation

Version 4.2 Part 3 of 3: Service-Specific Deep Dives

This document covers service-specific implementations, production features, and advanced patterns for Brainhair, Ledger, KnowledgeTree, Helm, and other specialized services. Read Architecture - Core and Architecture - Development first.

Other Parts: - Architecture - Core - Core concepts, authentication, service communication
- Architecture - Development - Development practices, security, tools, and patterns


16. Brainhair AI Assistant & Approval Flow

Brainhair is HiveMatrix's intelligent AI assistant service (port 5050) that enables natural language interaction with the entire platform through the official Claude Code CLI from Anthropic. It provides seamless integration with Claude AI for performing administrative tasks, answering questions, and managing system operations across all HiveMatrix services.

Design Goal: Brainhair can manage everything in the HiveMatrix platform. Any administrative task that can be performed through the web interface is also accessible via natural language commands through Brainhair. This includes creating, reading, updating, and deleting data across all services (Codex, Ledger, KnowledgeTree, etc.).

For comprehensive Brainhair documentation, see Brainhair Service Documentation.

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                   Brainhair Architecture                         │
│                                                                   │
│  User (Browser) ←──→ Flask Routes (port 5050)                   │
│                           │                                       │
│                           ├─ POST /api/chat                      │
│                           │  └─ Create response buffer           │
│                           │  └─ Spawn Claude Code subprocess     │
│                           │  └─ Return response_id               │
│                           │                                       │
│                           ├─ GET /api/chat/poll/<id>            │
│                           │  └─ Check for approval requests      │
│                           │  └─ Return new chunks + done status │
│                           │                                       │
│                           └─ POST /api/chat/approval-response   │
│                              └─ Write response file              │
│                                                                   │
│  ClaudeSessionManager                                            │
│      └─ Background cleanup (30-min idle timeout)                │
│      └─ PostgreSQL persistence (sessions + messages)           │
│      └─ Spawns: npx @anthropic-ai/claude-code                  │
│                                                                   │
│  Claude Code (Subprocess)                                        │
│      └─ Streams JSON output                                      │
│      └─ Calls tools from claude_tools/                          │
│      └─ Receives approval responses via file-based IPC          │
│                                                                   │
│  Tools System (claude_tools/*.py)                               │
│      └─ codex_tools.py, ledger_tools.py, etc.                  │
│      └─ Uses service-to-service auth                            │
│      └─ Requests approval for write operations                  │
│      └─ Applies PHI/CJIS filtering via Presidio                │
│                                                                   │
│  Approval Workflow (File-based IPC)                             │
│      1. Tool writes /tmp/brainhair_approval_request_*.json     │
│      2. Poll endpoint detects file, injects into stream        │
│      3. Browser displays approval modal                         │
│      4. User approves/denies                                    │
│      5. Browser writes /tmp/brainhair_approval_response_*.json │
│      6. Tool reads response, continues or exits                 │
└─────────────────────────────────────────────────────────────────┘

Claude Code CLI Integration

Brainhair uses the official Claude Code CLI from Anthropic (not custom ai_tools):

Installation:

# NPX (automatic)
npx -y @anthropic-ai/claude-code --version

# Or global install
npm install -g @anthropic-ai/claude-code

Subprocess Invocation:

# In claude_session_manager.py
command = [
    'npx', '-y', '@anthropic-ai/claude-code',
    '--model', 'claude-sonnet-4-5',
    '--allowed-tools', 'Bash,Read,Grep,Glob',
    '--permission-mode', 'dontAsk',
    '--output-format', 'stream-json',
    '--print',
    user_message
]

process = subprocess.Popen(
    command,
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE,
    cwd=self.working_directory,
    env=env
)

Why Claude Code CLI: - ✅ Official Anthropic tool (maintained and updated) - ✅ Streaming JSON output for real-time UI updates - ✅ Built-in tool system (Bash, Read, Grep, Glob) - ✅ Session management and conversation history - ✅ No need to maintain custom AI integration code

Polling Architecture

Brainhair uses server-side response buffering with client polling instead of Server-Sent Events (SSE) for better proxy compatibility:

Why Not SSE: - ❌ Proxies (Nexus, nginx, corporate) often buffer SSE responses - ❌ Timeout issues with long-running AI responses - ❌ Connection keep-alive challenges

Polling Flow:

1. User sends message → POST /api/chat
   → Creates response_id
   → Spawns Claude Code subprocess
   → Returns {"response_id": "abc123", "session_id": "session-uuid"}

2. Browser starts polling → GET /api/chat/poll/abc123?offset=0
   → Returns {"chunks": [...], "offset": 10, "done": false}

3. Poll loop continues every 500ms
   → GET /api/chat/poll/abc123?offset=10
   → Returns new chunks since last offset

4. Approval request detected (file-based IPC)
   → Poll returns {"chunks": [{"type": "approval_request", ...}]}
   → Browser shows approval modal
   → User approves/denies
   → POST /api/chat/approval-response

5. Claude Code finishes
   → Poll returns {"chunks": [...], "done": true, "session_id": "..."}
   → Browser stops polling

Response Buffer Structure:

response_buffers[response_id] = {
    'chunks': [],           # All output chunks
    'done': False,          # Completion flag
    'error': None,          # Error message if failed
    'created_at': time.time(),  # For cleanup
    'session_id': None,     # In-memory session ID
    'db_session_id': None,  # Database session ID
    'process': subprocess   # Claude Code process
}

Buffer Cleanup: - Buffers removed after 5 minutes of inactivity - Prevents memory leaks from abandoned polls

Tools System

Brainhair tools are Python modules in claude_tools/ directory (not ai_tools/):

Available Tools: - codex_tools.py - Query companies, assets, users, tickets from Codex - ledger_tools.py - Billing calculations and contract alignment - contract_tools.py - Contract analysis and billing alignment - knowledge_tools.py - Search and manage KnowledgeTree - system_tools.py - Service health checks and system status

Tool Structure:

# claude_tools/codex_tools.py
import os
from brainhair_auth import BrainhairAuth

def list_companies(limit=50):
    """List all companies from Codex."""
    auth = BrainhairAuth()
    response = auth.get('/api/codex/companies', params={'limit': limit})
    return response.json()

def get_company(account_number):
    """Get company details by account number."""
    auth = BrainhairAuth()
    response = auth.get(f'/api/codex/companies/{account_number}')
    return response.json()

BrainhairAuth Helper:

# claude_tools/brainhair_auth.py
class BrainhairAuth:
    """Helper for service-to-service authentication."""

    def __init__(self):
        self.session_id = os.environ.get('BRAINHAIR_SESSION_ID')
        self.user = os.environ.get('HIVEMATRIX_USER')
        self.base_url = os.environ.get('BRAINHAIR_URL', 'http://localhost:5050')

    def get_service_token(self):
        """Request service token from Brainhair Flask app."""
        response = requests.post(
            f"{self.base_url}/api/internal/service-token",
            json={'session_id': self.session_id}
        )
        return response.json()['token']

    def get(self, path, **kwargs):
        """Make authenticated GET request."""
        token = self.get_service_token()
        return requests.get(
            f"{self.base_url}{path}",
            headers={'Authorization': f'Bearer {token}'},
            **kwargs
        )

Approval Workflow (File-Based IPC)

Critical Rule: All write operations (create, update, delete) must request user approval before executing.

Flow Diagram:

┌──────────────────────────────────────────────────────────────┐
│               Approval Workflow (File-Based IPC)              │
│                                                                │
│  1. Tool executes and needs approval                          │
│     └─ request_approval("Update billing", {...})             │
│                                                                │
│  2. Tool writes approval request file                         │
│     └─ /tmp/brainhair_approval_request_SESSION_TIMESTAMP.json│
│        {                                                       │
│          "type": "approval_request",                          │
│          "approval_id": "SESSION_TIMESTAMP",                  │
│          "session_id": "db-session-uuid",                     │
│          "action": "Update billing rates for Company X",      │
│          "details": {                                          │
│            "Company": "Acme Corp",                             │
│            "Per User Cost": "$15.00 → $18.00",                │
│            "Monthly Impact": "+$75.00"                         │
│          }                                                     │
│        }                                                       │
│                                                                │
│  3. Poll endpoint detects approval file                       │
│     └─ Scans /tmp/brainhair_approval_request_SESSION_*.json  │
│     └─ Injects into response stream as chunk                  │
│                                                                │
│  4. Browser receives approval chunk                           │
│     └─ Displays modal with action and details                 │
│     └─ User clicks "Approve" or "Deny"                        │
│                                                                │
│  5. Browser writes response file                              │
│     └─ POST /api/chat/approval-response                       │
│     └─ /tmp/brainhair_approval_response_APPROVAL_ID.json     │
│        {                                                       │
│          "approved": true,                                     │
│          "timestamp": "2025-11-22T10:30:00Z"                  │
│        }                                                       │
│                                                                │
│  6. Tool polls for response file (timeout: 5 minutes)        │
│     └─ Reads response                                          │
│     └─ If approved: continue execution                        │
│     └─ If denied: exit with error                             │
│     └─ If timeout: exit with timeout error                    │
│                                                                │
│  7. Cleanup                                                    │
│     └─ Both files deleted after response read                 │
└──────────────────────────────────────────────────────────────┘

Approval Helper (claude_tools/approval_helper.py):

def request_approval(action: str, details: dict, timeout: int = 300) -> bool:
    """
    Request user approval for a write operation.

    Args:
        action: Description of the action (e.g., "Update billing rates")
        details: Dict of details to show user
        timeout: Seconds to wait for response (default: 5 minutes)

    Returns:
        True if approved, False if denied or timeout
    """
    session_id = os.environ.get('BRAINHAIR_SESSION_ID')
    approval_id = f"{session_id}_{int(time.time() * 1000)}"

    # Write request file
    request_file = f"/tmp/brainhair_approval_request_{approval_id}.json"
    with open(request_file, 'w') as f:
        json.dump({
            'type': 'approval_request',
            'approval_id': approval_id,
            'session_id': session_id,
            'action': action,
            'details': details
        }, f)

    # Poll for response
    response_file = f"/tmp/brainhair_approval_response_{approval_id}.json"
    start_time = time.time()

    while time.time() - start_time < timeout:
        if os.path.exists(response_file):
            with open(response_file, 'r') as f:
                response = json.load(f)
            os.remove(request_file)
            os.remove(response_file)
            return response.get('approved', False)
        time.sleep(0.5)

    # Timeout cleanup
    if os.path.exists(request_file):
        os.remove(request_file)
    return False

Tools Requiring Approval: - update_billing.py - Change billing rates, add line items - set_company_plan.py - Assign/change billing plans - manage_network_equipment.py - Add/remove assets - update_features.py - Modify feature overrides - execute_powershell.py - Run remote commands (Datto RMM)

Read-Only Tools (No Approval): - list_companies.py - View company data - view_billing.py - Display billing information - search_knowledge.py - Search knowledge base - get_company_plan.py - Show plan details

PHI/CJIS Filtering with Microsoft Presidio

Critical Security Feature: Brainhair implements automatic PHI (Protected Health Information) and CJIS (Criminal Justice Information Systems) data filtering to prevent sensitive information from being exposed to Claude AI or logged inappropriately.

Implementation: Uses Microsoft Presidio for automated detection and anonymization of sensitive data in tool responses. All data flowing through Brainhair is filtered before being sent to Claude.

Filtered Entity Types:

PHI Entities (Default): - PERSON (names) - Anonymized to "FirstName L." format via custom FirstNameLastInitialOperator - EMAIL_ADDRESS - Replaced with <EMAIL_ADDRESS> - PHONE_NUMBER - Replaced with <PHONE_NUMBER> - US_SSN - Replaced with <US_SSN> - DATE_TIME - Replaced with <DATE_TIME> - LOCATION - Replaced with <LOCATION> - MEDICAL_LICENSE - Replaced with <MEDICAL_LICENSE> - US_DRIVER_LICENSE - Replaced with <US_DRIVER_LICENSE> - US_PASSPORT - Replaced with <US_PASSPORT> - CREDIT_CARD - Replaced with <CREDIT_CARD> - IP_ADDRESS - Replaced with <IP_ADDRESS>

CJIS Entities (Law Enforcement): - Subset of PHI: PERSON, US_SSN, US_DRIVER_LICENSE, DATE_TIME, LOCATION, IP_ADDRESS

Example Filtering:

// Before PHI filtering
{
  "requester": "John Smith",
  "email": "john.smith@example.com",
  "phone": "555-123-4567",
  "ssn": "123-45-6789",
  "ip_address": "192.168.1.100"
}

// After PHI filtering
{
  "requester": "John S.",
  "email": "<EMAIL_ADDRESS>",
  "phone": "<PHONE_NUMBER>",
  "ssn": "<US_SSN>",
  "ip_address": "<IP_ADDRESS>"
}

Custom FirstNameLastInitialOperator: - Preserves first name for context (e.g., "Which John?") - Protects last name (e.g., "Smith" → "S.") - Better for AI understanding while maintaining privacy

Filtering in Tools:

All service API calls from tools pass through PHI filtering:

# In codex_tools.py
from app.presidio_filter import get_presidio_filter

def get_company_data(account_number):
    response = call_service('codex', f'/api/companies/{account_number}')
    data = response.json()

    # Apply PHI filtering
    presidio = get_presidio_filter()
    filtered_data = presidio.filter_phi(data)

    return filtered_data

When to Use: - PHI (default): Healthcare MSPs, general client data - CJIS: Law enforcement MSPs, criminal justice data - None: Internal operations with proper authorization only

See Brainhair PHI/CJIS Filtering Documentation for complete implementation details.

Contract Alignment

Contract Alignment allows Claude to analyze contract documents, extract billing terms, and automatically align Ledger billing settings to match the contract.

5-Step Workflow:

1. User provides contract text to Claude
   "Here's the contract for Example Company..."

2. Claude extracts billing terms using NLP
   → per_user_rate: $15.00
   → hourly_rate: $150.00
   → prepaid_hours_monthly: 4.0

3. Claude calls contract_tools.get_current_settings()
   → Fetches current billing from Ledger

4. Claude calls contract_tools.compare_contract_to_settings()
   → Identifies discrepancies
   → Generates recommendations

5. Claude presents findings to user
   "Found 3 discrepancies:"
   " - Per user cost: $12 → should be $15"
   " - Prepaid hours: 0 → should be 4"

6. User approves alignment (approval workflow)
   → Claude calls contract_tools.align_settings()
   → Requires approval (write operation)

7. Claude verifies alignment
   → contract_tools.verify_alignment()
   → "✓ Alignment complete. All settings match contract."

Contract Terms Structure:

contract_terms = {
    "billing_method": "per_user",     # Or: "flat_fee", "per_device"
    "per_user_rate": 15.00,
    "per_workstation_rate": 75.00,
    "per_server_rate": 125.00,
    "hourly_rate": 150.00,
    "prepaid_hours_monthly": 4.0,
    "support_level": "All Inclusive",
    "included_users": 50,              # Optional
    "custom_items": [                  # Optional
        {"name": "Extra backup", "monthly_fee": 100.00}
    ]
}

API Functions (contract_tools.py): - get_current_settings(account_number) - Get all current billing settings - compare_contract_to_settings(account_number, contract_terms) - Identify discrepancies - align_settings(account_number, adjustments, dry_run=False) - Apply adjustments (requires approval) - verify_alignment(account_number, contract_terms) - Verify settings match contract

See Brainhair Contract Alignment Documentation for complete API reference and examples.

Session Management

Brainhair implements database-backed session management with automatic cleanup and conversation persistence.

Session Lifecycle:

1. Session Creation
   ├─ POST /api/chat (first message)
   ├─ Generate UUID for in-memory session
   ├─ Create or resume database session (ChatSession)
   └─ Fetch context (ticket, client details)

2. Session Active
   ├─ Messages exchanged (user ←→ Claude)
   ├─ Claude Code subprocess spawned per message
   ├─ Conversation history maintained in memory
   └─ Last activity timestamp updated

3. Session Idle (30 minutes)
   ├─ No activity for 30 minutes
   ├─ Background cleanup thread detects idle
   ├─ Session removed from in-memory cache
   └─ Database session remains for history

4. Session Resumption
   ├─ POST /api/chat with db_session_id
   ├─ Loads messages from database (ChatMessage)
   ├─ Reconstructs conversation history
   └─ Continues conversation

ClaudeSessionManager:

from app.claude_session_manager import get_session_manager

session_manager = get_session_manager()  # Singleton

# Create new session
session_id = session_manager.create_session(
    user="john.technician",
    context={"ticket": "12345", "client": "ACME Corp"}
)

# Resume existing session
session_id = session_manager.create_session(
    user="john.technician",
    context={...},
    db_session_id="existing-db-uuid"
)

# Send message
for chunk in session_manager.get_session(session_id).send_message_stream("Hello"):
    print(chunk)

# Cleanup idle sessions (runs automatically every 5 minutes)
session_manager.cleanup_idle_sessions(max_age_seconds=1800)  # 30 min

Database Schema:

# ChatSession model
class ChatSession(db.Model):
    id = db.Column(db.String(36), primary_key=True)  # UUID
    user_id = db.Column(db.String(100))               # Username
    user_name = db.Column(db.String(255))             # Display name
    ticket_number = db.Column(db.String(50))          # Optional ticket
    client_name = db.Column(db.String(255))           # Optional client
    created_at = db.Column(db.DateTime)
    updated_at = db.Column(db.DateTime)
    messages = db.relationship('ChatMessage', backref='session')

# ChatMessage model
class ChatMessage(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    session_id = db.Column(db.String(36), ForeignKey('chat_sessions.id'))
    role = db.Column(db.String(20))  # 'user' or 'assistant'
    content = db.Column(db.Text)
    created_at = db.Column(db.DateTime)

Background Cleanup Thread:

def _cleanup_loop(self):
    """Runs every 5 minutes to remove idle sessions."""
    cleanup_interval = 300  # 5 minutes

    while not self._cleanup_stop_event.is_set():
        if self._cleanup_stop_event.wait(timeout=cleanup_interval):
            break

        # Remove sessions idle for 30+ minutes
        self.cleanup_idle_sessions(max_age_seconds=1800)

Session Persistence: - In-Memory: Active sessions for fast access - Database: All sessions + messages for history and resumption - Cleanup: Automatic removal of idle in-memory sessions - Resumption: Load from database on demand

See Brainhair Session Management Documentation for complete implementation details.

Security & Debugging

Security Considerations: - Tools run with service-level authentication (bypass user permission checks) - Approval system ensures human oversight for all write operations - Session ID tracking prevents cross-session approval hijacking - Timeout prevents tools from hanging indefinitely (default: 5 minutes) - Approval files cleaned up after use to prevent file system bloat - PHI/CJIS filtering prevents sensitive data exposure to AI and logs - All commands logged to Helm for audit trail

Debugging:

View Brainhair logs:

cd hivematrix-helm
source pyenv/bin/activate
python logs_cli.py brainhair --tail 50

Check for approval files:

ls -la /tmp/brainhair_approval_*

Monitor Claude Code subprocess:

# View Claude Code invocation logs
python logs_cli.py brainhair --tail 100 | grep "Claude Code"

For comprehensive troubleshooting, see Brainhair Troubleshooting Guide.


17. Service-to-Service Communication Enhancements

Service Token Caching

To improve performance and reduce load on Core, services implement service token caching with 5-minute expiration.

Without Caching (Old):

# Every API call requests a new token from Core
def call_service(service_name, path):
    token = get_service_token_from_core(service_name)  # HTTP call to Core
    response = requests.get(f"{service_url}{path}", headers={'Authorization': f'Bearer {token}'})
    return response

With Caching (New):

# In service_client.py
_token_cache = {}

def get_service_token(service_name):
    """Get cached token or request new one if expired."""
    cache_key = f"token:{service_name}"

    if cache_key in _token_cache:
        token, expires_at = _token_cache[cache_key]
        if time.time() < expires_at:
            return token  # Use cached token

    # Request new token from Core
    response = requests.post(
        f"{CORE_SERVICE_URL}/service-token",
        json={'calling_service': 'myservice', 'target_service': service_name}
    )
    token = response.json()['token']

    # Cache for 4.5 minutes (leave 30s buffer before 5-min expiration)
    _token_cache[cache_key] = (token, time.time() + 270)

    return token

Benefits: - ✅ Reduces HTTP calls to Core by ~90% - ✅ Faster service-to-service communication - ✅ Lower Core service load - ✅ Same security (tokens still expire after 5 minutes)

Implementation Status: - ✅ Core service (already implemented) - ✅ Codex service - ✅ Ledger service - ✅ KnowledgeTree service - ✅ Brainhair service


18. Production Features

HiveMatrix services implement comprehensive production-ready features for reliability, observability, and security.

Redis Session Persistence (Core)

Problem: In-memory sessions are lost on service restart, forcing users to re-login.

Solution: Sessions stored in Redis with automatic TTL expiration.

Implementation (Core Service):

# In hivematrix-core/app/session_manager.py
import redis
from flask import current_app

class SessionManager:
    def __init__(self):
        redis_url = current_app.config.get('REDIS_URL')
        if redis_url:
            self.redis_client = redis.from_url(redis_url)
            self.storage_type = 'redis'
        else:
            self.sessions = {}  # Fallback to in-memory
            self.storage_type = 'memory'

    def create_session(self, user_data, ttl=3600):
        """Create session with 1-hour TTL."""
        session_id = str(uuid.uuid4())

        if self.storage_type == 'redis':
            self.redis_client.setex(
                f"session:{session_id}",
                ttl,
                json.dumps(user_data)
            )
        else:
            self.sessions[session_id] = {
                'data': user_data,
                'expires_at': time.time() + ttl
            }

        return session_id

    def get_session(self, session_id):
        """Retrieve session data."""
        if self.storage_type == 'redis':
            data = self.redis_client.get(f"session:{session_id}")
            return json.loads(data) if data else None
        else:
            session = self.sessions.get(session_id)
            if session and time.time() < session['expires_at']:
                return session['data']
            return None

Configuration:

# .flaskenv for Core service
REDIS_URL=redis://localhost:6379/0

Benefits: - ✅ Sessions survive Core service restarts - ✅ Automatic expiration (no manual cleanup needed) - ✅ Shared sessions across multiple Core instances (future scalability) - ✅ Falls back to in-memory if Redis unavailable

Per-User Rate Limiting

Problem: IP-based rate limiting fails when multiple users share IP (NAT, corporate proxy).

Solution: Rate limiting based on JWT sub (subject) claim with IP fallback.

Implementation (All Services):

# In app/rate_limit_key.py
from flask import request, g

def get_user_id_or_ip():
    """
    Extract user ID from JWT for per-user rate limiting.
    Falls back to IP if no JWT.
    """
    if hasattr(g, 'user') and g.user:
        # Use JWT subject claim (username)
        return f"user:{g.user.get('sub', 'unknown')}"
    else:
        # Fallback to IP address
        return f"ip:{request.remote_addr}"

Usage in Routes:

from flask_limiter import Limiter
from app.rate_limit_key import get_user_id_or_ip

limiter = Limiter(
    app=app,
    key_func=get_user_id_or_ip,  # Per-user limits
    default_limits=["10000 per hour", "500 per minute"]
)

@app.route('/api/data')
@limiter.limit("100 per minute")
@token_required
def get_data():
    # Rate limited per user, not per IP
    return jsonify({'data': ...})

Benefits: - ✅ Fair rate limiting for users behind shared IPs - ✅ Prevents one user from exhausting IP quota - ✅ Better abuse prevention (tracks individual users) - ✅ Graceful fallback to IP if no JWT

Structured JSON Logging

Problem: Plain text logs are hard to parse, search, and analyze programmatically.

Solution: JSON-formatted logs with correlation IDs for distributed tracing.

Implementation (All Services):

# In app/structured_logger.py
import logging
import json
from flask import request, g
import uuid

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_data = {
            'timestamp': self.formatTime(record),
            'level': record.levelname,
            'service': os.getenv('SERVICE_NAME', 'unknown'),
            'correlation_id': getattr(g, 'correlation_id', 'none'),
            'message': record.getMessage(),
        }

        # Add extra fields if present
        if hasattr(record, 'user'):
            log_data['user'] = record.user
        if hasattr(record, 'extra'):
            log_data['extra'] = record.extra

        return json.dumps(log_data)

def init_logging(app):
    """Initialize structured JSON logging."""
    if app.config.get('ENABLE_JSON_LOGGING', False):
        handler = logging.StreamHandler()
        handler.setFormatter(JSONFormatter())
        app.logger.addHandler(handler)

@app.before_request
def assign_correlation_id():
    """Assign correlation ID to each request for tracing."""
    g.correlation_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4()))

Example Log Entry:

{
  "timestamp": "2025-11-22T10:30:00Z",
  "level": "INFO",
  "service": "ledger",
  "correlation_id": "req-abc-123",
  "user": "billing@example.com",
  "message": "Bill archived successfully",
  "extra": {
    "account_number": "620547",
    "invoice_number": "620547-202410",
    "total_amount": 4275.00
  }
}

Benefits: - ✅ Easily parsed by log aggregation tools (Elasticsearch, Datadog, etc.) - ✅ Correlation IDs for tracing requests across services - ✅ Structured extra data for better debugging - ✅ Machine-readable for automated monitoring

RFC 7807 Problem Details

Problem: Inconsistent error response formats make client error handling difficult.

Solution: Standardized RFC 7807 Problem Details format for all errors.

Implementation (All Services):

# In app/error_responses.py
from flask import jsonify

def problem_detail(title, status, detail=None, type_uri=None, **kwargs):
    """
    Generate RFC 7807 Problem Details response.

    Args:
        title: Short human-readable summary
        status: HTTP status code
        detail: Human-readable explanation
        type_uri: URI identifying the problem type
        **kwargs: Additional fields
    """
    problem = {
        'type': type_uri or f'https://httpstatuses.com/{status}',
        'title': title,
        'status': status
    }

    if detail:
        problem['detail'] = detail

    problem.update(kwargs)

    return jsonify(problem), status

# Error handlers
@app.errorhandler(404)
def not_found(error):
    return problem_detail(
        title='Not Found',
        status=404,
        detail='The requested resource does not exist.'
    )

@app.errorhandler(429)
def rate_limit_exceeded(error):
    return problem_detail(
        title='Too Many Requests',
        status=429,
        detail='Rate limit exceeded. Try again later.'
    )

@app.errorhandler(500)
def internal_error(error):
    return problem_detail(
        title='Internal Server Error',
        status=500,
        detail='An unexpected error occurred.'
    )

Example Error Response:

{
  "type": "https://httpstatuses.com/404",
  "title": "Not Found",
  "status": 404,
  "detail": "Company with account number '999999' does not exist.",
  "account_number": "999999"
}

Benefits: - ✅ Standardized error format across all services - ✅ Machine-readable for automated error handling - ✅ Human-readable error messages - ✅ Extensible with custom fields

Health Checks

Problem: No programmatic way to check if a service is healthy.

Solution: /health endpoint with component-level health checks.

Implementation (All Services):

# In health_check.py
import psutil
import requests
from extensions import db

def check_database():
    """Check if database is accessible."""
    try:
        db.session.execute('SELECT 1')
        return {'status': 'healthy', 'message': 'Connected'}
    except Exception as e:
        return {'status': 'unhealthy', 'error': str(e)}

def check_disk_space():
    """Check if sufficient disk space available."""
    usage = psutil.disk_usage('/')
    free_gb = usage.free / (1024 ** 3)
    usage_percent = usage.percent

    status = 'healthy' if free_gb > 10 else 'degraded'
    return {
        'status': status,
        'free_gb': round(free_gb, 2),
        'usage_percent': round(usage_percent, 2)
    }

def check_dependency(service_name, url):
    """Check if a dependency service is healthy."""
    try:
        start = time.time()
        response = requests.get(f"{url}/health", timeout=5)
        duration = (time.time() - start) * 1000

        if response.status_code == 200:
            return {
                'status': 'healthy',
                'response_time_ms': round(duration, 2)
            }
        else:
            return {
                'status': 'degraded',
                'http_status': response.status_code
            }
    except Exception as e:
        return {'status': 'unhealthy', 'error': str(e)}

@app.route('/health')
def health_check():
    """Comprehensive health check endpoint."""
    checks = {
        'database': check_database(),
        'disk': check_disk_space(),
        'dependencies': {
            'core': check_dependency('core', CORE_SERVICE_URL),
            'codex': check_dependency('codex', CODEX_SERVICE_URL)
        }
    }

    # Determine overall status
    statuses = [checks['database']['status'], checks['disk']['status']]
    statuses.extend([c['status'] for c in checks['dependencies'].values()])

    if any(s == 'unhealthy' for s in statuses):
        overall_status = 'unhealthy'
        http_status = 503
    elif any(s == 'degraded' for s in statuses):
        overall_status = 'degraded'
        http_status = 200
    else:
        overall_status = 'healthy'
        http_status = 200

    return jsonify({
        'status': overall_status,
        'timestamp': datetime.utcnow().isoformat() + 'Z',
        'service': os.getenv('SERVICE_NAME'),
        'checks': checks
    }), http_status

Example Health Check Response:

{
  "status": "healthy",
  "timestamp": "2025-11-22T10:30:00Z",
  "service": "ledger",
  "checks": {
    "database": {
      "status": "healthy",
      "message": "Connected"
    },
    "disk": {
      "status": "healthy",
      "free_gb": 120.5,
      "usage_percent": 35.2
    },
    "dependencies": {
      "core": {
        "status": "healthy",
        "response_time_ms": 15.3
      },
      "codex": {
        "status": "healthy",
        "response_time_ms": 42.1
      }
    }
  }
}

Benefits: - ✅ Automated monitoring and alerting - ✅ Load balancer health checks - ✅ Container orchestration (Kubernetes) ready - ✅ Component-level diagnostics

OpenAPI/Swagger Documentation

Problem: No automatic API documentation for developers.

Solution: OpenAPI 3.0 documentation at /docs endpoint.

Implementation (All Services):

# In app/__init__.py
from flask_swagger_ui import get_swaggerui_blueprint

# Swagger UI configuration
SWAGGER_URL = '/docs'
API_URL = '/static/openapi.json'

swaggerui_blueprint = get_swaggerui_blueprint(
    SWAGGER_URL,
    API_URL,
    config={'app_name': "Ledger API"}
)

app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL)
# static/openapi.json
openapi: 3.0.0
info:
  title: Ledger API
  version: 2.0.0
  description: Billing calculation and invoicing service

paths:
  /api/billing/{account_number}:
    get:
      summary: Get billing data for a company
      parameters:
        - name: account_number
          in: path
          required: true
          schema:
            type: string
        - name: year
          in: query
          schema:
            type: integer
        - name: month
          in: query
          schema:
            type: integer
      responses:
        '200':
          description: Billing data
          content:
            application/json:
              schema:
                $ref: '#/components/schemas/BillingData'

Access: - Local: http://localhost:5030/docs - Via Nexus: https://server/ledger/docs

Benefits: - ✅ Interactive API documentation - ✅ Try API calls directly from browser - ✅ Auto-generated from OpenAPI spec - ✅ Standard format for API clients


19. Centralized Logging Architecture (Helm)

Helm provides centralized log aggregation from all HiveMatrix services to a PostgreSQL database for unified searching, filtering, and analysis.

Architecture

┌──────────────────────────────────────────────────────────┐
│          Centralized Logging Architecture                 │
│                                                            │
│  Services (Core, Codex, Ledger, etc.)                    │
│      ↓ HTTP POST                                          │
│  Helm Flask App (/api/logs)                              │
│      ↓ INSERT                                             │
│  PostgreSQL (helm_db.log_entries table)                  │
│      ↓ SELECT                                             │
│  Helm Web Interface (/logs)                              │
│      ↓ CLI                                                │
│  logs_cli.py (command-line viewer)                       │
└──────────────────────────────────────────────────────────┘

Database Schema

CREATE TABLE log_entries (
    id SERIAL PRIMARY KEY,
    service VARCHAR(50) NOT NULL,
    level VARCHAR(20) NOT NULL,
    message TEXT NOT NULL,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    correlation_id VARCHAR(100),
    user_email VARCHAR(255),
    extra_data JSONB,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_log_service ON log_entries(service);
CREATE INDEX idx_log_level ON log_entries(level);
CREATE INDEX idx_log_timestamp ON log_entries(timestamp);
CREATE INDEX idx_log_correlation ON log_entries(correlation_id);

Service Integration

Each service sends logs to Helm via HTTP:

# In app/helm_logger.py
import requests
import logging

class HelmHandler(logging.Handler):
    """Send logs to Helm's centralized logging."""

    def __init__(self, service_name, helm_url):
        super().__init__()
        self.service_name = service_name
        self.helm_url = helm_url

    def emit(self, record):
        """Send log entry to Helm."""
        log_data = {
            'service': self.service_name,
            'level': record.levelname,
            'message': record.getMessage(),
            'timestamp': datetime.fromtimestamp(record.created).isoformat(),
            'correlation_id': getattr(g, 'correlation_id', None),
            'user_email': getattr(g, 'user', {}).get('email'),
            'extra_data': getattr(record, 'extra', {})
        }

        try:
            requests.post(
                f"{self.helm_url}/api/logs",
                json=log_data,
                timeout=2
            )
        except:
            pass  # Don't fail if Helm is down

Log Viewing (CLI)

cd hivematrix-helm
source pyenv/bin/activate

# View logs from specific service
python logs_cli.py ledger --tail 50

# Filter by level
python logs_cli.py core --level ERROR --tail 100

# View all services
python logs_cli.py --tail 30

# Filter by correlation ID
python logs_cli.py --correlation req-abc-123

Log Viewing (Web Interface)

Helm Web Dashboard (http://localhost:5004/logs): - Filter by service, level, date range - Search log messages - View correlation chains (trace requests across services) - Export to CSV/JSON

Log Retention

Automatic Cleanup:

# In hivematrix-helm/cleanup_old_logs.py
from extensions import db
from datetime import datetime, timedelta

def cleanup_old_logs(days=30):
    """Delete logs older than specified days."""
    cutoff_date = datetime.utcnow() - timedelta(days=days)

    db.session.execute(
        "DELETE FROM log_entries WHERE timestamp < :cutoff",
        {'cutoff': cutoff_date}
    )
    db.session.commit()

Cron Job:

# Run daily at 2 AM
0 2 * * * cd /path/to/helm && pyenv/bin/python cleanup_old_logs.py

Benefits

  • ✅ Single place to view logs from all services
  • ✅ Correlation ID tracking for distributed tracing
  • ✅ Searchable and filterable
  • ✅ Historical log retention
  • ✅ No need for log file management

20. Billing Architecture (Ledger)

Ledger is HiveMatrix's billing calculation and invoicing engine, with integrated Archive functionality for billing snapshots.

Billing Calculation Flow

┌──────────────────────────────────────────────────────────┐
│              Billing Calculation Engine                   │
│                                                            │
│  1. Fetch Base Data from Codex                            │
│     ├─ Company profile (billing plan, contract)          │
│     ├─ Active users/contacts                              │
│     ├─ Active assets (workstations, servers, etc.)       │
│     ├─ Backup usage data                                  │
│     └─ Support tickets with billable hours               │
│                                                            │
│  2. Determine Billing Plan                                │
│     ├─ Company's default plan from Codex                 │
│     └─ Apply client-specific plan override if exists     │
│                                                            │
│  3. Calculate Base Charges                                │
│     ├─ User charges: Count × per-user rate               │
│     ├─ Asset charges: Count by type × per-asset rate     │
│     ├─ Backup charges: Base fee + overage                │
│     └─ Ticket charges: Total hours × hourly rate         │
│                                                            │
│  4. Apply Overrides                                       │
│     ├─ Client-specific custom rates                       │
│     ├─ Asset-specific billing type changes               │
│     ├─ User-specific free/paid/custom status             │
│     ├─ Manual assets/users                                │
│     └─ Custom line items (one-off, recurring, yearly)    │
│                                                            │
│  5. Generate Output                                       │
│     ├─ Itemized receipt with all charges                 │
│     ├─ Total amount due                                   │
│     ├─ CSV invoice for download                           │
│     └─ Optional: Billing snapshot for archive            │
└──────────────────────────────────────────────────────────┘

Override System

Ledger provides a comprehensive override system for client-specific billing customization:

1. Client Billing Overrides (ClientBillingOverride model): - Custom billing plan assignment - Per-user cost override - Per-workstation/server/VM/switch/firewall cost overrides - Hourly rate override - Backup pricing overrides - Prepaid hours (monthly/yearly)

2. Asset Billing Overrides (AssetBillingOverride model): - Change billing type for specific assets - Example: Bill laptop as workstation instead of server - Custom cost per asset

3. User Billing Overrides (UserBillingOverride model): - Mark users as free, paid, or custom cost - Example: Don't charge for CEO's account

4. Manual Assets (ManualAsset model): - Add assets not in Codex/Datto - Example: BYOD devices, third-party equipment

5. Manual Users (ManualUser model): - Add users not in Codex/PSA - Example: Contractors not in ticketing system

6. Custom Line Items (CustomLineItem model): - Monthly recurring: Cloud hosting, licenses - One-off: Network upgrades, project charges - Yearly: Annual renewals billed specific month

Archive Integration

Billing Snapshots are immutable records of finalized bills stored in Ledger's database (formerly Archive service):

BillingSnapshot Model:

class BillingSnapshot(db.Model):
    company_account_number = db.Column(db.String(50))
    company_name = db.Column(db.String(255))
    invoice_number = db.Column(db.String(100), unique=True)  # ACCOUNT-YYYYMM
    billing_year = db.Column(db.Integer)
    billing_month = db.Column(db.Integer)
    total_amount = db.Column(db.Numeric(10, 2))
    billing_data_json = db.Column(db.Text)  # Complete calculation
    invoice_csv = db.Column(db.Text)  # CSV invoice content
    archived_at = db.Column(db.String(30))
    created_by = db.Column(db.String(255))

Snapshot Creation:

# In app/archive/snapshot.py
def create_snapshot(account_number, year, month, created_by='admin'):
    """Create immutable billing snapshot."""

    # Calculate billing
    billing_data = get_billing_data_for_client(account_number, year, month)

    # Generate invoice
    invoice_csv = generate_invoice_csv(account_number, billing_data)

    # Create snapshot
    snapshot = BillingSnapshot(
        company_account_number=account_number,
        invoice_number=f"{account_number}-{year}{month:02d}",
        billing_year=year,
        billing_month=month,
        total_amount=billing_data['receipt_data']['total'],
        billing_data_json=json.dumps(billing_data),
        invoice_csv=invoice_csv,
        archived_at=datetime.utcnow().isoformat() + 'Z',
        created_by=created_by
    )

    db.session.add(snapshot)
    db.session.commit()

    return snapshot.invoice_number

Benefits of Merge (Archive → Ledger): - ✅ Eliminates HTTP overhead between services - ✅ Atomic transactions (billing + archive in one operation) - ✅ Simpler deployment (one less service) - ✅ Better data integrity - ✅ Faster snapshot creation

Bulk Operations

Generate All Invoices:

# GET /invoices/bulk/download?year=2024&month=10
@app.route('/invoices/bulk/download')
@billing_required
def bulk_invoice_download():
    """Download ZIP of all company invoices."""
    year = int(request.args.get('year', current_year))
    month = int(request.args.get('month', current_month))

    companies = get_all_companies()

    # Create in-memory ZIP
    zip_buffer = BytesIO()
    with zipfile.ZipFile(zip_buffer, 'w') as zip_file:
        for company in companies:
            billing_data = get_billing_data_for_client(
                company['account_number'], year, month
            )
            csv_content = generate_invoice_csv(company['account_number'], billing_data)

            filename = f"{company['name']}-{company['account_number']}-{year}{month:02d}.csv"
            zip_file.writestr(filename, csv_content)

    return send_file(
        zip_buffer,
        mimetype='application/zip',
        as_attachment=True,
        download_name=f'invoices-{year}-{month:02d}.zip'
    )

Bulk Snapshot Creation:

# Archive all bills for October 2024
POST /archive/api/snapshot/bulk
{
  "year": 2024,
  "month": 10,
  "account_numbers": null  # null = all companies
}

Performance Optimization

Bulk Codex API:

Instead of N API calls for N companies:

# BAD: N+1 queries
companies = get_all_companies()
for company in companies:
    assets = get_company_assets(company['account_number'])  # N calls
    users = get_company_users(company['account_number'])    # N calls

Use bulk endpoint (1 call):

# GOOD: 1 bulk API call
all_data = call_service('codex', '/api/companies/bulk?include_tickets=true&year=2024')
# Returns companies with assets, users, tickets pre-populated

Performance Gain: Dashboard load time: 30s → 2s

See Ledger Service Documentation for complete billing engine details, override system, and API reference.


21. Graph Database Pattern (KnowledgeTree)

KnowledgeTree implements a Neo4j graph database for hierarchical knowledge management with full-text search.

Why Neo4j for Knowledge Management

Traditional relational databases struggle with hierarchical data: - ❌ Self-referential foreign keys are complex - ❌ Recursive queries (WITH RECURSIVE) are slow - ❌ Path queries require multiple JOINs - ❌ Tree structure changes are expensive

Neo4j excels at hierarchical data: - ✅ Natural graph representation of folders/items - ✅ Fast path traversal (ancestors, descendants) - ✅ Relationship-based queries (parent-child) - ✅ Full-text search with score ranking

Schema Design

// Node type: ContextItem
(:ContextItem {
    id: String,           // UUID
    name: String,         // Display name
    content: String,      // Markdown content
    is_folder: Boolean,   // true for folders, false for items
    is_attached: Boolean, // Attached to tickets/companies
    read_only: Boolean,   // System items
    created_at: String,   // ISO timestamp
    updated_at: String    // ISO timestamp
})

// Relationship: PARENT_OF
(:ContextItem)-[:PARENT_OF]->(:ContextItem)

Example Hierarchy:

(root:ContextItem {name: "Root", is_folder: true})
  -[:PARENT_OF]->(folder1:ContextItem {name: "PowerShell Scripts", is_folder: true})
    -[:PARENT_OF]->(item1:ContextItem {name: "Restart Print Spooler", is_folder: false})
    -[:PARENT_OF]->(item2:ContextItem {name: "Clear DNS Cache", is_folder: false})
  -[:PARENT_OF]->(folder2:ContextItem {name: "Network Diagnostics", is_folder: true})
    -[:PARENT_OF]->(item3:ContextItem {name: "Ping Test", is_folder: false})

Core Queries

Get All Root Items:

MATCH (n:ContextItem)
WHERE NOT (n)<-[:PARENT_OF]-()
RETURN n
ORDER BY n.is_folder DESC, n.name ASC

Get Children of Folder:

MATCH (parent:ContextItem {id: $parent_id})-[:PARENT_OF]->(child:ContextItem)
RETURN child
ORDER BY child.is_folder DESC, child.name ASC

Get Full Path (Breadcrumbs):

MATCH path = (root:ContextItem)<-[:PARENT_OF*]-(item:ContextItem {id: $item_id})
WHERE NOT (root)<-[:PARENT_OF]-()
RETURN [node IN nodes(path) | {id: node.id, name: node.name}] AS path

Move Item to New Parent:

// Delete old parent relationship
MATCH (item:ContextItem {id: $item_id})<-[r:PARENT_OF]-()
DELETE r

// Create new parent relationship
MATCH (new_parent:ContextItem {id: $new_parent_id}),
      (item:ContextItem {id: $item_id})
CREATE (new_parent)-[:PARENT_OF]->(item)

Full-Text Search:

// Create full-text index
CREATE FULLTEXT INDEX contextItemSearch FOR (n:ContextItem) ON EACH [n.name, n.content]

// Search with score ranking
CALL db.index.fulltext.queryNodes('contextItemSearch', $query)
YIELD node, score
WHERE node.is_folder = false
RETURN node, score
ORDER BY score DESC
LIMIT 20

Integration with Codex

KnowledgeTree syncs data from Codex for AI assistant context:

Sync Companies:

# sync_codex_data.py
from app import app, neo4j_driver
from app.codex_client import get_all_companies

def sync_companies():
    """Sync companies from Codex to Neo4j for context."""
    companies = get_all_companies()

    with neo4j_driver.session() as session:
        for company in companies:
            session.execute_write(_create_company_node, company)

def _create_company_node(tx, company):
    query = """
    MERGE (c:Company {id: $id})
    SET c.name = $name,
        c.account_number = $account_number,
        c.updated_at = datetime()
    """
    tx.run(query,
           id=company['id'],
           name=company['name'],
           account_number=company['account_number'])

Query Context for Ticket:

// Get all context attached to a ticket
MATCH (ticket:Ticket {number: $ticket_number})-[:HAS_CONTEXT]->(item:ContextItem)
RETURN item
ORDER BY item.name

API Patterns

Flask Integration:

# In app/__init__.py
from neo4j import GraphDatabase

neo4j_uri = os.getenv('NEO4J_URI', 'bolt://localhost:7687')
neo4j_user = os.getenv('NEO4J_USER', 'neo4j')
neo4j_password = os.getenv('NEO4J_PASSWORD', 'password')

neo4j_driver = GraphDatabase.driver(neo4j_uri, auth=(neo4j_user, neo4j_password))

@app.teardown_appcontext
def close_neo4j_driver(error):
    if neo4j_driver:
        neo4j_driver.close()

Route Example:

@app.route('/api/browse/<path:folder_id>')
@token_required
def browse_folder(folder_id):
    """Get children of a folder."""
    with neo4j_driver.session() as session:
        result = session.run("""
            MATCH (parent:ContextItem {id: $folder_id})-[:PARENT_OF]->(child:ContextItem)
            RETURN child
            ORDER BY child.is_folder DESC, child.name ASC
        """, folder_id=folder_id)

        items = [dict(record['child']) for record in result]

    return jsonify({'items': items})

Benefits of Graph Database

For Knowledge Management: - ✅ Natural folder/item hierarchy - ✅ Fast ancestor/descendant queries - ✅ Easy tree restructuring - ✅ Relationship-based permissions (future)

For AI Assistant Context: - ✅ Quick context gathering for tickets - ✅ Related item discovery - ✅ Company-specific knowledge lookup - ✅ Full-text search with relevance scoring

Performance: - ✅ Traversal queries O(n) instead of O(n²) - ✅ No complex JOIN operations - ✅ Efficient path finding - ✅ Scales well with deep hierarchies

See KnowledgeTree Service Documentation for complete Neo4j implementation, API reference, and Cypher query examples.


22. Version History

  • 4.2 - Comprehensive Service Documentation Integration (November 2025): Major ARCHITECTURE.md update incorporating findings from completed service documentation overhaul. Section 16 (Brainhair) completely rewritten with current architecture: Claude Code CLI integration (official Anthropic CLI), polling architecture with server-side response buffers, tools system (claude_tools/), file-based IPC approval workflow, PHI/CJIS filtering via Microsoft Presidio with FirstNameLastInitialOperator, contract alignment workflow (5-step process), ClaudeSessionManager with PostgreSQL persistence and 30-minute idle cleanup. New Section 17: Service-to-service communication enhancements including service token caching (5-minute TTL, 4.5-minute cache) reducing Core load by ~90%. New Section 18: Production features across all services - Redis session persistence (Core), per-user JWT-based rate limiting, structured JSON logging with correlation IDs, RFC 7807 Problem Details error responses, component-level /health endpoints, OpenAPI/Swagger documentation at /docs. New Section 19: Centralized logging architecture (Helm) with PostgreSQL log aggregation, logs_cli.py viewer, correlation ID tracking, web dashboard, and 30-day retention. New Section 20: Billing architecture (Ledger) with 5-step calculation flow, comprehensive override system (6 types), Archive integration (merged from separate service), bulk operations, Codex bulk API optimization (30s→2s dashboard load). New Section 21: Graph database pattern (KnowledgeTree) with Neo4j schema, hierarchical queries, full-text search, Codex sync integration. Added cross-references to detailed service documentation throughout. Previous Section 17 (Version History) renumbered to Section 22.

  • 4.1 - Production Readiness & Service Consolidation (November 2025): Implemented comprehensive production-ready features across all services. Infrastructure improvements: Automated daily database backups with 30-day retention via systemd, comprehensive component-level health checks (/health endpoints), Redis session persistence for session survival across restarts, structured JSON logging with correlation IDs for distributed tracing. API improvements: OpenAPI/Swagger documentation at /docs endpoints across all services, RFC 7807 Problem Details for standardized machine-readable error responses, per-user JWT-based rate limiting to prevent shared IP abuse. Observability: Enhanced log viewer with correlation ID filtering, 30-second request timeouts to prevent hanging requests, service health dashboard with real-time latency metrics. Service consolidation: Merged Archive functionality into Ledger service (accessible at /ledger/archive/) for simplified architecture and improved performance, cancelled Resolve service. Optional pre-commit hooks for code quality (black, isort, flake8).

  • 4.0 - Brainhair AI Assistant & Approval Flow: Added comprehensive documentation for Brainhair AI assistant service including AI tools pattern, approval flow for write operations, file-based IPC mechanism, browser integration, PHI/CJIS filtering with Microsoft Presidio, security considerations, and debugging guides. All write operations in AI tools (update_billing.py, set_company_plan.py, manage_network_equipment.py, update_features.py) now require explicit user approval before execution via approval_helper.py. Automatic PHI filtering protects sensitive information in tool responses using Presidio entity detection and anonymization.

  • 3.9 - Dynamic Service Discovery & Keycloak Auto-Configuration: Added scan_all_services() to install_manager.py for automatic detection of all hivematrix-* services (not just registry). Services are discovered on every start.sh run, allowing manual copies and git pulls to work seamlessly. Enhanced Keycloak setup with intelligent synchronization between Keycloak database and master_config.json - tracks fresh Keycloak installations (KEYCLOAK_FRESH_INSTALL), clears old config when reinstalling, and ensures realm/users are always configured. Two-way sync prevents configuration drift. Both improvements make the system more resilient and reduce manual configuration.

  • 3.8 - Documented apps_registry.json as the authoritative source for service configuration, with install_manager.py update-config generating both master_services.json and services.json automatically. Added archive service example to registry documentation.

  • 3.7 - Documented master_services.json and services.json dual configuration system for service discovery and management

  • 3.6 - Updated URL prefix handling to use werkzeug's ProxyFix middleware instead of custom PrefixMiddleware. Added X-Forwarded-Prefix header from Nexus. Documented cookie-based authentication fallback for AJAX requests with credentials: 'same-origin'.

  • 3.5 - Added Development & Debugging Tools section: logs_cli.py for centralized log viewing, create_test_token.py for JWT token generation, test_with_token.sh for quick endpoint testing, and comprehensive debugging workflows
  • 3.4 - Added comprehensive security architecture, security audit tool (security_audit.py), firewall generation, service binding requirements, and automated security checks in start.sh
  • 3.3 - Added centralized configuration management (config_manager.py), auto-installation architecture (install_manager.py), unified startup script (start.sh), and comprehensive deployment documentation
  • 3.2 - Added revokable session management, logout flow, token validation, Keycloak proxy on port 443
  • 3.1 - Added service-to-service communication, permission levels, database best practices, external integrations
  • 3.0 - Initial version with core architecture patterns