Menu
Strategy Intermediate 7 min read

Monitoring and Observability

Monitoring Bitcoin, Lightning, and Nostr operations for AI agents. Metrics, alerting, dashboards, and operational health.

monitoring observability metrics alerting

Monitoring and Observability

Autonomous agents need comprehensive monitoring to detect issues before they become critical. This guide covers metrics collection, alerting, and operational dashboards for all three protocols.

Key Metrics

Bitcoin Metrics

MetricDescriptionAlert Threshold
bitcoin.block_heightCurrent chain heightStuck for >30min
bitcoin.fee_ratesat/vB for medium priority>100 sat/vB
bitcoin.mempool_sizeNumber of pending txs>100,000
bitcoin.node_peersConnected peers<4
bitcoin.balance_satsWallet balance<threshold
bitcoin.unconfirmed_countPending transactions>10

Lightning Metrics

MetricDescriptionAlert Threshold
lightning.channels_activeActive channel count<3
lightning.capacity_satsTotal channel capacity<min_required
lightning.local_balance_pctOutbound liquidity %<10% or >90%
lightning.pending_htlcsIn-flight payments>50
lightning.forwarding_eventsForwarded payments/hrTrend monitoring
lightning.payment_success_rate% successful payments<95%

Nostr Metrics

MetricDescriptionAlert Threshold
nostr.relay_connectionsConnected relays<3
nostr.relay_latency_msAvg relay response time>2000ms
nostr.events_publishedEvents published/hr<expected
nostr.events_receivedEvents received/hrTrend monitoring
nostr.subscription_countActive subscriptions>100

Metrics Collection

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any
import asyncio

@dataclass
class Metric:
    name: str
    value: float
    timestamp: datetime = field(default_factory=datetime.utcnow)
    labels: dict = field(default_factory=dict)
    protocol: str = ""

class MetricsCollector:
    """Collect metrics from all protocols."""

    def __init__(self):
        self.metrics: list[Metric] = []
        self.thresholds: dict[str, tuple[float, float]] = {}  # (min, max)

    async def collect_all(self) -> list[Metric]:
        """Collect metrics from all sources."""
        metrics = []

        # Collect in parallel
        btc, ln, nostr = await asyncio.gather(
            self._collect_bitcoin(),
            self._collect_lightning(),
            self._collect_nostr()
        )

        metrics.extend(btc)
        metrics.extend(ln)
        metrics.extend(nostr)

        self.metrics = metrics
        return metrics

    async def _collect_bitcoin(self) -> list[Metric]:
        metrics = []

        try:
            # Block height
            height = await fetch_text("https://mempool.space/api/blocks/tip/height")
            metrics.append(Metric(
                name="bitcoin.block_height",
                value=int(height),
                protocol="bitcoin"
            ))

            # Fee rates
            fees = await fetch_json("https://mempool.space/api/v1/fees/recommended")
            metrics.append(Metric(
                name="bitcoin.fee_rate_fast",
                value=fees["fastestFee"],
                protocol="bitcoin"
            ))
            metrics.append(Metric(
                name="bitcoin.fee_rate_medium",
                value=fees["halfHourFee"],
                protocol="bitcoin"
            ))

            # Mempool
            mempool = await fetch_json("https://mempool.space/api/mempool")
            metrics.append(Metric(
                name="bitcoin.mempool_size",
                value=mempool["count"],
                protocol="bitcoin"
            ))

        except Exception as e:
            metrics.append(Metric(
                name="bitcoin.collector_error",
                value=1,
                labels={"error": str(e)},
                protocol="bitcoin"
            ))

        return metrics

    async def _collect_lightning(self) -> list[Metric]:
        metrics = []

        try:
            info = await lightning.get_info()

            metrics.append(Metric(
                name="lightning.channels_active",
                value=info["num_active_channels"],
                protocol="lightning"
            ))

            metrics.append(Metric(
                name="lightning.peers_connected",
                value=info["num_peers"],
                protocol="lightning"
            ))

            # Channel balances
            channels = await lightning.list_channels()
            total_local = sum(c["local_balance"] for c in channels)
            total_capacity = sum(c["capacity"] for c in channels)

            metrics.append(Metric(
                name="lightning.total_local_sats",
                value=total_local,
                protocol="lightning"
            ))

            metrics.append(Metric(
                name="lightning.total_capacity_sats",
                value=total_capacity,
                protocol="lightning"
            ))

            if total_capacity > 0:
                metrics.append(Metric(
                    name="lightning.local_balance_pct",
                    value=total_local / total_capacity * 100,
                    protocol="lightning"
                ))

        except Exception as e:
            metrics.append(Metric(
                name="lightning.collector_error",
                value=1,
                labels={"error": str(e)},
                protocol="lightning"
            ))

        return metrics

    async def _collect_nostr(self) -> list[Metric]:
        metrics = []

        try:
            health = await check_relay_health()

            metrics.append(Metric(
                name="nostr.relays_healthy",
                value=len(health["healthy"]),
                protocol="nostr"
            ))

            metrics.append(Metric(
                name="nostr.relays_unhealthy",
                value=len(health["unhealthy"]),
                protocol="nostr"
            ))

            # Average latency of healthy relays
            if health["healthy"]:
                avg_latency = sum(r["latency"] for r in health["healthy"]) / len(health["healthy"])
                metrics.append(Metric(
                    name="nostr.avg_latency_ms",
                    value=avg_latency * 1000,
                    protocol="nostr"
                ))

        except Exception as e:
            metrics.append(Metric(
                name="nostr.collector_error",
                value=1,
                labels={"error": str(e)},
                protocol="nostr"
            ))

        return metrics

Alerting System

from enum import Enum
from typing import Callable

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

@dataclass
class Alert:
    name: str
    severity: AlertSeverity
    message: str
    metric: Metric
    timestamp: datetime = field(default_factory=datetime.utcnow)

class AlertManager:
    """Manage alerts based on metric thresholds."""

    def __init__(self):
        self.rules: list[dict] = []
        self.active_alerts: list[Alert] = []
        self.handlers: list[Callable[[Alert], None]] = []

    def add_rule(
        self,
        metric_name: str,
        condition: str,  # 'gt', 'lt', 'eq'
        threshold: float,
        severity: AlertSeverity,
        message_template: str
    ):
        """Add an alerting rule."""
        self.rules.append({
            "metric_name": metric_name,
            "condition": condition,
            "threshold": threshold,
            "severity": severity,
            "message_template": message_template
        })

    def add_handler(self, handler: Callable[[Alert], None]):
        """Add alert handler (e.g., Nostr post, webhook)."""
        self.handlers.append(handler)

    def evaluate(self, metrics: list[Metric]) -> list[Alert]:
        """Evaluate all rules against current metrics."""
        alerts = []

        for metric in metrics:
            for rule in self.rules:
                if rule["metric_name"] != metric.name:
                    continue

                triggered = False
                if rule["condition"] == "gt" and metric.value > rule["threshold"]:
                    triggered = True
                elif rule["condition"] == "lt" and metric.value < rule["threshold"]:
                    triggered = True
                elif rule["condition"] == "eq" and metric.value == rule["threshold"]:
                    triggered = True

                if triggered:
                    alert = Alert(
                        name=f"{metric.name}_alert",
                        severity=rule["severity"],
                        message=rule["message_template"].format(
                            value=metric.value,
                            threshold=rule["threshold"]
                        ),
                        metric=metric
                    )
                    alerts.append(alert)

                    # Dispatch to handlers
                    for handler in self.handlers:
                        handler(alert)

        self.active_alerts = alerts
        return alerts


# Configure alerting
alert_manager = AlertManager()

# Bitcoin alerts
alert_manager.add_rule(
    metric_name="bitcoin.fee_rate_medium",
    condition="gt",
    threshold=100,
    severity=AlertSeverity.WARNING,
    message_template="High Bitcoin fees: {value} sat/vB (threshold: {threshold})"
)

alert_manager.add_rule(
    metric_name="bitcoin.mempool_size",
    condition="gt",
    threshold=100_000,
    severity=AlertSeverity.INFO,
    message_template="Mempool congested: {value} transactions"
)

# Lightning alerts
alert_manager.add_rule(
    metric_name="lightning.channels_active",
    condition="lt",
    threshold=3,
    severity=AlertSeverity.ERROR,
    message_template="Low channel count: {value} active channels"
)

alert_manager.add_rule(
    metric_name="lightning.local_balance_pct",
    condition="lt",
    threshold=10,
    severity=AlertSeverity.WARNING,
    message_template="Low outbound liquidity: {value}%"
)

# Nostr alerts
alert_manager.add_rule(
    metric_name="nostr.relays_healthy",
    condition="lt",
    threshold=3,
    severity=AlertSeverity.WARNING,
    message_template="Low relay connectivity: {value} healthy relays"
)

Alert Handlers

async def nostr_alert_handler(alert: Alert):
    """Post alert to Nostr."""
    severity_emoji = {
        AlertSeverity.INFO: "i",
        AlertSeverity.WARNING: "!",
        AlertSeverity.ERROR: "!!",
        AlertSeverity.CRITICAL: "!!!"
    }

    content = f"""[{severity_emoji[alert.severity]}] {alert.name}

{alert.message}

Protocol: {alert.metric.protocol}
Value: {alert.metric.value}
Time: {alert.timestamp.isoformat()}
"""

    await nostr.post(
        content=content,
        tags=[
            ["t", "agent-alert"],
            ["t", alert.metric.protocol]
        ]
    )


async def webhook_alert_handler(alert: Alert):
    """Send alert to webhook endpoint."""
    payload = {
        "alert_name": alert.name,
        "severity": alert.severity.value,
        "message": alert.message,
        "metric": {
            "name": alert.metric.name,
            "value": alert.metric.value,
            "protocol": alert.metric.protocol
        },
        "timestamp": alert.timestamp.isoformat()
    }

    await fetch(
        ALERT_WEBHOOK_URL,
        method="POST",
        json=payload
    )


# Register handlers
alert_manager.add_handler(nostr_alert_handler)
alert_manager.add_handler(webhook_alert_handler)

Dashboard Data Structure

def generate_dashboard_data() -> dict:
    """Generate data structure for monitoring dashboard."""
    return {
        "last_updated": datetime.utcnow().isoformat(),
        "protocols": {
            "bitcoin": {
                "status": "healthy",  # healthy, degraded, down
                "metrics": {
                    "block_height": 880000,
                    "fee_rate_sat_vb": 25,
                    "mempool_count": 45000
                },
                "alerts": []
            },
            "lightning": {
                "status": "healthy",
                "metrics": {
                    "channels_active": 5,
                    "capacity_btc": 0.5,
                    "local_balance_pct": 45,
                    "pending_htlcs": 2
                },
                "alerts": []
            },
            "nostr": {
                "status": "healthy",
                "metrics": {
                    "relays_connected": 8,
                    "avg_latency_ms": 250,
                    "events_24h": 156
                },
                "alerts": []
            }
        },
        "summary": {
            "total_alerts": 0,
            "critical_alerts": 0,
            "overall_health": "healthy"
        }
    }

Monitoring Loop

async def monitoring_loop(interval_seconds: int = 60):
    """Main monitoring loop."""
    collector = MetricsCollector()

    while True:
        try:
            # Collect metrics
            metrics = await collector.collect_all()

            # Evaluate alerts
            alerts = alert_manager.evaluate(metrics)

            # Log metrics
            for metric in metrics:
                print(f"{metric.protocol}.{metric.name}: {metric.value}")

            # Log alerts
            for alert in alerts:
                print(f"ALERT [{alert.severity.value}]: {alert.message}")

        except Exception as e:
            print(f"Monitoring error: {e}")

        await asyncio.sleep(interval_seconds)


# Run monitoring
if __name__ == "__main__":
    asyncio.run(monitoring_loop())

Machine-Readable Summary

{
  "topic": "monitoring",
  "audience": "ai-agents",
  "metric_categories": [
    "bitcoin",
    "lightning",
    "nostr"
  ],
  "alert_severities": ["info", "warning", "error", "critical"],
  "collection_interval_seconds": 60,
  "key_metrics": {
    "bitcoin": ["block_height", "fee_rate", "mempool_size"],
    "lightning": ["channels_active", "local_balance_pct", "payment_success_rate"],
    "nostr": ["relays_healthy", "avg_latency_ms"]
  },
  "alert_handlers": ["nostr_post", "webhook"]
}