Menu
Nostr Python Executable Jan 31, 2026

Fetch Events from Nostr Relays

Query and filter Nostr events using REQ subscriptions

#relays #filters #subscription #nip-01

Overview

Query Nostr relays to fetch events matching specific criteria. Use filters to find posts from users, hashtags, or event types.

The Code

"""
Nostr Event Fetcher
Query relays for events using filters

Requirements: websocket-client (pip install websocket-client)
"""

import json
import time
import uuid
from typing import List, Optional, Dict, Any
import websocket

# Default relays
DEFAULT_RELAYS = [
    "wss://relay.damus.io",
    "wss://nos.lol",
    "wss://relay.nostr.band"
]


def create_subscription_id() -> str:
    """Generate a unique subscription ID."""
    return str(uuid.uuid4())[:8]


def fetch_events(
    relay_url: str,
    filters: Dict[str, Any],
    limit: int = 50,
    timeout: int = 10
) -> List[dict]:
    """
    Fetch events from a relay matching filters.

    Args:
        relay_url: WebSocket URL of the relay
        filters: NIP-01 filter object
        limit: Maximum events to fetch
        timeout: Connection timeout

    Returns:
        List of matching events
    """
    events = []
    sub_id = create_subscription_id()

    # Add limit to filters
    filters["limit"] = limit

    try:
        ws = websocket.create_connection(
            relay_url,
            timeout=timeout
        )

        # Send REQ message
        req = json.dumps(["REQ", sub_id, filters])
        ws.send(req)

        # Collect events until EOSE (End of Stored Events)
        while True:
            try:
                response = ws.recv()
                data = json.loads(response)

                if data[0] == "EVENT" and data[1] == sub_id:
                    events.append(data[2])
                elif data[0] == "EOSE":
                    break
                elif data[0] == "NOTICE":
                    print(f"Relay notice: {data[1]}")

            except websocket.WebSocketTimeoutException:
                break

        # Close subscription
        ws.send(json.dumps(["CLOSE", sub_id]))
        ws.close()

    except Exception as e:
        print(f"Error fetching from {relay_url}: {e}")

    return events


def fetch_user_posts(pubkey: str, limit: int = 20) -> List[dict]:
    """
    Fetch recent posts from a specific user.

    Args:
        pubkey: User's public key (hex)
        limit: Maximum posts to fetch

    Returns:
        List of kind:1 events
    """
    filters = {
        "authors": [pubkey],
        "kinds": [1]  # Text notes
    }

    all_events = []
    for relay in DEFAULT_RELAYS[:2]:  # Query 2 relays
        events = fetch_events(relay, filters, limit=limit)
        all_events.extend(events)

    # Deduplicate by event ID
    seen = set()
    unique = []
    for event in all_events:
        if event["id"] not in seen:
            seen.add(event["id"])
            unique.append(event)

    # Sort by created_at (newest first)
    unique.sort(key=lambda e: e["created_at"], reverse=True)

    return unique[:limit]


def fetch_hashtag_posts(hashtag: str, limit: int = 50) -> List[dict]:
    """
    Fetch posts with a specific hashtag.

    Args:
        hashtag: Hashtag to search (without #)
        limit: Maximum posts to fetch

    Returns:
        List of matching events
    """
    filters = {
        "kinds": [1],
        "#t": [hashtag.lower().strip("#")]
    }

    all_events = []
    for relay in DEFAULT_RELAYS:
        events = fetch_events(relay, filters, limit=limit)
        all_events.extend(events)

    # Deduplicate
    seen = set()
    unique = []
    for event in all_events:
        if event["id"] not in seen:
            seen.add(event["id"])
            unique.append(event)

    unique.sort(key=lambda e: e["created_at"], reverse=True)
    return unique[:limit]


def fetch_global_feed(limit: int = 50, since_hours: int = 1) -> List[dict]:
    """
    Fetch recent global posts.

    Args:
        limit: Maximum posts to fetch
        since_hours: Only posts from last N hours

    Returns:
        List of recent events
    """
    since = int(time.time()) - (since_hours * 3600)

    filters = {
        "kinds": [1],
        "since": since
    }

    all_events = []
    for relay in DEFAULT_RELAYS[:2]:
        events = fetch_events(relay, filters, limit=limit)
        all_events.extend(events)

    # Deduplicate
    seen = set()
    unique = []
    for event in all_events:
        if event["id"] not in seen:
            seen.add(event["id"])
            unique.append(event)

    unique.sort(key=lambda e: e["created_at"], reverse=True)
    return unique[:limit]


def fetch_user_profile(pubkey: str) -> Optional[dict]:
    """
    Fetch a user's profile metadata.

    Args:
        pubkey: User's public key (hex)

    Returns:
        Profile metadata or None
    """
    filters = {
        "authors": [pubkey],
        "kinds": [0]  # Metadata
    }

    for relay in DEFAULT_RELAYS:
        events = fetch_events(relay, filters, limit=1)
        if events:
            try:
                content = json.loads(events[0]["content"])
                return {
                    "pubkey": pubkey,
                    "name": content.get("name", ""),
                    "about": content.get("about", ""),
                    "picture": content.get("picture", ""),
                    "nip05": content.get("nip05", ""),
                    "raw": content
                }
            except json.JSONDecodeError:
                continue

    return None


def format_event(event: dict) -> str:
    """Format an event for display."""
    created = time.strftime(
        "%Y-%m-%d %H:%M",
        time.localtime(event["created_at"])
    )
    content = event["content"][:100]
    if len(event["content"]) > 100:
        content += "..."

    return f"[{created}] {event['pubkey'][:8]}...: {content}"


# Example usage
if __name__ == "__main__":
    print("=== Nostr Event Fetcher ===\n")

    # Fetch global feed
    print("=== Recent Global Posts ===")
    global_posts = fetch_global_feed(limit=5, since_hours=24)
    print(f"Found {len(global_posts)} posts\n")

    for event in global_posts[:5]:
        print(format_event(event))
        print()

    # Fetch hashtag posts
    print("\n=== Posts with #bitcoin ===")
    bitcoin_posts = fetch_hashtag_posts("bitcoin", limit=5)
    print(f"Found {len(bitcoin_posts)} posts\n")

    for event in bitcoin_posts[:3]:
        print(format_event(event))
        print()

    # Example: Fetch specific user (jack's pubkey)
    print("\n=== Fetch User Profile ===")
    # This is jack's well-known pubkey
    jack_pubkey = "82341f882b6eabcd2ba7f1ef90aad961cf074af15b9ef44a09f9d2a8fbfbe6a2"

    profile = fetch_user_profile(jack_pubkey)
    if profile:
        print(f"Name: {profile['name']}")
        print(f"About: {profile['about'][:100]}...")
        print(f"NIP-05: {profile['nip05']}")
    else:
        print("Profile not found")

Usage

pip install websocket-client
python fetch_events.py

Example Output

=== Nostr Event Fetcher ===

=== Recent Global Posts ===
Found 50 posts

[2026-01-31 14:30] a1b2c3d4...: Just deployed a new Lightning node! The future is...

[2026-01-31 14:28] e5f6g7h8...: Anyone building on Nostr? Looking for collaborators...

=== Posts with #bitcoin ===
Found 25 posts

[2026-01-31 14:25] i9j0k1l2...: Bitcoin is freedom money. No permission needed. #bitcoin

=== Fetch User Profile ===
Name: jack
About: CEO of block. Co-founder of twitter...
NIP-05: jack@cash.app

Agent Notes

Filter options (NIP-01):

FieldTypeDescription
idsarrayEvent IDs to fetch
authorsarrayPublic keys
kindsarrayEvent kinds (1=note, 0=profile)
#earrayReferenced event IDs
#parrayReferenced pubkeys
#tarrayHashtags
sinceintUnix timestamp (min)
untilintUnix timestamp (max)
limitintMaximum events

Performance tips for agents:

  1. Always set limit to avoid overwhelming responses
  2. Use since for incremental fetches
  3. Query multiple relays for better coverage
  4. Deduplicate events by ID
  5. Cache profiles to reduce queries