SDK Methods Guide

Detailed SDK interface handbook for interacting with the Somnia Data Streams protocol via the typescript SDK

Somnia Data Streams is the on-chain data streaming protocol that powers real-time, composable applications on the Somnia Network. It is available as an SDK Package https://www.npmjs.com/package/@somnia-chain/streams.

This SDK exposes all the core functionality developers need to write, read, subscribe to, and manage Data Streams and events directly from their dApps.

Before using the Data Streams SDK, ensure you have a working Node.js or Next.js environment (Node 18+ recommended). You’ll need access to a Somnia RPC endpoint (Testnet or Mainnet) and a wallet private key for publishing data.

Installation

Use npm, yarn, or pnpm:

# Using npm
npm install @somnia-chain/streams viem dotenv

# or with yarn
yarn add @somnia-chain/streams viem dotenv

The SDK depends on viem for blockchain interactions.

Project Setup

Create a .env.local or .env file in your project root:

RPC_URL=https://dream-rpc.somnia.network
PRIVATE_KEY=your_private_key_here

⚠️ Never expose private keys in client-side code. Keep writes (publishing data) in server routes or backend environments.

Basic Initialization

You’ll typically use two clients:

  • A public client for reading and subscribing

  • A wallet client for writing to the Somnia chain

import { SDK } from '@somnia-chain/streams'
import { createPublicClient, createWalletClient, http } from 'viem'
import { privateKeyToAccount } from 'viem/accounts'
import { somniaTestnet } from 'viem/chains'

const rpcUrl = process.env.RPC_URL!
const account = privateKeyToAccount(process.env.PRIVATE_KEY as `0x${string}`)

const sdk = new SDK({
  public: createPublicClient({ chain: somniaTestnet, transport: http(rpcUrl) }),
  wallet: createWalletClient({ chain: somniaTestnet, account, transport: http(rpcUrl) })
})

Somnia Data Streams

Data Streams in Somnia represent structured, verifiable data channels. Every piece of data conforms to a schema that defines its structure (e.g. timestamp, content, sender), and publishers can emit this data either as on-chain transactions or off-chain event notifications.

The SDK follows a simple pattern:

const sdk = new SDK({
  public: getPublicClient(),  // for reading and subscriptions
  wallet: getWalletClient()   // for writing
})

You’ll interact primarily through the sdk.streams interface.

Core Methods Overview

Write

set(d: DataStream[]): Promise<Hex | null>

Description

Publishes one or more data streams to the Somnia blockchain. Each stream must specify a schema ID, a unique ID, and the encoded payload.

Use Case

When you want to store data on-chain in a standardized format (e.g., chat messages, sensor telemetry, or leaderboard updates).

Example

const tx = await sdk.streams.set([
  { id: dataId, schemaId, data }
])
console.log('Data published with tx hash:', tx)

Always register your schema before calling set() ; otherwise, the transaction will revert.

emitEvents(e: EventStream[]): Promise<Hex | Error | null>

Description

Emits a registered Streams event without persisting new data. This is used for off-chain reactivity, triggering listeners subscribed via WebSocket.

Example

await sdk.streams.emitEvents([
  {
    id: 'ChatMessage',
    argumentTopics: [topic],
    data: '0x' // optional encoded payload
  }
])

Common Use includes notifying subscribers when something happens, e.g., “new message sent” or “order filled”.

setAndEmitEvents(d: DataStream[], e: EventStream[]): Promise<Hex | Error | null>

Description

Performs an atomic on-chain operation that both writes data and emits a corresponding event. This ensures your data and notifications are always in sync.

Example

await sdk.streams.setAndEmitEvents(
  [{ id: dataId, schemaId, data }],
  [{ id: 'ChatMessage', argumentTopics: [topic], data: '0x' }]
)

It is ideal for chat apps, game updates, or IoT streams — where data must be recorded and instantly broadcast.

Manage

registerDataSchemas(registrations: DataSchemaRegistration[], ignoreRegisteredSchemas?: boolean): Promise<Hex | Error | null>

Description

Registers a new data schema on-chain. Schemas define the structure of your Streams data, like a table schema in a database. The optional ignoreRegisteredSchemas parameter allows skipping registration if the schema is already registered.

Example

await sdk.streams.registerDataSchemas([
  {
    id: "chat",
    schema: 'uint64 timestamp, string message, address sender',
    parentSchemaId: zeroBytes32 // root schema
  }
], true) // Optionally ignore if already registered

Register before writing any new data type. If you modify the schema structure later, register it again as a new schema version.

registerEventSchemas(ids: string[], schemas: EventSchema[]): Promise<Hex | Error | null>

Description

Registers event definitions that can later be emitted or subscribed to.

Example

await sdk.streams.registerEventSchemas(
  ['ChatMessage'],
  [{
    params: [{ name: 'roomId', paramType: 'bytes32', isIndexed: true }],
    eventTopic: 'ChatMessage(bytes32 indexed roomId)'
  }]
)

Use before calling emitEvents() or subscribe() for a specific event.

manageEventEmittersForRegisteredStreamsEvent(streamsEventId: string, emitter: Address, isEmitter: boolean): Promise<Hex | Error | null>

Description

Grants or revokes permission for an address to emit a specific event.

Example

await sdk.streams.manageEventEmittersForRegisteredStreamsEvent(
  'ChatMessage',
  '0x1234abcd...',
  true // allow this address to emit
)

Used for access control in multi-publisher systems.

Read

getByKey(schemaId: SchemaID, publisher: Address, key: Hex): Promise<Hex[] | SchemaDecodedItem[][] | null>

Description

Retrieves data stored under a schema by its unique ID.

Example

const msg = await sdk.streams.getByKey(schemaId, publisher, dataId)
console.log('Data:', msg)

An example includes fetching a specific record, e.g., “fetch message by message ID”.

getAtIndex(schemaId: SchemaID, publisher: Address, idx: bigint): Promise<Hex[] | SchemaDecodedItem[][] | null>

Description

Fetches the record at a given index (0-based).

Example

const record = await sdk.streams.getAtIndex(schemaId, publisher, 0n)

It is useful for sequential datasets like logs or telemetry streams.

getBetweenRange(schemaId: SchemaID, publisher: Address, startIndex: bigint, endIndex: bigint): Promise<Hex[] | SchemaDecodedItem[][] | Error | null>

Description

Fetches records within a specified index range (0-based, inclusive start, exclusive end).

Use Case

Retrieving a batch of historical data, such as paginated logs or time-series entries.

Example

const records = await sdk.streams.getBetweenRange(schemaId, publisher, 0n, 10n)
console.log('Records in range:', records)

getAllPublisherDataForSchema(schemaReference: SchemaReference, publisher: Address): Promise<Hex[] | SchemaDecodedItem[][] | null>

Description

Retrieves all data published by a specific address under a given schema.

Use Case

Fetching complete datasets for analysis or synchronization.

Example

const allData = await sdk.streams.getAllPublisherDataForSchema(schemaReference, publisher)
console.log('All publisher data:', allData)

getLastPublishedDataForSchema(schemaId: SchemaID, publisher: Address): Promise<Hex[] | SchemaDecodedItem[][] | null>

Description

Retrieves the most recently published data under a schema by a publisher.

Use Case

Getting the latest update, such as the most recent sensor reading or message.

Example

const latest = await sdk.streams.getLastPublishedDataForSchema(schemaId, publisher)
console.log('Latest data:', latest)

totalPublisherDataForSchema(schemaId: SchemaID, publisher: Address): Promise<bigint | null>

Description

Returns how many records a publisher has stored under a schema.

Example

const total = await sdk.streams.totalPublisherDataForSchema(schemaId, publisher)
console.log(`Total entries: ${total}`)

isDataSchemaRegistered(schemaId: SchemaID): Promise<boolean | null>

Description

Checks if a schema exists on-chain.

Example

const exists = await sdk.streams.isDataSchemaRegistered(schemaId)
if (!exists) console.log('Schema not found')

parentSchemaId(schemaId: SchemaID): Promise<Hex | null>

Description

Finds the parent schema of a given schema, if one exists.

Example

const parent = await sdk.streams.parentSchemaId(schemaId)
console.log('Parent Schema ID:', parent)

schemaIdToId(schemaId: SchemaID): Promise<string | null>

Description

Converts a schema ID (Hex) to its corresponding string identifier.

Use Case

Mapping hashed IDs back to human-readable names for display or logging.

Example

const id = await sdk.streams.schemaIdToId(schemaId)
console.log('Schema ID string:', id)

idToSchemaId(id: string): Promise<Hex | null>

Description

Converts a string identifier to its corresponding schema ID (Hex).

Use Case

Looking up hashed IDs from known names for queries.

Example

const schemaId = await sdk.streams.idToSchemaId('chat')
console.log('Schema ID:', schemaId)

getAllSchemas(): Promise<string[] | null>

Description

Retrieves a list of all registered schema identifiers.

Use Case

Discovering available schemas in the protocol.

Example

const schemas = await sdk.streams.getAllSchemas()
console.log('All schemas:', schemas)

getEventSchemasById(ids: string[]): Promise<EventSchema[] | null>

Description

Fetches event schema details for given identifiers.

Use Case

Inspecting registered event structures before subscribing or emitting.

Example

const eventSchemas = await sdk.streams.getEventSchemasById(['ChatMessage'])
console.log('Event schemas:', eventSchemas)

computeSchemaId(schema: string): Promise<Hex | null>

Description

Computes the deterministic schemaId without registering it.

Example

const schemaId = await sdk.streams.computeSchemaId('uint64 timestamp, string content')

getSchemaFromSchemaId(schemaId: SchemaID): Promise<{ baseSchema: string, finalSchema: string, schemaId: Hex } | Error | null>

Description

Request a schema given the schema id used for data publishing and let the SDK take care of schema extensions.

Use Case

Retrieving schema details, including the base schema and the final extended schema, for a given schema ID.

Example

const schemaInfo = await sdk.streams.getSchemaFromSchemaId(schemaId)
console.log('Schema info:', schemaInfo)

Helpers

deserialiseRawData(rawData: Hex[], parentSchemaId: Hex, schemaLookup: { schema: string; schemaId: Hex; } | null): Promise<Hex[] | SchemaDecodedItem[][] | null>

Description

Deserializes raw data using the provided schema information.

Use Case

Decoding fetched raw bytes into structured objects for application use.

Example

const decoded = await sdk.streams.deserialiseRawData(rawData, parentSchemaId, schemaLookup)
console.log('Decoded data:', decoded)

Subscribe

subscribe(initParams: SubscriptionInitParams): Promise<{ subscriptionId: string, unsubscribe: () => void } | undefined>

Description

Creates a real-time WebSocket subscription to a Streams event. Whenever the specified event fires, the SDK calls your onData callback — optionally including enriched data from on-chain calls.

Parameters

  • somniaStreamsEventId: The identifier of a registered event schema within Somnia streams protocol or null if using a custom event source.

  • ethCalls: Fixed set of ETH calls that must be executed before onData callback is triggered. Multicall3 is recommended. Can be an empty array.

  • context: Event sourced selectors to be added to the data field of ETH calls, possible values: topic0, topic1, topic2, topic3, topic4, data and address.

  • onData: Callback for a successful reactivity notification.

  • onError: Callback for a failed attempt.

  • eventContractSource: Alternative contract event source (any on Somnia) that will be emitting the logs specified by topicOverrides.

  • topicOverrides: Optional when using Somnia streams as an event source but mandatory when using a different event source.

  • onlyPushChanges: Whether the data should be pushed to the subscriber only if eth_call results are different from the previous.

Example

await sdk.streams.subscribe({
    somniaStreamsEventId: "Firework",
    ethCalls,
    onData: (data) => {}
})

With ethCalls

await sdk.streams.subscribe({
  somniaStreamsEventId: 'TradeExecuted',
  ethCalls: [{
    to: '0xERC20Address',
    data: encodeFunctionData({
      abi: erc20Abi,
      functionName: 'balanceOf',
      args: ['0xUserAddress']
    })
  }],
  onData: (data) => console.log('Trade + balance data:', data)
})

Useful for off-chain reactivity: real-time dashboards, chat updates, live feeds, or notifications.

Notes

  • Requires createPublicClient({ transport: webSocket() })

  • Use setAndEmitEvents() on the publisher side to trigger matching subscriptions.

Protocol

getSomniaDataStreamsProtocolInfo(): Promise<GetSomniaDataStreamsProtocolInfoResponse | Error | null>

Description

Retrieves information about the Somnia Data Streams protocol.

Use Case

Fetching protocol-level details, such as version or configuration.

Example

const info = await sdk.streams.getSomniaDataStreamsProtocolInfo()
console.log('Protocol info:', info)

Key Types Reference

Type
Description

DataStream

{ id: Hex, schemaId: Hex, data: Hex } – Used with set() or setAndEmitEvents() .

EventStream

{ id: string, argumentTopics: Hex[], data: Hex } – Used with emitEvents() and setAndEmitEvents() .

DataSchemaRegistration

{ id: string, schema: string, parentSchemaId: Hex } – For registerDataSchemas() .

EventSchema

{ params: EventParameter[], eventTopic: string } – For registerEventSchemas() .

EthCall

{ to: Address, data: Hex } – Defines on-chain calls for event enrichment.

Developer Tips

  • Always compute your schema ID locally before deploying: await sdk.streams.computeSchemaId(schema).

  • For chat-like or telemetry apps, pair setAndEmitEvents() (write) with subscribe() (read).

  • Use zeroBytes32 for base schemas that don’t extend others.

  • All write methods return transaction hashes, use waitForTransactionReceipt() to confirm.

  • Data Streams focus on persistent, schema-based storage, while Event Streams enable reactive notifications; use them together for comprehensive applications.

Last updated