# SDK Methods Guide

Somnia Data Streams is the on-chain data streaming protocol that powers real-time, composable applications on the Somnia Network. It is available as an SDK Package [@somnia-chain/streams](https://www.npmjs.com/package/@somnia-chain/streams).

This SDK exposes all the core functionality developers need to write, read, subscribe to, and manage Data Streams and events directly from their dApps.

Before using the Data Streams SDK, ensure you have a working Node.js or Next.js environment (Node 18+ recommended). You’ll need access to a Somnia RPC endpoint (Testnet or Mainnet) and a wallet private key for publishing data.

### Installation

```bash
npm i @somnia-chain/streams viem dotenv
```

The SDK depends on [viem](https://viem.sh/) for blockchain interactions.

### Project Setup

Create a `.env.local` or `.env` file in your project root:

```bash
RPC_URL=https://dream-rpc.somnia.network
PRIVATE_KEY=your_private_key_here
```

⚠️ Never expose private keys in client-side code. Keep writes (publishing data) in server routes or backend environments.

### Basic Initialization

You’ll typically use two clients:

* A public client for reading and subscribing
* A wallet client for writing to the Somnia chain

```typescript
import { SDK } from '@somnia-chain/streams'
import { createPublicClient, createWalletClient, http } from 'viem'
import { privateKeyToAccount } from 'viem/accounts'
import { somniaTestnet } from 'viem/chains'

const rpcUrl = process.env.RPC_URL!
const account = privateKeyToAccount(process.env.PRIVATE_KEY as `0x${string}`)

const sdk = new SDK({
  public: createPublicClient({ chain: somniaTestnet, transport: http(rpcUrl) }),
  wallet: createWalletClient({ chain: somniaTestnet, account, transport: http(rpcUrl) })
})
```

### Somnia Data Streams

Data Streams in Somnia represent structured, verifiable data channels. Every piece of data conforms to a schema that defines its structure (e.g. `timestamp`, `content`, `sender`), and publishers can emit this data either as on-chain transactions or off-chain event notifications.

The SDK follows a simple pattern:

```typescript
const sdk = new SDK({
  public: getPublicClient(),  // for reading and subscriptions
  wallet: getWalletClient()   // for writing
})
```

You’ll interact primarily through the `sdk.streams` interface.

### Core Methods Overview

### Write

#### `set(d: DataStream[]): Promise<Hex | null>`

**Description**

Publishes one or more data streams to the Somnia blockchain. Each stream must specify a `schema ID`, a `unique ID`, and the `encoded payload`.

**Use Case**

When you want to store data on-chain in a standardized format (e.g., chat messages, sensor telemetry, or leaderboard updates).

**Example**

```typescript
const tx = await sdk.streams.set([
  { id: dataId, schemaId, data }
])
console.log('Data published with tx hash:', tx)
```

Always register your schema before calling `set()` ; otherwise, the transaction will revert.

#### `emitEvents(e: EventStream[]): Promise<Hex | Error | null>`

**Description**

Emits a registered Streams event without persisting new data. This is used for off-chain reactivity, triggering listeners subscribed via WebSocket.

**Example**

```typescript
await sdk.streams.emitEvents([
  {
    id: 'ChatMessage',
    argumentTopics: [topic],
    data: '0x' // optional encoded payload
  }
])
```

Common Use includes notifying subscribers when something happens, e.g., “new message sent” or “order filled”.

#### `setAndEmitEvents(d: DataStream[], e: EventStream[]): Promise<Hex | Error | null>`

**Description**

Performs an atomic on-chain operation that both writes data and emits a corresponding event. This ensures your data and notifications are always in sync.

**Example**

```typescript
await sdk.streams.setAndEmitEvents(
  [{ id: dataId, schemaId, data }],
  [{ id: 'ChatMessage', argumentTopics: [topic], data: '0x' }]
)
```

It is ideal for chat apps, game updates, or IoT streams — where data must be recorded and instantly broadcast.

### Manage

#### `registerDataSchemas(registrations: DataSchemaRegistration[], ignoreRegisteredSchemas?: boolean): Promise<Hex | Error | null>`

**Description**

Registers a new data schema on-chain. Schemas define the structure of your Streams data, like a table schema in a database. The optional `ignoreRegisteredSchemas` parameter allows skipping registration if the schema is already registered.

**Example**

```typescript
await sdk.streams.registerDataSchemas([
  {
    schemaName: "chat",
    schema: 'uint64 timestamp, string message, address sender',
    parentSchemaId: zeroBytes32 // root schema
  }
], true) // Optionally ignore if already registered
```

Register before writing any new data type. If you modify the schema structure later, register it again as a new schema version.

#### `registerEventSchemas(ids: string[], schemas: EventSchema[]): Promise<Hex | Error | null>`

**Description**

Registers event definitions that can later be emitted or subscribed to.

**Example**

```typescript
await sdk.streams.registerEventSchemas(
  ['ChatMessage'],
  [{
    params: [{ name: 'roomId', paramType: 'bytes32', isIndexed: true }],
    eventTopic: 'ChatMessage(bytes32 indexed roomId)'
  }]
)
```

Use before calling `emitEvents()` or `subscribe()` for a specific event.

#### `manageEventEmittersForRegisteredStreamsEvent(streamsEventId: string, emitter: Address, isEmitter: boolean): Promise<Hex | Error | null>`

**Description**

Grants or revokes permission for an address to emit a specific event.

**Example**

```typescript
await sdk.streams.manageEventEmittersForRegisteredStreamsEvent(
  'ChatMessage',
  '0x1234abcd...',
  true // allow this address to emit
)
```

Used for access control in multi-publisher systems.

### Read

#### `getByKey(schemaId: SchemaID, publisher: Address, key: Hex): Promise<Hex[] | SchemaDecodedItem[][] | null>`

**Description**

Retrieves data stored under a schema by its unique ID.

**Example**

```typescript
const msg = await sdk.streams.getByKey(schemaId, publisher, dataId)
console.log('Data:', msg)
```

An example includes fetching a specific record, e.g., “fetch message by message ID”.

#### `getAtIndex(schemaId: SchemaID, publisher: Address, idx: bigint): Promise<Hex[] | SchemaDecodedItem[][] | null>`

**Description**

Fetches the record at a given index (0-based).

**Example**

```typescript
const record = await sdk.streams.getAtIndex(schemaId, publisher, 0n)
```

It is useful for sequential datasets like logs or telemetry streams.

#### `getBetweenRange(schemaId: SchemaID, publisher: Address, startIndex: bigint, endIndex: bigint): Promise<Hex[] | SchemaDecodedItem[][] | Error | null>`

**Description**

Fetches records within a specified index range (0-based, inclusive start, exclusive end).

**Use Case**

Retrieving a batch of historical data, such as paginated logs or time-series entries.

**Example**

```typescript
const records = await sdk.streams.getBetweenRange(schemaId, publisher, 0n, 10n)
console.log('Records in range:', records)
```

#### `getAllPublisherDataForSchema(schemaReference: SchemaReference, publisher: Address): Promise<Hex[] | SchemaDecodedItem[][] | null>`

**Description**

Retrieves all data published by a specific address under a given schema.

**Use Case**

Fetching complete datasets for analysis or synchronization.

**Example**

```typescript
const allData = await sdk.streams.getAllPublisherDataForSchema(schemaReference, publisher)
console.log('All publisher data:', allData)
```

#### `getLastPublishedDataForSchema(schemaId: SchemaID, publisher: Address): Promise<Hex[] | SchemaDecodedItem[][] | null>`

**Description**

Retrieves the most recently published data under a schema by a publisher.

**Use Case**

Getting the latest update, such as the most recent sensor reading or message.

**Example**

```typescript
const latest = await sdk.streams.getLastPublishedDataForSchema(schemaId, publisher)
console.log('Latest data:', latest)
```

#### `totalPublisherDataForSchema(schemaId: SchemaID, publisher: Address): Promise<bigint | null>`

**Description**

Returns how many records a publisher has stored under a schema.

**Example**

```typescript
const total = await sdk.streams.totalPublisherDataForSchema(schemaId, publisher)
console.log(`Total entries: ${total}`)
```

#### `isDataSchemaRegistered(schemaId: SchemaID): Promise<boolean | null>`

**Description**

Checks if a schema exists on-chain.

**Example**

```typescript
const exists = await sdk.streams.isDataSchemaRegistered(schemaId)
if (!exists) console.log('Schema not found')
```

#### `parentSchemaId(schemaId: SchemaID): Promise<Hex | null>`

**Description**

Finds the parent schema of a given schema, if one exists.

**Example**

```typescript
const parent = await sdk.streams.parentSchemaId(schemaId)
console.log('Parent Schema ID:', parent)
```

#### `schemaIdToId(schemaId: SchemaID): Promise<string | null>`

**Description**

Converts a schema ID (Hex) to its corresponding string identifier.

**Use Case**

Mapping hashed IDs back to human-readable names for display or logging.

**Example**

```typescript
const id = await sdk.streams.schemaIdToId(schemaId)
console.log('Schema ID string:', id)
```

#### `idToSchemaId(id: string): Promise<Hex | null>`

**Description**

Converts a string identifier to its corresponding schema ID (Hex).

**Use Case**

Looking up hashed IDs from known names for queries.

**Example**

```typescript
const schemaId = await sdk.streams.idToSchemaId('chat')
console.log('Schema ID:', schemaId)
```

#### `getAllSchemas(): Promise<string[] | null>`

**Description**

Retrieves a list of all registered schema identifiers.

**Use Case**

Discovering available schemas in the protocol.

**Example**

```typescript
const schemas = await sdk.streams.getAllSchemas()
console.log('All schemas:', schemas)
```

#### `getEventSchemasById(ids: string[]): Promise<EventSchema[] | null>`

**Description**

Fetches event schema details for given identifiers.

**Use Case**

Inspecting registered event structures before subscribing or emitting.

**Example**

```typescript
const eventSchemas = await sdk.streams.getEventSchemasById(['ChatMessage'])
console.log('Event schemas:', eventSchemas)
```

#### `computeSchemaId(schema: string): Promise<Hex | null>`

**Description**

Computes the deterministic `schemaId` without registering it.

**Example**

```typescript
const schemaId = await sdk.streams.computeSchemaId('uint64 timestamp, string content')
```

#### `getSchemaFromSchemaId(schemaId: SchemaID): Promise<{ baseSchema: string, finalSchema: string, schemaId: Hex } | Error | null>`

**Description**

Request a schema given the schema id used for data publishing and let the SDK take care of schema extensions.

**Use Case**

Retrieving schema details, including the base schema and the final extended schema, for a given schema ID.

**Example**

```typescript
const schemaInfo = await sdk.streams.getSchemaFromSchemaId(schemaId)
console.log('Schema info:', schemaInfo)
```

### Helpers

#### `deserialiseRawData(rawData: Hex[], parentSchemaId: Hex, schemaLookup: { schema: string; schemaId: Hex; } | null): Promise<Hex[] | SchemaDecodedItem[][] | null>`

**Description**

Deserializes raw data using the provided schema information.

**Use Case**

Decoding fetched raw bytes into structured objects for application use.

**Example**

```typescript
const decoded = await sdk.streams.deserialiseRawData(rawData, parentSchemaId, schemaLookup)
console.log('Decoded data:', decoded)
```

### Subscribe

#### `subscribe(initParams: SubscriptionInitParams): Promise<{ subscriptionId: string, unsubscribe: () => void } | undefined>`

**Description**

Creates a real-time WebSocket subscription to a Streams event. Whenever the specified event fires, the SDK calls your `onData` callback — optionally including enriched data from on-chain calls.

**Parameters**

* `ethCalls`: Fixed set of ETH calls that must be executed before onData callback is triggered. Multicall3 is recommended. Can be an empty array.
* `context`: Event sourced selectors to be added to the data field of ETH calls, possible values: topic0, topic1, topic2, topic3, topic4, data and address.
* `onData`: Callback for a successful reactivity notification.
* `onError`: Callback for a failed attempt.
* `eventContractSource`: Optional but is the contract event source (any on Somnia) that will be emitting the logs specified by topicOverrides.
* `topicOverrides`: Optional but this argument is a filter applied to the subscription. Up to 4 bytes32 event topics can be supplied. By not defining, this is the equivalent of a wildcard subscription to all event topics
* `onlyPushChanges`: Whether the data should be pushed to the subscriber only if eth\_call results are different from the previous.

**Example**

```typescript
// Wildcard subscription to all events emitted by all contracts
await sdk.streams.subscribe({
    ethCalls: [], // No view calls
    onData: (data) => {}
})
```

**With `ethCalls`**

```typescript
import { toEventSelector } from "viem"
const transferSelector = toEventSelector({
    name: 'Transfer',
    type: 'event',
    inputs: [
      { type: 'address', indexed: true, name: 'from' },
      { type: 'address', indexed: true, name: 'to' },
      { type: 'uint256', indexed: false, name: 'value' }
    ]
  })

await sdk.streams.subscribe({
  topicOverrides: [
    transferSelector, // Topic 0 (Transfer event)
  ],
  ethCalls: [{
    to: '0xERC20Address',
    data: encodeFunctionData({
      abi: erc20Abi,
      functionName: 'balanceOf',
      args: ['0xUserAddress']
    })
  }],
  onData: (data) => console.log('Trade + balance data:', data)
})
```

Useful for off-chain reactivity: real-time dashboards, chat updates, live feeds, or notifications.

**Notes**

* Requires `createPublicClient({ transport: webSocket() })`
* Use `setAndEmitEvents()` on the publisher side to trigger matching subscriptions.

### Protocol

#### `getSomniaDataStreamsProtocolInfo(): Promise<GetSomniaDataStreamsProtocolInfoResponse | Error | null>`

**Description**

Retrieves information about the Somnia Data Streams protocol.

**Use Case**

Fetching protocol-level details, such as version or configuration.

**Example**

```typescript
const info = await sdk.streams.getSomniaDataStreamsProtocolInfo()
console.log('Protocol info:', info)
```

### Key Types Reference

| Type                     | Description                                                                                              |
| ------------------------ | -------------------------------------------------------------------------------------------------------- |
| `DataStream`             | `{ id: Hex, schemaId: Hex, data: Hex }` – Used with `set()` or `setAndEmitEvents()` .                    |
| `EventStream`            | `{ id: string, argumentTopics: Hex[], data: Hex }` – Used with `emitEvents()` and `setAndEmitEvents()` . |
| `DataSchemaRegistration` | `{ schemaName: string, schema: string, parentSchemaId: Hex }` – For `registerDataSchemas()` .            |
| `EventSchema`            | `{ params: EventParameter[], eventTopic: string }` – For `registerEventSchemas()` .                      |
| `EthCall`                | `{ to: Address, data: Hex }` – Defines on-chain calls for event enrichment.                              |

### Developer Tips

* Always compute your schema ID locally before deploying: `await sdk.streams.computeSchemaId(schema)`.
* For chat-like or telemetry apps, pair `setAndEmitEvents()` (write) with `subscribe()` (read).
* Use `zeroBytes32` for base schemas that don’t extend others.
* All write methods return transaction hashes, use `waitForTransactionReceipt()` to confirm.
* Data Streams focus on persistent, schema-based storage, while Event Streams enable reactive notifications; use them together for comprehensive applications.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.somnia.network/developer/data-streams/sdk-methods-guide.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
