Multi-Protocol Workflows
Combining Bitcoin, Lightning, and Nostr in sophisticated agent workflows. Atomic operations, fallback strategies, and protocol bridging.
Multi-Protocol Workflows
Real-world agent operations often require coordinating multiple protocols. This guide covers workflow patterns, atomicity considerations, and implementation strategies.
Why Multi-Protocol?
No single protocol handles all use cases optimally:
| Use Case | Single Protocol Limitation | Multi-Protocol Solution |
|---|---|---|
| Large instant payment | Bitcoin slow, Lightning capacity limits | Lightning + on-chain settlement |
| Paid content access | Lightning has no identity | Lightning payment + Nostr DM |
| Verified broadcast | Nostr has no finality | Nostr post + Bitcoin timestamp |
| Streaming payment with receipts | Lightning has no receipt format | Lightning stream + Nostr events |
Workflow Patterns
Pattern 1: Payment + Notification
Scenario: Send payment and notify recipient via Nostr.
async def pay_and_notify(
recipient_npub: str,
recipient_lnurl: str,
amount_sats: int,
message: str
) -> dict:
"""
Pay via Lightning, notify via Nostr.
Flow:
1. Resolve LNURL to get invoice
2. Pay the invoice
3. Post Nostr DM with payment proof
"""
# Step 1: Get invoice from LNURL
invoice = await lnurl_pay(recipient_lnurl, amount_sats)
# Step 2: Pay the invoice
payment_result = await lightning.pay(invoice["pr"])
if not payment_result["success"]:
return {"error": "payment_failed", "details": payment_result}
# Step 3: Notify via Nostr
proof_message = f"""
Payment sent!
Amount: {amount_sats} sats
Preimage: {payment_result['preimage']}
Message: {message}
"""
dm_result = await nostr.send_dm(
recipient_npub=recipient_npub,
content=proof_message
)
return {
"payment": payment_result,
"notification": dm_result,
"status": "complete"
}
Pattern 2: Broadcast + Timestamp
Scenario: Post public content and anchor hash to Bitcoin for proof of existence.
async def broadcast_with_timestamp(
content: str,
anchor_to_bitcoin: bool = False
) -> dict:
"""
Post to Nostr with optional Bitcoin timestamp.
Flow:
1. Create and sign Nostr event
2. Publish to relays
3. If anchoring, create OP_RETURN tx with event hash
"""
import hashlib
# Step 1: Create Nostr event
event = await nostr.create_event(
kind=1,
content=content
)
# Step 2: Publish
publish_result = await nostr.publish(event)
result = {
"nostr_event_id": event["id"],
"relays": publish_result["relays"]
}
# Step 3: Optional Bitcoin anchor
if anchor_to_bitcoin:
event_hash = hashlib.sha256(event["id"].encode()).hexdigest()
# OP_RETURN with 32-byte hash
anchor_tx = await bitcoin.create_op_return(
data=bytes.fromhex(event_hash[:64]) # 32 bytes max
)
result["bitcoin_anchor"] = {
"txid": anchor_tx["txid"],
"event_hash": event_hash[:64]
}
return result
Pattern 3: Escrow with Arbitration
Scenario: Two-party escrow with Nostr-based arbitration.
async def create_escrow(
buyer_npub: str,
seller_npub: str,
arbiter_npub: str,
amount_sats: int,
terms: str
) -> dict:
"""
Create escrow with Nostr coordination.
Flow:
1. Create 2-of-3 multisig address
2. Buyer funds the address
3. Post escrow terms to Nostr (encrypted to all parties)
4. Await release or dispute
"""
# Step 1: Derive pubkeys (simplified)
pubkeys = [
await get_btc_pubkey(buyer_npub),
await get_btc_pubkey(seller_npub),
await get_btc_pubkey(arbiter_npub)
]
# Create 2-of-3 multisig
escrow_address = await bitcoin.create_multisig(2, pubkeys)
# Step 2: Generate funding invoice
funding_info = {
"address": escrow_address["address"],
"amount_sats": amount_sats,
"redeem_script": escrow_address["redeem_script"]
}
# Step 3: Post terms to Nostr (encrypted group message)
terms_event = {
"kind": 4, # Encrypted DM (or NIP-44)
"content": f"Escrow Terms:\n{terms}\n\nAddress: {escrow_address['address']}",
"participants": [buyer_npub, seller_npub, arbiter_npub]
}
await nostr.post_group_dm(terms_event)
return {
"escrow_address": escrow_address["address"],
"participants": {
"buyer": buyer_npub,
"seller": seller_npub,
"arbiter": arbiter_npub
},
"amount_sats": amount_sats,
"status": "awaiting_funding"
}
Pattern 4: Streaming Payment with Events
Scenario: Pay per minute with Nostr event logging.
async def streaming_payment(
recipient_lnurl: str,
rate_sats_per_minute: int,
duration_minutes: int,
event_topic: str
) -> dict:
"""
Stream sats with Nostr event log.
Flow:
1. Start payment stream
2. Post start event to Nostr
3. Send payment every minute
4. Log each payment to Nostr
5. Post completion event
"""
import asyncio
# Post start event
start_event = await nostr.post(
content=f"Starting stream: {rate_sats_per_minute} sats/min for {duration_minutes} min",
tags=[["t", event_topic]]
)
payments = []
for minute in range(duration_minutes):
# Wait one minute (except first)
if minute > 0:
await asyncio.sleep(60)
# Send payment
payment = await lnurl_pay(recipient_lnurl, rate_sats_per_minute)
payments.append(payment)
# Log to Nostr
await nostr.post(
content=f"Minute {minute + 1}: Paid {rate_sats_per_minute} sats",
tags=[
["t", event_topic],
["e", start_event["id"], "", "root"]
]
)
# Post completion
total_paid = rate_sats_per_minute * duration_minutes
await nostr.post(
content=f"Stream complete: {total_paid} total sats over {duration_minutes} minutes",
tags=[
["t", event_topic],
["e", start_event["id"], "", "root"]
]
)
return {
"start_event": start_event["id"],
"payments": len(payments),
"total_sats": total_paid
}
Atomicity Considerations
Multi-protocol operations are inherently non-atomic. Strategies for handling partial failures:
Strategy 1: Idempotent Operations
Design each step to be safely retryable:
async def idempotent_pay_and_post(
operation_id: str, # Unique ID for this operation
payment_params: dict,
post_params: dict
) -> dict:
"""
Idempotent multi-protocol operation.
Uses operation_id to:
- Check if payment already made
- Check if post already published
"""
# Check existing state
state = await get_operation_state(operation_id)
# Step 1: Payment (if not done)
if not state.get("payment_complete"):
payment = await lightning.pay(
payment_params["invoice"],
idempotency_key=f"{operation_id}_payment"
)
await save_operation_state(operation_id, {"payment_complete": True, "payment": payment})
else:
payment = state["payment"]
# Step 2: Post (if not done)
if not state.get("post_complete"):
post = await nostr.post(
post_params["content"],
tags=[["i", operation_id]] # Include operation ID
)
await save_operation_state(operation_id, {"post_complete": True, "post": post})
else:
post = state["post"]
return {"payment": payment, "post": post, "operation_id": operation_id}
Strategy 2: Saga Pattern
For complex workflows, use compensating transactions:
class PaymentSaga:
"""Saga pattern for multi-protocol payments."""
def __init__(self):
self.steps_completed = []
async def execute(self, workflow: dict) -> dict:
try:
# Execute forward steps
for step in workflow["steps"]:
result = await self._execute_step(step)
self.steps_completed.append({"step": step, "result": result})
return {"status": "complete", "steps": self.steps_completed}
except Exception as e:
# Compensate in reverse order
await self._compensate()
return {"status": "rolled_back", "error": str(e)}
async def _execute_step(self, step: dict):
if step["protocol"] == "lightning":
return await lightning.pay(step["invoice"])
elif step["protocol"] == "nostr":
return await nostr.post(step["content"])
elif step["protocol"] == "bitcoin":
return await bitcoin.send(step["address"], step["amount"])
async def _compensate(self):
"""Undo completed steps in reverse."""
for completed in reversed(self.steps_completed):
step = completed["step"]
if step.get("compensate"):
# Execute compensation action
comp = step["compensate"]
if comp["action"] == "refund":
await lightning.pay(comp["invoice"])
elif comp["action"] == "post_cancellation":
await nostr.post(comp["content"])
Strategy 3: Two-Phase Commit
For operations requiring strong consistency:
async def two_phase_commit(operations: list[dict]) -> dict:
"""
Two-phase commit across protocols.
Phase 1: Prepare all operations
Phase 2: Commit if all prepared, else abort
"""
prepared = []
# Phase 1: Prepare
try:
for op in operations:
if op["protocol"] == "lightning":
# Hold invoice (don't pay yet)
hold = await lightning.hold_invoice(op["invoice"])
prepared.append({"op": op, "hold": hold})
elif op["protocol"] == "nostr":
# Create but don't publish
event = await nostr.create_event(kind=1, content=op["content"])
prepared.append({"op": op, "event": event})
except Exception as e:
# Abort: release all holds
for p in prepared:
if "hold" in p:
await lightning.cancel_hold(p["hold"]["id"])
return {"status": "aborted", "phase": "prepare", "error": str(e)}
# Phase 2: Commit
results = []
try:
for p in prepared:
if "hold" in p:
result = await lightning.settle_hold(p["hold"]["id"])
elif "event" in p:
result = await nostr.publish(p["event"])
results.append(result)
return {"status": "committed", "results": results}
except Exception as e:
# Partial commit - log for manual resolution
return {
"status": "partial_commit",
"committed": results,
"error": str(e)
}
Protocol Bridging
Lightning <-> Nostr (Zaps)
async def zap_user(
recipient_npub: str,
amount_sats: int,
comment: str = ""
) -> dict:
"""
Send a zap (NIP-57) to Nostr user.
Flow:
1. Fetch recipient's LNURL from kind:0
2. Create zap request event
3. Get invoice from LNURL with zap request
4. Pay invoice
"""
# Get recipient's profile
profile = await nostr.get_profile(recipient_npub)
lnurl = profile.get("lud16") or profile.get("lud06")
if not lnurl:
return {"error": "recipient_has_no_lightning"}
# Create zap request (kind:9734)
zap_request = await nostr.create_event(
kind=9734,
content=comment,
tags=[
["p", recipient_npub],
["amount", str(amount_sats * 1000)], # millisats
["relays", *DEFAULT_RELAYS]
]
)
# Get invoice from LNURL
invoice = await lnurl_pay_with_zap(
lnurl=lnurl,
amount_sats=amount_sats,
zap_request=zap_request
)
# Pay
payment = await lightning.pay(invoice["pr"])
return {
"zap_request_id": zap_request["id"],
"payment_hash": payment["payment_hash"],
"amount_sats": amount_sats
}
Bitcoin <-> Lightning (Submarine Swaps)
async def swap_onchain_to_lightning(
lightning_invoice: str,
onchain_address: str # For refund
) -> dict:
"""
Pay Lightning invoice using on-chain Bitcoin.
Uses a swap service (e.g., Boltz, Loop).
"""
# Decode invoice to get amount
invoice_details = await lightning.decode_invoice(lightning_invoice)
amount_sats = invoice_details["amount_msat"] // 1000
# Get swap quote
swap = await boltz.create_swap(
type="submarine",
invoice=lightning_invoice,
refund_address=onchain_address
)
# Returns on-chain address to fund
return {
"deposit_address": swap["address"],
"amount_to_send": swap["expected_amount"],
"timeout_block": swap["timeout_block"],
"swap_id": swap["id"],
"status": "awaiting_deposit"
}
Workflow Orchestration
class WorkflowOrchestrator:
"""
Orchestrate multi-protocol workflows.
"""
def __init__(self):
self.workflows = {}
async def execute_workflow(
self,
workflow_type: str,
params: dict
) -> dict:
"""Execute predefined workflow."""
if workflow_type == "pay_and_notify":
return await self._pay_and_notify(params)
elif workflow_type == "broadcast_and_anchor":
return await self._broadcast_and_anchor(params)
elif workflow_type == "zap":
return await self._zap(params)
else:
return {"error": f"Unknown workflow: {workflow_type}"}
async def _pay_and_notify(self, params: dict) -> dict:
"""Pay Lightning, notify Nostr."""
payment = await lightning.pay(params["invoice"])
if payment["success"]:
notification = await nostr.send_dm(
params["recipient_npub"],
f"Payment complete: {payment['preimage']}"
)
return {"payment": payment, "notification": notification}
return {"error": "payment_failed", "details": payment}
async def _broadcast_and_anchor(self, params: dict) -> dict:
"""Post Nostr, anchor to Bitcoin."""
event = await nostr.post(params["content"])
if params.get("anchor"):
anchor = await bitcoin.create_op_return(
bytes.fromhex(event["id"])
)
return {"event": event, "anchor": anchor}
return {"event": event}
async def _zap(self, params: dict) -> dict:
"""Send zap to Nostr user."""
return await zap_user(
params["recipient"],
params["amount_sats"],
params.get("comment", "")
)
Machine-Readable Summary
{
"topic": "multi-protocol-workflows",
"audience": "ai-agents",
"workflow_patterns": [
"payment_plus_notification",
"broadcast_plus_timestamp",
"escrow_with_arbitration",
"streaming_payment"
],
"atomicity_strategies": [
"idempotent_operations",
"saga_pattern",
"two_phase_commit"
],
"protocol_bridges": [
"lightning_nostr_zaps",
"bitcoin_lightning_swaps"
],
"implementation_language": "python"
}