Security Intermediate 7 min read
Auditing and Logging
Security auditing for AI agents. Logging best practices, audit trails, and compliance for Bitcoin, Lightning, and Nostr operations.
auditing logging compliance forensics
Auditing and Logging
Comprehensive logging enables incident response, compliance, and operational visibility. This guide covers what to log, how to store logs securely, and how to analyze them.
Logging Principles
What to Log
| Event Type | Log Level | Retention |
|---|---|---|
| Key operations | CRITICAL | Forever |
| Transactions | INFO | 7 years |
| Authentication | INFO | 1 year |
| Errors | ERROR | 90 days |
| Debug | DEBUG | 7 days |
What NOT to Log
| Data | Risk | Alternative |
|---|---|---|
| Private keys | Total compromise | Never log |
| Seed phrases | Total compromise | Never log |
| Full addresses | Privacy | Log prefix only |
| Balances | Targeting | Log ranges |
| Passwords | Credential theft | Never log |
Secure Logging Implementation
import hashlib
import json
from datetime import datetime
from dataclasses import dataclass, asdict
from enum import Enum
from typing import Any
class LogLevel(Enum):
DEBUG = 0
INFO = 1
WARNING = 2
ERROR = 3
CRITICAL = 4
@dataclass
class AuditEvent:
"""Structured audit log entry."""
timestamp: str
level: str
category: str
action: str
actor: str
details: dict
outcome: str
correlation_id: str | None = None
def to_json(self) -> str:
return json.dumps(asdict(self), sort_keys=True)
def hash(self) -> str:
"""Hash for integrity verification."""
return hashlib.sha256(self.to_json().encode()).hexdigest()
class SecureAuditLogger:
"""Secure audit logging with integrity verification."""
def __init__(self, log_path: str, encryption_key: bytes | None = None):
self.log_path = log_path
self.encryption_key = encryption_key
self.previous_hash = "0" * 64 # Genesis hash
def log(
self,
level: LogLevel,
category: str,
action: str,
actor: str,
details: dict,
outcome: str,
correlation_id: str | None = None
):
"""Log audit event with chain integrity."""
# Sanitize sensitive data
safe_details = self._sanitize(details)
event = AuditEvent(
timestamp=datetime.utcnow().isoformat(),
level=level.name,
category=category,
action=action,
actor=actor,
details=safe_details,
outcome=outcome,
correlation_id=correlation_id
)
# Chain hash for tamper detection
event_hash = event.hash()
chain_hash = hashlib.sha256(
(self.previous_hash + event_hash).encode()
).hexdigest()
log_entry = {
"event": asdict(event),
"event_hash": event_hash,
"chain_hash": chain_hash,
"previous_hash": self.previous_hash
}
self.previous_hash = chain_hash
# Write log
log_line = json.dumps(log_entry)
if self.encryption_key:
log_line = self._encrypt(log_line)
with open(self.log_path, "a") as f:
f.write(log_line + "\n")
def _sanitize(self, details: dict) -> dict:
"""Remove sensitive data from log details."""
sanitized = {}
for key, value in details.items():
# Never log these
if key in ["private_key", "seed", "password", "secret", "nsec"]:
continue
# Truncate addresses
if key in ["address", "destination"]:
if isinstance(value, str) and len(value) > 10:
sanitized[key] = f"{value[:8]}...{value[-4:]}"
continue
# Use ranges for amounts over threshold
if key in ["amount", "balance"] and isinstance(value, (int, float)):
if value > 1_000_000: # >0.01 BTC
magnitude = 10 ** (len(str(int(value))) - 1)
sanitized[key] = f"{magnitude}-{magnitude * 10} range"
continue
sanitized[key] = value
return sanitized
def _encrypt(self, data: str) -> str:
"""Encrypt log entry."""
from cryptography.fernet import Fernet
f = Fernet(self.encryption_key)
return f.encrypt(data.encode()).decode()
def verify_chain_integrity(self) -> dict:
"""Verify log chain hasn't been tampered with."""
with open(self.log_path, "r") as f:
lines = f.readlines()
previous = "0" * 64
for i, line in enumerate(lines):
if self.encryption_key:
line = self._decrypt(line)
entry = json.loads(line)
# Verify previous hash link
if entry["previous_hash"] != previous:
return {
"valid": False,
"error": f"Chain broken at line {i}",
"expected_previous": previous,
"found_previous": entry["previous_hash"]
}
# Verify event hash
event = AuditEvent(**entry["event"])
if event.hash() != entry["event_hash"]:
return {
"valid": False,
"error": f"Event tampered at line {i}"
}
# Verify chain hash
expected_chain = hashlib.sha256(
(previous + entry["event_hash"]).encode()
).hexdigest()
if expected_chain != entry["chain_hash"]:
return {
"valid": False,
"error": f"Chain hash invalid at line {i}"
}
previous = entry["chain_hash"]
return {"valid": True, "entries_verified": len(lines)}
Protocol-Specific Logging
Bitcoin Logging
class BitcoinAuditLogger:
"""Bitcoin-specific audit logging."""
def __init__(self, logger: SecureAuditLogger):
self.logger = logger
def log_transaction_sent(
self,
txid: str,
outputs: list[dict],
fee_sats: int,
correlation_id: str
):
"""Log outgoing Bitcoin transaction."""
self.logger.log(
level=LogLevel.INFO,
category="bitcoin.transaction",
action="send",
actor="agent",
details={
"txid": txid,
"output_count": len(outputs),
"fee_sats": fee_sats,
# Don't log full addresses or amounts
},
outcome="broadcast",
correlation_id=correlation_id
)
def log_transaction_confirmed(
self,
txid: str,
confirmations: int,
block_height: int
):
"""Log transaction confirmation."""
self.logger.log(
level=LogLevel.INFO,
category="bitcoin.transaction",
action="confirm",
actor="network",
details={
"txid": txid,
"confirmations": confirmations,
"block_height": block_height
},
outcome="confirmed"
)
Lightning Logging
class LightningAuditLogger:
"""Lightning-specific audit logging."""
def __init__(self, logger: SecureAuditLogger):
self.logger = logger
def log_payment_sent(
self,
payment_hash: str,
amount_sats: int,
hops: int,
fee_sats: int,
correlation_id: str
):
"""Log outgoing Lightning payment."""
self.logger.log(
level=LogLevel.INFO,
category="lightning.payment",
action="send",
actor="agent",
details={
"payment_hash": payment_hash[:16] + "...",
"amount_range": self._amount_range(amount_sats),
"hops": hops,
"fee_sats": fee_sats
},
outcome="settled",
correlation_id=correlation_id
)
def log_channel_opened(
self,
channel_id: str,
capacity_sats: int,
peer_alias: str
):
"""Log channel opening."""
self.logger.log(
level=LogLevel.INFO,
category="lightning.channel",
action="open",
actor="agent",
details={
"channel_id": channel_id[:16] + "...",
"capacity_range": self._amount_range(capacity_sats),
"peer": peer_alias[:20]
},
outcome="opened"
)
def _amount_range(self, sats: int) -> str:
"""Convert amount to range for privacy."""
if sats < 10_000:
return "<10k"
elif sats < 100_000:
return "10k-100k"
elif sats < 1_000_000:
return "100k-1M"
else:
return ">1M"
Nostr Logging
class NostrAuditLogger:
"""Nostr-specific audit logging."""
def __init__(self, logger: SecureAuditLogger):
self.logger = logger
def log_event_published(
self,
event_id: str,
kind: int,
relay_count: int,
correlation_id: str
):
"""Log published Nostr event."""
self.logger.log(
level=LogLevel.INFO,
category="nostr.event",
action="publish",
actor="agent",
details={
"event_id": event_id[:16] + "...",
"kind": kind,
"relays": relay_count
},
outcome="published",
correlation_id=correlation_id
)
def log_dm_sent(
self,
recipient_npub: str,
nip: int, # 04 or 44
correlation_id: str
):
"""Log encrypted DM sent."""
self.logger.log(
level=LogLevel.INFO,
category="nostr.dm",
action="send",
actor="agent",
details={
"recipient": recipient_npub[:12] + "...",
"encryption": f"NIP-{nip:02d}"
},
outcome="sent",
correlation_id=correlation_id
)
Log Analysis
Anomaly Detection
class LogAnalyzer:
"""Analyze logs for security anomalies."""
def __init__(self, log_path: str):
self.log_path = log_path
def detect_anomalies(self) -> list[dict]:
"""Detect security anomalies in logs."""
anomalies = []
events = self._load_events()
anomalies.extend(self._detect_unusual_volume(events))
anomalies.extend(self._detect_unusual_timing(events))
anomalies.extend(self._detect_failures(events))
return anomalies
def _detect_unusual_volume(self, events: list) -> list:
"""Detect unusual event volume."""
anomalies = []
# Group by hour
hourly = {}
for event in events:
hour = event["timestamp"][:13] # YYYY-MM-DDTHH
hourly[hour] = hourly.get(hour, 0) + 1
if not hourly:
return anomalies
# Calculate baseline
values = list(hourly.values())
avg = sum(values) / len(values)
std = (sum((v - avg) ** 2 for v in values) / len(values)) ** 0.5
# Flag hours with volume > 3 std dev above average
for hour, count in hourly.items():
if count > avg + 3 * std:
anomalies.append({
"type": "unusual_volume",
"hour": hour,
"count": count,
"expected": avg,
"severity": "medium"
})
return anomalies
def _detect_unusual_timing(self, events: list) -> list:
"""Detect events at unusual times."""
anomalies = []
for event in events:
# Parse hour
hour = int(event["timestamp"][11:13])
# Flag overnight activity (2-6 AM UTC)
if 2 <= hour <= 6:
if event["level"] in ["CRITICAL", "ERROR"]:
anomalies.append({
"type": "overnight_error",
"timestamp": event["timestamp"],
"action": event["action"],
"severity": "high"
})
return anomalies
def _detect_failures(self, events: list) -> list:
"""Detect failure patterns."""
anomalies = []
failures = [e for e in events if e["outcome"] in ["failed", "error"]]
# Check for repeated failures
failure_counts = {}
for f in failures:
key = f["action"]
failure_counts[key] = failure_counts.get(key, 0) + 1
for action, count in failure_counts.items():
if count > 10:
anomalies.append({
"type": "repeated_failures",
"action": action,
"count": count,
"severity": "high"
})
return anomalies
Compliance Requirements
Data Retention
import os
from datetime import datetime, timedelta
class LogRetention:
"""Manage log retention policies."""
RETENTION_DAYS = {
LogLevel.CRITICAL: 365 * 100, # Forever (effectively)
LogLevel.INFO: 365 * 7, # 7 years
LogLevel.WARNING: 365, # 1 year
LogLevel.ERROR: 90, # 90 days
LogLevel.DEBUG: 7 # 7 days
}
def cleanup_expired_logs(self, log_dir: str):
"""Remove logs past retention period."""
now = datetime.utcnow()
for filename in os.listdir(log_dir):
if not filename.endswith(".log"):
continue
filepath = os.path.join(log_dir, filename)
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
age_days = (now - mtime).days
# Parse log level from filename
level = self._parse_level_from_filename(filename)
max_age = self.RETENTION_DAYS.get(level, 90)
if age_days > max_age:
# Securely delete
self._secure_delete(filepath)
def _secure_delete(self, filepath: str):
"""Securely delete log file."""
# Overwrite with random data before deletion
size = os.path.getsize(filepath)
with open(filepath, "wb") as f:
f.write(os.urandom(size))
os.remove(filepath)
Machine-Readable Summary
{
"topic": "auditing",
"audience": "ai-agents",
"logging_principles": [
"sanitize_sensitive_data",
"chain_integrity",
"encrypted_storage",
"retention_policies"
],
"never_log": [
"private_keys",
"seed_phrases",
"passwords",
"full_balances"
],
"retention_periods": {
"critical": "forever",
"transactions": "7_years",
"errors": "90_days",
"debug": "7_days"
}
}