Working with Multiple Publishers in a Shared Stream
The core architecture of Somnia Data Streams decouples data schemas from publishers. This allows multiple different accounts (or devices) to publish data using the same schema. The data conforms to the same data structure, regardless of who published it.
This model is perfect for multi-source data scenarios, such as:
Multi-User Chat: Multiple users sending messages under the same
chatMessageschema.IoT (Internet of Things): Hundreds of sensors submitting data under the same
telemetryschema.Gaming: All players in a game publishing their positions and scores under the same
playerUpdateschema.
In this tutorial, we will demonstrate how to build an "aggregator" application that collects and merges data from two different "devices" (two separate wallet accounts) publishing to the same "telemetry" schema.
Prerequisites
Node.js 18+ and npm
@somnia-chain/streamsandviemlibraries installed.An
RPC_URLfor access to the Somnia Testnet.Two (2) funded Somnia Testnet wallets for publishing data.
What You’ll Build
In this tutorial, we will build two main components:
A Publisher Script that simulates two different wallets sending data to the same telemetry schema.
An Aggregator Script that fetches all data from a specified list of publishers, merges them into a single list, and sorts them by timestamp.
Project Setup
Create a new directory for your application and install the necessary packages.
mkdir somnia-aggregator
cd somnia-aggregator
npm init -y
npm i @somnia-chain/streams viem dotenv
npm i -D @types/node typescript ts-nodeCreate a tsconfig.json file in your project root:
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"outDir": "./dist"
},
"include": ["src/**/*"]
}Configure Environment Variables
Create a .env file in your project root. For this tutorial, we will need two different private keys.
# .env
RPC_URL=https://dream-rpc.somnia.network/
# Simulates two different devices/publishers
PUBLISHER_1_PK=0xPUBLISHER_ONE_PRIVATE_KEY
PUBLISHER_2_PK=0xPUBLISHER_TWO_PRIVATE_KEYChain and Client Configuration
Create a folder named src/lib and set up your chain.ts and clients.ts files.
src/lib/chain.ts
import { defineChain } from 'viem'
export const somniaTestnet = defineChain({
id: 50312,
name: 'Somnia Testnet',
network: 'somnia-testnet',
nativeCurrency: { name: 'STT', symbol: 'STT', decimals: 18 },
rpcUrls: {
default: { http: ['[https://dream-rpc.somnia.network]'] },
public: { http: ['[https://dream-rpc.somnia.network]'] },
},
} as const)src/lib/clients.ts
import 'dotenv/config'
import { createPublicClient, createWalletClient, http, PublicClient } from 'viem'
import { privateKeyToAccount, PrivateKeyAccount } from 'viem/accounts'
import { somniaTestnet } from './chain'
function getEnv(key: string): string {
const value = process.env[key]
if (!value) {
throw new Error(`Missing environment variable: ${key}`)
}
return value
}
// A single Public Client for read operations
export const publicClient: PublicClient = createPublicClient({
chain: somniaTestnet,
transport: http(getEnv('RPC_URL')),
})
// Two different Wallet Clients for simulation
export const walletClient1 = createWalletClient({
account: privateKeyToAccount(getEnv('PUBLISHER_1_PK') as `0x${string}`),
chain: somniaTestnet,
transport: http(getEnv('RPC_URL')),
})
export const walletClient2 = createWalletClient({
account: privateKeyToAccount(getEnv('PUBLISHER_2_PK') as `0x${string}`),
chain: somniaTestnet,
transport: http(getEnv('RPC_URL')),
})Define the Shared Schema
Let's define the common schema that all publishers will use.
src/lib/schema.ts
// This schema will be used by multiple devices
export const telemetrySchema =
'uint64 timestamp, string deviceId, int32 x, int32 y, uint32 speed'Create the Publisher Script
Now, let's create a script that simulates how two different publishers will send data to this schema. This script will take which publisher to use as a command-line argument.
src/scripts/publishData.ts
import 'dotenv/config'
import { SDK, SchemaEncoder, zeroBytes32 } from '@somnia-chain/streams'
import { publicClient, walletClient1, walletClient2 } from '../lib/clients'
import { telemetrySchema } from '../lib/schema'
import { toHex, Hex, WalletClient } from 'viem'
import { waitForTransactionReceipt } from 'viem/actions'
// Select which publisher to use
async function getPublisher(): Promise<{ client: WalletClient, deviceId: string }> {
const arg = process.argv[2] // 'p1' or 'p2'
if (arg === 'p2') {
console.log('Using Publisher 2 (Device B)')
return { client: walletClient2, deviceId: 'device-b-002' }
}
console.log('Using Publisher 1 (Device A)')
return { client: walletClient1, deviceId: 'device-a-001' }
}
// Helper function to encode the data
function encodeTelemetry(encoder: SchemaEncoder, deviceId: string): Hex {
const now = Date.now().toString()
return encoder.encodeData([
{ name: "timestamp", value: now, type: "uint64" },
{ name: "deviceId", value: deviceId, type: "string" },
{ name: "x", value: Math.floor(Math.random() * 1000).toString(), type: "int32" },
{ name: "y", value: Math.floor(Math.random() * 1000).toString(), type: "int32" },
{ name: "speed", value: Math.floor(Math.random() * 120).toString(), type: "uint32" },
])
}
async function main() {
const { client, deviceId } = await getPublisher()
const publisherAddress = client.account.address
console.log(`Publisher Address: ${publisherAddress}`)
const sdk = new SDK({ public: publicClient, wallet: client })
const encoder = new SchemaEncoder(telemetrySchema)
// 1. Compute the Schema ID
const schemaId = await sdk.streams.computeSchemaId(telemetrySchema)
if (!schemaId) throw new Error('Could not compute schemaId')
console.log(`Schema ID: ${schemaId}`)
// 2. Register Schema (Updated for new API)
console.log('Registering schema (if not already registered)...')
const ignoreAlreadyRegisteredSchemas = true
const regTx = await sdk.streams.registerDataSchemas([
{
schemaName: 'telemetry', // Updated: 'id' is now 'schemaName'
schema: telemetrySchema,
parentSchemaId: zeroBytes32
}
], ignoreAlreadyRegisteredSchemas)
if (regTx) {
console.log('Schema registration transaction sent:', regTx)
await waitForTransactionReceipt(publicClient, { hash: regTx })
console.log('Schema registered successfully!')
} else {
console.log('Schema was already registered. No transaction sent.')
}
// 3. Encode the data
const encodedData = encodeTelemetry(encoder, deviceId)
// 4. Publish the data
// We make the dataId unique with a timestamp and device ID
const dataId = toHex(`${deviceId}-${Date.now()}`, { size: 32 })
const txHash = await sdk.streams.set([
{ id: dataId, schemaId, data: encodedData }
])
if (!txHash) throw new Error('Failed to publish data')
console.log(`Publishing data... Tx: ${txHash}`)
await waitForTransactionReceipt(publicClient, { hash: txHash })
console.log('Data published successfully!')
}
main().catch((e) => {
console.error(e)
process.exit(1)
})To run this script:
Add the following scripts to your package.json file:
"scripts": {
"publish:p1": "ts-node src/scripts/publishData.ts p1",
"publish:p2": "ts-node src/scripts/publishData.ts p2"
}Now, open two different terminals and send data from each:
# Terminal 1
npm run publish:p1
# Terminal 2
npm run publish:p2Repeat this a few times to build up a dataset from both publishers.
Create the Aggregator Script
Now that we have published our data, we can write the "aggregator" script that collects, merges, and sorts all data from these two (or more) publishers.
src/scripts/aggregateData.ts
import 'dotenv/config'
import { SDK, SchemaDecodedItem } from '@somnia-chain/streams'
import { publicClient, walletClient1, walletClient2 } from '../lib/clients'
import { telemetrySchema } from '../lib/schema'
import { Address } from 'viem'
// LIST OF PUBLISHERS TO TRACK
// You could also fetch this list dynamically (e.g., from a contract or database).
const TRACKED_PUBLISHERS: Address[] = [
walletClient1.account.address,
walletClient2.account.address,
]
// Helper function to convert SDK data into a cleaner object
// (Similar to the 'val' function in the Minimal On-Chain Chat App Tutorial)
function decodeTelemetryRecord(row: SchemaDecodedItem[]): TelemetryRecord {
const val = (field: any) => field?.value?.value ?? field?.value ?? ''
return {
timestamp: Number(val(row[0])),
deviceId: String(val(row[1])),
x: Number(val(row[2])),
y: Number(val(row[3])),
speed: Number(val(row[4])),
}
}
// Type definition for our data
interface TelemetryRecord {
timestamp: number
deviceId: string
x: number
y: number
speed: number
publisher?: Address // We will add this field later
}
async function main() {
// The aggregator doesn't need to write data, so it only uses the publicClient
const sdk = new SDK({ public: publicClient })
const schemaId = await sdk.streams.computeSchemaId(telemetrySchema)
if (!schemaId) throw new Error('Could not compute schemaId')
console.log(`Aggregator started. Tracking ${TRACKED_PUBLISHERS.length} publishers...`)
console.log(`Schema ID: ${schemaId}\n`)
const allRecords: TelemetryRecord[] = []
// 1. Loop through each publisher
for (const publisherAddress of TRACKED_PUBLISHERS) {
console.log(`--- Fetching data for ${publisherAddress} ---`)
// 2. Fetch all data for the publisher based on the schema
// Note: The SDK automatically decodes the data if the schema is registered
const data = await sdk.streams.getAllPublisherDataForSchema(schemaId, publisherAddress)
if (!data || data.length === 0) {
console.log('No data found for this publisher.\n')
continue
}
// 3. Transform the data and add the 'publisher' field
const records: TelemetryRecord[] = (data as SchemaDecodedItem[][]).map(row => ({
...decodeTelemetryRecord(row),
publisher: publisherAddress // To know where the data came from
}))
console.log(`Found ${records.length} records.`)
// 4. Add all records to the main list
allRecords.push(...records)
}
// 5. Sort all data by timestamp
console.log('\n--- Aggregation Complete ---')
console.log(`Total records fetched: ${allRecords.length}`)
allRecords.sort((a, b) => a.timestamp - b.timestamp)
// 6. Display the result
console.log('\n--- Combined and Sorted Telemetry Log ---')
allRecords.forEach(record => {
console.log(
`[${new Date(record.timestamp).toISOString()}] [${record.publisher}] - Device: ${record.deviceId}, Speed: ${record.speed}`
)
})
}
main().catch((e) => {
console.error(e)
process.exit(1)
})To run this script:
Add the script to your package.json file:
"scripts": {
"publish:p1": "ts-node src/scripts/publishData.ts p1",
"publish:p2": "ts-node src/scripts/publishData.ts p2",
"aggregate": "ts-node src/scripts/aggregateData.ts"
}And run it:
npm run aggregateConclusion
In this tutorial, you learned how to manage a multi-publisher architecture with Somnia Data Streams.
Publisher Side: The logic remained unchanged. Each publisher independently published its data using its wallet and the
sdk.streams.set()method.Aggregator Side: This is where the main logic came in.
We maintained a list of publishers we were interested in.
We fetched the data for each publisher separately using the
getAllPublisherDataForSchemamethod.We combined the incoming data into a single array (
allRecords.push(...)).Finally, we sorted all the data on the client-side to display them in a meaningful order (e.g., by timestamp).
This pattern can be scaled to support any number of publishers and provides a robust foundation for building decentralized, multi-source applications.
Last updated