Common Attacks
Attack patterns and defenses for AI agents. Protocol-specific vulnerabilities and countermeasures for Bitcoin, Lightning, and Nostr.
Common Attacks
Understanding attack patterns enables better defenses. This guide covers known attacks against Bitcoin, Lightning, and Nostr, along with detection and prevention strategies.
Bitcoin Attacks
Double Spend Attack
Description: Spending the same UTXO twice by broadcasting conflicting transactions.
| Variant | Method | Target |
|---|---|---|
| Race attack | Two txs simultaneously | 0-conf merchants |
| Finney attack | Pre-mine block with different tx | 0-conf merchants |
| 51% attack | Reorg chain | Exchanges (deep confirms) |
Defense:
async def verify_payment(
txid: str,
min_confirmations: int = 6
) -> dict:
"""Verify payment with confirmation requirements."""
tx = await bitcoin.get_transaction(txid)
if tx["confirmations"] < min_confirmations:
return {
"status": "pending",
"confirmations": tx["confirmations"],
"required": min_confirmations,
"message": "Wait for more confirmations"
}
# For high-value, check for chain reorg risk
if tx["value"] > 10_000_000: # >0.1 BTC
block = await bitcoin.get_block(tx["blockhash"])
current_height = await bitcoin.get_block_count()
depth = current_height - block["height"]
if depth < 6:
return {
"status": "pending",
"depth": depth,
"message": "High-value tx needs deeper confirmation"
}
return {"status": "confirmed", "confirmations": tx["confirmations"]}
Address Poisoning
Description: Attacker sends dust to your wallet from a similar-looking address, hoping you’ll copy the wrong address for future sends.
Defense:
def verify_address_before_send(
address: str,
expected_first_chars: str,
expected_last_chars: str
) -> bool:
"""Verify address matches expected pattern."""
# Check prefix
if not address.startswith(expected_first_chars[:8]):
return False
# Check suffix
if not address.endswith(expected_last_chars[-8:]):
return False
# Additional: check against known addresses
return address in KNOWN_GOOD_ADDRESSES
# Never use address from transaction history without verification
async def send_payment(destination: str, amount: int):
# Always verify full address
if not verify_address_before_send(destination, ...):
raise SecurityError("Address verification failed")
Dust Attack
Description: Send tiny amounts to track UTXO spending patterns and link addresses.
Defense:
DUST_THRESHOLD = 546 # satoshis
def is_dust(utxo: dict) -> bool:
"""Check if UTXO is dust."""
return utxo["value"] < DUST_THRESHOLD
def filter_dust_utxos(utxos: list[dict]) -> list[dict]:
"""Remove dust UTXOs from coin selection."""
return [u for u in utxos if not is_dust(u)]
async def consolidate_dust_safely(dust_utxos: list[dict]):
"""
Consolidate dust during low-fee periods.
Only consolidate dust with other UTXOs you're already spending
to avoid creating a linking transaction.
"""
# Wait for low fees
fees = await bitcoin.get_fee_estimates()
if fees["hourFee"] > 5:
return {"status": "waiting", "current_fee": fees["hourFee"]}
# Consolidate
pass
Lightning Attacks
Channel Jamming
Description: Attacker creates pending HTLCs without settling, locking up channel liquidity.
| Type | Method | Duration |
|---|---|---|
| Slot jamming | Many small HTLCs | Until HTLC timeout |
| Amount jamming | Large HTLCs | Until HTLC timeout |
Defense:
class ChannelJammingDefense:
"""Defend against channel jamming attacks."""
def __init__(self):
self.pending_htlcs: dict[str, list] = {}
self.max_htlcs_per_peer = 20
self.max_pending_per_peer = 1_000_000 # sats
def should_accept_htlc(
self,
peer_id: str,
amount_sats: int
) -> bool:
"""Decide whether to accept incoming HTLC."""
peer_htlcs = self.pending_htlcs.get(peer_id, [])
# Check HTLC count limit
if len(peer_htlcs) >= self.max_htlcs_per_peer:
return False
# Check total pending amount
total_pending = sum(h["amount"] for h in peer_htlcs)
if total_pending + amount_sats > self.max_pending_per_peer:
return False
return True
async def circuit_breaker(
self,
channel_id: str,
utilization: float
):
"""Trigger circuit breaker if channel over-utilized."""
if utilization > 0.8:
# Stop accepting new HTLCs temporarily
await lightning.set_channel_policy(
channel_id,
max_htlc_msat=0
)
# Resume after cooldown
await asyncio.sleep(60)
await lightning.reset_channel_policy(channel_id)
Balance Probing
Description: Attacker makes payments that fail to discover channel balances.
Defense:
class BalanceProbingDefense:
"""Defend against balance probing."""
def __init__(self):
self.recent_failures: dict[str, list] = {}
def on_payment_failed(
self,
peer_id: str,
amount: int,
error_code: str
):
"""Track payment failures for probing detection."""
if peer_id not in self.recent_failures:
self.recent_failures[peer_id] = []
self.recent_failures[peer_id].append({
"amount": amount,
"error": error_code,
"time": time.time()
})
# Detect probing pattern
if self._is_probing(peer_id):
self._rate_limit_peer(peer_id)
def _is_probing(self, peer_id: str) -> bool:
"""Detect binary search probing pattern."""
failures = self.recent_failures.get(peer_id, [])
# Filter to recent failures
recent = [f for f in failures if time.time() - f["time"] < 300]
if len(recent) < 3:
return False
# Check for binary search pattern (amounts halving)
amounts = [f["amount"] for f in recent]
for i in range(len(amounts) - 2):
ratio = amounts[i+1] / amounts[i]
if 0.4 < ratio < 0.6: # Approximately halving
return True
return False
def _rate_limit_peer(self, peer_id: str):
"""Rate limit suspected probing peer."""
# Implement rate limiting
pass
# Alternative: add randomness to error messages
async def custom_failure_response(
amount: int,
actual_balance: int
) -> str:
"""Return randomized failure to prevent probing."""
if amount > actual_balance:
# Don't reveal exact failure point
if random.random() < 0.3:
return "TEMPORARY_CHANNEL_FAILURE"
else:
return "AMOUNT_BELOW_MINIMUM"
return None # Success
Force Close Griefing
Description: Malicious force-close during high-fee periods to maximize victim’s costs.
Defense:
class ForceCloseDefense:
"""Defend against force-close griefing."""
def __init__(self):
self.watchtower = WatchtowerClient()
async def monitor_peer_behavior(self, peer_id: str) -> dict:
"""Monitor peer for force-close risk indicators."""
peer_info = await lightning.get_peer_info(peer_id)
risk_factors = []
# Check uptime
if peer_info["uptime_percent"] < 95:
risk_factors.append("low_uptime")
# Check pending HTLCs
channels = await lightning.list_channels_with_peer(peer_id)
for channel in channels:
if channel["pending_htlcs"] > 10:
risk_factors.append("many_pending_htlcs")
# Check channel age
for channel in channels:
if channel["age_blocks"] < 144: # <1 day
risk_factors.append("new_channel")
return {
"peer_id": peer_id,
"risk_factors": risk_factors,
"risk_score": len(risk_factors)
}
async def prepare_for_force_close(self):
"""Ensure watchtower is monitoring all channels."""
channels = await lightning.list_channels()
for channel in channels:
await self.watchtower.register_channel(
channel["channel_id"],
channel["commitment_tx"]
)
Nostr Attacks
Impersonation Attack
Description: Attacker creates similar npub or uses similar display name.
Defense:
class ImpersonationDefense:
"""Defend against Nostr impersonation."""
def __init__(self):
self.verified_identities: dict[str, str] = {} # npub -> NIP-05
async def verify_identity(self, npub: str) -> dict:
"""Verify Nostr identity via NIP-05."""
profile = await nostr.get_profile(npub)
if not profile.get("nip05"):
return {"verified": False, "reason": "no_nip05"}
nip05 = profile["nip05"]
# Verify NIP-05
try:
verified_pubkey = await resolve_nip05(nip05)
if verified_pubkey == npub:
self.verified_identities[npub] = nip05
return {"verified": True, "nip05": nip05}
else:
return {"verified": False, "reason": "nip05_mismatch"}
except:
return {"verified": False, "reason": "nip05_resolution_failed"}
def is_trusted(self, npub: str) -> bool:
"""Check if npub is in trusted list."""
return npub in self.verified_identities
Relay Flooding
Description: Overwhelm relays with spam events.
Defense (as relay operator):
class SpamDefense:
"""Defend relay against spam."""
def __init__(self):
self.event_rates: dict[str, list] = {}
self.max_events_per_minute = 10
def should_accept_event(self, event: dict) -> bool:
"""Rate limit events by pubkey."""
pubkey = event["pubkey"]
if pubkey not in self.event_rates:
self.event_rates[pubkey] = []
# Clean old entries
now = time.time()
self.event_rates[pubkey] = [
t for t in self.event_rates[pubkey]
if now - t < 60
]
# Check rate
if len(self.event_rates[pubkey]) >= self.max_events_per_minute:
return False
self.event_rates[pubkey].append(now)
return True
def validate_proof_of_work(self, event: dict, min_pow: int = 16) -> bool:
"""Require proof of work for event acceptance."""
# NIP-13 proof of work
event_id = event["id"]
leading_zeros = count_leading_zeros(bytes.fromhex(event_id))
return leading_zeros >= min_pow
Metadata Correlation
Description: Link activities across identities via timing, content, or relay patterns.
Defense:
class CorrelationDefense:
"""Prevent identity correlation on Nostr."""
def __init__(self):
self.identities: dict[str, dict] = {}
async def post_with_decorrelation(
self,
identity: str,
content: str
):
"""Post with anti-correlation measures."""
# Use identity-specific relays
relays = self.identities[identity]["relays"]
# Add timing jitter
await asyncio.sleep(random.uniform(30, 300))
# Normalize posting time to round intervals
await wait_until_round_minute()
# Post
await nostr.post(
content,
private_key=self.identities[identity]["nsec"],
relays=relays
)
def verify_relay_isolation(self) -> bool:
"""Verify identities use different relay sets."""
all_relays = set()
for identity in self.identities.values():
identity_relays = set(identity["relays"])
if identity_relays & all_relays:
return False # Overlap detected
all_relays |= identity_relays
return True
Attack Detection
class AttackDetector:
"""Detect ongoing attacks across protocols."""
async def detect_all(self) -> list[dict]:
"""Run all detection checks."""
alerts = []
alerts.extend(await self._detect_bitcoin_attacks())
alerts.extend(await self._detect_lightning_attacks())
alerts.extend(await self._detect_nostr_attacks())
return alerts
async def _detect_bitcoin_attacks(self) -> list:
alerts = []
# Detect dust attacks
utxos = await bitcoin.list_utxos()
dust_count = sum(1 for u in utxos if u["value"] < 546)
if dust_count > 10:
alerts.append({
"type": "dust_attack",
"protocol": "bitcoin",
"severity": "medium",
"count": dust_count
})
return alerts
async def _detect_lightning_attacks(self) -> list:
alerts = []
# Detect jamming
channels = await lightning.list_channels()
for channel in channels:
if channel["pending_htlc_count"] > 30:
alerts.append({
"type": "possible_jamming",
"protocol": "lightning",
"channel": channel["channel_id"],
"severity": "high"
})
return alerts
async def _detect_nostr_attacks(self) -> list:
alerts = []
# Detect impersonation attempts
# Check for similar-looking npubs
pass
return alerts
Machine-Readable Summary
{
"topic": "common-attacks",
"audience": "ai-agents",
"attacks_by_protocol": {
"bitcoin": ["double_spend", "address_poisoning", "dust_attack"],
"lightning": ["channel_jamming", "balance_probing", "force_close_griefing"],
"nostr": ["impersonation", "relay_flooding", "metadata_correlation"]
},
"defense_strategies": [
"confirmation_requirements",
"rate_limiting",
"circuit_breakers",
"identity_verification",
"correlation_prevention"
]
}