Code Examples
Complete Python code examples for the Miura Nexus API
Complete, runnable Python examples demonstrating various features of the Miura Nexus API.
Sync API Quick Start
Basic usage with the synchronous API (miura). Use this for synchronous code or Jupyter notebooks.
python
import json
from datetime import datetime
from pathlib import Path
from miura import Nexus
from miura.api.datasources import LocalDataSource
from miura.api.exceptions import PermissionError
from miura.nexus.infrastructure.auth.exceptions import ApiKeyNotFoundError
from miura.logging import get_logger
logger = get_logger(__name__)
def main():
"""Demonstrate sync API usage."""
try:
with Nexus() as nexus:
# List existing projects
logger.info("=== Listing Projects ===")
projects = nexus.list_projects()
logger.info(f"Found {len(projects)} project(s)")
for project in projects:
logger.info(f" - {project.name} ({project.uuid})")
# Create a new project
logger.info("=== Creating a Project ===")
project_name = f"demo-project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
project = nexus.create_project(project_name)
logger.info(f"Created project: {project.name} ({project.uuid})")
# Get project by name
project = nexus.get_project(project_name)
logger.info(f"Retrieved project: {project.name} ({project.uuid})")
# Create a collection with a schema
logger.info("=== Creating a Collection ===")
schema_path = Path("schemas/manifold.json")
with open(schema_path, "r") as f:
schema = json.load(f)
collection_name = f"demo-collection-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
collection = project.create_collection(name=collection_name, schema=schema, metadata={})
logger.info(f"Created collection: {collection.name} ({collection.uuid})")
# Path-based navigation
logger.info("=== Path-based Navigation ===")
collection_via_path = nexus.get(f"{project_name}/{collection_name}")
logger.info(f"Retrieved collection via path: {collection_via_path.name}")
# List collections
logger.info("=== Listing Collections ===")
collections = project.list_collections()
logger.info(f"Found {len(collections)} collection(s)")
# Upload data
logger.info("=== Uploading Data ===")
data_path = Path("data/manifold")
if data_path.exists():
datasource = LocalDataSource(str(data_path))
from miura.api import UploadMode
collection.upload(datasource, mode=UploadMode.REPLACE)
# List items
logger.info("=== Listing Collection Items ===")
items_response = collection.list_items(path="/", page=1, page_size=10)
items = items_response.get("items", [])
for item in items:
if item.is_folder:
logger.info(f" {item.name}/")
else:
size_str = f" ({item.file_size:,} bytes)" if item.file_size else ""
logger.info(f" {item.name}{size_str}")
# Download data
logger.info("=== Downloading Data ===")
download_path = Path("downloads") / collection_name
download_path.mkdir(parents=True, exist_ok=True)
collection.download(path="/", local_path=str(download_path), confirm=True)
except (PermissionError, ApiKeyNotFoundError) as e:
logger.error(f"Authentication required: {e}")
logger.error("Set MIURA_NEXUS_API_KEY or use: miura-auth token set")
return 1
if __name__ == "__main__":
exit(main() or 0)
Async API Quick Start
Primary async API (miura.aio). Use this for all new code.
python
import asyncio
import json
from datetime import datetime
from pathlib import Path
from miura.aio import AsyncNexus
from miura.api.datasources import LocalDataSource
from miura.api.exceptions import PermissionError
from miura.nexus.infrastructure.auth.exceptions import ApiKeyNotFoundError
from miura.logging import get_logger
logger = get_logger(__name__)
async def main():
"""Demonstrate async API usage."""
try:
async with AsyncNexus() as nexus:
# List existing projects
logger.info("=== Listing Projects ===")
projects = await nexus.list_projects()
logger.info(f"Found {len(projects)} project(s)")
for project in projects:
logger.info(f" - {project.name} ({project.uuid})")
# Create a new project
logger.info("=== Creating a Project ===")
project_name = f"demo-project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
project = await nexus.create_project(project_name)
logger.info(f"Created project: {project.name} ({project.uuid})")
# Get project by name
project = await nexus.get_project(project_name)
logger.info(f"Retrieved project: {project.name} ({project.uuid})")
# Create a collection with a schema
logger.info("=== Creating a Collection ===")
schema_path = Path("schemas/manifold.json")
with open(schema_path, "r") as f:
schema = json.load(f)
collection_name = f"demo-collection-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
collection = await project.create_collection(collection_name=collection_name, schema=schema, metadata={})
logger.info(f"Created collection: {collection.name} ({collection.uuid})")
# List collections
logger.info("=== Listing Collections ===")
collections = await project.list_collections()
logger.info(f"Found {len(collections)} collection(s)")
# Upload data
logger.info("=== Uploading Data ===")
data_path = Path("data/manifold")
if data_path.exists():
datasource = LocalDataSource(str(data_path))
from miura.api import UploadMode
await collection.upload(datasource, mode=UploadMode.REPLACE)
# List items
logger.info("=== Listing Collection Items ===")
items_response = await collection.list_items(path="/", page=1, page_size=10)
items = items_response.get("items", [])
for item in items:
if item.is_folder:
logger.info(f" {item.name}/")
else:
size_str = f" ({item.file_size:,} bytes)" if item.file_size else ""
logger.info(f" {item.name}{size_str}")
# Download data
logger.info("=== Downloading Data ===")
download_path = Path("downloads") / collection_name
download_path.mkdir(parents=True, exist_ok=True)
await collection.download(path="/", local_path=str(download_path), confirm=True)
except (PermissionError, ApiKeyNotFoundError) as e:
logger.error(f"Authentication required: {e}")
logger.error("Set MIURA_NEXUS_API_KEY or use: miura-auth token set")
return 1
if __name__ == "__main__":
asyncio.run(main())
Using Iterators
Iterate over large datasets efficiently with automatic pagination handling.
python
import asyncio
from miura.aio import AsyncNexus
from miura.logging import get_logger
logger = get_logger(__name__)
async def main():
"""Demonstrate iterator usage for large datasets."""
async with AsyncNexus() as nexus:
# Iterate over all projects
logger.info("=== Iterating Projects ===")
async for project_info in nexus.iter_projects(prefetch_pages=1, page_size=10):
logger.info(f" - {project_info.name} ({project_info.uuid})")
# Get a project and iterate its collections
project = await nexus.get_project("my-project")
logger.info("=== Iterating Collections ===")
async for collection_info in project.iter_collections(prefetch_pages=1, page_size=10):
logger.info(f" - {collection_info.name} ({collection_info.uuid})")
# Get a collection and iterate its items
collection = await project.get_collection("my-collection")
logger.info("=== Iterating Items ===")
async for item in collection.iter_items(path="/", prefetch_pages=1, page_size=50):
if item.is_folder:
logger.info(f" {item.name}/")
else:
size_str = f" ({item.file_size:,} bytes)" if item.file_size else ""
logger.info(f" {item.name}{size_str}")
if __name__ == "__main__":
asyncio.run(main())
Project and Collection Deletion
Delete projects and collections, including all associated data.
python
import asyncio
from datetime import datetime
from pathlib import Path
from miura.aio import AsyncNexus
from miura.api.datasources import LocalDataSource
from miura.logging import get_logger
logger = get_logger(__name__)
async def main():
"""Demonstrate project and collection deletion."""
async with AsyncNexus() as nexus:
# Create a project and collection
project_name = f"demo-project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
project = await nexus.create_project(project_name)
logger.info(f"Created project: {project.name}")
collection_name = f"demo-collection-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
collection = await project.create_collection(
collection_name=collection_name,
schema=[{"pattern": ".*\\.txt$", "min_occurrence": 0, "max_occurrence": None}],
metadata={}
)
logger.info(f"Created collection: {collection.name}")
# Upload some test data
data_path = Path("data")
data_path.mkdir(exist_ok=True)
(data_path / "data1.txt").write_text("test data 1")
(data_path / "data2.txt").write_text("test data 2")
(data_path / "data3.txt").write_text("test data 3")
datasource = LocalDataSource(str(data_path))
from miura.api import UploadMode
await collection.upload(datasource, mode=UploadMode.REPLACE)
logger.info("Uploaded test data")
# Delete the collection (removes collection and all associated lakehouse data)
logger.info(f"Deleting collection: {collection_name}")
await collection.delete_collection()
logger.info("Collection deleted successfully")
# Delete the project (removes project, all collections, and all data)
logger.info(f"Deleting project: {project_name}")
await project.delete_project()
logger.info("Project deleted successfully")
if __name__ == "__main__":
asyncio.run(main())
API Hooks for Observability
Use hooks to monitor API operations, collect metrics, and integrate with observability systems.
python
import asyncio
import time
from collections import defaultdict
from typing import Dict, Union
from miura.aio import AsyncNexus
from miura.api.policies import ApiHooks
from miura.logging import get_logger
logger = get_logger(__name__)
# Shared metrics for tracking API usage
metrics: Dict[str, Union[int, float]] = defaultdict(int)
request_times: Dict[str, float] = {}
def log_request(metadata: dict) -> None:
"""Log API requests and track metrics."""
method = metadata.get("method", "unknown")
metrics[f"requests_{method}"] += 1
request_times[method] = time.time()
context_parts = []
if "project_name" in metadata:
context_parts.append(f"project={metadata['project_name']}")
if "collection_name" in metadata:
context_parts.append(f"collection={metadata['collection_name']}")
context = f" ({', '.join(context_parts)})" if context_parts else ""
logger.info(f"→ {method}{context}")
def log_response(metadata: dict) -> None:
"""Log API responses and track duration."""
method = metadata.get("method", "unknown")
metrics[f"responses_{method}"] += 1
duration = None
if method in request_times:
duration = time.time() - request_times.pop(method)
metrics[f"duration_{method}"] += duration
context_parts = []
if "project_name" in metadata:
context_parts.append(f"project={metadata['project_name']}")
if "collection_name" in metadata:
context_parts.append(f"collection={metadata['collection_name']}")
context = f" ({', '.join(context_parts)})" if context_parts else ""
duration_str = f" [{duration:.3f}s]" if duration is not None else ""
logger.info(f"← {method}{context}{duration_str}")
def log_retry(metadata: dict) -> None:
"""Log retry attempts."""
method = metadata.get("method", "unknown")
attempt = metadata.get("attempt", 0)
backoff = metadata.get("backoff", 0)
metrics[f"retries_{method}"] += 1
logger.info(f"↻ {method} (attempt {attempt}, backoff {backoff:.2f}s)")
async def main():
"""Demonstrate API hooks usage."""
# Create hooks
hooks = ApiHooks(
on_request=log_request,
on_response=log_response,
on_retry=log_retry
)
# Use hooks at client level
async with AsyncNexus(hooks=hooks) as nexus:
logger.info("=== Client-level Hooks ===")
projects = await nexus.list_projects()
logger.info(f"Found {len(projects)} project(s)")
if projects:
project = await nexus.get_project(projects[0].name)
collections = await project.list_collections()
logger.info(f"Found {len(collections)} collection(s)")
# Print metrics summary
logger.info("=== Metrics Summary ===")
for key, value in sorted(metrics.items()):
logger.info(f" {key}: {value}")
if __name__ == "__main__":
asyncio.run(main())
Schema Generation
Generate schemas automatically from your data structure.
python
import asyncio
from pathlib import Path
from miura.aio import AsyncNexus
from miura.api import generate_schema_from_path, SchemaGenOptions
from miura.logging import get_logger
logger = get_logger(__name__)
async def main():
"""Demonstrate schema generation."""
async with AsyncNexus() as nexus:
# Generate schema from a local directory
data_path = Path("data/manifold")
options = SchemaGenOptions(
min_occurrence=1, # Files must appear at least once
max_occurrence=None, # No maximum limit
include_metadata=False # Don't include file metadata in schema
)
schema = generate_schema_from_path(str(data_path), options)
logger.info("Generated schema:")
logger.info(schema)
# Create a project and collection with the generated schema
project = await nexus.create_project("my-project")
collection = await project.create_collection(
collection_name="my-collection",
schema=schema,
metadata={}
)
logger.info(f"Created collection with generated schema: {collection.name}")
if __name__ == "__main__":
asyncio.run(main())
Downloading Items
Download collection content to a local directory.
python
import asyncio
from pathlib import Path
from miura.aio import AsyncNexus
from miura.logging import get_logger
logger = get_logger(__name__)
async def main():
"""Demonstrate downloading collection items."""
async with AsyncNexus() as nexus:
project = await nexus.get_project("my-project")
collection = await project.get_collection("my-collection")
# Download entire collection
download_path = Path("downloads") / "my-collection"
download_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Downloading collection to: {download_path}")
await collection.download(path="/", local_path=str(download_path), confirm=True)
# Download a specific folder
folder_path = Path("downloads") / "my-folder"
folder_path.mkdir(parents=True, exist_ok=True)
logger.info(f"Downloading folder to: {folder_path}")
await collection.download(path="/data", local_path=str(folder_path), confirm=True)
# Download a specific file
file_path = Path("downloads") / "my-file.txt"
file_path.parent.mkdir(parents=True, exist_ok=True)
logger.info(f"Downloading file to: {file_path}")
await collection.download(path="/data/file.txt", local_path=str(file_path), confirm=True)
if __name__ == "__main__":
asyncio.run(main())
More Examples
For additional examples and use cases, see:
- Getting Started - Installation and first API call
- Projects and Collections - Detailed project and collection operations
- Schemas and Schema Generation - Advanced schema generation techniques
- Uploading Data - Upload strategies and best practices
- End-to-End Workflows - Complete workflow examples