Working with Multiple Publishers in a Shared Stream

The core architecture of Somnia Data Streams decouples data schemas from publishers. This allows multiple different accounts (or devices) to publish data using the same schema. The data conforms to the same data structure, regardless of who published it.

This model is perfect for multi-source data scenarios, such as:

  • Multi-User Chat: Multiple users sending messages under the same chatMessage schema.

  • IoT (Internet of Things): Hundreds of sensors submitting data under the same telemetry schema.

  • Gaming: All players in a game publishing their positions and scores under the same playerUpdate schema.

In this tutorial, we will demonstrate how to build an "aggregator" application that collects and merges data from two different "devices" (two separate wallet accounts) publishing to the same "telemetry" schema.

Prerequisites

  • Node.js 18+ and npm

  • @somnia-chain/streams and viem libraries installed.

  • An RPC_URL for access to the Somnia Testnet.

  • Two (2) funded Somnia Testnet wallets for publishing data.

What You’ll Build

In this tutorial, we will build two main components:

  1. A Publisher Script that simulates two different wallets sending data to the same telemetry schema.

  2. An Aggregator Script that fetches all data from a specified list of publishers, merges them into a single list, and sorts them by timestamp.

Project Setup

Create a new directory for your application and install the necessary packages.

mkdir somnia-aggregator
cd somnia-aggregator
npm init -y
npm i @somnia-chain/streams viem dotenv
npm i -D @types/node typescript ts-node

Create a tsconfig.json file in your project root:

{
  "compilerOptions": {
    "target": "ES2020",
    "module": "commonjs",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true,
    "forceConsistentCasingInFileNames": true,
    "outDir": "./dist"
  },
  "include": ["src/**/*"]
}

Configure Environment Variables

Create a .env file in your project root. For this tutorial, we will need two different private keys.

# .env
RPC_URL=https://dream-rpc.somnia.network/ 

# Simulates two different devices/publishers
PUBLISHER_1_PK=0xPUBLISHER_ONE_PRIVATE_KEY
PUBLISHER_2_PK=0xPUBLISHER_TWO_PRIVATE_KEY

IMPORTANT: Never expose private keys in client-side (browser) code or public repositories. The scripts in this tutorial are intended to be run server-side.

Chain and Client Configuration

Create a folder named src/lib and set up your chain.ts and clients.ts files.

src/lib/chain.ts

import { defineChain } from 'viem'

export const somniaTestnet = defineChain({
  id: 50312,
  name: 'Somnia Testnet',
  network: 'somnia-testnet',
  nativeCurrency: { name: 'STT', symbol: 'STT', decimals: 18 },
  rpcUrls: {
    default: { http: ['[https://dream-rpc.somnia.network]'] },
    public:  { http: ['[https://dream-rpc.somnia.network]'] },
  },
} as const)

src/lib/clients.ts

import 'dotenv/config'
import { createPublicClient, createWalletClient, http, PublicClient } from 'viem'
import { privateKeyToAccount, PrivateKeyAccount } from 'viem/accounts'
import { somniaTestnet } from './chain'

function getEnv(key: string): string {
  const value = process.env[key]
  if (!value) {
    throw new Error(`Missing environment variable: ${key}`)
  }
  return value
}

// A single Public Client for read operations
export const publicClient: PublicClient = createPublicClient({
  chain: somniaTestnet, 
  transport: http(getEnv('RPC_URL')),
})

// Two different Wallet Clients for simulation
export const walletClient1 = createWalletClient({
  account: privateKeyToAccount(getEnv('PUBLISHER_1_PK') as `0x${string}`),
  chain: somniaTestnet, 
  transport: http(getEnv('RPC_URL')),
})

export const walletClient2 = createWalletClient({
  account: privateKeyToAccount(getEnv('PUBLISHER_2_PK') as `0x${string}`),
  chain: somniaTestnet,
  transport: http(getEnv('RPC_URL')),
})

Define the Shared Schema

Let's define the common schema that all publishers will use.

src/lib/schema.ts

// This schema will be used by multiple devices
export const telemetrySchema = 
  'uint64 timestamp, string deviceId, int32 x, int32 y, uint32 speed'

Create the Publisher Script

Now, let's create a script that simulates how two different publishers will send data to this schema. This script will take which publisher to use as a command-line argument.

src/scripts/publishData.ts

import 'dotenv/config'
import { SDK, SchemaEncoder, zeroBytes32 } from '@somnia-chain/streams'
import { publicClient, walletClient1, walletClient2 } from '../lib/clients'
import { telemetrySchema } from '../lib/schema'
import { toHex, Hex, WalletClient } from 'viem'
import { waitForTransactionReceipt } from 'viem/actions'

// Select which publisher to use
async function getPublisher(): Promise<{ client: WalletClient, deviceId: string }> {
  const arg = process.argv[2] // 'p1' or 'p2'
  if (arg === 'p2') {
    console.log('Using Publisher 2 (Device B)')
    return { client: walletClient2, deviceId: 'device-b-002' }
  }
  console.log('Using Publisher 1 (Device A)')
  return { client: walletClient1, deviceId: 'device-a-001' }
}

// Helper function to encode the data
function encodeTelemetry(encoder: SchemaEncoder, deviceId: string): Hex {
  const now = Date.now().toString()
  return encoder.encodeData([
    { name: "timestamp", value: now, type: "uint64" },
    { name: "deviceId", value: deviceId, type: "string" },
    { name: "x", value: Math.floor(Math.random() * 1000).toString(), type: "int32" },
    { name: "y", value: Math.floor(Math.random() * 1000).toString(), type: "int32" },
    { name: "speed", value: Math.floor(Math.random() * 120).toString(), type: "uint32" },
  ])
}

async function main() {
  const { client, deviceId } = await getPublisher()
  const publisherAddress = client.account.address
  console.log(`Publisher Address: ${publisherAddress}`)

  const sdk = new SDK({ public: publicClient, wallet: client })
  const encoder = new SchemaEncoder(telemetrySchema)

  // 1. Compute the Schema ID
  const schemaId = await sdk.streams.computeSchemaId(telemetrySchema)
  if (!schemaId) throw new Error('Could not compute schemaId')
  console.log(`Schema ID: ${schemaId}`)

  // 2. Register Schema (Updated for new API)
  console.log('Registering schema (if not already registered)...')
  
  const ignoreAlreadyRegisteredSchemas = true
  const regTx = await sdk.streams.registerDataSchemas([
    { 
      schemaName: 'telemetry', // Updated: 'id' is now 'schemaName'
      schema: telemetrySchema, 
      parentSchemaId: zeroBytes32 
    }
  ], ignoreAlreadyRegisteredSchemas)

  if (regTx) {
    console.log('Schema registration transaction sent:', regTx)
    await waitForTransactionReceipt(publicClient, { hash: regTx })
    console.log('Schema registered successfully!')
  } else {
    console.log('Schema was already registered. No transaction sent.')
  }

  // 3. Encode the data
  const encodedData = encodeTelemetry(encoder, deviceId)
  
  // 4. Publish the data
  // We make the dataId unique with a timestamp and device ID
  const dataId = toHex(`${deviceId}-${Date.now()}`, { size: 32 })
  
  const txHash = await sdk.streams.set([
    { id: dataId, schemaId, data: encodedData }
  ])

  if (!txHash) throw new Error('Failed to publish data')
  console.log(`Publishing data... Tx: ${txHash}`)

  await waitForTransactionReceipt(publicClient, { hash: txHash })
  console.log('Data published successfully!')
}

main().catch((e) => {
  console.error(e)
  process.exit(1)
})

To run this script:

Add the following scripts to your package.json file:

"scripts": {
  "publish:p1": "ts-node src/scripts/publishData.ts p1",
  "publish:p2": "ts-node src/scripts/publishData.ts p2"
}

Now, open two different terminals and send data from each:

# Terminal 1
npm run publish:p1
# Terminal 2
npm run publish:p2

Repeat this a few times to build up a dataset from both publishers.

Create the Aggregator Script

Now that we have published our data, we can write the "aggregator" script that collects, merges, and sorts all data from these two (or more) publishers.

src/scripts/aggregateData.ts

import 'dotenv/config'
import { SDK, SchemaDecodedItem } from '@somnia-chain/streams'
import { publicClient, walletClient1, walletClient2 } from '../lib/clients'
import { telemetrySchema } from '../lib/schema'
import { Address } from 'viem'

// LIST OF PUBLISHERS TO TRACK
// You could also fetch this list dynamically (e.g., from a contract or database).
const TRACKED_PUBLISHERS: Address[] = [
  walletClient1.account.address,
  walletClient2.account.address,
]

// Helper function to convert SDK data into a cleaner object
// (Similar to the 'val' function in the Minimal On-Chain Chat App Tutorial)
function decodeTelemetryRecord(row: SchemaDecodedItem[]): TelemetryRecord {
  const val = (field: any) => field?.value?.value ?? field?.value ?? ''
  return {
    timestamp: Number(val(row[0])),
    deviceId: String(val(row[1])),
    x: Number(val(row[2])),
    y: Number(val(row[3])),
    speed: Number(val(row[4])),
  }
}

// Type definition for our data
interface TelemetryRecord {
  timestamp: number
  deviceId: string
  x: number
  y: number
  speed: number
  publisher?: Address // We will add this field later
}

async function main() {
  // The aggregator doesn't need to write data, so it only uses the publicClient
  const sdk = new SDK({ public: publicClient })
  
  const schemaId = await sdk.streams.computeSchemaId(telemetrySchema)
  if (!schemaId) throw new Error('Could not compute schemaId')

  console.log(`Aggregator started. Tracking ${TRACKED_PUBLISHERS.length} publishers...`)
  console.log(`Schema ID: ${schemaId}\n`)

  const allRecords: TelemetryRecord[] = []

  // 1. Loop through each publisher
  for (const publisherAddress of TRACKED_PUBLISHERS) {
    console.log(`--- Fetching data for ${publisherAddress} ---`)
    
    // 2. Fetch all data for the publisher based on the schema
    // Note: The SDK automatically decodes the data if the schema is registered
    const data = await sdk.streams.getAllPublisherDataForSchema(schemaId, publisherAddress)
    
    if (!data || data.length === 0) {
      console.log('No data found for this publisher.\n')
      continue
    }

    // 3. Transform the data and add the 'publisher' field
    const records: TelemetryRecord[] = (data as SchemaDecodedItem[][]).map(row => ({
      ...decodeTelemetryRecord(row),
      publisher: publisherAddress // To know where the data came from
    }))

    console.log(`Found ${records.length} records.`)

    // 4. Add all records to the main list
    allRecords.push(...records)
  }

  // 5. Sort all data by timestamp
  console.log('\n--- Aggregation Complete ---')
  console.log(`Total records fetched: ${allRecords.length}`)

  allRecords.sort((a, b) => a.timestamp - b.timestamp)

  // 6. Display the result
  console.log('\n--- Combined and Sorted Telemetry Log ---')
  allRecords.forEach(record => {
    console.log(
      `[${new Date(record.timestamp).toISOString()}] [${record.publisher}] - Device: ${record.deviceId}, Speed: ${record.speed}`
    )
  })
}

main().catch((e) => {
  console.error(e)
  process.exit(1)
})

To run this script:

Add the script to your package.json file:

"scripts": {
  "publish:p1": "ts-node src/scripts/publishData.ts p1",
  "publish:p2": "ts-node src/scripts/publishData.ts p2",
  "aggregate": "ts-node src/scripts/aggregateData.ts"
}

And run it:

npm run aggregate

Conclusion

In this tutorial, you learned how to manage a multi-publisher architecture with Somnia Data Streams.

  • Publisher Side: The logic remained unchanged. Each publisher independently published its data using its wallet and the sdk.streams.set() method.

  • Aggregator Side: This is where the main logic came in.

    1. We maintained a list of publishers we were interested in.

    2. We fetched the data for each publisher separately using the getAllPublisherDataForSchema method.

    3. We combined the incoming data into a single array (allRecords.push(...)).

    4. Finally, we sorted all the data on the client-side to display them in a meaningful order (e.g., by timestamp).

This pattern can be scaled to support any number of publishers and provides a robust foundation for building decentralized, multi-source applications.

Last updated