The 0G Storage SDK uses two error conventions that work together:
  • Return-tuple errors(result, err) for operations with expected failure paths (e.g. upload/download). Check err is not None before using result.
  • Raised exceptions — for programming errors, invalid input, and unrecoverable failures.
Every exception inherits from StorageError and carries structured context you can inspect for automated recovery.

Exception hierarchy

StorageError
├── UploadError
│   └── UploadRetryableError  (also RetryableError)
├── DownloadError
│   └── DownloadRetryableError  (also RetryableError)
├── VerificationError
├── NodeUnavailableError
│   └── NodeConnectionError  (also NetworkError)
├── InsufficientReplicasError
├── MerkleTreeError
├── ContractError
│   └── TransactionError
├── NetworkError
├── TimeoutError
├── RetryableError
├── InvalidInputError
└── FileOperationError
All are importable from exceptions:
from exceptions import (
    StorageError,
    UploadError,
    DownloadError,
    VerificationError,
    NodeUnavailableError,
    InsufficientReplicasError,
    MerkleTreeError,
    ContractError,
    NetworkError,
    TimeoutError,
    RetryableError,
    UploadRetryableError,
    DownloadRetryableError,
    NodeConnectionError,
    TransactionError,
    InvalidInputError,
    FileOperationError,
)

Error structure

Every StorageError carries four optional attributes:
AttributeTypeDescription
messagestrHuman-readable description
error_codestr | NoneMachine-readable code (e.g. "NETWORK_ERROR", "TIMEOUT")
contextdictStructured metadata (operation name, URL, tx hash, etc.)
causeException | NoneThe original exception that triggered this one
try:
    result, err = indexer.upload(file, BLOCKCHAIN_RPC, account, opts)
    if err:
        raise err
except StorageError as e:
    print(f"Code:    {e.error_code}")
    print(f"Context: {e.context}")
    print(f"Cause:   {e.cause}")
Attach more context on the fly:
raise StorageError("upload stalled").with_context(
    root_hash=tree.root_hash(),
    node_url=node.url,
)

Return-tuple pattern

Indexer and Uploader/Downloader methods return (result, err) tuples. Always branch on err:
result, err = indexer.upload(file, BLOCKCHAIN_RPC, account, opts)

if err is not None:
    # err is an Exception instance — the SDK returns it instead of raising
    print(f"Upload failed: {err}")
    # Optionally re-raise for the caller
    raise err

print(f"Root: {result['rootHash']}")
indexer.download() returns a single err (or None) rather than a tuple:
err = indexer.download(root_hash, "./out.bin")
if err:
    raise err

Classifying retryable errors

RetryableError and its subclasses signal transient failures you can safely retry. The is_retryable() helper handles the classification:
from utils.error_handler import is_retryable

try:
    result, err = indexer.upload(file, BLOCKCHAIN_RPC, account, opts)
    if err:
        raise err
except Exception as e:
    if is_retryable(e):
        print("Transient — retry later")
    else:
        print("Permanent — investigate")
        raise
RetryableError tracks retry_count and max_retries if you want to bound retry loops:
from exceptions import RetryableError

err = RetryableError("node timeout", retry_count=2, max_retries=5)
print(err)   # "...retry 2/5..."

ErrorContext for multi-step operations

For operations that may accumulate multiple failures (e.g. batch uploads), ErrorContext collects them:
from utils.error_handler import ErrorContext, is_retryable

with ErrorContext("batch_upload", verbose=True) as ctx:
    for file in files:
        try:
            result, err = indexer.upload(file, BLOCKCHAIN_RPC, account, opts)
            if err:
                raise err
        except Exception as e:
            ctx.add_error(e)
            if not is_retryable(e):
                break

    ctx.raise_if_errors("One or more uploads failed")
The context logs errors automatically, stores them in ctx.errors, and optionally raises a combined StorageError when you call raise_if_errors().

Decorator form

Wrap a function with the same semantics using wrap_with_context:
from utils.error_handler import wrap_with_context

@wrap_with_context("upload", verbose=True)
def upload_one(file):
    result, err = indexer.upload(file, BLOCKCHAIN_RPC, account, opts)
    if err:
        raise err
    return result

Converting low-level exceptions

The error-handler module provides helpers that wrap generic exceptions into the SDK’s structured types:
from utils.error_handler import (
    handle_network_error,
    handle_timeout_error,
    handle_upload_error,
    handle_download_error,
    handle_node_error,
    handle_transaction_error,
)

try:
    requests.get(node.url, timeout=5)
except requests.Timeout as e:
    raise handle_timeout_error(e, operation="node_ping", node_url=node.url)
except requests.RequestException as e:
    raise handle_network_error(e, operation="node_ping", node_url=node.url)
Each helper sets an appropriate error_code and attaches relevant context fields.

Validate input early

validate_input raises InvalidInputError for parameter problems — use it at API boundaries:
from utils.error_handler import validate_input

validate_input(root_hash, lambda x: isinstance(x, str) and x.startswith("0x"),
               "root_hash must be a 0x-prefixed hex string")

Safe calling

For best-effort operations where you want to catch everything and continue:
from utils.error_handler import safe_call

result = safe_call(lambda: indexer.get_file_locations(root_hash), default=[])
safe_call runs the callable and returns the default value on any exception, logging the error without propagating it.

When to catch vs re-raise

SituationAction
is_retryable(err) is trueRetry with backoff
InvalidInputErrorFix the input — don’t retry
InsufficientReplicasErrorIncrease expected_replica or wait for more nodes
MerkleTreeErrorBug or corrupted input — report with reproducer
TransactionError with tx_hashInspect on-chain; may need manual recovery
NodeConnectionError to one nodeTry a different node from indexer.select_nodes()
Unknown / non-StorageError exceptionBubble up — likely a programming error

Example: retry with backoff

import time
from utils.error_handler import is_retryable

MAX_RETRIES = 3
BACKOFF_S   = 5

for attempt in range(MAX_RETRIES):
    result, err = indexer.upload(file, BLOCKCHAIN_RPC, account, opts)
    if err is None:
        break
    if not is_retryable(err) or attempt == MAX_RETRIES - 1:
        raise err
    print(f"Attempt {attempt + 1} failed ({err}); retrying in {BACKOFF_S}s")
    time.sleep(BACKOFF_S)
For most uploads, you can also pass retry_opts directly to indexer.upload() and let the SDK handle retries internally.

Next steps

Upload & download

The core file flow that produces most SDK errors.

Large files

Multi-fragment upload patterns with independent retry granularity.