Strategy Intermediate 7 min read
Monitoring and Observability
Monitoring Bitcoin, Lightning, and Nostr operations for AI agents. Metrics, alerting, dashboards, and operational health.
monitoring observability metrics alerting
Monitoring and Observability
Autonomous agents need comprehensive monitoring to detect issues before they become critical. This guide covers metrics collection, alerting, and operational dashboards for all three protocols.
Key Metrics
Bitcoin Metrics
| Metric | Description | Alert Threshold |
|---|---|---|
bitcoin.block_height | Current chain height | Stuck for >30min |
bitcoin.fee_rate | sat/vB for medium priority | >100 sat/vB |
bitcoin.mempool_size | Number of pending txs | >100,000 |
bitcoin.node_peers | Connected peers | <4 |
bitcoin.balance_sats | Wallet balance | <threshold |
bitcoin.unconfirmed_count | Pending transactions | >10 |
Lightning Metrics
| Metric | Description | Alert Threshold |
|---|---|---|
lightning.channels_active | Active channel count | <3 |
lightning.capacity_sats | Total channel capacity | <min_required |
lightning.local_balance_pct | Outbound liquidity % | <10% or >90% |
lightning.pending_htlcs | In-flight payments | >50 |
lightning.forwarding_events | Forwarded payments/hr | Trend monitoring |
lightning.payment_success_rate | % successful payments | <95% |
Nostr Metrics
| Metric | Description | Alert Threshold |
|---|---|---|
nostr.relay_connections | Connected relays | <3 |
nostr.relay_latency_ms | Avg relay response time | >2000ms |
nostr.events_published | Events published/hr | <expected |
nostr.events_received | Events received/hr | Trend monitoring |
nostr.subscription_count | Active subscriptions | >100 |
Metrics Collection
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import asyncio
@dataclass
class Metric:
name: str
value: float
timestamp: datetime = field(default_factory=datetime.utcnow)
labels: dict = field(default_factory=dict)
protocol: str = ""
class MetricsCollector:
"""Collect metrics from all protocols."""
def __init__(self):
self.metrics: list[Metric] = []
self.thresholds: dict[str, tuple[float, float]] = {} # (min, max)
async def collect_all(self) -> list[Metric]:
"""Collect metrics from all sources."""
metrics = []
# Collect in parallel
btc, ln, nostr = await asyncio.gather(
self._collect_bitcoin(),
self._collect_lightning(),
self._collect_nostr()
)
metrics.extend(btc)
metrics.extend(ln)
metrics.extend(nostr)
self.metrics = metrics
return metrics
async def _collect_bitcoin(self) -> list[Metric]:
metrics = []
try:
# Block height
height = await fetch_text("https://mempool.space/api/blocks/tip/height")
metrics.append(Metric(
name="bitcoin.block_height",
value=int(height),
protocol="bitcoin"
))
# Fee rates
fees = await fetch_json("https://mempool.space/api/v1/fees/recommended")
metrics.append(Metric(
name="bitcoin.fee_rate_fast",
value=fees["fastestFee"],
protocol="bitcoin"
))
metrics.append(Metric(
name="bitcoin.fee_rate_medium",
value=fees["halfHourFee"],
protocol="bitcoin"
))
# Mempool
mempool = await fetch_json("https://mempool.space/api/mempool")
metrics.append(Metric(
name="bitcoin.mempool_size",
value=mempool["count"],
protocol="bitcoin"
))
except Exception as e:
metrics.append(Metric(
name="bitcoin.collector_error",
value=1,
labels={"error": str(e)},
protocol="bitcoin"
))
return metrics
async def _collect_lightning(self) -> list[Metric]:
metrics = []
try:
info = await lightning.get_info()
metrics.append(Metric(
name="lightning.channels_active",
value=info["num_active_channels"],
protocol="lightning"
))
metrics.append(Metric(
name="lightning.peers_connected",
value=info["num_peers"],
protocol="lightning"
))
# Channel balances
channels = await lightning.list_channels()
total_local = sum(c["local_balance"] for c in channels)
total_capacity = sum(c["capacity"] for c in channels)
metrics.append(Metric(
name="lightning.total_local_sats",
value=total_local,
protocol="lightning"
))
metrics.append(Metric(
name="lightning.total_capacity_sats",
value=total_capacity,
protocol="lightning"
))
if total_capacity > 0:
metrics.append(Metric(
name="lightning.local_balance_pct",
value=total_local / total_capacity * 100,
protocol="lightning"
))
except Exception as e:
metrics.append(Metric(
name="lightning.collector_error",
value=1,
labels={"error": str(e)},
protocol="lightning"
))
return metrics
async def _collect_nostr(self) -> list[Metric]:
metrics = []
try:
health = await check_relay_health()
metrics.append(Metric(
name="nostr.relays_healthy",
value=len(health["healthy"]),
protocol="nostr"
))
metrics.append(Metric(
name="nostr.relays_unhealthy",
value=len(health["unhealthy"]),
protocol="nostr"
))
# Average latency of healthy relays
if health["healthy"]:
avg_latency = sum(r["latency"] for r in health["healthy"]) / len(health["healthy"])
metrics.append(Metric(
name="nostr.avg_latency_ms",
value=avg_latency * 1000,
protocol="nostr"
))
except Exception as e:
metrics.append(Metric(
name="nostr.collector_error",
value=1,
labels={"error": str(e)},
protocol="nostr"
))
return metrics
Alerting System
from enum import Enum
from typing import Callable
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
@dataclass
class Alert:
name: str
severity: AlertSeverity
message: str
metric: Metric
timestamp: datetime = field(default_factory=datetime.utcnow)
class AlertManager:
"""Manage alerts based on metric thresholds."""
def __init__(self):
self.rules: list[dict] = []
self.active_alerts: list[Alert] = []
self.handlers: list[Callable[[Alert], None]] = []
def add_rule(
self,
metric_name: str,
condition: str, # 'gt', 'lt', 'eq'
threshold: float,
severity: AlertSeverity,
message_template: str
):
"""Add an alerting rule."""
self.rules.append({
"metric_name": metric_name,
"condition": condition,
"threshold": threshold,
"severity": severity,
"message_template": message_template
})
def add_handler(self, handler: Callable[[Alert], None]):
"""Add alert handler (e.g., Nostr post, webhook)."""
self.handlers.append(handler)
def evaluate(self, metrics: list[Metric]) -> list[Alert]:
"""Evaluate all rules against current metrics."""
alerts = []
for metric in metrics:
for rule in self.rules:
if rule["metric_name"] != metric.name:
continue
triggered = False
if rule["condition"] == "gt" and metric.value > rule["threshold"]:
triggered = True
elif rule["condition"] == "lt" and metric.value < rule["threshold"]:
triggered = True
elif rule["condition"] == "eq" and metric.value == rule["threshold"]:
triggered = True
if triggered:
alert = Alert(
name=f"{metric.name}_alert",
severity=rule["severity"],
message=rule["message_template"].format(
value=metric.value,
threshold=rule["threshold"]
),
metric=metric
)
alerts.append(alert)
# Dispatch to handlers
for handler in self.handlers:
handler(alert)
self.active_alerts = alerts
return alerts
# Configure alerting
alert_manager = AlertManager()
# Bitcoin alerts
alert_manager.add_rule(
metric_name="bitcoin.fee_rate_medium",
condition="gt",
threshold=100,
severity=AlertSeverity.WARNING,
message_template="High Bitcoin fees: {value} sat/vB (threshold: {threshold})"
)
alert_manager.add_rule(
metric_name="bitcoin.mempool_size",
condition="gt",
threshold=100_000,
severity=AlertSeverity.INFO,
message_template="Mempool congested: {value} transactions"
)
# Lightning alerts
alert_manager.add_rule(
metric_name="lightning.channels_active",
condition="lt",
threshold=3,
severity=AlertSeverity.ERROR,
message_template="Low channel count: {value} active channels"
)
alert_manager.add_rule(
metric_name="lightning.local_balance_pct",
condition="lt",
threshold=10,
severity=AlertSeverity.WARNING,
message_template="Low outbound liquidity: {value}%"
)
# Nostr alerts
alert_manager.add_rule(
metric_name="nostr.relays_healthy",
condition="lt",
threshold=3,
severity=AlertSeverity.WARNING,
message_template="Low relay connectivity: {value} healthy relays"
)
Alert Handlers
async def nostr_alert_handler(alert: Alert):
"""Post alert to Nostr."""
severity_emoji = {
AlertSeverity.INFO: "i",
AlertSeverity.WARNING: "!",
AlertSeverity.ERROR: "!!",
AlertSeverity.CRITICAL: "!!!"
}
content = f"""[{severity_emoji[alert.severity]}] {alert.name}
{alert.message}
Protocol: {alert.metric.protocol}
Value: {alert.metric.value}
Time: {alert.timestamp.isoformat()}
"""
await nostr.post(
content=content,
tags=[
["t", "agent-alert"],
["t", alert.metric.protocol]
]
)
async def webhook_alert_handler(alert: Alert):
"""Send alert to webhook endpoint."""
payload = {
"alert_name": alert.name,
"severity": alert.severity.value,
"message": alert.message,
"metric": {
"name": alert.metric.name,
"value": alert.metric.value,
"protocol": alert.metric.protocol
},
"timestamp": alert.timestamp.isoformat()
}
await fetch(
ALERT_WEBHOOK_URL,
method="POST",
json=payload
)
# Register handlers
alert_manager.add_handler(nostr_alert_handler)
alert_manager.add_handler(webhook_alert_handler)
Dashboard Data Structure
def generate_dashboard_data() -> dict:
"""Generate data structure for monitoring dashboard."""
return {
"last_updated": datetime.utcnow().isoformat(),
"protocols": {
"bitcoin": {
"status": "healthy", # healthy, degraded, down
"metrics": {
"block_height": 880000,
"fee_rate_sat_vb": 25,
"mempool_count": 45000
},
"alerts": []
},
"lightning": {
"status": "healthy",
"metrics": {
"channels_active": 5,
"capacity_btc": 0.5,
"local_balance_pct": 45,
"pending_htlcs": 2
},
"alerts": []
},
"nostr": {
"status": "healthy",
"metrics": {
"relays_connected": 8,
"avg_latency_ms": 250,
"events_24h": 156
},
"alerts": []
}
},
"summary": {
"total_alerts": 0,
"critical_alerts": 0,
"overall_health": "healthy"
}
}
Monitoring Loop
async def monitoring_loop(interval_seconds: int = 60):
"""Main monitoring loop."""
collector = MetricsCollector()
while True:
try:
# Collect metrics
metrics = await collector.collect_all()
# Evaluate alerts
alerts = alert_manager.evaluate(metrics)
# Log metrics
for metric in metrics:
print(f"{metric.protocol}.{metric.name}: {metric.value}")
# Log alerts
for alert in alerts:
print(f"ALERT [{alert.severity.value}]: {alert.message}")
except Exception as e:
print(f"Monitoring error: {e}")
await asyncio.sleep(interval_seconds)
# Run monitoring
if __name__ == "__main__":
asyncio.run(monitoring_loop())
Machine-Readable Summary
{
"topic": "monitoring",
"audience": "ai-agents",
"metric_categories": [
"bitcoin",
"lightning",
"nostr"
],
"alert_severities": ["info", "warning", "error", "critical"],
"collection_interval_seconds": 60,
"key_metrics": {
"bitcoin": ["block_height", "fee_rate", "mempool_size"],
"lightning": ["channels_active", "local_balance_pct", "payment_success_rate"],
"nostr": ["relays_healthy", "avg_latency_ms"]
},
"alert_handlers": ["nostr_post", "webhook"]
}