A file upload on 0G Storage is a three-part operation: the SDK computes a merkle tree, submits the root hash to the Flow contract, and pushes segments to sharded storage nodes. Downloads reverse the process — locate nodes, pull segments, and reassemble. This page walks through the full flow.

Prerequisites

  • Python 3.8 or higher
  • The SDK installed: pip install 0g-storage-sdk
  • A wallet with 0G tokens (get testnet tokens)
If you haven’t set up the SDK yet, follow the Installation guide first.

The upload flow

1

Create an Indexer client

The indexer discovers storage nodes and coordinates uploads.
from core.indexer import Indexer

INDEXER_RPC = "https://indexer-storage-testnet-turbo.0g.ai"
indexer = Indexer(INDEXER_RPC)
2

Load your file

ZgFile wraps a file handle and computes merkle trees. Use from_file_path for disk files or from_bytes for in-memory data.
from core.file import ZgFile

file = ZgFile.from_file_path("./data.txt")
print(f"Size: {file.size()} bytes")
print(f"Chunks: {file.num_chunks()}, Segments: {file.num_segments()}")
3

Build an account for signing

The SDK uses eth_account to sign Flow contract transactions.
from eth_account import Account

PRIVATE_KEY = "0xYOUR_PRIVATE_KEY"
account = Account.from_key(PRIVATE_KEY)
4

Upload

indexer.upload() handles merkle tree generation, transaction submission, and segment uploads. It returns a (result, error) tuple.
BLOCKCHAIN_RPC = "https://evmrpc-testnet.0g.ai"

upload_opts = {
    "tags": b"\x00",
    "finalityRequired": True,
    "taskSize": 10,
    "expectedReplica": 1,
    "skipTx": False,
    "account": account,
}

result, err = indexer.upload(
    file,
    BLOCKCHAIN_RPC,
    account,
    upload_opts,
)

if err is not None:
    raise err

print(f"Transaction: {result['txHash']}")
print(f"Root hash:   {result['rootHash']}")

file.close()
Always call file.close() when done. ZgFile holds an open file descriptor.

Upload options

The upload_opts dict accepts:
KeyTypeDefaultDescription
accountLocalAccountrequiredSigner for the Flow contract transaction
tagsbytesb"\x00"Arbitrary tag bytes stored alongside the file
finalityRequiredboolTrueWait for the Flow log entry to be finalized
taskSizeint10Segments per parallel upload task
expectedReplicaint1Number of storage node replicas to target
skipTxboolFalseSkip the Flow submission if the file was already uploaded
feeint0Storage fee in wei (0 = auto-calculate from market contract)
submitterstraccount addressOptional override for the on-chain submitter field
You can also pass retry_opts as a third argument — see Large Files for multi-GB upload patterns.

The download flow

Download by root hash. The SDK finds nodes that have the file, pulls segments, and writes the assembled file to disk.
from core.indexer import Indexer

INDEXER_RPC = "https://indexer-storage-testnet-turbo.0g.ai"
root_hash   = "0x11fdd3fd0a6e9594bf4ffe86a5cf095d85ac00f23b4f2e559802d624f6a86b58"

indexer = Indexer(INDEXER_RPC)
err = indexer.download(root_hash, "./output.txt", proof=False)

if err is not None:
    raise err
print("Download complete")
Set proof=True to verify every segment against its merkle proof during download. This is slower but guarantees byte-exact integrity.
Files typically take 3–5 minutes to propagate across storage shards after upload. If you download immediately, the indexer may not have enough replicas to serve your request yet.

Complete example

End-to-end upload and download:
import os
from eth_account import Account
from core.indexer import Indexer
from core.file import ZgFile

PRIVATE_KEY    = os.environ["PRIVATE_KEY"]
BLOCKCHAIN_RPC = "https://evmrpc-testnet.0g.ai"
INDEXER_RPC    = "https://indexer-storage-testnet-turbo.0g.ai"

indexer = Indexer(INDEXER_RPC)
account = Account.from_key(PRIVATE_KEY)

# --- Upload ---
file = ZgFile.from_file_path("./data.txt")

result, err = indexer.upload(
    file,
    BLOCKCHAIN_RPC,
    account,
    {
        "tags": b"\x00",
        "finalityRequired": True,
        "expectedReplica": 1,
        "account": account,
    },
)
file.close()

if err:
    raise err

root_hash = result["rootHash"]
print(f"Uploaded: {root_hash}")
print(f"Tx:       {result['txHash']}")

# --- Download (wait 3–5 minutes for propagation) ---
err = indexer.download(root_hash, "./output.txt")
if err:
    raise err
print("Downloaded ./output.txt")

Upload from bytes

For in-memory data, use ZgFile.from_bytes:
data = b"hello, 0G Storage!"
file = ZgFile.from_bytes(data)

result, err = indexer.upload(file, BLOCKCHAIN_RPC, account, {
    "tags": b"\x00",
    "finalityRequired": True,
    "account": account,
})

print(result["rootHash"])
# No file.close() needed — in-memory ZgFile has no file descriptor

Check if a file already exists

Before uploading, you can skip the transaction if a file with the same root hash is already on the network. Compute the merkle root first, then query the storage nodes:
file = ZgFile.from_file_path("./data.txt")
tree, err = file.merkle_tree()
if err:
    raise err
print(f"Would upload: {tree.root_hash()}")

# Then pass skipTx=True in upload_opts to reuse the existing submission
See Merkle Trees & Proofs for computing root hashes directly without uploading.

Storage fees

Uploads cost a small on-chain fee paid to the Flow contract in 0G tokens. The SDK calculates the fee automatically by querying the market contract for the current pricePerSector and multiplying by the number of sectors in your file.
  • Override the fee by setting "fee": <wei_amount> in upload_opts
  • The SDK prints the calculated fee to stdout before submission
  • Fees are small — typically a fraction of an OG token for files up to a few megabytes

Handling errors

Upload and download return (result, err) tuples instead of raising. Always check err first:
result, err = indexer.upload(file, BLOCKCHAIN_RPC, account, upload_opts)

if err is not None:
    # err is an Exception instance — inspect or re-raise
    print(f"Upload failed: {err}")
For retryable errors (transient network issues, “too many data writing” from busy nodes), pass retry_opts to indexer.upload(). See Error Handling for the full exception hierarchy.

Troubleshooting

Files take 3–5 minutes to propagate across storage shards. Wait, then retry. You can query individual storage nodes with indexer.get_file_locations(root_hash) to see which nodes have your file.
This is a non-fatal warning. The SDK submitted the file to the Flow contract and some direct segment uploads failed, but other nodes will propagate the data via gossip. The upload is considered successful once the transaction is confirmed.
Top up your wallet with more 0G tokens. For testnet, use faucet.0g.ai, or claim additional tokens from the hackathon faucet at 0g-faucet-hackathon.vercel.app using the code AGENT-2026. For a rough estimate, 1 OG covers a few hundred kilobytes of storage.
The upload_opts dict must include the "account" key — the signer used to submit the Flow contract transaction. This is in addition to the signer positional argument.
Large files (hundreds of MB+) take time. For files over 4 GB, use Uploader.splitable_upload() — see Large Files.

Next steps

Merkle trees & proofs

Compute root hashes, generate proofs, validate file integrity.

Large files

Splitable uploads and fragment downloads for multi-gigabyte files.

KV storage

Read and write key-value streams with access control.

Error handling

Exception hierarchy, retry logic, and error codes.