Streams Case Study: Formula 1
Streaming data from OpenF1 on-chain and building reactive applications
Schemas
uint32 number, string name, string abbreviation, string teamName, string teamColorint256 x, int256 y, int256 zSchema registration and re-use
const { SDK, zeroBytes32, SchemaEncoder } = require("@somnia-chain/streams");
const {
createPublicClient,
http,
createWalletClient,
toHex,
defineChain,
} = require("viem");
const { privateKeyToAccount } = require("viem/accounts");
const dreamChain = defineChain({
id: 50312,
name: "Somnia Testnet",
network: "testnet",
nativeCurrency: {
decimals: 18,
name: "STT",
symbol: "STT",
},
rpcUrls: {
default: {
http: [
"https://dream-rpc.somnia.network",
],
},
public: {
http: [
"https://dream-rpc.somnia.network",
],
},
},
})
async function main() {
// Connect to the blockchain to read data with the public client
const publicClient = createPublicClient({
chain: dreamChain,
transport: http(),
})
const walletClient = createWalletClient({
account: privateKeyToAccount(process.env.PRIVATE_KEY),
chain: dreamChain,
transport: http(),
})
// Connect to the SDK
const sdk = new SDK({
public: publicClient,
wallet: walletClient,
})
// Setup the schemas
const coordinatesSchema = `int256 x, int256 y, int256 z`
const driverSchema = `uint32 number, string name, string abbreviation, string teamName, string teamColor`
// Derive Etherbase schema metadata
const coordinatesSchemaId = await sdk.streams.computeSchemaId(
coordinatesSchema
)
if (!coordinatesSchemaId) {
throw new Error("Unable to compute the schema ID for the coordinates schema")
}
const driverSchemaId = await sdk.streams.computeSchemaId(
driverSchema
)
if (!driverSchemaId) {
throw new Error("Unable to compute the schema ID for the driver schema")
}
const extendedSchema = `${driverSchema}, ${coordinatesSchema}`
console.log("Schemas in use", {
coordinatesSchemaId,
driverSchemaId,
coordinatesSchema,
driverSchema,
extendedSchema
})
const isCoordinatesSchemaRegistered = await sdk.streams.isDataSchemaRegistered(coordinatesSchemaId)
if (!isCoordinatesSchemaRegistered) {
// We want to publish the driver schema but we need to publish the coordinates schema first before it can be extended
const registerCoordinatesSchemaTxHash =
await sdk.streams.registerDataSchemas([
{ schemaName: "coords", schema: coordinatesSchema }
])
if (!registerCoordinatesSchemaTxHash) {
throw new Error("Failed to register coordinates schema")
}
console.log("Registered coordinates schema on-chain", {
registerCoordinatesSchemaTxHash
})
await publicClient.waitForTransactionReceipt({
hash: registerCoordinatesSchemaTxHash
})
}
const isDriverSchemaRegistered = await sdk.streams.isDataSchemaRegistered(driverSchemaId)
if (!isDriverSchemaRegistered) {
// Now, publish the driver schema but extend the coordinates schema!
const registerDriverSchemaTxHash = sdk.streams.registerDataSchemas([
{ schemaName: "driver", schema: driverSchema, parentSchemaId: coordinatesSchemaId }
])
if (!registerDriverSchemaTxHash) {
throw new Error("Failed to register schema on-chain")
}
console.log("Registered driver schema on-chain", {
registerDriverSchemaTxHash,
})
await publicClient.waitForTransactionReceipt({
hash: registerDriverSchemaTxHash
})
}
// Publish some data!!
const schemaEncoder = new SchemaEncoder(extendedSchema)
const encodedData = schemaEncoder.encodeData([
{ name: "number", value: "44", type: "uint32" },
{ name: "name", value: "Lewis Hamilton", type: "string" },
{ name: "abbreviation", value: "HAM", type: "string" },
{ name: "teamName", value: "Ferrari", type: "string" },
{ name: "teamColor", value: "#F91536", type: "string" },
{ name: "x", value: "-1513", type: "int256" },
{ name: "y", value: "0", type: "int256" },
{ name: "z", value: "955", type: "int256" },
])
console.log("encodedData", encodedData)
const dataStreams = [{
// Data id: DRIVER number - index will be a helpful lookup later and references ./data/f1-coordinates.js Cube 4 coordinates (driver 44) - F1 telemetry data
id: toHex(`44-0`, { size: 32 }),
schemaId: driverSchemaId,
data: encodedData
}]
const publishTxHash = await sdk.streams.set(dataStreams)
console.log("\nPublish Tx Hash", publishTxHash)
}Last updated