Code Examples

Complete Python code examples for the Miura Nexus API

Complete, runnable Python examples demonstrating various features of the Miura Nexus API.


Sync API Quick Start

Basic usage with the synchronous API (miura). Use this for synchronous code or Jupyter notebooks.

python
import json
from datetime import datetime
from pathlib import Path

from miura import Nexus
from miura.api.datasources import LocalDataSource
from miura.api.exceptions import PermissionError
from miura.nexus.infrastructure.auth.exceptions import ApiKeyNotFoundError
from miura.logging import get_logger

logger = get_logger(__name__)


def main():
    """Demonstrate sync API usage."""
    try:
        with Nexus() as nexus:
            # List existing projects
            logger.info("=== Listing Projects ===")
            projects = nexus.list_projects()
            logger.info(f"Found {len(projects)} project(s)")
            for project in projects:
                logger.info(f"  - {project.name} ({project.uuid})")

            # Create a new project
            logger.info("=== Creating a Project ===")
            project_name = f"demo-project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
            project = nexus.create_project(project_name)
            logger.info(f"Created project: {project.name} ({project.uuid})")

            # Get project by name
            project = nexus.get_project(project_name)
            logger.info(f"Retrieved project: {project.name} ({project.uuid})")

            # Create a collection with a schema
            logger.info("=== Creating a Collection ===")
            schema_path = Path("schemas/manifold.json")
            with open(schema_path, "r") as f:
                schema = json.load(f)

            collection_name = f"demo-collection-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
            collection = project.create_collection(name=collection_name, schema=schema, metadata={})
            logger.info(f"Created collection: {collection.name} ({collection.uuid})")

            # Path-based navigation
            logger.info("=== Path-based Navigation ===")
            collection_via_path = nexus.get(f"{project_name}/{collection_name}")
            logger.info(f"Retrieved collection via path: {collection_via_path.name}")

            # List collections
            logger.info("=== Listing Collections ===")
            collections = project.list_collections()
            logger.info(f"Found {len(collections)} collection(s)")

            # Upload data
            logger.info("=== Uploading Data ===")
            data_path = Path("data/manifold")
            if data_path.exists():
                datasource = LocalDataSource(str(data_path))
                from miura.api import UploadMode
                collection.upload(datasource, mode=UploadMode.REPLACE)

                # List items
                logger.info("=== Listing Collection Items ===")
                items_response = collection.list_items(path="/", page=1, page_size=10)
                items = items_response.get("items", [])
                for item in items:
                    if item.is_folder:
                        logger.info(f"  {item.name}/")
                    else:
                        size_str = f" ({item.file_size:,} bytes)" if item.file_size else ""
                        logger.info(f"  {item.name}{size_str}")

                # Download data
                logger.info("=== Downloading Data ===")
                download_path = Path("downloads") / collection_name
                download_path.mkdir(parents=True, exist_ok=True)
                collection.download(path="/", local_path=str(download_path), confirm=True)
    except (PermissionError, ApiKeyNotFoundError) as e:
        logger.error(f"Authentication required: {e}")
        logger.error("Set MIURA_NEXUS_API_KEY or use: miura-auth token set")
        return 1


if __name__ == "__main__":
    exit(main() or 0)

Async API Quick Start

Primary async API (miura.aio). Use this for all new code.

python
import asyncio
import json
from datetime import datetime
from pathlib import Path

from miura.aio import AsyncNexus
from miura.api.datasources import LocalDataSource
from miura.api.exceptions import PermissionError
from miura.nexus.infrastructure.auth.exceptions import ApiKeyNotFoundError
from miura.logging import get_logger

logger = get_logger(__name__)


async def main():
    """Demonstrate async API usage."""
    try:
        async with AsyncNexus() as nexus:
            # List existing projects
            logger.info("=== Listing Projects ===")
            projects = await nexus.list_projects()
            logger.info(f"Found {len(projects)} project(s)")
            for project in projects:
                logger.info(f"  - {project.name} ({project.uuid})")

            # Create a new project
            logger.info("=== Creating a Project ===")
            project_name = f"demo-project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
            project = await nexus.create_project(project_name)
            logger.info(f"Created project: {project.name} ({project.uuid})")

            # Get project by name
            project = await nexus.get_project(project_name)
            logger.info(f"Retrieved project: {project.name} ({project.uuid})")

            # Create a collection with a schema
            logger.info("=== Creating a Collection ===")
            schema_path = Path("schemas/manifold.json")
            with open(schema_path, "r") as f:
                schema = json.load(f)

            collection_name = f"demo-collection-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
            collection = await project.create_collection(collection_name=collection_name, schema=schema, metadata={})
            logger.info(f"Created collection: {collection.name} ({collection.uuid})")

            # List collections
            logger.info("=== Listing Collections ===")
            collections = await project.list_collections()
            logger.info(f"Found {len(collections)} collection(s)")

            # Upload data
            logger.info("=== Uploading Data ===")
            data_path = Path("data/manifold")
            if data_path.exists():
                datasource = LocalDataSource(str(data_path))
                from miura.api import UploadMode
                await collection.upload(datasource, mode=UploadMode.REPLACE)

                # List items
                logger.info("=== Listing Collection Items ===")
                items_response = await collection.list_items(path="/", page=1, page_size=10)
                items = items_response.get("items", [])
                for item in items:
                    if item.is_folder:
                        logger.info(f"  {item.name}/")
                    else:
                        size_str = f" ({item.file_size:,} bytes)" if item.file_size else ""
                        logger.info(f"  {item.name}{size_str}")

                # Download data
                logger.info("=== Downloading Data ===")
                download_path = Path("downloads") / collection_name
                download_path.mkdir(parents=True, exist_ok=True)
                await collection.download(path="/", local_path=str(download_path), confirm=True)
    except (PermissionError, ApiKeyNotFoundError) as e:
        logger.error(f"Authentication required: {e}")
        logger.error("Set MIURA_NEXUS_API_KEY or use: miura-auth token set")
        return 1


if __name__ == "__main__":
    asyncio.run(main())

Using Iterators

Iterate over large datasets efficiently with automatic pagination handling.

python
import asyncio
from miura.aio import AsyncNexus
from miura.logging import get_logger

logger = get_logger(__name__)


async def main():
    """Demonstrate iterator usage for large datasets."""
    async with AsyncNexus() as nexus:
        # Iterate over all projects
        logger.info("=== Iterating Projects ===")
        async for project_info in nexus.iter_projects(prefetch_pages=1, page_size=10):
            logger.info(f"  - {project_info.name} ({project_info.uuid})")

        # Get a project and iterate its collections
        project = await nexus.get_project("my-project")
        logger.info("=== Iterating Collections ===")
        async for collection_info in project.iter_collections(prefetch_pages=1, page_size=10):
            logger.info(f"  - {collection_info.name} ({collection_info.uuid})")

        # Get a collection and iterate its items
        collection = await project.get_collection("my-collection")
        logger.info("=== Iterating Items ===")
        async for item in collection.iter_items(path="/", prefetch_pages=1, page_size=50):
            if item.is_folder:
                logger.info(f"  {item.name}/")
            else:
                size_str = f" ({item.file_size:,} bytes)" if item.file_size else ""
                logger.info(f"  {item.name}{size_str}")


if __name__ == "__main__":
    asyncio.run(main())

Project and Collection Deletion

Delete projects and collections, including all associated data.

python
import asyncio
from datetime import datetime
from pathlib import Path

from miura.aio import AsyncNexus
from miura.api.datasources import LocalDataSource
from miura.logging import get_logger

logger = get_logger(__name__)


async def main():
    """Demonstrate project and collection deletion."""
    async with AsyncNexus() as nexus:
        # Create a project and collection
        project_name = f"demo-project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        project = await nexus.create_project(project_name)
        logger.info(f"Created project: {project.name}")

        collection_name = f"demo-collection-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        collection = await project.create_collection(
            collection_name=collection_name,
            schema=[{"pattern": ".*\\.txt$", "min_occurrence": 0, "max_occurrence": None}],
            metadata={}
        )
        logger.info(f"Created collection: {collection.name}")

        # Upload some test data
        data_path = Path("data")
        data_path.mkdir(exist_ok=True)
        (data_path / "data1.txt").write_text("test data 1")
        (data_path / "data2.txt").write_text("test data 2")
        (data_path / "data3.txt").write_text("test data 3")

        datasource = LocalDataSource(str(data_path))
        from miura.api import UploadMode
        await collection.upload(datasource, mode=UploadMode.REPLACE)
        logger.info("Uploaded test data")

        # Delete the collection (removes collection and all associated lakehouse data)
        logger.info(f"Deleting collection: {collection_name}")
        await collection.delete_collection()
        logger.info("Collection deleted successfully")

        # Delete the project (removes project, all collections, and all data)
        logger.info(f"Deleting project: {project_name}")
        await project.delete_project()
        logger.info("Project deleted successfully")


if __name__ == "__main__":
    asyncio.run(main())

API Hooks for Observability

Use hooks to monitor API operations, collect metrics, and integrate with observability systems.

python
import asyncio
import time
from collections import defaultdict
from typing import Dict, Union

from miura.aio import AsyncNexus
from miura.api.policies import ApiHooks
from miura.logging import get_logger

logger = get_logger(__name__)

# Shared metrics for tracking API usage
metrics: Dict[str, Union[int, float]] = defaultdict(int)
request_times: Dict[str, float] = {}


def log_request(metadata: dict) -> None:
    """Log API requests and track metrics."""
    method = metadata.get("method", "unknown")
    metrics[f"requests_{method}"] += 1
    request_times[method] = time.time()

    context_parts = []
    if "project_name" in metadata:
        context_parts.append(f"project={metadata['project_name']}")
    if "collection_name" in metadata:
        context_parts.append(f"collection={metadata['collection_name']}")

    context = f" ({', '.join(context_parts)})" if context_parts else ""
    logger.info(f"→ {method}{context}")


def log_response(metadata: dict) -> None:
    """Log API responses and track duration."""
    method = metadata.get("method", "unknown")
    metrics[f"responses_{method}"] += 1

    duration = None
    if method in request_times:
        duration = time.time() - request_times.pop(method)
        metrics[f"duration_{method}"] += duration

    context_parts = []
    if "project_name" in metadata:
        context_parts.append(f"project={metadata['project_name']}")
    if "collection_name" in metadata:
        context_parts.append(f"collection={metadata['collection_name']}")

    context = f" ({', '.join(context_parts)})" if context_parts else ""
    duration_str = f" [{duration:.3f}s]" if duration is not None else ""
    logger.info(f"← {method}{context}{duration_str}")


def log_retry(metadata: dict) -> None:
    """Log retry attempts."""
    method = metadata.get("method", "unknown")
    attempt = metadata.get("attempt", 0)
    backoff = metadata.get("backoff", 0)
    metrics[f"retries_{method}"] += 1

    logger.info(f"↻ {method} (attempt {attempt}, backoff {backoff:.2f}s)")


async def main():
    """Demonstrate API hooks usage."""
    # Create hooks
    hooks = ApiHooks(
        on_request=log_request,
        on_response=log_response,
        on_retry=log_retry
    )

    # Use hooks at client level
    async with AsyncNexus(hooks=hooks) as nexus:
        logger.info("=== Client-level Hooks ===")
        projects = await nexus.list_projects()
        logger.info(f"Found {len(projects)} project(s)")

        if projects:
            project = await nexus.get_project(projects[0].name)
            collections = await project.list_collections()
            logger.info(f"Found {len(collections)} collection(s)")

    # Print metrics summary
    logger.info("=== Metrics Summary ===")
    for key, value in sorted(metrics.items()):
        logger.info(f"  {key}: {value}")


if __name__ == "__main__":
    asyncio.run(main())

Schema Generation

Generate schemas automatically from your data structure.

python
import asyncio
from pathlib import Path

from miura.aio import AsyncNexus
from miura.api import generate_schema_from_path, SchemaGenOptions
from miura.logging import get_logger

logger = get_logger(__name__)


async def main():
    """Demonstrate schema generation."""
    async with AsyncNexus() as nexus:
        # Generate schema from a local directory
        data_path = Path("data/manifold")
        
        options = SchemaGenOptions(
            min_occurrence=1,  # Files must appear at least once
            max_occurrence=None,  # No maximum limit
            include_metadata=False  # Don't include file metadata in schema
        )
        
        schema = generate_schema_from_path(str(data_path), options)
        logger.info("Generated schema:")
        logger.info(schema)

        # Create a project and collection with the generated schema
        project = await nexus.create_project("my-project")
        collection = await project.create_collection(
            collection_name="my-collection",
            schema=schema,
            metadata={}
        )
        logger.info(f"Created collection with generated schema: {collection.name}")


if __name__ == "__main__":
    asyncio.run(main())

Downloading Items

Download collection content to a local directory.

python
import asyncio
from pathlib import Path

from miura.aio import AsyncNexus
from miura.logging import get_logger

logger = get_logger(__name__)


async def main():
    """Demonstrate downloading collection items."""
    async with AsyncNexus() as nexus:
        project = await nexus.get_project("my-project")
        collection = await project.get_collection("my-collection")

        # Download entire collection
        download_path = Path("downloads") / "my-collection"
        download_path.mkdir(parents=True, exist_ok=True)
        
        logger.info(f"Downloading collection to: {download_path}")
        await collection.download(path="/", local_path=str(download_path), confirm=True)

        # Download a specific folder
        folder_path = Path("downloads") / "my-folder"
        folder_path.mkdir(parents=True, exist_ok=True)
        
        logger.info(f"Downloading folder to: {folder_path}")
        await collection.download(path="/data", local_path=str(folder_path), confirm=True)

        # Download a specific file
        file_path = Path("downloads") / "my-file.txt"
        file_path.parent.mkdir(parents=True, exist_ok=True)
        
        logger.info(f"Downloading file to: {file_path}")
        await collection.download(path="/data/file.txt", local_path=str(file_path), confirm=True)


if __name__ == "__main__":
    asyncio.run(main())

More Examples

For additional examples and use cases, see:

© 2025