Menu
Strategy Advanced 9 min read

Multi-Protocol Workflows

Combining Bitcoin, Lightning, and Nostr in sophisticated agent workflows. Atomic operations, fallback strategies, and protocol bridging.

multi-protocol workflows integration orchestration

Multi-Protocol Workflows

Real-world agent operations often require coordinating multiple protocols. This guide covers workflow patterns, atomicity considerations, and implementation strategies.

Why Multi-Protocol?

No single protocol handles all use cases optimally:

Use CaseSingle Protocol LimitationMulti-Protocol Solution
Large instant paymentBitcoin slow, Lightning capacity limitsLightning + on-chain settlement
Paid content accessLightning has no identityLightning payment + Nostr DM
Verified broadcastNostr has no finalityNostr post + Bitcoin timestamp
Streaming payment with receiptsLightning has no receipt formatLightning stream + Nostr events

Workflow Patterns

Pattern 1: Payment + Notification

Scenario: Send payment and notify recipient via Nostr.

async def pay_and_notify(
    recipient_npub: str,
    recipient_lnurl: str,
    amount_sats: int,
    message: str
) -> dict:
    """
    Pay via Lightning, notify via Nostr.

    Flow:
    1. Resolve LNURL to get invoice
    2. Pay the invoice
    3. Post Nostr DM with payment proof
    """
    # Step 1: Get invoice from LNURL
    invoice = await lnurl_pay(recipient_lnurl, amount_sats)

    # Step 2: Pay the invoice
    payment_result = await lightning.pay(invoice["pr"])

    if not payment_result["success"]:
        return {"error": "payment_failed", "details": payment_result}

    # Step 3: Notify via Nostr
    proof_message = f"""
Payment sent!
Amount: {amount_sats} sats
Preimage: {payment_result['preimage']}
Message: {message}
"""

    dm_result = await nostr.send_dm(
        recipient_npub=recipient_npub,
        content=proof_message
    )

    return {
        "payment": payment_result,
        "notification": dm_result,
        "status": "complete"
    }

Pattern 2: Broadcast + Timestamp

Scenario: Post public content and anchor hash to Bitcoin for proof of existence.

async def broadcast_with_timestamp(
    content: str,
    anchor_to_bitcoin: bool = False
) -> dict:
    """
    Post to Nostr with optional Bitcoin timestamp.

    Flow:
    1. Create and sign Nostr event
    2. Publish to relays
    3. If anchoring, create OP_RETURN tx with event hash
    """
    import hashlib

    # Step 1: Create Nostr event
    event = await nostr.create_event(
        kind=1,
        content=content
    )

    # Step 2: Publish
    publish_result = await nostr.publish(event)

    result = {
        "nostr_event_id": event["id"],
        "relays": publish_result["relays"]
    }

    # Step 3: Optional Bitcoin anchor
    if anchor_to_bitcoin:
        event_hash = hashlib.sha256(event["id"].encode()).hexdigest()

        # OP_RETURN with 32-byte hash
        anchor_tx = await bitcoin.create_op_return(
            data=bytes.fromhex(event_hash[:64])  # 32 bytes max
        )

        result["bitcoin_anchor"] = {
            "txid": anchor_tx["txid"],
            "event_hash": event_hash[:64]
        }

    return result

Pattern 3: Escrow with Arbitration

Scenario: Two-party escrow with Nostr-based arbitration.

async def create_escrow(
    buyer_npub: str,
    seller_npub: str,
    arbiter_npub: str,
    amount_sats: int,
    terms: str
) -> dict:
    """
    Create escrow with Nostr coordination.

    Flow:
    1. Create 2-of-3 multisig address
    2. Buyer funds the address
    3. Post escrow terms to Nostr (encrypted to all parties)
    4. Await release or dispute
    """
    # Step 1: Derive pubkeys (simplified)
    pubkeys = [
        await get_btc_pubkey(buyer_npub),
        await get_btc_pubkey(seller_npub),
        await get_btc_pubkey(arbiter_npub)
    ]

    # Create 2-of-3 multisig
    escrow_address = await bitcoin.create_multisig(2, pubkeys)

    # Step 2: Generate funding invoice
    funding_info = {
        "address": escrow_address["address"],
        "amount_sats": amount_sats,
        "redeem_script": escrow_address["redeem_script"]
    }

    # Step 3: Post terms to Nostr (encrypted group message)
    terms_event = {
        "kind": 4,  # Encrypted DM (or NIP-44)
        "content": f"Escrow Terms:\n{terms}\n\nAddress: {escrow_address['address']}",
        "participants": [buyer_npub, seller_npub, arbiter_npub]
    }

    await nostr.post_group_dm(terms_event)

    return {
        "escrow_address": escrow_address["address"],
        "participants": {
            "buyer": buyer_npub,
            "seller": seller_npub,
            "arbiter": arbiter_npub
        },
        "amount_sats": amount_sats,
        "status": "awaiting_funding"
    }

Pattern 4: Streaming Payment with Events

Scenario: Pay per minute with Nostr event logging.

async def streaming_payment(
    recipient_lnurl: str,
    rate_sats_per_minute: int,
    duration_minutes: int,
    event_topic: str
) -> dict:
    """
    Stream sats with Nostr event log.

    Flow:
    1. Start payment stream
    2. Post start event to Nostr
    3. Send payment every minute
    4. Log each payment to Nostr
    5. Post completion event
    """
    import asyncio

    # Post start event
    start_event = await nostr.post(
        content=f"Starting stream: {rate_sats_per_minute} sats/min for {duration_minutes} min",
        tags=[["t", event_topic]]
    )

    payments = []

    for minute in range(duration_minutes):
        # Wait one minute (except first)
        if minute > 0:
            await asyncio.sleep(60)

        # Send payment
        payment = await lnurl_pay(recipient_lnurl, rate_sats_per_minute)
        payments.append(payment)

        # Log to Nostr
        await nostr.post(
            content=f"Minute {minute + 1}: Paid {rate_sats_per_minute} sats",
            tags=[
                ["t", event_topic],
                ["e", start_event["id"], "", "root"]
            ]
        )

    # Post completion
    total_paid = rate_sats_per_minute * duration_minutes
    await nostr.post(
        content=f"Stream complete: {total_paid} total sats over {duration_minutes} minutes",
        tags=[
            ["t", event_topic],
            ["e", start_event["id"], "", "root"]
        ]
    )

    return {
        "start_event": start_event["id"],
        "payments": len(payments),
        "total_sats": total_paid
    }

Atomicity Considerations

Multi-protocol operations are inherently non-atomic. Strategies for handling partial failures:

Strategy 1: Idempotent Operations

Design each step to be safely retryable:

async def idempotent_pay_and_post(
    operation_id: str,  # Unique ID for this operation
    payment_params: dict,
    post_params: dict
) -> dict:
    """
    Idempotent multi-protocol operation.

    Uses operation_id to:
    - Check if payment already made
    - Check if post already published
    """
    # Check existing state
    state = await get_operation_state(operation_id)

    # Step 1: Payment (if not done)
    if not state.get("payment_complete"):
        payment = await lightning.pay(
            payment_params["invoice"],
            idempotency_key=f"{operation_id}_payment"
        )
        await save_operation_state(operation_id, {"payment_complete": True, "payment": payment})
    else:
        payment = state["payment"]

    # Step 2: Post (if not done)
    if not state.get("post_complete"):
        post = await nostr.post(
            post_params["content"],
            tags=[["i", operation_id]]  # Include operation ID
        )
        await save_operation_state(operation_id, {"post_complete": True, "post": post})
    else:
        post = state["post"]

    return {"payment": payment, "post": post, "operation_id": operation_id}

Strategy 2: Saga Pattern

For complex workflows, use compensating transactions:

class PaymentSaga:
    """Saga pattern for multi-protocol payments."""

    def __init__(self):
        self.steps_completed = []

    async def execute(self, workflow: dict) -> dict:
        try:
            # Execute forward steps
            for step in workflow["steps"]:
                result = await self._execute_step(step)
                self.steps_completed.append({"step": step, "result": result})

            return {"status": "complete", "steps": self.steps_completed}

        except Exception as e:
            # Compensate in reverse order
            await self._compensate()
            return {"status": "rolled_back", "error": str(e)}

    async def _execute_step(self, step: dict):
        if step["protocol"] == "lightning":
            return await lightning.pay(step["invoice"])
        elif step["protocol"] == "nostr":
            return await nostr.post(step["content"])
        elif step["protocol"] == "bitcoin":
            return await bitcoin.send(step["address"], step["amount"])

    async def _compensate(self):
        """Undo completed steps in reverse."""
        for completed in reversed(self.steps_completed):
            step = completed["step"]

            if step.get("compensate"):
                # Execute compensation action
                comp = step["compensate"]
                if comp["action"] == "refund":
                    await lightning.pay(comp["invoice"])
                elif comp["action"] == "post_cancellation":
                    await nostr.post(comp["content"])

Strategy 3: Two-Phase Commit

For operations requiring strong consistency:

async def two_phase_commit(operations: list[dict]) -> dict:
    """
    Two-phase commit across protocols.

    Phase 1: Prepare all operations
    Phase 2: Commit if all prepared, else abort
    """
    prepared = []

    # Phase 1: Prepare
    try:
        for op in operations:
            if op["protocol"] == "lightning":
                # Hold invoice (don't pay yet)
                hold = await lightning.hold_invoice(op["invoice"])
                prepared.append({"op": op, "hold": hold})

            elif op["protocol"] == "nostr":
                # Create but don't publish
                event = await nostr.create_event(kind=1, content=op["content"])
                prepared.append({"op": op, "event": event})

    except Exception as e:
        # Abort: release all holds
        for p in prepared:
            if "hold" in p:
                await lightning.cancel_hold(p["hold"]["id"])
        return {"status": "aborted", "phase": "prepare", "error": str(e)}

    # Phase 2: Commit
    results = []
    try:
        for p in prepared:
            if "hold" in p:
                result = await lightning.settle_hold(p["hold"]["id"])
            elif "event" in p:
                result = await nostr.publish(p["event"])
            results.append(result)

        return {"status": "committed", "results": results}

    except Exception as e:
        # Partial commit - log for manual resolution
        return {
            "status": "partial_commit",
            "committed": results,
            "error": str(e)
        }

Protocol Bridging

Lightning <-> Nostr (Zaps)

async def zap_user(
    recipient_npub: str,
    amount_sats: int,
    comment: str = ""
) -> dict:
    """
    Send a zap (NIP-57) to Nostr user.

    Flow:
    1. Fetch recipient's LNURL from kind:0
    2. Create zap request event
    3. Get invoice from LNURL with zap request
    4. Pay invoice
    """
    # Get recipient's profile
    profile = await nostr.get_profile(recipient_npub)
    lnurl = profile.get("lud16") or profile.get("lud06")

    if not lnurl:
        return {"error": "recipient_has_no_lightning"}

    # Create zap request (kind:9734)
    zap_request = await nostr.create_event(
        kind=9734,
        content=comment,
        tags=[
            ["p", recipient_npub],
            ["amount", str(amount_sats * 1000)],  # millisats
            ["relays", *DEFAULT_RELAYS]
        ]
    )

    # Get invoice from LNURL
    invoice = await lnurl_pay_with_zap(
        lnurl=lnurl,
        amount_sats=amount_sats,
        zap_request=zap_request
    )

    # Pay
    payment = await lightning.pay(invoice["pr"])

    return {
        "zap_request_id": zap_request["id"],
        "payment_hash": payment["payment_hash"],
        "amount_sats": amount_sats
    }

Bitcoin <-> Lightning (Submarine Swaps)

async def swap_onchain_to_lightning(
    lightning_invoice: str,
    onchain_address: str  # For refund
) -> dict:
    """
    Pay Lightning invoice using on-chain Bitcoin.

    Uses a swap service (e.g., Boltz, Loop).
    """
    # Decode invoice to get amount
    invoice_details = await lightning.decode_invoice(lightning_invoice)
    amount_sats = invoice_details["amount_msat"] // 1000

    # Get swap quote
    swap = await boltz.create_swap(
        type="submarine",
        invoice=lightning_invoice,
        refund_address=onchain_address
    )

    # Returns on-chain address to fund
    return {
        "deposit_address": swap["address"],
        "amount_to_send": swap["expected_amount"],
        "timeout_block": swap["timeout_block"],
        "swap_id": swap["id"],
        "status": "awaiting_deposit"
    }

Workflow Orchestration

class WorkflowOrchestrator:
    """
    Orchestrate multi-protocol workflows.
    """

    def __init__(self):
        self.workflows = {}

    async def execute_workflow(
        self,
        workflow_type: str,
        params: dict
    ) -> dict:
        """Execute predefined workflow."""

        if workflow_type == "pay_and_notify":
            return await self._pay_and_notify(params)

        elif workflow_type == "broadcast_and_anchor":
            return await self._broadcast_and_anchor(params)

        elif workflow_type == "zap":
            return await self._zap(params)

        else:
            return {"error": f"Unknown workflow: {workflow_type}"}

    async def _pay_and_notify(self, params: dict) -> dict:
        """Pay Lightning, notify Nostr."""
        payment = await lightning.pay(params["invoice"])

        if payment["success"]:
            notification = await nostr.send_dm(
                params["recipient_npub"],
                f"Payment complete: {payment['preimage']}"
            )
            return {"payment": payment, "notification": notification}

        return {"error": "payment_failed", "details": payment}

    async def _broadcast_and_anchor(self, params: dict) -> dict:
        """Post Nostr, anchor to Bitcoin."""
        event = await nostr.post(params["content"])

        if params.get("anchor"):
            anchor = await bitcoin.create_op_return(
                bytes.fromhex(event["id"])
            )
            return {"event": event, "anchor": anchor}

        return {"event": event}

    async def _zap(self, params: dict) -> dict:
        """Send zap to Nostr user."""
        return await zap_user(
            params["recipient"],
            params["amount_sats"],
            params.get("comment", "")
        )

Machine-Readable Summary

{
  "topic": "multi-protocol-workflows",
  "audience": "ai-agents",
  "workflow_patterns": [
    "payment_plus_notification",
    "broadcast_plus_timestamp",
    "escrow_with_arbitration",
    "streaming_payment"
  ],
  "atomicity_strategies": [
    "idempotent_operations",
    "saga_pattern",
    "two_phase_commit"
  ],
  "protocol_bridges": [
    "lightning_nostr_zaps",
    "bitcoin_lightning_swaps"
  ],
  "implementation_language": "python"
}