Transports & Connection
Transports & Connection
Supported Transports
| Transport | URL | Direction | History | Recommended for |
|---|---|---|---|---|
| SDK (JSON) | wss://{host}/stream/websocket | Bidirectional | Yes | Browser apps, trade tape |
| SDK (Protobuf) | wss://{host}/stream/websocket?format=protobuf | Bidirectional | Yes | HFT / algo trading |
| Raw WebSocket | wss://{host}/stream/websocket | Bidirectional | Yes | Scripts, custom clients |
| Raw WebSocket | wss://{host}/stream/ws | Unidirectional | No | Simple receive-only consumers |
| SSE / EventSource | https://{host}/stream/sse | Unidirectional | No | Browser fallback |
| HTTP Streaming | https://{host}/stream/http | Unidirectional | No | Last resort fallback |
Hosts: staging.kyan.sh (testnet), alpha.kyan.sh (trading competition), api.kyan.sh (mainnet)
Recovery
L2 Orderbook — Cache Recovery
L2 channels (l2:perps, l2:perps:grouped) support two recovery features:
1. Cache recovery (first subscribe) — subscribe with since: {} to receive the last cached snapshot as the first publication event, before any live updates arrive. No waiting for the next book change.
2. Automatic reconnect recovery — on network disconnect, the SDK automatically recovers the latest snapshot using force_recovery: true on the server. No client code needed — the SDK tracks the stream position internally and the server pushes any missed data on reconnect.
// SDK: subscribe with since: {} for instant initial data
const sub = client.newSubscription("l2:perps:100:BTC_USDC-PERPETUAL", { since: {} });
sub.on("publication", (ctx) => {
// First publication = cached snapshot (delivered immediately on subscribe)
// Subsequent publications = live updates
const { timestamp, bids, asks } = ctx.data;
});
sub.subscribe();BBO channels (l2:bbo) do not use cache recovery — updates are frequent enough that clients receive data within milliseconds. Reconnect recovery is still automatic.
| Channel | First subscribe | On reconnect |
|---|---|---|
l2:perps:{freq}:{instrument} | Cached snapshot via since: {} | Automatic (server-pushed) |
l2:perps:grouped:{bucket}:{instrument} | Cached snapshot via since: {} | Automatic (server-pushed) |
l2:bbo:{instrument} | Next BBO change (sub-ms) | Automatic (server-pushed) |
Trade History
Trade channels (trades_perp, trades_option) support fetching up to 100 recent trades via the history API, available on bidirectional transports only.
| Transport | How to get history | Live trades |
|---|---|---|
| Bidirectional (SDK or raw) | sub.history() or history protocol command | Yes |
| Unidirectional (SSE, HTTP, uni WS) | Not available | Live only |
On reconnect, the SDK automatically recovers missed trades using the last known stream position.
Connection Keep-Alive
The server sends a ping (empty {} frame) every 25 seconds on bidirectional WebSocket connections. Clients must reply with {} within 8 seconds or the connection is closed.
| Transport | Keep-alive | Action required |
|---|---|---|
| SDK (JSON or Protobuf) | Automatic | None — SDK handles ping/pong |
| Raw bidirectional WS | Server pings | Reply with {} when you receive {} |
| SSE / HTTP Streaming | Server-managed | None |
| Unidirectional WS | Client pings | Send WebSocket ping frames every 25s |
Raw bidirectional WebSocket example:
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (Object.keys(msg).length === 0) {
ws.send("{}"); // pong — reply immediately
return;
}
// ... handle other messages
};Connection health detection: If no message (including pings) arrives within ~35 seconds, consider the connection dead and reconnect.
Subscribe Examples
All examples subscribe to trades_perp:ETH_USDC-PERPETUAL.
SDK WebSocket — JSON (recommended)
Subscribe, fetch history, then stream live. Live trades are buffered during history fetch to preserve ordering.
import { Centrifuge } from "centrifuge";
const client = new Centrifuge("wss://staging.kyan.sh/stream/websocket");
const sub = client.newSubscription("trades_perp:ETH_USDC-PERPETUAL");
let ready = false, buf = [], maxOffset = 0;
sub.on("subscribed", async () => {
const h = await sub.history({ limit: -1, reverse: false });
const pubs = h.publications || [];
for (const pub of pubs) {
const [price, signedSize, timestamp] = pub.data;
maxOffset = Math.max(maxOffset, pub.offset ?? 0);
}
// Flush live trades that arrived during history fetch, skip duplicates
for (const pub of buf) {
if (pub.offset > maxOffset) {
const [price, signedSize, timestamp] = pub.data;
}
}
buf = [];
ready = true;
});
sub.on("publication", (ctx) => {
// price: "2251.50" (string, always positive)
// signedSize: "1.5" (positive = buy) or "-1.5" (negative = sell)
// timestamp: 1704067200000 (ms)
if (!ready) { buf.push(ctx); return; }
const [price, signedSize, timestamp] = ctx.data;
});
sub.subscribe();
client.connect();SDK WebSocket — Protobuf
Smallest frames, fastest serialization — ideal for HFT. Live only, no history fetch.
import { Centrifuge } from "centrifuge/build/protobuf";
const client = new Centrifuge("wss://staging.kyan.sh/stream/websocket?format=protobuf");
const sub = client.newSubscription("trades_perp:ETH_USDC-PERPETUAL");
sub.on("publication", (ctx) => {
const data = ctx.data instanceof Uint8Array
? JSON.parse(new TextDecoder().decode(ctx.data))
: ctx.data;
const [price, signedSize, timestamp] = data;
});
sub.subscribe();
client.connect();Raw WebSocket — Bidirectional (with history)
No SDK needed. Connect, subscribe, fetch history, then stream live. Live trades are buffered during history fetch to preserve ordering.
const ws = new WebSocket("wss://staging.kyan.sh/stream/websocket");
let id = 0;
let historyLoaded = false;
let historyMaxOffset = 0;
const buf = [];
ws.onopen = () => ws.send(JSON.stringify({ id: ++id, connect: {} }));
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
// Ping — reply immediately
if (Object.keys(msg).length === 0) { ws.send("{}"); return; }
// Connect reply — subscribe first
if (msg.id === 1) {
ws.send(JSON.stringify({ id: ++id, subscribe: { channel: "trades_perp:ETH_USDC-PERPETUAL" } }));
return;
}
// Subscribe reply — fetch history
if (msg.id === 2) {
ws.send(JSON.stringify({ id: ++id, history: { channel: "trades_perp:ETH_USDC-PERPETUAL", limit: -1 } }));
return;
}
// History reply — last 100 trades, then flush buffered live trades
if (msg.id === 3) {
const pubs = (msg.history || msg.result).publications || [];
for (const pub of pubs) {
const [price, signedSize, timestamp] = pub.data;
historyMaxOffset = Math.max(historyMaxOffset, pub.offset ?? 0);
}
// Flush live trades that arrived during history fetch, skip duplicates
for (const t of buf) {
if (t.offset > historyMaxOffset) {
const [price, signedSize, timestamp] = t.data;
}
}
buf.length = 0;
historyLoaded = true;
return;
}
// Live trades — buffer until history is loaded
if (msg.push?.pub) {
if (!historyLoaded) { buf.push(msg.push.pub); return; }
const [price, signedSize, timestamp] = msg.push.pub.data;
}
};SSE / EventSource (live only)
POST (non-browser):
const response = await fetch("https://staging.kyan.sh/stream/sse", {
method: "POST",
body: JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const msg = JSON.parse(line.slice(6));
if (msg.push?.pub) {
const [price, signedSize, timestamp] = msg.push.pub.data;
}
}
}GET (browser EventSource):
const params = JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } });
const url = new URL("https://staging.kyan.sh/stream/sse");
url.searchParams.append("cf_connect", params);
const es = new EventSource(url);
es.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.push?.pub) {
const [price, signedSize, timestamp] = msg.push.pub.data;
}
};HTTP Streaming (live only)
const response = await fetch("https://staging.kyan.sh/stream/http", {
method: "POST",
body: JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.trim()) continue;
const msg = JSON.parse(line);
if (msg.push?.pub) {
const [price, signedSize, timestamp] = msg.push.pub.data;
}
}
}Updated 1 day ago
