Menu
Security Advanced 9 min read

Common Attacks

Attack patterns and defenses for AI agents. Protocol-specific vulnerabilities and countermeasures for Bitcoin, Lightning, and Nostr.

attacks vulnerabilities defenses countermeasures

Common Attacks

Understanding attack patterns enables better defenses. This guide covers known attacks against Bitcoin, Lightning, and Nostr, along with detection and prevention strategies.

Bitcoin Attacks

Double Spend Attack

Description: Spending the same UTXO twice by broadcasting conflicting transactions.

VariantMethodTarget
Race attackTwo txs simultaneously0-conf merchants
Finney attackPre-mine block with different tx0-conf merchants
51% attackReorg chainExchanges (deep confirms)

Defense:

async def verify_payment(
    txid: str,
    min_confirmations: int = 6
) -> dict:
    """Verify payment with confirmation requirements."""

    tx = await bitcoin.get_transaction(txid)

    if tx["confirmations"] < min_confirmations:
        return {
            "status": "pending",
            "confirmations": tx["confirmations"],
            "required": min_confirmations,
            "message": "Wait for more confirmations"
        }

    # For high-value, check for chain reorg risk
    if tx["value"] > 10_000_000:  # >0.1 BTC
        block = await bitcoin.get_block(tx["blockhash"])
        current_height = await bitcoin.get_block_count()

        depth = current_height - block["height"]

        if depth < 6:
            return {
                "status": "pending",
                "depth": depth,
                "message": "High-value tx needs deeper confirmation"
            }

    return {"status": "confirmed", "confirmations": tx["confirmations"]}

Address Poisoning

Description: Attacker sends dust to your wallet from a similar-looking address, hoping you’ll copy the wrong address for future sends.

Defense:

def verify_address_before_send(
    address: str,
    expected_first_chars: str,
    expected_last_chars: str
) -> bool:
    """Verify address matches expected pattern."""

    # Check prefix
    if not address.startswith(expected_first_chars[:8]):
        return False

    # Check suffix
    if not address.endswith(expected_last_chars[-8:]):
        return False

    # Additional: check against known addresses
    return address in KNOWN_GOOD_ADDRESSES


# Never use address from transaction history without verification
async def send_payment(destination: str, amount: int):
    # Always verify full address
    if not verify_address_before_send(destination, ...):
        raise SecurityError("Address verification failed")

Dust Attack

Description: Send tiny amounts to track UTXO spending patterns and link addresses.

Defense:

DUST_THRESHOLD = 546  # satoshis

def is_dust(utxo: dict) -> bool:
    """Check if UTXO is dust."""
    return utxo["value"] < DUST_THRESHOLD


def filter_dust_utxos(utxos: list[dict]) -> list[dict]:
    """Remove dust UTXOs from coin selection."""
    return [u for u in utxos if not is_dust(u)]


async def consolidate_dust_safely(dust_utxos: list[dict]):
    """
    Consolidate dust during low-fee periods.

    Only consolidate dust with other UTXOs you're already spending
    to avoid creating a linking transaction.
    """
    # Wait for low fees
    fees = await bitcoin.get_fee_estimates()
    if fees["hourFee"] > 5:
        return {"status": "waiting", "current_fee": fees["hourFee"]}

    # Consolidate
    pass

Lightning Attacks

Channel Jamming

Description: Attacker creates pending HTLCs without settling, locking up channel liquidity.

TypeMethodDuration
Slot jammingMany small HTLCsUntil HTLC timeout
Amount jammingLarge HTLCsUntil HTLC timeout

Defense:

class ChannelJammingDefense:
    """Defend against channel jamming attacks."""

    def __init__(self):
        self.pending_htlcs: dict[str, list] = {}
        self.max_htlcs_per_peer = 20
        self.max_pending_per_peer = 1_000_000  # sats

    def should_accept_htlc(
        self,
        peer_id: str,
        amount_sats: int
    ) -> bool:
        """Decide whether to accept incoming HTLC."""
        peer_htlcs = self.pending_htlcs.get(peer_id, [])

        # Check HTLC count limit
        if len(peer_htlcs) >= self.max_htlcs_per_peer:
            return False

        # Check total pending amount
        total_pending = sum(h["amount"] for h in peer_htlcs)
        if total_pending + amount_sats > self.max_pending_per_peer:
            return False

        return True

    async def circuit_breaker(
        self,
        channel_id: str,
        utilization: float
    ):
        """Trigger circuit breaker if channel over-utilized."""
        if utilization > 0.8:
            # Stop accepting new HTLCs temporarily
            await lightning.set_channel_policy(
                channel_id,
                max_htlc_msat=0
            )

            # Resume after cooldown
            await asyncio.sleep(60)
            await lightning.reset_channel_policy(channel_id)

Balance Probing

Description: Attacker makes payments that fail to discover channel balances.

Defense:

class BalanceProbingDefense:
    """Defend against balance probing."""

    def __init__(self):
        self.recent_failures: dict[str, list] = {}

    def on_payment_failed(
        self,
        peer_id: str,
        amount: int,
        error_code: str
    ):
        """Track payment failures for probing detection."""
        if peer_id not in self.recent_failures:
            self.recent_failures[peer_id] = []

        self.recent_failures[peer_id].append({
            "amount": amount,
            "error": error_code,
            "time": time.time()
        })

        # Detect probing pattern
        if self._is_probing(peer_id):
            self._rate_limit_peer(peer_id)

    def _is_probing(self, peer_id: str) -> bool:
        """Detect binary search probing pattern."""
        failures = self.recent_failures.get(peer_id, [])

        # Filter to recent failures
        recent = [f for f in failures if time.time() - f["time"] < 300]

        if len(recent) < 3:
            return False

        # Check for binary search pattern (amounts halving)
        amounts = [f["amount"] for f in recent]
        for i in range(len(amounts) - 2):
            ratio = amounts[i+1] / amounts[i]
            if 0.4 < ratio < 0.6:  # Approximately halving
                return True

        return False

    def _rate_limit_peer(self, peer_id: str):
        """Rate limit suspected probing peer."""
        # Implement rate limiting
        pass


# Alternative: add randomness to error messages
async def custom_failure_response(
    amount: int,
    actual_balance: int
) -> str:
    """Return randomized failure to prevent probing."""
    if amount > actual_balance:
        # Don't reveal exact failure point
        if random.random() < 0.3:
            return "TEMPORARY_CHANNEL_FAILURE"
        else:
            return "AMOUNT_BELOW_MINIMUM"

    return None  # Success

Force Close Griefing

Description: Malicious force-close during high-fee periods to maximize victim’s costs.

Defense:

class ForceCloseDefense:
    """Defend against force-close griefing."""

    def __init__(self):
        self.watchtower = WatchtowerClient()

    async def monitor_peer_behavior(self, peer_id: str) -> dict:
        """Monitor peer for force-close risk indicators."""
        peer_info = await lightning.get_peer_info(peer_id)

        risk_factors = []

        # Check uptime
        if peer_info["uptime_percent"] < 95:
            risk_factors.append("low_uptime")

        # Check pending HTLCs
        channels = await lightning.list_channels_with_peer(peer_id)
        for channel in channels:
            if channel["pending_htlcs"] > 10:
                risk_factors.append("many_pending_htlcs")

        # Check channel age
        for channel in channels:
            if channel["age_blocks"] < 144:  # <1 day
                risk_factors.append("new_channel")

        return {
            "peer_id": peer_id,
            "risk_factors": risk_factors,
            "risk_score": len(risk_factors)
        }

    async def prepare_for_force_close(self):
        """Ensure watchtower is monitoring all channels."""
        channels = await lightning.list_channels()

        for channel in channels:
            await self.watchtower.register_channel(
                channel["channel_id"],
                channel["commitment_tx"]
            )

Nostr Attacks

Impersonation Attack

Description: Attacker creates similar npub or uses similar display name.

Defense:

class ImpersonationDefense:
    """Defend against Nostr impersonation."""

    def __init__(self):
        self.verified_identities: dict[str, str] = {}  # npub -> NIP-05

    async def verify_identity(self, npub: str) -> dict:
        """Verify Nostr identity via NIP-05."""
        profile = await nostr.get_profile(npub)

        if not profile.get("nip05"):
            return {"verified": False, "reason": "no_nip05"}

        nip05 = profile["nip05"]

        # Verify NIP-05
        try:
            verified_pubkey = await resolve_nip05(nip05)
            if verified_pubkey == npub:
                self.verified_identities[npub] = nip05
                return {"verified": True, "nip05": nip05}
            else:
                return {"verified": False, "reason": "nip05_mismatch"}
        except:
            return {"verified": False, "reason": "nip05_resolution_failed"}

    def is_trusted(self, npub: str) -> bool:
        """Check if npub is in trusted list."""
        return npub in self.verified_identities

Relay Flooding

Description: Overwhelm relays with spam events.

Defense (as relay operator):

class SpamDefense:
    """Defend relay against spam."""

    def __init__(self):
        self.event_rates: dict[str, list] = {}
        self.max_events_per_minute = 10

    def should_accept_event(self, event: dict) -> bool:
        """Rate limit events by pubkey."""
        pubkey = event["pubkey"]

        if pubkey not in self.event_rates:
            self.event_rates[pubkey] = []

        # Clean old entries
        now = time.time()
        self.event_rates[pubkey] = [
            t for t in self.event_rates[pubkey]
            if now - t < 60
        ]

        # Check rate
        if len(self.event_rates[pubkey]) >= self.max_events_per_minute:
            return False

        self.event_rates[pubkey].append(now)
        return True

    def validate_proof_of_work(self, event: dict, min_pow: int = 16) -> bool:
        """Require proof of work for event acceptance."""
        # NIP-13 proof of work
        event_id = event["id"]
        leading_zeros = count_leading_zeros(bytes.fromhex(event_id))

        return leading_zeros >= min_pow

Metadata Correlation

Description: Link activities across identities via timing, content, or relay patterns.

Defense:

class CorrelationDefense:
    """Prevent identity correlation on Nostr."""

    def __init__(self):
        self.identities: dict[str, dict] = {}

    async def post_with_decorrelation(
        self,
        identity: str,
        content: str
    ):
        """Post with anti-correlation measures."""

        # Use identity-specific relays
        relays = self.identities[identity]["relays"]

        # Add timing jitter
        await asyncio.sleep(random.uniform(30, 300))

        # Normalize posting time to round intervals
        await wait_until_round_minute()

        # Post
        await nostr.post(
            content,
            private_key=self.identities[identity]["nsec"],
            relays=relays
        )

    def verify_relay_isolation(self) -> bool:
        """Verify identities use different relay sets."""
        all_relays = set()
        for identity in self.identities.values():
            identity_relays = set(identity["relays"])
            if identity_relays & all_relays:
                return False  # Overlap detected
            all_relays |= identity_relays
        return True

Attack Detection

class AttackDetector:
    """Detect ongoing attacks across protocols."""

    async def detect_all(self) -> list[dict]:
        """Run all detection checks."""
        alerts = []

        alerts.extend(await self._detect_bitcoin_attacks())
        alerts.extend(await self._detect_lightning_attacks())
        alerts.extend(await self._detect_nostr_attacks())

        return alerts

    async def _detect_bitcoin_attacks(self) -> list:
        alerts = []

        # Detect dust attacks
        utxos = await bitcoin.list_utxos()
        dust_count = sum(1 for u in utxos if u["value"] < 546)
        if dust_count > 10:
            alerts.append({
                "type": "dust_attack",
                "protocol": "bitcoin",
                "severity": "medium",
                "count": dust_count
            })

        return alerts

    async def _detect_lightning_attacks(self) -> list:
        alerts = []

        # Detect jamming
        channels = await lightning.list_channels()
        for channel in channels:
            if channel["pending_htlc_count"] > 30:
                alerts.append({
                    "type": "possible_jamming",
                    "protocol": "lightning",
                    "channel": channel["channel_id"],
                    "severity": "high"
                })

        return alerts

    async def _detect_nostr_attacks(self) -> list:
        alerts = []

        # Detect impersonation attempts
        # Check for similar-looking npubs
        pass

        return alerts

Machine-Readable Summary

{
  "topic": "common-attacks",
  "audience": "ai-agents",
  "attacks_by_protocol": {
    "bitcoin": ["double_spend", "address_poisoning", "dust_attack"],
    "lightning": ["channel_jamming", "balance_probing", "force_close_griefing"],
    "nostr": ["impersonation", "relay_flooding", "metadata_correlation"]
  },
  "defense_strategies": [
    "confirmation_requirements",
    "rate_limiting",
    "circuit_breakers",
    "identity_verification",
    "correlation_prevention"
  ]
}