End-to-End Workflows

Complete, real-world workflows combining all features

This tutorial demonstrates a complete, real-world workflow combining schema generation, collection management, data upload, item retrieval, and selective downloads.


Scenario Overview

Use Case: You have simulation data organized in a filesystem and want to:

  1. Automatically generate a schema from the existing structure
  2. Create a collection with the generated schema
  3. Upload and validate the data
  4. Browse and explore the uploaded data
  5. Download specific files and folders as needed

Data Structure:

text
simulation-data/
├── run_001/
│   ├── parameters.dat
│   ├── results/
│   │   ├── output_001.vtk
│   │   ├── output_002.vtk
│   │   └── output_003.vtk
│   └── metadata.json
├── run_002/
│   ├── parameters.dat
│   ├── results/
│   │   ├── output_001.vtk
│   │   ├── output_002.vtk
│   │   └── output_003.vtk
│   └── metadata.json
└── run_003/
    ├── parameters.dat
    ├── results/
    │   ├── output_001.vtk
    │   ├── output_002.vtk
    │   └── output_003.vtk
    └── metadata.json


Complete Workflow

Here's the complete workflow from start to finish:

python
import asyncio
from datetime import datetime
from pathlib import Path
from miura.aio import AsyncNexus
from miura.api.datasources import LocalDataSource
from miura.api import generate_schema_from_path, SchemaGenOptions
from miura.logging import get_logger

logger = get_logger(__name__)

async def complete_workflow(): """Complete end-to-end workflow.""" async with AsyncNexus() as nexus: # Step 1: Generate schema from filesystem logger.info("=== Step 1: Generating Schema ===") data_path = Path("simulation-data") if not data_path.exists(): logger.error(f"Data path not found: {data_path}") return

python
        options = SchemaGenOptions(
            min_files_for_pattern=2,
            default_required=False,
            schema_name="simulation-schema",
            similarity_threshold=0.7,
            confidence_threshold=0.75
        )
        schema = generate_schema_from_path(str(data_path), options=options)
        logger.info(f"Generated schema with {len(schema)} root-level nodes")
    # Step 2: Create project
    logger.info("=== Step 2: Creating Project ===")
    project_name = f"simulation-project-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    project = await nexus.create_project(project_name)
    logger.info(f"Created project: {project.name} ({project.uuid})")
    

    # Step 3: Create collection with generated schema
    logger.info("=== Step 3: Creating Collection ===")
    collection_name = f"simulation-collection-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    collection = await project.create_collection(
        collection_name=collection_name,
        schema=schema,
        metadata={
            "description": "Simulation data collection",
            "source_path": str(data_path),
            "generated_at": datetime.now().isoformat(),
            "schema_type": "auto-generated"
        }
    )
    logger.info(f"Created collection: {collection.name} ({collection.uuid})")
    

    # Step 4: Upload data
    logger.info("=== Step 4: Uploading Data ===")
    datasource = LocalDataSource(str(data_path))
    logger.info(f"Data source: {datasource.path}")
    
python
        upload_result = await collection.upload(
            datasource=datasource,
            create_new_version=False
        )
        logger.info(f"Upload completed:")
        logger.info(f"  Files uploaded: {upload_result.get('files_uploaded', 0)}")
        logger.info(f"  Files failed: {upload_result.get('files_failed', 0)}")
        logger.info(f"  Total size: {upload_result.get('total_size', 0):,} bytes")
    # Step 5: Browse uploaded data
    logger.info("=== Step 5: Browsing Collection ===")
    response = await collection.list_items(path="/", page=1, page_size=20)
    items = response.get("items", [])
    pagination = response.get("pagination", {})
    
    logger.info(f"Found {pagination.get('total_count', 0)} items in collection")
    logger.info("Root-level items:")
    for i, item in enumerate(items[:10], 1):
        item_type = "folder" if item.is_folder else "file"
        size_info = f" ({item.file_size:,} bytes)" if item.file_size else ""
        logger.info(f"  {i}. {item.name} [{item_type}]{size_info}")
    

    # Step 6: Get specific items
    logger.info("=== Step 6: Getting Specific Items ===")
    

    # Get a specific run folder
    run_folder = await collection.get_item("/run_001/")
    if run_folder:
        logger.info(f"Found run folder: {run_folder.item_uri}")
        logger.info(f"  Has children: {run_folder.has_children}")
    

    # Get a specific file
    params_file = await collection.get_item("/run_001/parameters.dat")
    if params_file:
        logger.info(f"Found parameters file: {params_file.item_uri}")
        logger.info(f"  Size: {params_file.file_size:,} bytes")
    

    # Step 7: Download specific items
    logger.info("=== Step 7: Downloading Specific Items ===")
    

    # Download a specific file
    if params_file:
        result = params_file.download("./downloads/", confirm=False)
        logger.info(f"Downloaded file: {result.get('files_downloaded', 0)} file(s)")
    

    # Download a specific folder
    if run_folder:
        result = run_folder.download("./downloads/run_001/", confirm=False)
        logger.info(f"Downloaded folder: {result.get('files_downloaded', 0)} file(s)")
    

    # Step 8: Selective downloads
    logger.info("=== Step 8: Selective Downloads ===")
    download_count = 0
    async for item in collection.iter_items(path="/"):
        # Download only parameter files
        if not item.is_folder and item.name == "parameters.dat":
            bound_item = await collection.get_item(item.item_uri)
            if bound_item:
                logger.info(f"Downloading {item.item_uri}...")
                result = bound_item.download("./downloads/parameters/", confirm=False)
                download_count += 1
                logger.info(f"  Downloaded: {result.get('files_downloaded', 0)} file(s)")
    
    logger.info(f"Downloaded {download_count} parameter files")
    

    # Step 9: Using hierarchical paths (sync API)
    logger.info("=== Step 9: Using Hierarchical Paths ===")
    # Note: This would be in a sync context
    # from miura import Nexus
    # nexus_sync = Nexus()
    # item = nexus_sync.get(f"{project.name}/{collection.name}/run_001/parameters.dat")
    # result = item.download("./downloads/", confirm=False)
    
    logger.info("=== Workflow Complete ===")
    logger.info(f"Project: {project.name}")
    logger.info(f"Collection: {collection.name}")
    logger.info("All operations completed successfully")

if name == "main": asyncio.run(complete_workflow())


Step-by-Step Breakdown

Step 1: Generate Schema

python
from miura.api import generate_schema_from_path, SchemaGenOptions

options = SchemaGenOptions(
    min_files_for_pattern=2,
    default_required=False,
    schema_name="simulation-schema"
)
schema = generate_schema_from_path("simulation-data", options=options)

What happens: The schema generator scans your filesystem and detects patterns in filenames and folder names, creating a schema that matches your data structure.

Step 2: Create Project

python
project = await nexus.create_project("simulation-project")

What happens: Creates a new project to organize your collections.

Step 3: Create Collection

python
collection = await project.create_collection(
    collection_name="simulation-collection",
    schema=schema,
    metadata={"description": "Simulation data"}
)

What happens: Creates a collection with the generated schema. The schema defines what files and folders are expected.

Step 4: Upload Data

python
datasource = LocalDataSource("simulation-data")
upload_result = await collection.upload(datasource=datasource)

What happens:

  • Scans the local directory
  • Validates each file/folder against the schema
  • Uploads validated files to the collection
  • Reports any validation errors

Step 5: Browse Collection

python
response = await collection.list_items(path="/", page=1, page_size=20)
items = response.get("items", [])

What happens: Lists items in the collection, allowing you to see what was uploaded.

Step 6: Get Specific Items

python
item = await collection.get_item("/run_001/parameters.dat")

What happens: Retrieves a specific item by its path, returning a BoundCollectionItem that you can work with directly.

Step 7: Download Items

python
result = item.download("./downloads/", confirm=False)

What happens: Downloads the item (file or folder) to the specified local directory.

Step 8: Selective Downloads

python
async for item in collection.iter_items(path="/"):
    if not item.is_folder and item.name == "parameters.dat":
        bound_item = await collection.get_item(item.item_uri)
        result = bound_item.download("./downloads/", confirm=False)

What happens: Iterates through all items and downloads only those matching your criteria.


Advanced Patterns

Pattern 1: Schema Generation with Validation

Generate schema and validate it before using:

python
import json

Generate schema

schema = generate_schema_from_path("simulation-data")

Validate schema structure

def validate_schema(schema): """Validate generated schema structure.""" if not isinstance(schema, list): raise ValueError("Schema must be a list")

for node in schema:
    if "name" not in node and "pattern" not in node:
        raise ValueError("Each node must have 'name' or 'pattern'")

return True
python
validate_schema(schema)
logger.info("Schema validation passed")
logger.info(json.dumps(schema, indent=2))

Pattern 2: Incremental Uploads

Upload data incrementally and track progress:

Pattern 3: Batch Item Operations

Process multiple items in batches:

python
async def download_large_files(collection, min_size=1_000_000):
    """Download all large files from a collection."""
    large_files = []
    
    async for item in collection.iter_items(path="/"):
        if not item.is_folder and item.file_size and item.file_size >= min_size:
            large_files.append(item)
    
    logger.info(f"Found {len(large_files)} large files")
    
    for item in large_files:
        bound_item = await collection.get_item(item.item_uri)
        if bound_item:
            logger.info(f"Downloading {item.name} ({item.file_size:,} bytes)...")
            result = bound_item.download("./downloads/large/", confirm=False)
            logger.info(f"  Status: {result['status']}")

await download_large_files(collection, min_size=1_000_000)

Pattern 4: Error Recovery

Handle errors gracefully and continue processing:

python
async def safe_download_all(collection, download_path="./downloads/"):
    """Download all items with error handling."""
    success_count = 0
    error_count = 0
    
    async for item in collection.iter_items(path="/"):
        if not item.is_folder:
            try:
                bound_item = await collection.get_item(item.item_uri)
                if bound_item:
                    result = bound_item.download(download_path, confirm=False)
                    if result["status"] == "completed":
                        success_count += 1
                    else:
                        error_count += 1
                        logger.warning(f"Failed to download {item.name}")
            except Exception as e:
                error_count += 1
                logger.error(f"Error downloading {item.name}: {e}")
    
    logger.info(f"Download complete: {success_count} succeeded, {error_count} failed")

await safe_download_all(collection)

Error Handling

Comprehensive Error Handling

python
from miura.api.exceptions import NotFoundError, ValidationError
from miura.logging import get_logger

logger = get_logger(__name__)

async def robust_workflow():
    """Workflow with comprehensive error handling."""
    try:
        async with AsyncNexus() as nexus:
        # Generate schema
        try:
            schema = generate_schema_from_path("simulation-data")
        except FileNotFoundError as e:
            logger.error(f"Data path not found: {e}")
            return
        except Exception as e:
            logger.error(f"Error generating schema: {e}")
            return
        

        # Create project
        try:
            project = await nexus.create_project("simulation-project")
        except Exception as e:
            logger.error(f"Error creating project: {e}")
            return
        

        # Create collection
        try:
            collection = await project.create_collection(
                collection_name="simulation-collection",
                schema=schema
            )
        except ValidationError as e:
            logger.error(f"Schema validation error: {e}")
            return
        except Exception as e:
            logger.error(f"Error creating collection: {e}")
            return
        

        # Upload data
        try:
            datasource = LocalDataSource("simulation-data")
            upload_result = await collection.upload(datasource=datasource)
            
            if upload_result.get("files_failed", 0) > 0:
                logger.warning(f"Some files failed to upload: {upload_result.get('files_failed', 0)}")
                if upload_result.get("errors"):
                    for error in upload_result["errors"]:
                        logger.error(f"Upload error: {error}")
        except Exception as e:
            logger.error(f"Error uploading data: {e}")
            return
        

        # Get and download items
        try:
            item = await collection.get_item("/run_001/parameters.dat")
            if item:
                result = item.download("./downloads/", confirm=False)
                if result["status"] != "completed":
                    logger.warning(f"Download incomplete: {result.get('errors', [])}")
        except NotFoundError:
            logger.warning("Item not found")
        except Exception as e:
            logger.error(f"Error downloading item: {e}")

except Exception as e:
    logger.error(f"Unexpected error: {e}")
    raise

asyncio.run(robust_workflow())


Next Steps


© 2025