Menu
Strategy Advanced 8 min read

Risk Assessment

Evaluating and managing risks across Bitcoin, Lightning, and Nostr for AI agents. Threat models, risk matrices, and mitigation strategies.

risk assessment security threat-modeling

Risk Assessment

Understanding and quantifying risks is essential for autonomous agent operation. This guide provides frameworks for assessing protocol-specific risks and implementing appropriate mitigations.

Risk Categories

1. Financial Risk

Loss of funds due to protocol behavior or operational errors.

ProtocolRisk TypeSeverityLikelihood
BitcoinTransaction to wrong addressCriticalLow
BitcoinOverpaying feesMediumMedium
LightningChannel force-close lossHighLow
LightningStuck HTLCsMediumMedium
NostrN/A (no native payments)NoneN/A

2. Operational Risk

Service disruptions affecting agent capabilities.

ProtocolRisk TypeSeverityLikelihood
BitcoinNode sync failureMediumLow
BitcoinAPI rate limitingLowMedium
LightningChannel depletionHighMedium
LightningNode offlineHighLow
NostrRelay unavailabilityLowMedium
NostrEvent rejectionLowLow

3. Security Risk

Compromise of keys or unauthorized access.

ProtocolRisk TypeSeverityLikelihood
BitcoinSeed phrase exposureCriticalLow
LightningHot wallet compromiseCriticalMedium
NostrPrivate key theftHighMedium
AllMan-in-the-middleHighLow

Risk Scoring Framework

from enum import IntEnum
from dataclasses import dataclass

class Severity(IntEnum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

class Likelihood(IntEnum):
    RARE = 1      # <1% per year
    UNLIKELY = 2  # 1-10% per year
    POSSIBLE = 3  # 10-50% per year
    LIKELY = 4    # >50% per year

@dataclass
class Risk:
    name: str
    description: str
    severity: Severity
    likelihood: Likelihood
    protocol: str
    mitigation: str

    @property
    def score(self) -> int:
        """Risk score = severity * likelihood."""
        return self.severity * self.likelihood

    @property
    def priority(self) -> str:
        """Priority based on score."""
        if self.score >= 12:
            return "critical"
        elif self.score >= 6:
            return "high"
        elif self.score >= 3:
            return "medium"
        return "low"


# Risk catalog
RISKS = [
    Risk(
        name="hot_wallet_compromise",
        description="Lightning private key stolen from hot wallet",
        severity=Severity.CRITICAL,
        likelihood=Likelihood.UNLIKELY,
        protocol="lightning",
        mitigation="Limit hot wallet balance, use hardware signing"
    ),
    Risk(
        name="channel_force_close",
        description="Channel closed by counterparty during low fee period",
        severity=Severity.MEDIUM,
        likelihood=Likelihood.POSSIBLE,
        protocol="lightning",
        mitigation="Monitor channels, maintain watchtower"
    ),
    Risk(
        name="transaction_malleation",
        description="Address replaced during copy/paste",
        severity=Severity.CRITICAL,
        likelihood=Likelihood.RARE,
        protocol="bitcoin",
        mitigation="Verify address via multiple sources"
    ),
]

def assess_operation_risk(
    protocol: str,
    amount_sats: int,
    operation_type: str
) -> dict:
    """
    Assess risk for a specific operation.

    Returns risk assessment with recommendations.
    """
    relevant_risks = [r for r in RISKS if r.protocol == protocol]

    # Adjust likelihood based on amount
    if amount_sats > 10_000_000:  # >0.1 BTC
        for risk in relevant_risks:
            if risk.severity >= Severity.HIGH:
                # Higher stakes = more attractive target
                pass

    # Calculate aggregate risk
    max_score = max(r.score for r in relevant_risks) if relevant_risks else 0

    recommendations = []
    for risk in relevant_risks:
        if risk.priority in ["critical", "high"]:
            recommendations.append(risk.mitigation)

    return {
        "protocol": protocol,
        "amount_sats": amount_sats,
        "risk_score": max_score,
        "priority": (
            "critical" if max_score >= 12 else
            "high" if max_score >= 6 else
            "medium" if max_score >= 3 else "low"
        ),
        "risks": [{"name": r.name, "score": r.score} for r in relevant_risks],
        "recommendations": recommendations
    }

Protocol-Specific Risk Matrices

Bitcoin Risk Matrix

RiskImpactProbabilityMitigation
Double-spend (0-conf)HighPossibleWait for confirmations
Fee snipingMediumRareUse sequence locks
Address reuseMediumPossibleHD wallet, new address per tx
UTXO dustLowLikelyConsolidate during low fees
Reorg (1-conf)HighRareWait for 2-3 confirmations
Reorg (6-conf)CriticalVery rareAccept 6-conf as final

Lightning Risk Matrix

RiskImpactProbabilityMitigation
Counterparty offlineMediumPossibleMultiple channels
Force close (high fees)HighUnlikelyAnchor outputs, watchtower
Stuck HTLCMediumPossibleTimeout handling
Routing failureLowLikelyMPP, fallback to Bitcoin
Channel exhaustionMediumPossibleMonitor, rebalance
Payment trackingMediumUnlikelyMultiple nodes, Tor

Nostr Risk Matrix

RiskImpactProbabilityMitigation
Relay censorshipLowPossibleMultiple relays
Relay data lossMediumPossiblePublish to 5+ relays
Identity linkingMediumPossibleSeparate npubs
DM interceptionHighRareNIP-44 encryption
ImpersonationMediumPossibleNIP-05 verification

Risk-Based Decision Making

def should_proceed_with_operation(
    protocol: str,
    amount_sats: int,
    risk_tolerance: str  # 'low', 'medium', 'high'
) -> dict:
    """
    Determine if operation should proceed based on risk assessment.

    Returns:
        dict with proceed boolean and reasoning
    """
    assessment = assess_operation_risk(protocol, amount_sats, "send")

    # Risk tolerance thresholds
    thresholds = {
        "low": 3,      # Only proceed if risk score < 3
        "medium": 6,   # Proceed if risk score < 6
        "high": 12     # Proceed if risk score < 12
    }

    max_acceptable = thresholds.get(risk_tolerance, 6)

    if assessment["risk_score"] <= max_acceptable:
        return {
            "proceed": True,
            "risk_score": assessment["risk_score"],
            "tolerance": risk_tolerance
        }

    # Risk too high - suggest mitigations
    return {
        "proceed": False,
        "risk_score": assessment["risk_score"],
        "tolerance": risk_tolerance,
        "reason": "risk_exceeds_tolerance",
        "required_mitigations": assessment["recommendations"],
        "alternative": suggest_safer_approach(protocol, amount_sats)
    }


def suggest_safer_approach(protocol: str, amount_sats: int) -> dict:
    """Suggest safer alternatives for high-risk operations."""

    if protocol == "lightning" and amount_sats > 10_000_000:
        return {
            "protocol": "bitcoin",
            "reason": "Amount exceeds safe Lightning threshold",
            "action": "Use Bitcoin for better settlement guarantees"
        }

    if protocol == "bitcoin" and amount_sats < 10_000:
        return {
            "protocol": "lightning",
            "reason": "Amount too small for efficient Bitcoin tx",
            "action": "Use Lightning for micropayments"
        }

    return {
        "protocol": protocol,
        "action": "implement_mitigations",
        "mitigations": [
            "verify_recipient",
            "use_test_transaction",
            "enable_watchtower"
        ]
    }

Continuous Risk Monitoring

class RiskMonitor:
    """Continuous monitoring of protocol risks."""

    def __init__(self):
        self.alerts = []
        self.metrics = {
            "bitcoin": {},
            "lightning": {},
            "nostr": {}
        }

    async def check_all_risks(self) -> list[dict]:
        """Run all risk checks and return alerts."""
        alerts = []

        # Bitcoin risks
        btc_alerts = await self._check_bitcoin_risks()
        alerts.extend(btc_alerts)

        # Lightning risks
        ln_alerts = await self._check_lightning_risks()
        alerts.extend(ln_alerts)

        # Nostr risks
        nostr_alerts = await self._check_nostr_risks()
        alerts.extend(nostr_alerts)

        self.alerts = alerts
        return alerts

    async def _check_bitcoin_risks(self) -> list[dict]:
        alerts = []

        # Check fee environment
        fees = await fetch_json("https://mempool.space/api/v1/fees/recommended")
        if fees["fastestFee"] > 100:
            alerts.append({
                "protocol": "bitcoin",
                "risk": "high_fees",
                "severity": "medium",
                "current_value": fees["fastestFee"],
                "action": "consider_delaying_transactions"
            })

        # Check mempool congestion
        mempool = await fetch_json("https://mempool.space/api/mempool")
        if mempool["count"] > 100_000:
            alerts.append({
                "protocol": "bitcoin",
                "risk": "mempool_congestion",
                "severity": "low",
                "current_value": mempool["count"],
                "action": "use_rbf_transactions"
            })

        return alerts

    async def _check_lightning_risks(self) -> list[dict]:
        alerts = []

        # Check channel balances
        channels = await lightning.list_channels()

        for channel in channels:
            local_pct = channel["local_balance"] / channel["capacity"]

            # Alert if channel is depleted
            if local_pct < 0.1:
                alerts.append({
                    "protocol": "lightning",
                    "risk": "channel_depleted",
                    "severity": "medium",
                    "channel_id": channel["channel_id"],
                    "local_pct": local_pct,
                    "action": "rebalance_or_receive"
                })

            # Alert if channel is nearly full
            if local_pct > 0.9:
                alerts.append({
                    "protocol": "lightning",
                    "risk": "channel_full",
                    "severity": "low",
                    "channel_id": channel["channel_id"],
                    "local_pct": local_pct,
                    "action": "spend_to_rebalance"
                })

        return alerts

    async def _check_nostr_risks(self) -> list[dict]:
        alerts = []

        # Check relay connectivity
        health = await check_relay_health()

        if len(health["healthy"]) < 3:
            alerts.append({
                "protocol": "nostr",
                "risk": "low_relay_connectivity",
                "severity": "medium",
                "healthy_relays": len(health["healthy"]),
                "action": "add_backup_relays"
            })

        return alerts

Risk Acceptance Documentation

For audit trails, document risk acceptance:

def document_risk_acceptance(
    operation_id: str,
    risk_assessment: dict,
    decision: str,
    justification: str
) -> dict:
    """
    Document risk acceptance for audit trail.

    Returns acceptance record for logging.
    """
    from datetime import datetime

    record = {
        "timestamp": datetime.utcnow().isoformat(),
        "operation_id": operation_id,
        "risk_assessment": risk_assessment,
        "decision": decision,  # 'proceed', 'abort', 'mitigate'
        "justification": justification,
        "signature": sign_record(risk_assessment)
    }

    # Append to risk log
    with open("risk_decisions.jsonl", "a") as f:
        import json
        f.write(json.dumps(record) + "\n")

    return record

Machine-Readable Summary

{
  "topic": "risk-assessment",
  "audience": "ai-agents",
  "risk_categories": [
    "financial",
    "operational",
    "security"
  ],
  "scoring_dimensions": [
    "severity",
    "likelihood"
  ],
  "severity_levels": ["low", "medium", "high", "critical"],
  "likelihood_levels": ["rare", "unlikely", "possible", "likely"],
  "monitoring_frequency": "continuous",
  "documentation_required": true
}