Security Advanced 10 min read
Threat Modeling
Understanding adversaries and attack vectors for AI agents. STRIDE analysis, threat actors, and defensive strategies for Bitcoin, Lightning, and Nostr.
threats modeling adversaries attacks
Threat Modeling
Threat modeling is the process of identifying potential threats, understanding attackers, and designing appropriate defenses. For agents, this is essential because automated systems are attractive targets.
Threat Actors
Actor Categories
| Actor | Capability | Resources | Motivation |
|---|---|---|---|
| Script Kiddie | Low | Minimal | Opportunism, fame |
| Cybercriminal | Medium | Moderate | Financial gain |
| Hacktivist | Medium | Volunteer | Ideology |
| Competitor | Medium-High | Business | Market advantage |
| Nation State | Very High | Unlimited | Strategic |
| Insider | High (access) | Variable | Various |
Threat Actor Profiles
from enum import Enum
from dataclasses import dataclass
class ActorCapability(Enum):
LOW = 1 # Uses existing tools
MEDIUM = 2 # Can modify tools
HIGH = 3 # Can develop exploits
VERY_HIGH = 4 # Can develop 0-days
class ActorPersistence(Enum):
LOW = 1 # Gives up easily
MEDIUM = 2 # Tries multiple approaches
HIGH = 3 # Sustained campaigns
@dataclass
class ThreatActor:
name: str
capability: ActorCapability
persistence: ActorPersistence
targets: list[str]
motivations: list[str]
@property
def threat_level(self) -> int:
return self.capability.value * self.persistence.value
# Define common threat actors
THREAT_ACTORS = [
ThreatActor(
name="opportunistic_attacker",
capability=ActorCapability.LOW,
persistence=ActorPersistence.LOW,
targets=["exposed_keys", "weak_passwords", "unpatched_software"],
motivations=["quick_profit"]
),
ThreatActor(
name="targeted_criminal",
capability=ActorCapability.MEDIUM,
persistence=ActorPersistence.MEDIUM,
targets=["high_value_wallets", "exchange_hot_wallets"],
motivations=["financial_gain"]
),
ThreatActor(
name="state_actor",
capability=ActorCapability.VERY_HIGH,
persistence=ActorPersistence.HIGH,
targets=["privacy_tools", "financial_infrastructure"],
motivations=["surveillance", "sanctions_enforcement"]
)
]
STRIDE Framework
STRIDE is a threat classification framework:
| Category | Threat | Agent Impact |
|---|---|---|
| Spoofing | Identity forgery | Fake payment requests, impersonation |
| Tampering | Data modification | Transaction manipulation |
| Repudiation | Denying actions | Disputes without proof |
| Information Disclosure | Data leakage | Key exposure, balance leaks |
| Denial of Service | Availability attacks | Prevented from transacting |
| Elevation of Privilege | Unauthorized access | Key theft, system takeover |
STRIDE Analysis for Agent Operations
@dataclass
class ThreatEntry:
category: str # S, T, R, I, D, or E
asset: str
threat: str
likelihood: int # 1-5
impact: int # 1-5
mitigations: list[str]
@property
def risk_score(self) -> int:
return self.likelihood * self.impact
AGENT_THREATS = [
# Spoofing
ThreatEntry(
category="S",
asset="payment_requests",
threat="Attacker creates fake invoice appearing from known entity",
likelihood=3,
impact=4,
mitigations=[
"Verify invoice source out-of-band",
"Maintain known-good pubkey list",
"Use Nostr for payment coordination"
]
),
# Tampering
ThreatEntry(
category="T",
asset="transaction_data",
threat="MITM modifies transaction before signing",
likelihood=2,
impact=5,
mitigations=[
"Always verify final transaction before signing",
"Use hardware signing",
"Compare with independent source"
]
),
# Information Disclosure
ThreatEntry(
category="I",
asset="private_keys",
threat="Memory dump exposes keys",
likelihood=2,
impact=5,
mitigations=[
"Use secure memory (mlock)",
"Minimize key exposure time",
"Hardware security modules"
]
),
# Denial of Service
ThreatEntry(
category="D",
asset="lightning_node",
threat="Channel jamming prevents payments",
likelihood=3,
impact=3,
mitigations=[
"Multiple channels with diverse peers",
"Fee-based rate limiting",
"Circuit breaker patterns"
]
),
# Elevation of Privilege
ThreatEntry(
category="E",
asset="macaroon",
threat="Stolen macaroon grants full node access",
likelihood=2,
impact=5,
mitigations=[
"Restrict macaroon permissions",
"Short expiry times",
"IP binding"
]
)
]
Attack Trees
Attack trees visualize paths to compromise:
┌────────────────────────────┐
│ STEAL AGENT FUNDS │
└─────────────┬──────────────┘
│
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Compromise │ │ Compromise │ │ Social │
│ Keys │ │ Infrastructure│ │ Engineering │
└───────┬───────┘ └───────┬───────┘ └───────┬───────┘
│ │ │
┌───────┴───────┐ ┌───────┴───────┐ ┌───────┴───────┐
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌─────────┐ ┌─────────┐┌─────────┐┌─────────┐┌─────────┐┌─────────┐
│ Memory │ │ Backup ││ Server ││ API ││ Fake ││ Insider │
│ Dump │ │ Theft ││ Breach ││ Attack ││ Invoice ││ Threat │
└─────────┘ └─────────┘└─────────┘└─────────┘└─────────┘└─────────┘
Attack Tree Analysis
@dataclass
class AttackNode:
name: str
description: str
cost: int # 1-5, attacker cost
difficulty: int # 1-5
detectability: int # 1-5, higher = easier to detect
children: list["AttackNode"] = None
@property
def attack_score(self) -> float:
"""Lower score = more attractive attack."""
return (self.cost + self.difficulty) / (6 - self.detectability)
def analyze_attack_tree(root: AttackNode) -> dict:
"""Analyze attack tree to find most likely paths."""
def find_paths(node: AttackNode, path: list) -> list[list]:
current_path = path + [node]
if not node.children:
return [current_path]
all_paths = []
for child in node.children:
all_paths.extend(find_paths(child, current_path))
return all_paths
paths = find_paths(root, [])
# Score each path
scored_paths = []
for path in paths:
total_score = sum(n.attack_score for n in path)
scored_paths.append({
"path": [n.name for n in path],
"score": total_score,
"difficulty": sum(n.difficulty for n in path)
})
# Sort by score (lower = more likely attack)
scored_paths.sort(key=lambda x: x["score"])
return {
"most_likely_attack": scored_paths[0] if scored_paths else None,
"all_paths": scored_paths
}
Protocol-Specific Threats
Bitcoin Threats
| Threat | Attack Vector | Impact | Defense |
|---|---|---|---|
| Double spend | 0-conf, race attack | Loss of goods | Wait for confirmations |
| Address poisoning | Similar address in history | Funds to attacker | Verify full address |
| Dust attack | Tiny UTXOs for tracking | Privacy loss | Don’t spend dust |
| Fee sniping | Reorg with different tx | Fee loss | Use locktime |
Lightning Threats
| Threat | Attack Vector | Impact | Defense |
|---|---|---|---|
| Channel jamming | Hold HTLCs indefinitely | Liquidity lockup | Circuit breakers |
| Balance probing | Binary search payments | Privacy loss | Randomize errors |
| Wormhole attack | Steal routing fees | Fee theft | Atomic commitment |
| Time dilation | Eclipse + delay blocks | Force close theft | Multiple nodes |
Nostr Threats
| Threat | Attack Vector | Impact | Defense |
|---|---|---|---|
| Impersonation | Create similar npub | Reputation theft | NIP-05 verification |
| Relay censorship | Drop events | Availability | Multiple relays |
| Metadata logging | Relay logs activity | Privacy loss | Tor, multiple relays |
| Spam | Flood with events | Resource exhaustion | Paid relays, filters |
Threat Model Template
@dataclass
class ThreatModel:
"""Complete threat model for agent system."""
system_name: str
assets: list[dict] # What we're protecting
threat_actors: list[ThreatActor]
threats: list[ThreatEntry]
attack_trees: list[AttackNode]
assumptions: list[str]
mitigations: list[dict]
def generate_report(self) -> str:
"""Generate threat model report."""
high_risk = [t for t in self.threats if t.risk_score >= 15]
report = f"""
# Threat Model: {self.system_name}
## Assets
{self._format_assets()}
## Threat Actors
{self._format_actors()}
## High-Risk Threats
{self._format_threats(high_risk)}
## Recommended Mitigations
{self._format_mitigations()}
## Assumptions
{self._format_assumptions()}
"""
return report
# Example threat model
AGENT_THREAT_MODEL = ThreatModel(
system_name="Bitcoin/Lightning/Nostr Agent",
assets=[
{"name": "Bitcoin funds", "value": "high", "location": "cold + hot wallet"},
{"name": "Lightning channels", "value": "medium", "location": "hot wallet"},
{"name": "Nostr identity", "value": "medium", "location": "hot storage"},
{"name": "Operational logs", "value": "low", "location": "encrypted storage"}
],
threat_actors=THREAT_ACTORS,
threats=AGENT_THREATS,
attack_trees=[], # Define attack trees
assumptions=[
"Operating system is trusted",
"Hardware is not compromised",
"RNG is secure",
"Cryptographic primitives are sound"
],
mitigations=[
{"threat": "key_theft", "control": "hardware_signing", "status": "planned"},
{"threat": "dos", "control": "rate_limiting", "status": "implemented"}
]
)
Continuous Threat Assessment
class ThreatMonitor:
"""Continuous threat monitoring."""
def __init__(self, threat_model: ThreatModel):
self.threat_model = threat_model
self.indicators = []
async def check_indicators(self) -> list[dict]:
"""Check for indicators of compromise or attack."""
alerts = []
# Check for unusual transaction patterns
tx_pattern = await self._check_transaction_patterns()
if tx_pattern:
alerts.append(tx_pattern)
# Check for failed authentication attempts
auth_failures = await self._check_auth_failures()
if auth_failures:
alerts.append(auth_failures)
# Check for network anomalies
network_anomaly = await self._check_network()
if network_anomaly:
alerts.append(network_anomaly)
return alerts
async def _check_transaction_patterns(self) -> dict | None:
"""Detect unusual transaction patterns."""
# Implementation: compare recent activity to baseline
pass
async def _check_auth_failures(self) -> dict | None:
"""Detect authentication attack indicators."""
# Implementation: check for brute force patterns
pass
async def _check_network(self) -> dict | None:
"""Detect network-level attacks."""
# Implementation: check for eclipse attack indicators
pass
Machine-Readable Summary
{
"topic": "threat-modeling",
"audience": "ai-agents",
"frameworks": ["STRIDE", "attack_trees"],
"threat_actors": [
"opportunistic",
"targeted_criminal",
"nation_state",
"insider"
],
"stride_categories": [
"spoofing",
"tampering",
"repudiation",
"information_disclosure",
"denial_of_service",
"elevation_of_privilege"
],
"assessment_frequency": "continuous"
}