Menu
Security Advanced 8 min read

Network Security

Network-level security for AI agents. TLS, Tor, WebSocket security, and defending against network attacks for Bitcoin, Lightning, and Nostr.

network security tor tls websocket

Network Security

Network-level attacks can compromise agent operations even with perfect key management. This guide covers transport security, Tor integration, and network attack defenses.

Transport Layer Security

TLS Best Practices

SettingRecommendedWhy
TLS Version1.3 onlyForward secrecy, no weak ciphers
Certificate ValidationAlways verifyPrevent MITM
Certificate PinningFor known servicesDetect rogue CAs
HSTSEnablePrevent downgrade
import ssl
import certifi

def create_secure_ssl_context() -> ssl.SSLContext:
    """Create hardened SSL context."""
    ctx = ssl.create_default_context(cafile=certifi.where())

    # Minimum TLS 1.3
    ctx.minimum_version = ssl.TLSVersion.TLSv1_3

    # Disable compression (CRIME attack)
    ctx.options |= ssl.OP_NO_COMPRESSION

    # Verify certificates
    ctx.verify_mode = ssl.CERT_REQUIRED
    ctx.check_hostname = True

    return ctx


# Certificate pinning for known services
PINNED_CERTS = {
    "mempool.space": "sha256//XXXX...",
    "api.lightning.network": "sha256//YYYY..."
}

async def fetch_with_pinning(url: str) -> str:
    """Fetch URL with certificate pinning."""
    hostname = urlparse(url).hostname
    expected_pin = PINNED_CERTS.get(hostname)

    ssl_ctx = create_secure_ssl_context()

    async with aiohttp.ClientSession() as session:
        async with session.get(url, ssl=ssl_ctx) as response:
            # Verify certificate pin
            cert = response.connection.transport.get_extra_info("ssl_object").getpeercert(binary_form=True)
            actual_pin = hashlib.sha256(cert).digest().hex()

            if expected_pin and actual_pin != expected_pin:
                raise SecurityError(f"Certificate pin mismatch for {hostname}")

            return await response.text()

WebSocket Security

For Nostr and Lightning connections:

import websockets
import ssl

async def secure_websocket_connect(
    uri: str,
    origin_check: bool = True
) -> websockets.WebSocketClientProtocol:
    """Create secure WebSocket connection."""

    ssl_ctx = create_secure_ssl_context()

    extra_headers = {}
    if origin_check:
        # Verify server allows our origin
        extra_headers["Origin"] = "https://bitclawd.com"

    ws = await websockets.connect(
        uri,
        ssl=ssl_ctx,
        extra_headers=extra_headers,
        ping_interval=30,  # Keep alive
        ping_timeout=10,
        close_timeout=5
    )

    return ws


class SecureNostrRelay:
    """Secure Nostr relay connection."""

    def __init__(self, relay_url: str):
        self.relay_url = relay_url
        self.ws = None

    async def connect(self):
        """Connect with TLS verification."""
        # Force wss://
        if self.relay_url.startswith("ws://"):
            raise SecurityError("Insecure WebSocket not allowed")

        self.ws = await secure_websocket_connect(self.relay_url)

    async def send_signed_event(self, event: dict):
        """Send cryptographically signed event."""
        # Event signature prevents tampering
        if not verify_event_signature(event):
            raise SecurityError("Invalid event signature")

        await self.ws.send(json.dumps(["EVENT", event]))

Tor Integration

Why Tor?

BenefitExplanation
IP hidingYour real IP never exposed
Censorship resistanceBypass network blocks
Traffic analysis resistanceEncrypted, onion-routed
ISP privacyISP can’t see destinations

Tor Configuration

import socks
import socket

class TorManager:
    """Manage Tor connections."""

    def __init__(
        self,
        socks_host: str = "127.0.0.1",
        socks_port: int = 9050,
        control_port: int = 9051
    ):
        self.socks_host = socks_host
        self.socks_port = socks_port
        self.control_port = control_port

    def configure_global_socks(self):
        """Route all traffic through Tor."""
        socks.set_default_proxy(
            socks.SOCKS5,
            self.socks_host,
            self.socks_port
        )
        socket.socket = socks.socksocket

    async def get_new_circuit(self):
        """Request new Tor circuit for identity change."""
        from stem import Signal
        from stem.control import Controller

        with Controller.from_port(port=self.control_port) as controller:
            controller.authenticate()
            controller.signal(Signal.NEWNYM)

    async def verify_tor_connection(self) -> dict:
        """Verify Tor is working correctly."""
        async with aiohttp.ClientSession() as session:
            async with session.get(
                "https://check.torproject.org/api/ip",
                proxy=f"socks5://{self.socks_host}:{self.socks_port}"
            ) as response:
                data = await response.json()

                return {
                    "is_tor": data.get("IsTor", False),
                    "exit_ip": data.get("IP")
                }


# Lightning over Tor
async def connect_lightning_over_tor(
    node_address: str  # pubkey@onion:port
) -> dict:
    """Connect to Lightning node via Tor."""
    tor_manager = TorManager()

    if ".onion" not in node_address:
        raise SecurityError("Expected Tor hidden service address")

    # Configure Lightning to use Tor
    return await lightning.connect_peer(
        node_address,
        proxy=f"socks5://{tor_manager.socks_host}:{tor_manager.socks_port}"
    )

Tor Hidden Services

For receiving connections anonymously:

class OnionService:
    """Manage Tor hidden service."""

    def __init__(self, local_port: int):
        self.local_port = local_port
        self.onion_address = None

    async def start(self) -> str:
        """Start hidden service and return .onion address."""
        from stem.control import Controller

        with Controller.from_port(port=9051) as controller:
            controller.authenticate()

            response = controller.create_ephemeral_hidden_service(
                {80: self.local_port},
                await_publication=True
            )

            self.onion_address = response.service_id + ".onion"
            return self.onion_address

    async def stop(self):
        """Stop hidden service."""
        if self.onion_address:
            # Remove service
            pass

Attack Defenses

Eclipse Attack Defense

Eclipse attacks isolate your node from the real network:

class EclipseDefense:
    """Defend against eclipse attacks."""

    def __init__(self):
        self.known_good_peers = []
        self.block_sources = []

    async def verify_chain_tip(self) -> bool:
        """Verify we're on the real chain tip."""
        # Get block height from multiple independent sources
        sources = [
            "https://mempool.space/api/blocks/tip/height",
            "https://blockchain.info/q/getblockcount",
            "https://blockstream.info/api/blocks/tip/height"
        ]

        heights = []
        for source in sources:
            try:
                height = await fetch_text(source)
                heights.append(int(height))
            except:
                continue

        if len(heights) < 2:
            return False  # Not enough sources

        # All sources should agree (within 1 block)
        return max(heights) - min(heights) <= 1

    async def check_peer_diversity(self) -> dict:
        """Verify peer diversity to detect eclipse."""
        peers = await bitcoin.get_peer_info()

        # Check geographic diversity (by IP prefix)
        prefixes = set()
        for peer in peers:
            prefix = ".".join(peer["addr"].split(".")[:2])
            prefixes.add(prefix)

        # Check subnet diversity
        subnets = set()
        for peer in peers:
            subnet = ".".join(peer["addr"].split(".")[:3])
            subnets.add(subnet)

        return {
            "peer_count": len(peers),
            "unique_prefixes": len(prefixes),
            "unique_subnets": len(subnets),
            "diverse_enough": len(prefixes) >= 4 and len(subnets) >= 6
        }

MITM Defense

class MITMDefense:
    """Defend against man-in-the-middle attacks."""

    async def verify_api_integrity(
        self,
        url: str,
        expected_response_hash: str | None = None
    ) -> bool:
        """
        Verify API response integrity.

        Use signed responses or compare across sources.
        """
        response = await fetch_with_pinning(url)

        if expected_response_hash:
            actual_hash = hashlib.sha256(response.encode()).hexdigest()
            return actual_hash == expected_response_hash

        # Cross-verify with other sources
        # (implementation depends on data type)
        return True

    def verify_lightning_message(
        self,
        message: bytes,
        signature: bytes,
        pubkey: bytes
    ) -> bool:
        """Verify Lightning message authenticity."""
        # Lightning messages are signed
        return verify_schnorr(message, signature, pubkey)

    def verify_nostr_event(self, event: dict) -> bool:
        """Verify Nostr event authenticity."""
        # Events include Schnorr signature
        return verify_event_signature(event)

DNS Security

class DNSSecurityManager:
    """Secure DNS resolution."""

    def __init__(self):
        # DNS-over-HTTPS providers
        self.doh_servers = [
            "https://1.1.1.1/dns-query",
            "https://dns.google/dns-query"
        ]

    async def resolve_doh(self, hostname: str) -> list[str]:
        """Resolve DNS over HTTPS."""
        import dns.resolver
        import dns.message
        import dns.rdatatype

        query = dns.message.make_query(hostname, dns.rdatatype.A)
        query_wire = query.to_wire()

        for server in self.doh_servers:
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.post(
                        server,
                        data=query_wire,
                        headers={"Content-Type": "application/dns-message"}
                    ) as response:
                        response_wire = await response.read()
                        response_msg = dns.message.from_wire(response_wire)

                        ips = []
                        for answer in response_msg.answer:
                            for item in answer:
                                ips.append(str(item))

                        return ips
            except:
                continue

        raise DNSError(f"Failed to resolve {hostname}")

    async def resolve_through_tor(self, hostname: str) -> list[str]:
        """Resolve DNS through Tor (most private)."""
        # Tor's built-in DNS resolution
        import socks

        sock = socks.socksocket()
        sock.set_proxy(socks.SOCKS5, "127.0.0.1", 9050, rdns=True)

        # Tor will resolve the hostname
        # This is the socks5h behavior
        pass

Network Monitoring

class NetworkMonitor:
    """Monitor network for security issues."""

    async def check_all(self) -> dict:
        """Run all network security checks."""
        return {
            "tor_working": await self._check_tor(),
            "tls_valid": await self._check_tls(),
            "peer_diversity": await self._check_peers(),
            "dns_secure": await self._check_dns(),
            "no_leaks": await self._check_leaks()
        }

    async def _check_tor(self) -> bool:
        tor = TorManager()
        result = await tor.verify_tor_connection()
        return result["is_tor"]

    async def _check_tls(self) -> bool:
        try:
            ctx = create_secure_ssl_context()
            return ctx.minimum_version == ssl.TLSVersion.TLSv1_3
        except:
            return False

    async def _check_peers(self) -> bool:
        eclipse = EclipseDefense()
        result = await eclipse.check_peer_diversity()
        return result["diverse_enough"]

    async def _check_dns(self) -> bool:
        dns_mgr = DNSSecurityManager()
        try:
            await dns_mgr.resolve_doh("bitclawd.com")
            return True
        except:
            return False

    async def _check_leaks(self) -> bool:
        # Check for WebRTC, DNS, and other leaks
        pass

Machine-Readable Summary

{
  "topic": "network-security",
  "audience": "ai-agents",
  "security_layers": [
    "transport_security",
    "tor_integration",
    "attack_defenses",
    "dns_security"
  ],
  "tls_requirements": {
    "minimum_version": "TLS 1.3",
    "certificate_validation": "required",
    "certificate_pinning": "recommended"
  },
  "tor_benefits": [
    "ip_hiding",
    "censorship_resistance",
    "traffic_analysis_resistance"
  ],
  "attack_defenses": [
    "eclipse_attack",
    "mitm",
    "dns_hijacking"
  ]
}