End-to-End Workflows
This tutorial demonstrates a complete, real-world workflow combining schema generation, collection management, data upload, item retrieval, and selective downloads.
Scenario Overview
Use Case: You have simulation data organized in a filesystem and want to:
- Automatically generate a schema from the existing structure
- Create a collection with the generated schema
- Upload and validate the data
- Browse and explore the uploaded data
- Download specific files and folders as needed
Data Structure:
simulation-data/
├── run_001/
│ ├── parameters.dat
│ ├── results/
│ │ ├── output_001.vtk
│ │ ├── output_002.vtk
│ │ └── output_003.vtk
│ └── metadata.json
├── run_002/
│ ├── parameters.dat
│ ├── results/
│ │ ├── output_001.vtk
│ │ ├── output_002.vtk
│ │ └── output_003.vtk
│ └── metadata.json
└── run_003/
├── parameters.dat
├── results/
│ ├── output_001.vtk
│ ├── output_002.vtk
│ └── output_003.vtk
└── metadata.json
Complete Workflow
Here's the complete workflow from start to finish:
import asyncio
from datetime import datetime
from pathlib import Path
from miura.aio import AsyncNexus
from miura.api.datasources import LocalDataSource
from miura.api import generate_schema_from_path, SchemaGenOptions
from miura.logging import get_logger
logger = get_logger(__name__)
async def complete_workflow(): """Complete end-to-end workflow.""" async with AsyncNexus() as nexus: # Step 1: Generate schema from filesystem logger.info("=== Step 1: Generating Schema ===") data_path = Path("simulation-data") if not data_path.exists(): logger.error(f"Data path not found: {data_path}") return
options = SchemaGenOptions(
min_files_for_pattern=2,
default_required=False,
schema_name="simulation-schema",
similarity_threshold=0.7,
confidence_threshold=0.75
)
schema = generate_schema_from_path(str(data_path), options=options)
logger.info(f"Generated schema with {len(schema)} root-level nodes")
# Step 2: Create project
logger.info("=== Step 2: Creating Project ===")
project_name = f"simulation-project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
project = await nexus.create_project(project_name)
logger.info(f"Created project: {project.name} ({project.uuid})")
# Step 3: Create collection with generated schema
logger.info("=== Step 3: Creating Collection ===")
collection_name = f"simulation-collection-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
collection = await project.create_collection(
collection_name=collection_name,
schema=schema,
metadata={
"description": "Simulation data collection",
"source_path": str(data_path),
"generated_at": datetime.now().isoformat(),
"schema_type": "auto-generated"
}
)
logger.info(f"Created collection: {collection.name} ({collection.uuid})")
# Step 4: Upload data
logger.info("=== Step 4: Uploading Data ===")
datasource = LocalDataSource(str(data_path))
logger.info(f"Data source: {datasource.path}")
upload_result = await collection.upload(
datasource=datasource,
create_new_version=False
)
logger.info(f"Upload completed:")
logger.info(f" Files uploaded: {upload_result.get('files_uploaded', 0)}")
logger.info(f" Files failed: {upload_result.get('files_failed', 0)}")
logger.info(f" Total size: {upload_result.get('total_size', 0):,} bytes")
# Step 5: Browse uploaded data
logger.info("=== Step 5: Browsing Collection ===")
response = await collection.list_items(path="/", page=1, page_size=20)
items = response.get("items", [])
pagination = response.get("pagination", {})
logger.info(f"Found {pagination.get('total_count', 0)} items in collection")
logger.info("Root-level items:")
for i, item in enumerate(items[:10], 1):
item_type = "folder" if item.is_folder else "file"
size_info = f" ({item.file_size:,} bytes)" if item.file_size else ""
logger.info(f" {i}. {item.name} [{item_type}]{size_info}")
# Step 6: Get specific items
logger.info("=== Step 6: Getting Specific Items ===")
# Get a specific run folder
run_folder = await collection.get_item("/run_001/")
if run_folder:
logger.info(f"Found run folder: {run_folder.item_uri}")
logger.info(f" Has children: {run_folder.has_children}")
# Get a specific file
params_file = await collection.get_item("/run_001/parameters.dat")
if params_file:
logger.info(f"Found parameters file: {params_file.item_uri}")
logger.info(f" Size: {params_file.file_size:,} bytes")
# Step 7: Download specific items
logger.info("=== Step 7: Downloading Specific Items ===")
# Download a specific file
if params_file:
result = params_file.download("./downloads/", confirm=False)
logger.info(f"Downloaded file: {result.get('files_downloaded', 0)} file(s)")
# Download a specific folder
if run_folder:
result = run_folder.download("./downloads/run_001/", confirm=False)
logger.info(f"Downloaded folder: {result.get('files_downloaded', 0)} file(s)")
# Step 8: Selective downloads
logger.info("=== Step 8: Selective Downloads ===")
download_count = 0
async for item in collection.iter_items(path="/"):
# Download only parameter files
if not item.is_folder and item.name == "parameters.dat":
bound_item = await collection.get_item(item.item_uri)
if bound_item:
logger.info(f"Downloading {item.item_uri}...")
result = bound_item.download("./downloads/parameters/", confirm=False)
download_count += 1
logger.info(f" Downloaded: {result.get('files_downloaded', 0)} file(s)")
logger.info(f"Downloaded {download_count} parameter files")
# Step 9: Using hierarchical paths (sync API)
logger.info("=== Step 9: Using Hierarchical Paths ===")
# Note: This would be in a sync context
# from miura import Nexus
# nexus_sync = Nexus()
# item = nexus_sync.get(f"{project.name}/{collection.name}/run_001/parameters.dat")
# result = item.download("./downloads/", confirm=False)
logger.info("=== Workflow Complete ===")
logger.info(f"Project: {project.name}")
logger.info(f"Collection: {collection.name}")
logger.info("All operations completed successfully")
if name == "main": asyncio.run(complete_workflow())
Step-by-Step Breakdown
Step 1: Generate Schema
from miura.api import generate_schema_from_path, SchemaGenOptions
options = SchemaGenOptions(
min_files_for_pattern=2,
default_required=False,
schema_name="simulation-schema"
)
schema = generate_schema_from_path("simulation-data", options=options)
What happens: The schema generator scans your filesystem and detects patterns in filenames and folder names, creating a schema that matches your data structure.
Step 2: Create Project
project = await nexus.create_project("simulation-project")
What happens: Creates a new project to organize your collections.
Step 3: Create Collection
collection = await project.create_collection(
collection_name="simulation-collection",
schema=schema,
metadata={"description": "Simulation data"}
)
What happens: Creates a collection with the generated schema. The schema defines what files and folders are expected.
Step 4: Upload Data
datasource = LocalDataSource("simulation-data")
upload_result = await collection.upload(datasource=datasource)
What happens:
- Scans the local directory
- Validates each file/folder against the schema
- Uploads validated files to the collection
- Reports any validation errors
Step 5: Browse Collection
response = await collection.list_items(path="/", page=1, page_size=20)
items = response.get("items", [])
What happens: Lists items in the collection, allowing you to see what was uploaded.
Step 6: Get Specific Items
item = await collection.get_item("/run_001/parameters.dat")
What happens: Retrieves a specific item by its path, returning a BoundCollectionItem that you can work with directly.
Step 7: Download Items
result = item.download("./downloads/", confirm=False)
What happens: Downloads the item (file or folder) to the specified local directory.
Step 8: Selective Downloads
async for item in collection.iter_items(path="/"):
if not item.is_folder and item.name == "parameters.dat":
bound_item = await collection.get_item(item.item_uri)
result = bound_item.download("./downloads/", confirm=False)
What happens: Iterates through all items and downloads only those matching your criteria.
Advanced Patterns
Pattern 1: Schema Generation with Validation
Generate schema and validate it before using:
import json
Generate schema
schema = generate_schema_from_path("simulation-data")
Validate schema structure
def validate_schema(schema): """Validate generated schema structure.""" if not isinstance(schema, list): raise ValueError("Schema must be a list")
for node in schema:
if "name" not in node and "pattern" not in node:
raise ValueError("Each node must have 'name' or 'pattern'")
return True
validate_schema(schema)
logger.info("Schema validation passed")
logger.info(json.dumps(schema, indent=2))
Pattern 2: Incremental Uploads
Upload data incrementally and track progress:
Pattern 3: Batch Item Operations
Process multiple items in batches:
async def download_large_files(collection, min_size=1_000_000):
"""Download all large files from a collection."""
large_files = []
async for item in collection.iter_items(path="/"):
if not item.is_folder and item.file_size and item.file_size >= min_size:
large_files.append(item)
logger.info(f"Found {len(large_files)} large files")
for item in large_files:
bound_item = await collection.get_item(item.item_uri)
if bound_item:
logger.info(f"Downloading {item.name} ({item.file_size:,} bytes)...")
result = bound_item.download("./downloads/large/", confirm=False)
logger.info(f" Status: {result['status']}")
await download_large_files(collection, min_size=1_000_000)
Pattern 4: Error Recovery
Handle errors gracefully and continue processing:
async def safe_download_all(collection, download_path="./downloads/"):
"""Download all items with error handling."""
success_count = 0
error_count = 0
async for item in collection.iter_items(path="/"):
if not item.is_folder:
try:
bound_item = await collection.get_item(item.item_uri)
if bound_item:
result = bound_item.download(download_path, confirm=False)
if result["status"] == "completed":
success_count += 1
else:
error_count += 1
logger.warning(f"Failed to download {item.name}")
except Exception as e:
error_count += 1
logger.error(f"Error downloading {item.name}: {e}")
logger.info(f"Download complete: {success_count} succeeded, {error_count} failed")
await safe_download_all(collection)
Error Handling
Comprehensive Error Handling
from miura.api.exceptions import NotFoundError, ValidationError
from miura.logging import get_logger
logger = get_logger(__name__)
async def robust_workflow():
"""Workflow with comprehensive error handling."""
try:
async with AsyncNexus() as nexus:
# Generate schema
try:
schema = generate_schema_from_path("simulation-data")
except FileNotFoundError as e:
logger.error(f"Data path not found: {e}")
return
except Exception as e:
logger.error(f"Error generating schema: {e}")
return
# Create project
try:
project = await nexus.create_project("simulation-project")
except Exception as e:
logger.error(f"Error creating project: {e}")
return
# Create collection
try:
collection = await project.create_collection(
collection_name="simulation-collection",
schema=schema
)
except ValidationError as e:
logger.error(f"Schema validation error: {e}")
return
except Exception as e:
logger.error(f"Error creating collection: {e}")
return
# Upload data
try:
datasource = LocalDataSource("simulation-data")
upload_result = await collection.upload(datasource=datasource)
if upload_result.get("files_failed", 0) > 0:
logger.warning(f"Some files failed to upload: {upload_result.get('files_failed', 0)}")
if upload_result.get("errors"):
for error in upload_result["errors"]:
logger.error(f"Upload error: {error}")
except Exception as e:
logger.error(f"Error uploading data: {e}")
return
# Get and download items
try:
item = await collection.get_item("/run_001/parameters.dat")
if item:
result = item.download("./downloads/", confirm=False)
if result["status"] != "completed":
logger.warning(f"Download incomplete: {result.get('errors', [])}")
except NotFoundError:
logger.warning("Item not found")
except Exception as e:
logger.error(f"Error downloading item: {e}")
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
asyncio.run(robust_workflow())
Next Steps
- Getting Items Tutorial - Detailed guide on retrieving items
- Downloading Items Tutorial - Detailed guide on downloading
- Schema Generation Tutorial - Detailed guide on schema generation
- API Reference - Complete API documentation
Related Documentation
- Quick Start Guide - Get started with the Nexus API
- API Reference - Complete API documentation