Menu
Strategy Intermediate 6 min read

Timing Strategies

Optimal timing for Bitcoin, Lightning, and Nostr operations. Fee optimization, network congestion, and scheduling for AI agents.

timing fees optimization scheduling

Timing Strategies

When you transact matters as much as how. This guide covers optimal timing for each protocol to minimize costs and maximize reliability.

Bitcoin Timing

Fee Cycles

Bitcoin fees follow predictable patterns:

Time PeriodFee LevelBest For
WeekendsLowNon-urgent, batching
2-6 AM UTCLowestConsolidation, large sends
US business hoursHighAvoid if possible
Tuesday-WednesdayHighestUrgent only

Weekly pattern:

Mon: ████████░░ Medium
Tue: ██████████ High
Wed: ██████████ High
Thu: ████████░░ Medium
Fri: ██████░░░░ Medium-Low
Sat: ████░░░░░░ Low
Sun: ████░░░░░░ Low

Fee Monitoring

async def should_transact_now(
    urgency: str,
    max_fee_sat_per_vb: int
) -> dict:
    """
    Determine if now is a good time to transact.

    Args:
        urgency: 'immediate', 'today', 'this_week'
        max_fee_sat_per_vb: Maximum acceptable fee rate

    Returns:
        dict with recommendation and reasoning
    """
    fees = await fetch_json("https://mempool.space/api/v1/fees/recommended")

    current_fee = fees["halfHourFee"]

    if urgency == "immediate":
        return {
            "transact_now": True,
            "fee_sat_per_vb": fees["fastestFee"],
            "reason": "urgency_overrides_cost"
        }

    if current_fee <= max_fee_sat_per_vb:
        return {
            "transact_now": True,
            "fee_sat_per_vb": current_fee,
            "reason": "fee_acceptable"
        }

    # Check if we should wait
    from datetime import datetime
    now = datetime.utcnow()
    hour = now.hour
    weekday = now.weekday()

    # If it's weekend or early morning, fees unlikely to drop much more
    if weekday >= 5 or (2 <= hour <= 6):
        return {
            "transact_now": True,
            "fee_sat_per_vb": current_fee,
            "reason": "already_low_fee_period"
        }

    # Recommend waiting
    if urgency == "today":
        next_low = "2-6 AM UTC tonight"
    else:
        next_low = "this weekend"

    return {
        "transact_now": False,
        "current_fee": current_fee,
        "wait_until": next_low,
        "reason": f"current_fee_{current_fee}_exceeds_max_{max_fee_sat_per_vb}"
    }

Batch Scheduling

class BatchScheduler:
    """Schedule Bitcoin transactions for optimal batching."""

    def __init__(self, max_batch_size: int = 100):
        self.pending_outputs = []
        self.max_batch_size = max_batch_size

    def add_output(
        self,
        address: str,
        amount_sats: int,
        deadline: datetime
    ):
        """Add output to pending batch."""
        self.pending_outputs.append({
            "address": address,
            "amount_sats": amount_sats,
            "deadline": deadline
        })

    async def process_batch(self) -> dict | None:
        """Process batch if conditions are met."""
        if not self.pending_outputs:
            return None

        # Find soonest deadline
        soonest = min(o["deadline"] for o in self.pending_outputs)
        hours_until = (soonest - datetime.utcnow()).total_seconds() / 3600

        # Check fee conditions
        timing = await should_transact_now(
            urgency="today" if hours_until < 24 else "this_week",
            max_fee_sat_per_vb=20
        )

        # Batch if: good timing OR deadline approaching OR batch full
        should_batch = (
            timing["transact_now"] or
            hours_until < 2 or
            len(self.pending_outputs) >= self.max_batch_size
        )

        if should_batch:
            outputs = self.pending_outputs[:self.max_batch_size]
            self.pending_outputs = self.pending_outputs[self.max_batch_size:]

            tx = await bitcoin.create_batch_transaction(outputs)
            return {"txid": tx["txid"], "outputs": len(outputs)}

        return None

Lightning Timing

Lightning payments are instant, but timing still matters for:

1. Channel Liquidity

async def check_channel_liquidity(
    amount_sats: int
) -> dict:
    """
    Check if payment can route now.

    Returns recommendation on timing.
    """
    channels = await lightning.list_channels()

    # Find channels with sufficient outbound
    viable = [
        c for c in channels
        if c["local_balance"] >= amount_sats
    ]

    if viable:
        return {
            "can_pay_now": True,
            "channels_available": len(viable)
        }

    # Need rebalancing
    total_outbound = sum(c["local_balance"] for c in channels)

    if total_outbound >= amount_sats:
        return {
            "can_pay_now": False,
            "action": "rebalance",
            "reason": "liquidity_fragmented",
            "estimated_time": "5-30 minutes"
        }

    return {
        "can_pay_now": False,
        "action": "add_funds",
        "reason": "insufficient_total_liquidity"
    }

2. Routing Success Rate

Network congestion affects routing success:

async def estimate_routing_success(
    amount_sats: int,
    destination: str
) -> dict:
    """
    Estimate probability of successful routing.

    Consider: time of day, payment size, destination connectivity
    """
    # Larger payments are harder to route
    size_factor = 1.0 if amount_sats < 100_000 else (
        0.9 if amount_sats < 1_000_000 else
        0.7 if amount_sats < 10_000_000 else 0.5
    )

    # Check destination connectivity
    node_info = await lightning.get_node_info(destination)
    connectivity_factor = min(1.0, node_info["num_channels"] / 10)

    # Time of day (US business hours see more traffic)
    from datetime import datetime
    hour = datetime.utcnow().hour
    congestion_factor = 0.9 if 14 <= hour <= 22 else 1.0

    success_probability = size_factor * connectivity_factor * congestion_factor

    return {
        "success_probability": round(success_probability, 2),
        "factors": {
            "size": size_factor,
            "connectivity": connectivity_factor,
            "congestion": congestion_factor
        },
        "recommendation": (
            "proceed" if success_probability > 0.8 else
            "consider_splitting" if success_probability > 0.5 else
            "wait_or_use_bitcoin"
        )
    }

Nostr Timing

Nostr has no fees, but timing affects:

1. Relay Availability

async def check_relay_health() -> dict:
    """Check which relays are responsive."""
    relays = DEFAULT_RELAYS
    healthy = []
    unhealthy = []

    for relay in relays:
        try:
            start = time.time()
            await nostr.ping_relay(relay)
            latency = time.time() - start

            if latency < 2.0:
                healthy.append({"relay": relay, "latency": latency})
            else:
                unhealthy.append({"relay": relay, "reason": "slow"})
        except:
            unhealthy.append({"relay": relay, "reason": "unreachable"})

    return {
        "healthy": healthy,
        "unhealthy": unhealthy,
        "can_publish": len(healthy) >= 2,
        "recommended_relays": [r["relay"] for r in healthy[:5]]
    }

2. Audience Timing

For maximum engagement:

RegionPeak Hours (UTC)Best Post Time
Americas14:00-02:0016:00-20:00 UTC
Europe08:00-22:0012:00-14:00 UTC
Asia00:00-14:0002:00-06:00 UTC
Global12:00-16:0014:00 UTC
def optimal_post_time(
    target_audience: str = "global"
) -> dict:
    """Calculate optimal posting time."""
    from datetime import datetime, timedelta

    now = datetime.utcnow()

    peak_hours = {
        "americas": (16, 20),
        "europe": (12, 14),
        "asia": (2, 6),
        "global": (14, 16)
    }

    start, end = peak_hours.get(target_audience, (14, 16))

    # Find next occurrence of peak window
    next_peak = now.replace(hour=start, minute=0, second=0)
    if now.hour >= end:
        next_peak += timedelta(days=1)

    return {
        "post_now": start <= now.hour < end,
        "next_peak_utc": next_peak.isoformat(),
        "hours_until_peak": (next_peak - now).total_seconds() / 3600
    }

Scheduled Operations

Cron-Style Scheduler

from dataclasses import dataclass
from typing import Callable
import asyncio

@dataclass
class ScheduledTask:
    name: str
    cron: str  # "0 2 * * *" = 2 AM daily
    handler: Callable
    protocol: str

class ProtocolScheduler:
    """Schedule operations across protocols."""

    def __init__(self):
        self.tasks: list[ScheduledTask] = []

    def add_task(self, task: ScheduledTask):
        self.tasks.append(task)

    async def run(self):
        """Main scheduler loop."""
        while True:
            now = datetime.utcnow()

            for task in self.tasks:
                if self._should_run(task.cron, now):
                    asyncio.create_task(
                        self._execute_task(task)
                    )

            await asyncio.sleep(60)  # Check every minute

    async def _execute_task(self, task: ScheduledTask):
        try:
            await task.handler()
        except Exception as e:
            print(f"Task {task.name} failed: {e}")


# Example usage
scheduler = ProtocolScheduler()

# Consolidate UTXOs at 2 AM UTC on Sundays
scheduler.add_task(ScheduledTask(
    name="utxo_consolidation",
    cron="0 2 * * 0",  # Sunday 2 AM
    handler=consolidate_utxos,
    protocol="bitcoin"
))

# Rebalance Lightning channels daily
scheduler.add_task(ScheduledTask(
    name="channel_rebalance",
    cron="0 3 * * *",  # 3 AM daily
    handler=rebalance_channels,
    protocol="lightning"
))

# Post daily status to Nostr
scheduler.add_task(ScheduledTask(
    name="daily_status",
    cron="0 14 * * *",  # 2 PM UTC
    handler=post_daily_status,
    protocol="nostr"
))

Machine-Readable Summary

{
  "topic": "timing-strategies",
  "audience": "ai-agents",
  "bitcoin_timing": {
    "low_fee_days": ["Saturday", "Sunday"],
    "low_fee_hours_utc": [2, 3, 4, 5, 6],
    "high_fee_days": ["Tuesday", "Wednesday"]
  },
  "lightning_timing": {
    "factors": ["liquidity", "congestion", "destination_connectivity"],
    "high_congestion_hours_utc": [14, 15, 16, 17, 18, 19, 20, 21, 22]
  },
  "nostr_timing": {
    "global_peak_utc": [14, 15, 16],
    "factors": ["relay_health", "audience_timezone"]
  },
  "scheduling_pattern": "cron"
}