Menu
Security Intermediate 7 min read

Auditing and Logging

Security auditing for AI agents. Logging best practices, audit trails, and compliance for Bitcoin, Lightning, and Nostr operations.

auditing logging compliance forensics

Auditing and Logging

Comprehensive logging enables incident response, compliance, and operational visibility. This guide covers what to log, how to store logs securely, and how to analyze them.

Logging Principles

What to Log

Event TypeLog LevelRetention
Key operationsCRITICALForever
TransactionsINFO7 years
AuthenticationINFO1 year
ErrorsERROR90 days
DebugDEBUG7 days

What NOT to Log

DataRiskAlternative
Private keysTotal compromiseNever log
Seed phrasesTotal compromiseNever log
Full addressesPrivacyLog prefix only
BalancesTargetingLog ranges
PasswordsCredential theftNever log

Secure Logging Implementation

import hashlib
import json
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
from typing import Any

class LogLevel(Enum):
    DEBUG = 0
    INFO = 1
    WARNING = 2
    ERROR = 3
    CRITICAL = 4

@dataclass
class AuditEvent:
    """Structured audit log entry."""
    timestamp: str
    level: str
    category: str
    action: str
    actor: str
    details: dict
    outcome: str
    correlation_id: str | None = None

    def to_json(self) -> str:
        return json.dumps(asdict(self), sort_keys=True)

    def hash(self) -> str:
        """Hash for integrity verification."""
        return hashlib.sha256(self.to_json().encode()).hexdigest()


class SecureAuditLogger:
    """Secure audit logging with integrity verification."""

    def __init__(self, log_path: str, encryption_key: bytes | None = None):
        self.log_path = log_path
        self.encryption_key = encryption_key
        self.previous_hash = "0" * 64  # Genesis hash

    def log(
        self,
        level: LogLevel,
        category: str,
        action: str,
        actor: str,
        details: dict,
        outcome: str,
        correlation_id: str | None = None
    ):
        """Log audit event with chain integrity."""

        # Sanitize sensitive data
        safe_details = self._sanitize(details)

        event = AuditEvent(
            timestamp=datetime.utcnow().isoformat(),
            level=level.name,
            category=category,
            action=action,
            actor=actor,
            details=safe_details,
            outcome=outcome,
            correlation_id=correlation_id
        )

        # Chain hash for tamper detection
        event_hash = event.hash()
        chain_hash = hashlib.sha256(
            (self.previous_hash + event_hash).encode()
        ).hexdigest()

        log_entry = {
            "event": asdict(event),
            "event_hash": event_hash,
            "chain_hash": chain_hash,
            "previous_hash": self.previous_hash
        }

        self.previous_hash = chain_hash

        # Write log
        log_line = json.dumps(log_entry)
        if self.encryption_key:
            log_line = self._encrypt(log_line)

        with open(self.log_path, "a") as f:
            f.write(log_line + "\n")

    def _sanitize(self, details: dict) -> dict:
        """Remove sensitive data from log details."""
        sanitized = {}

        for key, value in details.items():
            # Never log these
            if key in ["private_key", "seed", "password", "secret", "nsec"]:
                continue

            # Truncate addresses
            if key in ["address", "destination"]:
                if isinstance(value, str) and len(value) > 10:
                    sanitized[key] = f"{value[:8]}...{value[-4:]}"
                    continue

            # Use ranges for amounts over threshold
            if key in ["amount", "balance"] and isinstance(value, (int, float)):
                if value > 1_000_000:  # >0.01 BTC
                    magnitude = 10 ** (len(str(int(value))) - 1)
                    sanitized[key] = f"{magnitude}-{magnitude * 10} range"
                    continue

            sanitized[key] = value

        return sanitized

    def _encrypt(self, data: str) -> str:
        """Encrypt log entry."""
        from cryptography.fernet import Fernet
        f = Fernet(self.encryption_key)
        return f.encrypt(data.encode()).decode()

    def verify_chain_integrity(self) -> dict:
        """Verify log chain hasn't been tampered with."""
        with open(self.log_path, "r") as f:
            lines = f.readlines()

        previous = "0" * 64
        for i, line in enumerate(lines):
            if self.encryption_key:
                line = self._decrypt(line)

            entry = json.loads(line)

            # Verify previous hash link
            if entry["previous_hash"] != previous:
                return {
                    "valid": False,
                    "error": f"Chain broken at line {i}",
                    "expected_previous": previous,
                    "found_previous": entry["previous_hash"]
                }

            # Verify event hash
            event = AuditEvent(**entry["event"])
            if event.hash() != entry["event_hash"]:
                return {
                    "valid": False,
                    "error": f"Event tampered at line {i}"
                }

            # Verify chain hash
            expected_chain = hashlib.sha256(
                (previous + entry["event_hash"]).encode()
            ).hexdigest()
            if expected_chain != entry["chain_hash"]:
                return {
                    "valid": False,
                    "error": f"Chain hash invalid at line {i}"
                }

            previous = entry["chain_hash"]

        return {"valid": True, "entries_verified": len(lines)}

Protocol-Specific Logging

Bitcoin Logging

class BitcoinAuditLogger:
    """Bitcoin-specific audit logging."""

    def __init__(self, logger: SecureAuditLogger):
        self.logger = logger

    def log_transaction_sent(
        self,
        txid: str,
        outputs: list[dict],
        fee_sats: int,
        correlation_id: str
    ):
        """Log outgoing Bitcoin transaction."""
        self.logger.log(
            level=LogLevel.INFO,
            category="bitcoin.transaction",
            action="send",
            actor="agent",
            details={
                "txid": txid,
                "output_count": len(outputs),
                "fee_sats": fee_sats,
                # Don't log full addresses or amounts
            },
            outcome="broadcast",
            correlation_id=correlation_id
        )

    def log_transaction_confirmed(
        self,
        txid: str,
        confirmations: int,
        block_height: int
    ):
        """Log transaction confirmation."""
        self.logger.log(
            level=LogLevel.INFO,
            category="bitcoin.transaction",
            action="confirm",
            actor="network",
            details={
                "txid": txid,
                "confirmations": confirmations,
                "block_height": block_height
            },
            outcome="confirmed"
        )

Lightning Logging

class LightningAuditLogger:
    """Lightning-specific audit logging."""

    def __init__(self, logger: SecureAuditLogger):
        self.logger = logger

    def log_payment_sent(
        self,
        payment_hash: str,
        amount_sats: int,
        hops: int,
        fee_sats: int,
        correlation_id: str
    ):
        """Log outgoing Lightning payment."""
        self.logger.log(
            level=LogLevel.INFO,
            category="lightning.payment",
            action="send",
            actor="agent",
            details={
                "payment_hash": payment_hash[:16] + "...",
                "amount_range": self._amount_range(amount_sats),
                "hops": hops,
                "fee_sats": fee_sats
            },
            outcome="settled",
            correlation_id=correlation_id
        )

    def log_channel_opened(
        self,
        channel_id: str,
        capacity_sats: int,
        peer_alias: str
    ):
        """Log channel opening."""
        self.logger.log(
            level=LogLevel.INFO,
            category="lightning.channel",
            action="open",
            actor="agent",
            details={
                "channel_id": channel_id[:16] + "...",
                "capacity_range": self._amount_range(capacity_sats),
                "peer": peer_alias[:20]
            },
            outcome="opened"
        )

    def _amount_range(self, sats: int) -> str:
        """Convert amount to range for privacy."""
        if sats < 10_000:
            return "<10k"
        elif sats < 100_000:
            return "10k-100k"
        elif sats < 1_000_000:
            return "100k-1M"
        else:
            return ">1M"

Nostr Logging

class NostrAuditLogger:
    """Nostr-specific audit logging."""

    def __init__(self, logger: SecureAuditLogger):
        self.logger = logger

    def log_event_published(
        self,
        event_id: str,
        kind: int,
        relay_count: int,
        correlation_id: str
    ):
        """Log published Nostr event."""
        self.logger.log(
            level=LogLevel.INFO,
            category="nostr.event",
            action="publish",
            actor="agent",
            details={
                "event_id": event_id[:16] + "...",
                "kind": kind,
                "relays": relay_count
            },
            outcome="published",
            correlation_id=correlation_id
        )

    def log_dm_sent(
        self,
        recipient_npub: str,
        nip: int,  # 04 or 44
        correlation_id: str
    ):
        """Log encrypted DM sent."""
        self.logger.log(
            level=LogLevel.INFO,
            category="nostr.dm",
            action="send",
            actor="agent",
            details={
                "recipient": recipient_npub[:12] + "...",
                "encryption": f"NIP-{nip:02d}"
            },
            outcome="sent",
            correlation_id=correlation_id
        )

Log Analysis

Anomaly Detection

class LogAnalyzer:
    """Analyze logs for security anomalies."""

    def __init__(self, log_path: str):
        self.log_path = log_path

    def detect_anomalies(self) -> list[dict]:
        """Detect security anomalies in logs."""
        anomalies = []

        events = self._load_events()

        anomalies.extend(self._detect_unusual_volume(events))
        anomalies.extend(self._detect_unusual_timing(events))
        anomalies.extend(self._detect_failures(events))

        return anomalies

    def _detect_unusual_volume(self, events: list) -> list:
        """Detect unusual event volume."""
        anomalies = []

        # Group by hour
        hourly = {}
        for event in events:
            hour = event["timestamp"][:13]  # YYYY-MM-DDTHH
            hourly[hour] = hourly.get(hour, 0) + 1

        if not hourly:
            return anomalies

        # Calculate baseline
        values = list(hourly.values())
        avg = sum(values) / len(values)
        std = (sum((v - avg) ** 2 for v in values) / len(values)) ** 0.5

        # Flag hours with volume > 3 std dev above average
        for hour, count in hourly.items():
            if count > avg + 3 * std:
                anomalies.append({
                    "type": "unusual_volume",
                    "hour": hour,
                    "count": count,
                    "expected": avg,
                    "severity": "medium"
                })

        return anomalies

    def _detect_unusual_timing(self, events: list) -> list:
        """Detect events at unusual times."""
        anomalies = []

        for event in events:
            # Parse hour
            hour = int(event["timestamp"][11:13])

            # Flag overnight activity (2-6 AM UTC)
            if 2 <= hour <= 6:
                if event["level"] in ["CRITICAL", "ERROR"]:
                    anomalies.append({
                        "type": "overnight_error",
                        "timestamp": event["timestamp"],
                        "action": event["action"],
                        "severity": "high"
                    })

        return anomalies

    def _detect_failures(self, events: list) -> list:
        """Detect failure patterns."""
        anomalies = []

        failures = [e for e in events if e["outcome"] in ["failed", "error"]]

        # Check for repeated failures
        failure_counts = {}
        for f in failures:
            key = f["action"]
            failure_counts[key] = failure_counts.get(key, 0) + 1

        for action, count in failure_counts.items():
            if count > 10:
                anomalies.append({
                    "type": "repeated_failures",
                    "action": action,
                    "count": count,
                    "severity": "high"
                })

        return anomalies

Compliance Requirements

Data Retention

import os
from datetime import datetime, timedelta

class LogRetention:
    """Manage log retention policies."""

    RETENTION_DAYS = {
        LogLevel.CRITICAL: 365 * 100,  # Forever (effectively)
        LogLevel.INFO: 365 * 7,        # 7 years
        LogLevel.WARNING: 365,          # 1 year
        LogLevel.ERROR: 90,              # 90 days
        LogLevel.DEBUG: 7               # 7 days
    }

    def cleanup_expired_logs(self, log_dir: str):
        """Remove logs past retention period."""
        now = datetime.utcnow()

        for filename in os.listdir(log_dir):
            if not filename.endswith(".log"):
                continue

            filepath = os.path.join(log_dir, filename)
            mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
            age_days = (now - mtime).days

            # Parse log level from filename
            level = self._parse_level_from_filename(filename)
            max_age = self.RETENTION_DAYS.get(level, 90)

            if age_days > max_age:
                # Securely delete
                self._secure_delete(filepath)

    def _secure_delete(self, filepath: str):
        """Securely delete log file."""
        # Overwrite with random data before deletion
        size = os.path.getsize(filepath)
        with open(filepath, "wb") as f:
            f.write(os.urandom(size))
        os.remove(filepath)

Machine-Readable Summary

{
  "topic": "auditing",
  "audience": "ai-agents",
  "logging_principles": [
    "sanitize_sensitive_data",
    "chain_integrity",
    "encrypted_storage",
    "retention_policies"
  ],
  "never_log": [
    "private_keys",
    "seed_phrases",
    "passwords",
    "full_balances"
  ],
  "retention_periods": {
    "critical": "forever",
    "transactions": "7_years",
    "errors": "90_days",
    "debug": "7_days"
  }
}