End-to-End Workflows

Complete, real-world workflows combining all features

This tutorial demonstrates a complete, real-world workflow combining collection management, data upload, item retrieval, and selective downloads.


Scenario Overview

Use Case: You have simulation data organized in a filesystem and want to:

  1. Define a schema that matches your structure (quality gate)
  2. Create a collection with that schema
  3. Upload and validate the data
  4. Browse and explore the uploaded data
  5. Download specific files and folders as needed

Data Structure:

text
simulation-data/
├── run_001/
│   ├── parameters.dat
│   ├── results/
│   │   ├── output_001.vtk
│   │   ├── output_002.vtk
│   │   └── output_003.vtk
│   └── metadata.json
├── run_002/
│   ├── parameters.dat
│   ├── results/
│   │   ├── output_001.vtk
│   │   ├── output_002.vtk
│   │   └── output_003.vtk
│   └── metadata.json
└── run_003/
    ├── parameters.dat
    ├── results/
    │   ├── output_001.vtk
    │   ├── output_002.vtk
    │   └── output_003.vtk
    └── metadata.json


Complete Workflow

Here's the complete workflow from start to finish:

python
import asyncio
from datetime import datetime
from pathlib import Path
from miura.api import AsyncNexus
from miura.sdk import LocalDataSource
from miura.logging import get_logger

logger = get_logger(__name__)

# Schema matching the simulation-data structure (run_001/, run_002/, ...)
SCHEMA = [
    {
        "pattern": "run_\\d+/",
        "min_occurrence": 1,
        "children": [
            {"name": "parameters.dat", "min_occurrence": 1},
            {"name": "metadata.json", "min_occurrence": 1},
            {
                "pattern": "results/",
                "min_occurrence": 1,
                "children": [
                    {"pattern": "output_\\d+\\.vtk", "min_occurrence": 1}
                ]
            }
        ]
    }
]
python
async def complete_workflow():
    """Complete end-to-end workflow."""
    async with AsyncNexus() as nexus:
        # Step 1: Define schema (quality gate for our structure)
        logger.info("=== Step 1: Defining Schema ===")
        schema = SCHEMA
        data_path = Path("simulation-data")
        if not data_path.exists():
            logger.error(f"Data path not found: {data_path}")
            return
        
        # Step 2: Create project
        logger.info("=== Step 2: Creating Project ===")
        project_name = f"simulation-project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        project = await nexus.create_project(project_name)
        logger.info(f"Created project: {project.name} ({project.uuid})")
        

        # Step 3: Create collection with schema
        logger.info("=== Step 3: Creating Collection ===")
        collection_name = f"simulation-collection-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        collection = await project.create_collection(
            collection_name=collection_name,
            schema=schema,
            metadata={
                "description": "Simulation data collection",
                "source_path": str(data_path),
                "created_at": datetime.now().isoformat()
            }
        )
        logger.info(f"Created collection: {collection.name} ({collection.uuid})")
        

        # Step 4: Upload data
        logger.info("=== Step 4: Uploading Data ===")
        datasource = LocalDataSource(str(data_path))
        logger.info(f"Data source: {datasource.path}")
        upload_result = await collection.upload(datasource=datasource)
        logger.info(f"Upload completed:")
        logger.info(f"  Files uploaded: {upload_result.files_uploaded}")
        logger.info(f"  Files failed: {upload_result.files_failed}")
        logger.info(f"  Total size: {upload_result.total_size:,} bytes")
        
        # Step 5: Browse uploaded data
        logger.info("=== Step 5: Browsing Collection ===")
        items = await collection.list_items(path="/")
        logger.info(f"Found {len(items)} items in collection")
        logger.info("Root-level items:")
        for i, item in enumerate(items[:10], 1):
            item_type = "folder" if item.is_folder else "file"
            size_info = f" ({item.file_size:,} bytes)" if item.file_size else ""
            logger.info(f"  {i}. {item.name} [{item_type}]{size_info}")
        

        # Step 6: Get specific items
        logger.info("=== Step 6: Getting Specific Items ===")
        

        # Get a specific run folder
        run_folder = await collection.get_item("/run_001/")
        if run_folder:
            logger.info(f"Found run folder: {run_folder.item_uri}")
            logger.info(f"  Has children: {run_folder.has_children}")
        

        # Get a specific file
        params_file = await collection.get_item("/run_001/parameters.dat")
        if params_file:
            logger.info(f"Found parameters file: {params_file.item_uri}")
            logger.info(f"  Size: {params_file.file_size:,} bytes")
        

        # Step 7: Download specific items
        logger.info("=== Step 7: Downloading Specific Items ===")
        

        # Download a specific file
        if params_file:
            result = params_file.download("./downloads/", confirm=False)
            logger.info(f"Downloaded file: {result.files_downloaded} file(s)")
        

        # Download a specific folder
        if run_folder:
            result = run_folder.download("./downloads/run_001/", confirm=False)
            logger.info(f"Downloaded folder: {result.files_downloaded} file(s)")
        

        # Step 8: Selective downloads
        logger.info("=== Step 8: Selective Downloads ===")
        download_count = 0
        async for item in collection.iter_items(path="/"):
            # Download only parameter files
            if not item.is_folder and item.name == "parameters.dat":
                item_ref = await collection.get_item(item.item_uri)
                if item_ref:
                    logger.info(f"Downloading {item.item_uri}...")
                    result = item_ref.download("./downloads/parameters/", confirm=False)
                    download_count += 1
                    logger.info(f"  Downloaded: {result.files_downloaded} file(s)")
        
        logger.info(f"Downloaded {download_count} parameter files")
        

        # Step 9: Using hierarchical paths (sync API)
        logger.info("=== Step 9: Using Hierarchical Paths ===")
        # Note: This would be in a sync context
        # from miura import Nexus
        # nexus_sync = Nexus()
        # item = nexus_sync.get(f"{project.name}/{collection.name}/run_001/parameters.dat")
        # result = item.download("./downloads/", confirm=False)
        
        logger.info("=== Workflow Complete ===")
        logger.info(f"Project: {project.name}")
        logger.info(f"Collection: {collection.name}")
        logger.info("All operations completed successfully")

if __name__ == "__main__":
    asyncio.run(complete_workflow())

Step-by-Step Breakdown

Step 1: Define Schema

Define a schema that matches your data structure (see Schemas for format and patterns):

python
SCHEMA = [
    {
        "pattern": "run_\\d+/",
        "min_occurrence": 1,
        "children": [
            {"name": "parameters.dat", "min_occurrence": 1},
            {"name": "metadata.json", "min_occurrence": 1},
            {
                "pattern": "results/",
                "min_occurrence": 1,
                "children": [
                    {"pattern": "output_\\d+\\.vtk", "min_occurrence": 1}
                ]
            }
        ]
    }
]
schema = SCHEMA

What happens: The schema acts as a quality gate: it defines which files and folders are expected (by name or pattern) so uploads can be validated.

Step 2: Create Project

python
project = await nexus.create_project("simulation-project")

What happens: Creates a new project to organize your collections.

Step 3: Create Collection

python
collection = await project.create_collection(
    collection_name="simulation-collection",
    schema=schema,
    metadata={"description": "Simulation data"}
)

What happens: Creates a collection with your schema. The schema defines what files and folders are expected.

Step 4: Upload Data

python
datasource = LocalDataSource("simulation-data")
upload_result = await collection.upload(datasource=datasource)

What happens:

  • Scans the local directory
  • Validates each file/folder against the schema
  • Uploads validated files to the collection
  • Reports any validation errors

Step 5: Browse Collection

python
items = await collection.list_items(path="/")

What happens: Lists items at the collection root (single level), allowing you to see what was uploaded. Returns a list of AsyncCollectionItem objects.

Step 6: Get Specific Items

python
item = await collection.get_item("/run_001/parameters.dat")

What happens: Retrieves a specific item by its path, returning an AsyncCollectionItem that you can work with directly (e.g. call .download() on it).

Step 7: Download Items

python
result = item.download("./downloads/", confirm=False)

What happens: Downloads the item (file or folder) to the specified local directory.

Step 8: Selective Downloads

python
async for item in collection.iter_items(path="/"):
    if not item.is_folder and item.name == "parameters.dat":
        item_ref = await collection.get_item(item.item_uri)
        result = item_ref.download("./downloads/", confirm=False)

What happens: Iterates through all items and downloads only those matching your criteria.


Advanced Patterns

Pattern 1: Schema Validation

Validate your schema structure before creating a collection:

python
def validate_schema(schema):
    """Validate schema structure."""
    if not isinstance(schema, list):
        raise ValueError("Schema must be a list")
    for node in schema:
        if "name" not in node and "pattern" not in node:
            raise ValueError("Each node must have 'name' or 'pattern'")
    return True

validate_schema(SCHEMA)
logger.info("Schema validation passed")

Pattern 2: Incremental Uploads

Upload data incrementally and track progress:

Pattern 3: Batch Item Operations

Process multiple items in batches:

python
async def download_large_files(collection, min_size=1_000_000):
    """Download all large files from a collection."""
    large_files = []
    
    async for item in collection.iter_items(path="/"):
        if not item.is_folder and item.file_size and item.file_size >= min_size:
            large_files.append(item)
    
    logger.info(f"Found {len(large_files)} large files")
    
    for item in large_files:
        bound_item = await collection.get_item(item.item_uri)
        if bound_item:
            logger.info(f"Downloading {item.name} ({item.file_size:,} bytes)...")
            result = bound_item.download("./downloads/large/", confirm=False)
            logger.info(f"  Status: {result['status']}")

await download_large_files(collection, min_size=1_000_000)

Pattern 4: Error Recovery

Handle errors gracefully and continue processing:

python
async def safe_download_all(collection, download_path="./downloads/"):
    """Download all items with error handling."""
    success_count = 0
    error_count = 0
    
    async for item in collection.iter_items(path="/"):
        if not item.is_folder:
            try:
                bound_item = await collection.get_item(item.item_uri)
                if bound_item:
                    result = bound_item.download(download_path, confirm=False)
                    if result.status == "completed":
                        success_count += 1
                    else:
                        error_count += 1
                        logger.warning(f"Failed to download {item.name}")
            except Exception as e:
                error_count += 1
                logger.error(f"Error downloading {item.name}: {e}")
    
    logger.info(f"Download complete: {success_count} succeeded, {error_count} failed")

await safe_download_all(collection)

Error Handling

Comprehensive Error Handling

python
from miura.sdk import NotFoundError, ValidationError
from miura.logging import get_logger

logger = get_logger(__name__)

async def robust_workflow():
    """Workflow with comprehensive error handling."""
    try:
        async with AsyncNexus() as nexus:
            # Use your schema (e.g. the SCHEMA list defined in the workflow above)
            schema = [
                {"pattern": "run_\\d+/", "min_occurrence": 1, "children": [
                    {"name": "parameters.dat", "min_occurrence": 1},
                    {"name": "metadata.json", "min_occurrence": 1},
                    {"pattern": "results/", "min_occurrence": 1, "children": [
                        {"pattern": "output_\\d+\\.vtk", "min_occurrence": 1}
                    ]}
                ]}
            ]

            # Create project
            try:
                project = await nexus.create_project("simulation-project")
            except Exception as e:
                logger.error(f"Error creating project: {e}")
                return
            

            # Create collection
            try:
                collection = await project.create_collection(
                    collection_name="simulation-collection",
                    schema=schema
                )
            except ValidationError as e:
                logger.error(f"Schema validation error: {e}")
                return
            except Exception as e:
                logger.error(f"Error creating collection: {e}")
                return
            

            # Upload data
            try:
                datasource = LocalDataSource("simulation-data")
                upload_result = await collection.upload(datasource=datasource)
                
                if upload_result.files_failed > 0:
                    logger.warning(f"Some files failed to upload: {upload_result.files_failed}")
                    if upload_result.errors:
                        for error in upload_result.errors:
                            logger.error(f"Upload error: {error}")
            except Exception as e:
                logger.error(f"Error uploading data: {e}")
                return
            

            # Get and download items
            try:
                item = await collection.get_item("/run_001/parameters.dat")
                if item:
                    result = item.download("./downloads/", confirm=False)
                    if result.status != "completed":
                        logger.warning(f"Download incomplete: {result.errors}")
            except NotFoundError:
                logger.warning("Item not found")
            except Exception as e:
                logger.error(f"Error downloading item: {e}")
    
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise

asyncio.run(robust_workflow())

Next Steps


© 2026