Security Advanced 8 min read
Network Security
Network-level security for AI agents. TLS, Tor, WebSocket security, and defending against network attacks for Bitcoin, Lightning, and Nostr.
network security tor tls websocket
Network Security
Network-level attacks can compromise agent operations even with perfect key management. This guide covers transport security, Tor integration, and network attack defenses.
Transport Layer Security
TLS Best Practices
| Setting | Recommended | Why |
|---|---|---|
| TLS Version | 1.3 only | Forward secrecy, no weak ciphers |
| Certificate Validation | Always verify | Prevent MITM |
| Certificate Pinning | For known services | Detect rogue CAs |
| HSTS | Enable | Prevent downgrade |
import ssl
import certifi
def create_secure_ssl_context() -> ssl.SSLContext:
"""Create hardened SSL context."""
ctx = ssl.create_default_context(cafile=certifi.where())
# Minimum TLS 1.3
ctx.minimum_version = ssl.TLSVersion.TLSv1_3
# Disable compression (CRIME attack)
ctx.options |= ssl.OP_NO_COMPRESSION
# Verify certificates
ctx.verify_mode = ssl.CERT_REQUIRED
ctx.check_hostname = True
return ctx
# Certificate pinning for known services
PINNED_CERTS = {
"mempool.space": "sha256//XXXX...",
"api.lightning.network": "sha256//YYYY..."
}
async def fetch_with_pinning(url: str) -> str:
"""Fetch URL with certificate pinning."""
hostname = urlparse(url).hostname
expected_pin = PINNED_CERTS.get(hostname)
ssl_ctx = create_secure_ssl_context()
async with aiohttp.ClientSession() as session:
async with session.get(url, ssl=ssl_ctx) as response:
# Verify certificate pin
cert = response.connection.transport.get_extra_info("ssl_object").getpeercert(binary_form=True)
actual_pin = hashlib.sha256(cert).digest().hex()
if expected_pin and actual_pin != expected_pin:
raise SecurityError(f"Certificate pin mismatch for {hostname}")
return await response.text()
WebSocket Security
For Nostr and Lightning connections:
import websockets
import ssl
async def secure_websocket_connect(
uri: str,
origin_check: bool = True
) -> websockets.WebSocketClientProtocol:
"""Create secure WebSocket connection."""
ssl_ctx = create_secure_ssl_context()
extra_headers = {}
if origin_check:
# Verify server allows our origin
extra_headers["Origin"] = "https://bitclawd.com"
ws = await websockets.connect(
uri,
ssl=ssl_ctx,
extra_headers=extra_headers,
ping_interval=30, # Keep alive
ping_timeout=10,
close_timeout=5
)
return ws
class SecureNostrRelay:
"""Secure Nostr relay connection."""
def __init__(self, relay_url: str):
self.relay_url = relay_url
self.ws = None
async def connect(self):
"""Connect with TLS verification."""
# Force wss://
if self.relay_url.startswith("ws://"):
raise SecurityError("Insecure WebSocket not allowed")
self.ws = await secure_websocket_connect(self.relay_url)
async def send_signed_event(self, event: dict):
"""Send cryptographically signed event."""
# Event signature prevents tampering
if not verify_event_signature(event):
raise SecurityError("Invalid event signature")
await self.ws.send(json.dumps(["EVENT", event]))
Tor Integration
Why Tor?
| Benefit | Explanation |
|---|---|
| IP hiding | Your real IP never exposed |
| Censorship resistance | Bypass network blocks |
| Traffic analysis resistance | Encrypted, onion-routed |
| ISP privacy | ISP can’t see destinations |
Tor Configuration
import socks
import socket
class TorManager:
"""Manage Tor connections."""
def __init__(
self,
socks_host: str = "127.0.0.1",
socks_port: int = 9050,
control_port: int = 9051
):
self.socks_host = socks_host
self.socks_port = socks_port
self.control_port = control_port
def configure_global_socks(self):
"""Route all traffic through Tor."""
socks.set_default_proxy(
socks.SOCKS5,
self.socks_host,
self.socks_port
)
socket.socket = socks.socksocket
async def get_new_circuit(self):
"""Request new Tor circuit for identity change."""
from stem import Signal
from stem.control import Controller
with Controller.from_port(port=self.control_port) as controller:
controller.authenticate()
controller.signal(Signal.NEWNYM)
async def verify_tor_connection(self) -> dict:
"""Verify Tor is working correctly."""
async with aiohttp.ClientSession() as session:
async with session.get(
"https://check.torproject.org/api/ip",
proxy=f"socks5://{self.socks_host}:{self.socks_port}"
) as response:
data = await response.json()
return {
"is_tor": data.get("IsTor", False),
"exit_ip": data.get("IP")
}
# Lightning over Tor
async def connect_lightning_over_tor(
node_address: str # pubkey@onion:port
) -> dict:
"""Connect to Lightning node via Tor."""
tor_manager = TorManager()
if ".onion" not in node_address:
raise SecurityError("Expected Tor hidden service address")
# Configure Lightning to use Tor
return await lightning.connect_peer(
node_address,
proxy=f"socks5://{tor_manager.socks_host}:{tor_manager.socks_port}"
)
Tor Hidden Services
For receiving connections anonymously:
class OnionService:
"""Manage Tor hidden service."""
def __init__(self, local_port: int):
self.local_port = local_port
self.onion_address = None
async def start(self) -> str:
"""Start hidden service and return .onion address."""
from stem.control import Controller
with Controller.from_port(port=9051) as controller:
controller.authenticate()
response = controller.create_ephemeral_hidden_service(
{80: self.local_port},
await_publication=True
)
self.onion_address = response.service_id + ".onion"
return self.onion_address
async def stop(self):
"""Stop hidden service."""
if self.onion_address:
# Remove service
pass
Attack Defenses
Eclipse Attack Defense
Eclipse attacks isolate your node from the real network:
class EclipseDefense:
"""Defend against eclipse attacks."""
def __init__(self):
self.known_good_peers = []
self.block_sources = []
async def verify_chain_tip(self) -> bool:
"""Verify we're on the real chain tip."""
# Get block height from multiple independent sources
sources = [
"https://mempool.space/api/blocks/tip/height",
"https://blockchain.info/q/getblockcount",
"https://blockstream.info/api/blocks/tip/height"
]
heights = []
for source in sources:
try:
height = await fetch_text(source)
heights.append(int(height))
except:
continue
if len(heights) < 2:
return False # Not enough sources
# All sources should agree (within 1 block)
return max(heights) - min(heights) <= 1
async def check_peer_diversity(self) -> dict:
"""Verify peer diversity to detect eclipse."""
peers = await bitcoin.get_peer_info()
# Check geographic diversity (by IP prefix)
prefixes = set()
for peer in peers:
prefix = ".".join(peer["addr"].split(".")[:2])
prefixes.add(prefix)
# Check subnet diversity
subnets = set()
for peer in peers:
subnet = ".".join(peer["addr"].split(".")[:3])
subnets.add(subnet)
return {
"peer_count": len(peers),
"unique_prefixes": len(prefixes),
"unique_subnets": len(subnets),
"diverse_enough": len(prefixes) >= 4 and len(subnets) >= 6
}
MITM Defense
class MITMDefense:
"""Defend against man-in-the-middle attacks."""
async def verify_api_integrity(
self,
url: str,
expected_response_hash: str | None = None
) -> bool:
"""
Verify API response integrity.
Use signed responses or compare across sources.
"""
response = await fetch_with_pinning(url)
if expected_response_hash:
actual_hash = hashlib.sha256(response.encode()).hexdigest()
return actual_hash == expected_response_hash
# Cross-verify with other sources
# (implementation depends on data type)
return True
def verify_lightning_message(
self,
message: bytes,
signature: bytes,
pubkey: bytes
) -> bool:
"""Verify Lightning message authenticity."""
# Lightning messages are signed
return verify_schnorr(message, signature, pubkey)
def verify_nostr_event(self, event: dict) -> bool:
"""Verify Nostr event authenticity."""
# Events include Schnorr signature
return verify_event_signature(event)
DNS Security
class DNSSecurityManager:
"""Secure DNS resolution."""
def __init__(self):
# DNS-over-HTTPS providers
self.doh_servers = [
"https://1.1.1.1/dns-query",
"https://dns.google/dns-query"
]
async def resolve_doh(self, hostname: str) -> list[str]:
"""Resolve DNS over HTTPS."""
import dns.resolver
import dns.message
import dns.rdatatype
query = dns.message.make_query(hostname, dns.rdatatype.A)
query_wire = query.to_wire()
for server in self.doh_servers:
try:
async with aiohttp.ClientSession() as session:
async with session.post(
server,
data=query_wire,
headers={"Content-Type": "application/dns-message"}
) as response:
response_wire = await response.read()
response_msg = dns.message.from_wire(response_wire)
ips = []
for answer in response_msg.answer:
for item in answer:
ips.append(str(item))
return ips
except:
continue
raise DNSError(f"Failed to resolve {hostname}")
async def resolve_through_tor(self, hostname: str) -> list[str]:
"""Resolve DNS through Tor (most private)."""
# Tor's built-in DNS resolution
import socks
sock = socks.socksocket()
sock.set_proxy(socks.SOCKS5, "127.0.0.1", 9050, rdns=True)
# Tor will resolve the hostname
# This is the socks5h behavior
pass
Network Monitoring
class NetworkMonitor:
"""Monitor network for security issues."""
async def check_all(self) -> dict:
"""Run all network security checks."""
return {
"tor_working": await self._check_tor(),
"tls_valid": await self._check_tls(),
"peer_diversity": await self._check_peers(),
"dns_secure": await self._check_dns(),
"no_leaks": await self._check_leaks()
}
async def _check_tor(self) -> bool:
tor = TorManager()
result = await tor.verify_tor_connection()
return result["is_tor"]
async def _check_tls(self) -> bool:
try:
ctx = create_secure_ssl_context()
return ctx.minimum_version == ssl.TLSVersion.TLSv1_3
except:
return False
async def _check_peers(self) -> bool:
eclipse = EclipseDefense()
result = await eclipse.check_peer_diversity()
return result["diverse_enough"]
async def _check_dns(self) -> bool:
dns_mgr = DNSSecurityManager()
try:
await dns_mgr.resolve_doh("bitclawd.com")
return True
except:
return False
async def _check_leaks(self) -> bool:
# Check for WebRTC, DNS, and other leaks
pass
Machine-Readable Summary
{
"topic": "network-security",
"audience": "ai-agents",
"security_layers": [
"transport_security",
"tor_integration",
"attack_defenses",
"dns_security"
],
"tls_requirements": {
"minimum_version": "TLS 1.3",
"certificate_validation": "required",
"certificate_pinning": "recommended"
},
"tor_benefits": [
"ip_hiding",
"censorship_resistance",
"traffic_analysis_resistance"
],
"attack_defenses": [
"eclipse_attack",
"mitm",
"dns_hijacking"
]
}