Beyond file storage, 0G Storage exposes a key-value layer for structured data. Keys map to values within named streams, and each write is recorded on-chain so any reader can reconstruct the stream’s state at any version. The KV layer is ideal for indexed data you need to query by key — user profiles, timeseries, config blobs — rather than whole-file blobs.

Concepts

TermMeaning
StreamA named KV collection identified by a 32-byte hex stream_id
VersionMonotonically increasing number attached to each write batch
Access controlPer-stream admin/writer roles, per-key “special key” flag
Special keyA key whose writes require an explicit GRANT_SPECIAL_WRITE_ROLE
Keys and values are arbitrary bytes. Size limits:
  • MAX_KEY_SIZE — 16,777,216 bytes (16 MiB)
  • MAX_SET_SIZE — 65,536 entries per batch
  • MAX_QUERY_SIZE — 262,144 bytes per read chunk

Read values

KvClient wraps a storage node’s KV RPC. Use the indexer to dynamically discover a storage node, then point KvClient at it:
from core.indexer import Indexer
from core.kv import KvClient

# Discover a storage node via the indexer
indexer = Indexer("https://indexer-storage-testnet-turbo.0g.ai")
nodes, err = indexer.select_nodes(expected_replica=1)

if err or len(nodes) == 0:
    raise err or Exception("No nodes selected")

# Use the first discovered node
kv = KvClient(nodes[0].url)

# Read a complete value (handles pagination automatically)
value = kv.get_value(
    stream_id = "0x" + "11" * 32,   # 32-byte hex stream id
    key       = b"user:42",
)

if value is None:
    print("Key not found")
else:
    import base64
    decoded = base64.b64decode(value.data)
    print(f"Version: {value.version}, size: {value.size}, data: {decoded!r}")
get_value() returns a Value with data (base64-encoded bytes), size (total byte count), and version (write version), or None if the key doesn’t exist.

Range reads

Traverse keys in order with get_next, get_prev, get_first, get_last:
# First key in the stream
first = kv.get_first(
    stream_id   = "0x" + "11" * 32,
    start_index = 0,
    length      = 1024,
)

# Next key after "user:42", exclusive
nxt = kv.get_next(
    stream_id   = "0x" + "11" * 32,
    key         = b"user:42",
    start_index = 0,
    length      = 1024,
    inclusive   = False,
)
Each returns a KeyValue (key + partial value) or None.

Iterator

For cursor-style traversal, use KvIterator:
it = kv.new_iterator(stream_id="0x" + "11" * 32)

err = it.seek_to_first()
if err:
    raise err

while it.valid():
    pair = it.get_current_pair()
    if pair is None:
        break
    print(pair.key, pair.value)
    err = it.next()
    if err:
        raise err
Other cursor methods: seek_to_last(), seek_before(key), seek_after(key), prev().

Write values

Writing to KV is a storage operation — the SDK bundles writes into a stream-data blob, encodes it, and uploads it via the normal Flow contract + storage node flow. There are two APIs:
  • StreamDataBuilder — low-level: build a StreamData blob and upload it yourself
  • Batcher — high-level: accumulate writes and call .exec() to upload in one shot

Using the Batcher

import os
from eth_account import Account
from web3 import Web3
from core.kv import Batcher
from core.indexer import Indexer
from contracts.flow import FlowContract

PRIVATE_KEY    = os.environ["PRIVATE_KEY"]
BLOCKCHAIN_RPC = "https://evmrpc-testnet.0g.ai"
INDEXER_RPC    = "https://indexer-storage-testnet-turbo.0g.ai"

indexer = Indexer(INDEXER_RPC)
account = Account.from_key(PRIVATE_KEY)

clients, err = indexer.select_nodes(expected_replica=1)
if err:
    raise err

web3 = Web3(Web3.HTTPProvider(BLOCKCHAIN_RPC))
flow = FlowContract(web3, clients[0].get_status()["networkIdentity"]["flowAddress"])

# Stream version — monotonically increasing across writes
batcher = Batcher(
    version  = 1,
    clients  = clients,
    flow     = flow,
    provider = BLOCKCHAIN_RPC,
)

stream_id = "0x" + "11" * 32
batcher.set(stream_id, b"user:42",  b'{"name": "Alice"}')
batcher.set(stream_id, b"user:101", b'{"name": "Bob"}')

result, err = batcher.exec({"account": account})
if err:
    raise err

print(f"Wrote batch at root {result['rootHash']}")
The batcher encodes all writes into a single on-chain submission. exec() returns the normal upload result (txHash + rootHash).

Using StreamDataBuilder directly

When you need fine control over read/write/access-control mixing, build the stream data yourself:
from core.kv import StreamDataBuilder

builder = StreamDataBuilder(version=1)

builder.set("0x" + "11" * 32, b"key-1", b"value-1")
builder.set("0x" + "11" * 32, b"key-2", b"value-2")

stream_data = builder.build(sorted_=True)
encoded     = stream_data.encode()               # bytes — ready for upload
tags        = builder.build_tags(sorted_=True)   # bytes — encoded stream ids
Then upload encoded via ZgFile.from_bytes(encoded) + indexer.upload(file, ..., upload_opts={"tags": tags, ...}).

Access control

KV streams support per-account and per-key permissions. Access control is encoded as a list of AccessControl operations in the same stream data blob as writes. Operation types (AccessControlType):
ValueOperation
GRANT_ADMIN_ROLEMake an account admin of the stream
RENOUNCE_ADMIN_ROLEDrop your admin role
SET_KEY_TO_SPECIALMark a key as requiring GRANT_SPECIAL_WRITE_ROLE to write
SET_KEY_TO_NORMALRevert a special key back to normal
GRANT_WRITE_ROLEGrant stream-wide write access to an account
REVOKE_WRITE_ROLERevoke stream-wide write access
RENOUNCE_WRITE_ROLEDrop your own write access
GRANT_SPECIAL_WRITE_ROLEGrant write access to a specific special key
REVOKE_SPECIAL_WRITE_ROLERevoke write access to a specific special key
RENOUNCE_SPECIAL_WRITE_ROLEDrop your own special-write access

Permission checks

Query permissions without writing:
# Can this account write to this key?
ok = kv.has_write_permission(
    account   = "0xabc...",
    stream_id = "0x" + "11" * 32,
    key       = b"user:42",
)

# Is this account an admin?
admin = kv.is_admin(account="0xabc...", stream_id="0x" + "11" * 32)

# Is this a special key?
special = kv.is_special_key(stream_id="0x" + "11" * 32, key=b"user:42")

# Is this account a writer of a specific key / the whole stream?
kv.is_writer_of_key(account="0xabc...", stream_id="0x" + "11" * 32, key=b"user:42")
kv.is_writer_of_stream(account="0xabc...", stream_id="0x" + "11" * 32)
All return booleans, optionally scoped to a specific version.

Other utilities

  • kv.get_transaction_result(tx_seq) — fetch the result of a specific KV transaction
  • kv.get_holding_stream_ids() — list streams held by the connected node

API at a glance

KvClient

MethodReturns
KvClient(rpc)client instance
kv.get_value(stream_id, key, version=None)Value | None
kv.get(stream_id, key, start_index, length, version=None)Value | None
kv.get_first/last(stream_id, start_index, length, version=None)KeyValue | None
kv.get_next/prev(stream_id, key, start_index, length, inclusive, version=None)KeyValue | None
kv.new_iterator(stream_id, version=None)KvIterator
kv.has_write_permission(account, stream_id, key, version=None)bool
kv.is_admin(account, stream_id, version=None)bool
kv.is_special_key(stream_id, key, version=None)bool
kv.is_writer_of_key/stream(...)bool
kv.get_transaction_result(tx_seq)str | None
kv.get_holding_stream_ids()list[Hash]

StreamDataBuilder

MethodReturns
StreamDataBuilder(version)builder instance
builder.set(stream_id, key, data)None
builder.add_stream_id(stream_id)None
builder.build(sorted_=False)StreamData
builder.build_tags(sorted_=False)bytes

Batcher

MethodReturns
Batcher(version, clients, flow, provider)batcher instance
batcher.set(stream_id, key, data)None
batcher.exec(opts=None)({'txHash', 'rootHash'}, err)

KvIterator

MethodReturns
it.seek_to_first() / seek_to_last()Exception | None
it.seek_before(key) / seek_after(key)Exception | None
it.next() / prev()Exception | None
it.valid()bool
it.get_current_pair()KeyValue | None

Next steps

Merkle trees & proofs

Understand how KV batches are committed to the chain.

Error handling

Exception hierarchy and retry logic.