SDK Methods Guide
Detailed SDK interface handbook for interacting with the Somnia Data Streams protocol via the typescript SDK
Somnia Data Streams is the on-chain data streaming protocol that powers real-time, composable applications on the Somnia Network. It is available as an SDK Package https://www.npmjs.com/package/@somnia-chain/streams.
This SDK exposes all the core functionality developers need to write, read, subscribe to, and manage Data Streams and events directly from their dApps.
Before using the Data Streams SDK, ensure you have a working Node.js or Next.js environment (Node 18+ recommended). You’ll need access to a Somnia RPC endpoint (Testnet or Mainnet) and a wallet private key for publishing data.
Installation
Use npm, yarn, or pnpm:
# Using npm
npm install @somnia-chain/streams viem dotenv
# or with yarn
yarn add @somnia-chain/streams viem dotenvThe SDK depends on viem for blockchain interactions.
Project Setup
Create a .env.local or .env file in your project root:
RPC_URL=https://dream-rpc.somnia.network
PRIVATE_KEY=your_private_key_here⚠️ Never expose private keys in client-side code. Keep writes (publishing data) in server routes or backend environments.
Basic Initialization
You’ll typically use two clients:
A public client for reading and subscribing
A wallet client for writing to the Somnia chain
import { SDK } from '@somnia-chain/streams'
import { createPublicClient, createWalletClient, http } from 'viem'
import { privateKeyToAccount } from 'viem/accounts'
import { somniaTestnet } from 'viem/chains'
const rpcUrl = process.env.RPC_URL!
const account = privateKeyToAccount(process.env.PRIVATE_KEY as `0x${string}`)
const sdk = new SDK({
public: createPublicClient({ chain: somniaTestnet, transport: http(rpcUrl) }),
wallet: createWalletClient({ chain: somniaTestnet, account, transport: http(rpcUrl) })
})Somnia Data Streams
Data Streams in Somnia represent structured, verifiable data channels. Every piece of data conforms to a schema that defines its structure (e.g. timestamp, content, sender), and publishers can emit this data either as on-chain transactions or off-chain event notifications.
The SDK follows a simple pattern:
const sdk = new SDK({
public: getPublicClient(), // for reading and subscriptions
wallet: getWalletClient() // for writing
})You’ll interact primarily through the sdk.streams interface.
Core Methods Overview
Write
set(d: DataStream[]): Promise<Hex | null>
set(d: DataStream[]): Promise<Hex | null>Description
Publishes one or more data streams to the Somnia blockchain. Each stream must specify a schema ID, a unique ID, and the encoded payload.
Use Case
When you want to store data on-chain in a standardized format (e.g., chat messages, sensor telemetry, or leaderboard updates).
Example
const tx = await sdk.streams.set([
{ id: dataId, schemaId, data }
])
console.log('Data published with tx hash:', tx)Always register your schema before calling set() ; otherwise, the transaction will revert.
emitEvents(e: EventStream[]): Promise<Hex | Error | null>
emitEvents(e: EventStream[]): Promise<Hex | Error | null>Description
Emits a registered Streams event without persisting new data. This is used for off-chain reactivity, triggering listeners subscribed via WebSocket.
Example
await sdk.streams.emitEvents([
{
id: 'ChatMessage',
argumentTopics: [topic],
data: '0x' // optional encoded payload
}
])Common Use includes notifying subscribers when something happens, e.g., “new message sent” or “order filled”.
setAndEmitEvents(d: DataStream[], e: EventStream[]): Promise<Hex | Error | null>
setAndEmitEvents(d: DataStream[], e: EventStream[]): Promise<Hex | Error | null>Description
Performs an atomic on-chain operation that both writes data and emits a corresponding event. This ensures your data and notifications are always in sync.
Example
await sdk.streams.setAndEmitEvents(
[{ id: dataId, schemaId, data }],
[{ id: 'ChatMessage', argumentTopics: [topic], data: '0x' }]
)It is ideal for chat apps, game updates, or IoT streams — where data must be recorded and instantly broadcast.
Manage
registerDataSchemas(registrations: DataSchemaRegistration[], ignoreRegisteredSchemas?: boolean): Promise<Hex | Error | null>
registerDataSchemas(registrations: DataSchemaRegistration[], ignoreRegisteredSchemas?: boolean): Promise<Hex | Error | null>Description
Registers a new data schema on-chain. Schemas define the structure of your Streams data, like a table schema in a database. The optional ignoreRegisteredSchemas parameter allows skipping registration if the schema is already registered.
Example
await sdk.streams.registerDataSchemas([
{
id: "chat",
schema: 'uint64 timestamp, string message, address sender',
parentSchemaId: zeroBytes32 // root schema
}
], true) // Optionally ignore if already registeredRegister before writing any new data type. If you modify the schema structure later, register it again as a new schema version.
registerEventSchemas(ids: string[], schemas: EventSchema[]): Promise<Hex | Error | null>
registerEventSchemas(ids: string[], schemas: EventSchema[]): Promise<Hex | Error | null>Description
Registers event definitions that can later be emitted or subscribed to.
Example
await sdk.streams.registerEventSchemas(
['ChatMessage'],
[{
params: [{ name: 'roomId', paramType: 'bytes32', isIndexed: true }],
eventTopic: 'ChatMessage(bytes32 indexed roomId)'
}]
)Use before calling emitEvents() or subscribe() for a specific event.
manageEventEmittersForRegisteredStreamsEvent(streamsEventId: string, emitter: Address, isEmitter: boolean): Promise<Hex | Error | null>
manageEventEmittersForRegisteredStreamsEvent(streamsEventId: string, emitter: Address, isEmitter: boolean): Promise<Hex | Error | null>Description
Grants or revokes permission for an address to emit a specific event.
Example
await sdk.streams.manageEventEmittersForRegisteredStreamsEvent(
'ChatMessage',
'0x1234abcd...',
true // allow this address to emit
)Used for access control in multi-publisher systems.
Read
getByKey(schemaId: SchemaID, publisher: Address, key: Hex): Promise<Hex[] | SchemaDecodedItem[][] | null>
getByKey(schemaId: SchemaID, publisher: Address, key: Hex): Promise<Hex[] | SchemaDecodedItem[][] | null>Description
Retrieves data stored under a schema by its unique ID.
Example
const msg = await sdk.streams.getByKey(schemaId, publisher, dataId)
console.log('Data:', msg)An example includes fetching a specific record, e.g., “fetch message by message ID”.
getAtIndex(schemaId: SchemaID, publisher: Address, idx: bigint): Promise<Hex[] | SchemaDecodedItem[][] | null>
getAtIndex(schemaId: SchemaID, publisher: Address, idx: bigint): Promise<Hex[] | SchemaDecodedItem[][] | null>Description
Fetches the record at a given index (0-based).
Example
const record = await sdk.streams.getAtIndex(schemaId, publisher, 0n)It is useful for sequential datasets like logs or telemetry streams.
getBetweenRange(schemaId: SchemaID, publisher: Address, startIndex: bigint, endIndex: bigint): Promise<Hex[] | SchemaDecodedItem[][] | Error | null>
getBetweenRange(schemaId: SchemaID, publisher: Address, startIndex: bigint, endIndex: bigint): Promise<Hex[] | SchemaDecodedItem[][] | Error | null>Description
Fetches records within a specified index range (0-based, inclusive start, exclusive end).
Use Case
Retrieving a batch of historical data, such as paginated logs or time-series entries.
Example
const records = await sdk.streams.getBetweenRange(schemaId, publisher, 0n, 10n)
console.log('Records in range:', records)getAllPublisherDataForSchema(schemaReference: SchemaReference, publisher: Address): Promise<Hex[] | SchemaDecodedItem[][] | null>
getAllPublisherDataForSchema(schemaReference: SchemaReference, publisher: Address): Promise<Hex[] | SchemaDecodedItem[][] | null>Description
Retrieves all data published by a specific address under a given schema.
Use Case
Fetching complete datasets for analysis or synchronization.
Example
const allData = await sdk.streams.getAllPublisherDataForSchema(schemaReference, publisher)
console.log('All publisher data:', allData)getLastPublishedDataForSchema(schemaId: SchemaID, publisher: Address): Promise<Hex[] | SchemaDecodedItem[][] | null>
getLastPublishedDataForSchema(schemaId: SchemaID, publisher: Address): Promise<Hex[] | SchemaDecodedItem[][] | null>Description
Retrieves the most recently published data under a schema by a publisher.
Use Case
Getting the latest update, such as the most recent sensor reading or message.
Example
const latest = await sdk.streams.getLastPublishedDataForSchema(schemaId, publisher)
console.log('Latest data:', latest)totalPublisherDataForSchema(schemaId: SchemaID, publisher: Address): Promise<bigint | null>
totalPublisherDataForSchema(schemaId: SchemaID, publisher: Address): Promise<bigint | null>Description
Returns how many records a publisher has stored under a schema.
Example
const total = await sdk.streams.totalPublisherDataForSchema(schemaId, publisher)
console.log(`Total entries: ${total}`)isDataSchemaRegistered(schemaId: SchemaID): Promise<boolean | null>
isDataSchemaRegistered(schemaId: SchemaID): Promise<boolean | null>Description
Checks if a schema exists on-chain.
Example
const exists = await sdk.streams.isDataSchemaRegistered(schemaId)
if (!exists) console.log('Schema not found')parentSchemaId(schemaId: SchemaID): Promise<Hex | null>
parentSchemaId(schemaId: SchemaID): Promise<Hex | null>Description
Finds the parent schema of a given schema, if one exists.
Example
const parent = await sdk.streams.parentSchemaId(schemaId)
console.log('Parent Schema ID:', parent)schemaIdToId(schemaId: SchemaID): Promise<string | null>
schemaIdToId(schemaId: SchemaID): Promise<string | null>Description
Converts a schema ID (Hex) to its corresponding string identifier.
Use Case
Mapping hashed IDs back to human-readable names for display or logging.
Example
const id = await sdk.streams.schemaIdToId(schemaId)
console.log('Schema ID string:', id)idToSchemaId(id: string): Promise<Hex | null>
idToSchemaId(id: string): Promise<Hex | null>Description
Converts a string identifier to its corresponding schema ID (Hex).
Use Case
Looking up hashed IDs from known names for queries.
Example
const schemaId = await sdk.streams.idToSchemaId('chat')
console.log('Schema ID:', schemaId)getAllSchemas(): Promise<string[] | null>
getAllSchemas(): Promise<string[] | null>Description
Retrieves a list of all registered schema identifiers.
Use Case
Discovering available schemas in the protocol.
Example
const schemas = await sdk.streams.getAllSchemas()
console.log('All schemas:', schemas)getEventSchemasById(ids: string[]): Promise<EventSchema[] | null>
getEventSchemasById(ids: string[]): Promise<EventSchema[] | null>Description
Fetches event schema details for given identifiers.
Use Case
Inspecting registered event structures before subscribing or emitting.
Example
const eventSchemas = await sdk.streams.getEventSchemasById(['ChatMessage'])
console.log('Event schemas:', eventSchemas)computeSchemaId(schema: string): Promise<Hex | null>
computeSchemaId(schema: string): Promise<Hex | null>Description
Computes the deterministic schemaId without registering it.
Example
const schemaId = await sdk.streams.computeSchemaId('uint64 timestamp, string content')getSchemaFromSchemaId(schemaId: SchemaID): Promise<{ baseSchema: string, finalSchema: string, schemaId: Hex } | Error | null>
getSchemaFromSchemaId(schemaId: SchemaID): Promise<{ baseSchema: string, finalSchema: string, schemaId: Hex } | Error | null>Description
Request a schema given the schema id used for data publishing and let the SDK take care of schema extensions.
Use Case
Retrieving schema details, including the base schema and the final extended schema, for a given schema ID.
Example
const schemaInfo = await sdk.streams.getSchemaFromSchemaId(schemaId)
console.log('Schema info:', schemaInfo)Helpers
deserialiseRawData(rawData: Hex[], parentSchemaId: Hex, schemaLookup: { schema: string; schemaId: Hex; } | null): Promise<Hex[] | SchemaDecodedItem[][] | null>
deserialiseRawData(rawData: Hex[], parentSchemaId: Hex, schemaLookup: { schema: string; schemaId: Hex; } | null): Promise<Hex[] | SchemaDecodedItem[][] | null>Description
Deserializes raw data using the provided schema information.
Use Case
Decoding fetched raw bytes into structured objects for application use.
Example
const decoded = await sdk.streams.deserialiseRawData(rawData, parentSchemaId, schemaLookup)
console.log('Decoded data:', decoded)Subscribe
subscribe(initParams: SubscriptionInitParams): Promise<{ subscriptionId: string, unsubscribe: () => void } | undefined>
subscribe(initParams: SubscriptionInitParams): Promise<{ subscriptionId: string, unsubscribe: () => void } | undefined>Description
Creates a real-time WebSocket subscription to a Streams event. Whenever the specified event fires, the SDK calls your onData callback — optionally including enriched data from on-chain calls.
Parameters
somniaStreamsEventId: The identifier of a registered event schema within Somnia streams protocol or null if using a custom event source.ethCalls: Fixed set of ETH calls that must be executed before onData callback is triggered. Multicall3 is recommended. Can be an empty array.context: Event sourced selectors to be added to the data field of ETH calls, possible values: topic0, topic1, topic2, topic3, topic4, data and address.onData: Callback for a successful reactivity notification.onError: Callback for a failed attempt.eventContractSource: Alternative contract event source (any on Somnia) that will be emitting the logs specified by topicOverrides.topicOverrides: Optional when using Somnia streams as an event source but mandatory when using a different event source.onlyPushChanges: Whether the data should be pushed to the subscriber only if eth_call results are different from the previous.
Example
await sdk.streams.subscribe({
somniaStreamsEventId: "Firework",
ethCalls,
onData: (data) => {}
})With ethCalls
await sdk.streams.subscribe({
somniaStreamsEventId: 'TradeExecuted',
ethCalls: [{
to: '0xERC20Address',
data: encodeFunctionData({
abi: erc20Abi,
functionName: 'balanceOf',
args: ['0xUserAddress']
})
}],
onData: (data) => console.log('Trade + balance data:', data)
})Useful for off-chain reactivity: real-time dashboards, chat updates, live feeds, or notifications.
Notes
Requires
createPublicClient({ transport: webSocket() })Use
setAndEmitEvents()on the publisher side to trigger matching subscriptions.
Protocol
getSomniaDataStreamsProtocolInfo(): Promise<GetSomniaDataStreamsProtocolInfoResponse | Error | null>
getSomniaDataStreamsProtocolInfo(): Promise<GetSomniaDataStreamsProtocolInfoResponse | Error | null>Description
Retrieves information about the Somnia Data Streams protocol.
Use Case
Fetching protocol-level details, such as version or configuration.
Example
const info = await sdk.streams.getSomniaDataStreamsProtocolInfo()
console.log('Protocol info:', info)Key Types Reference
DataStream
{ id: Hex, schemaId: Hex, data: Hex } – Used with set() or setAndEmitEvents() .
EventStream
{ id: string, argumentTopics: Hex[], data: Hex } – Used with emitEvents() and setAndEmitEvents() .
DataSchemaRegistration
{ id: string, schema: string, parentSchemaId: Hex } – For registerDataSchemas() .
EventSchema
{ params: EventParameter[], eventTopic: string } – For registerEventSchemas() .
EthCall
{ to: Address, data: Hex } – Defines on-chain calls for event enrichment.
Developer Tips
Always compute your schema ID locally before deploying:
await sdk.streams.computeSchemaId(schema).For chat-like or telemetry apps, pair
setAndEmitEvents()(write) withsubscribe()(read).Use
zeroBytes32for base schemas that don’t extend others.All write methods return transaction hashes, use
waitForTransactionReceipt()to confirm.Data Streams focus on persistent, schema-based storage, while Event Streams enable reactive notifications; use them together for comprehensive applications.
Last updated