Transports & Connection
Transports & Connection
Supported Transports
| Transport | URL | Direction | History | Recommended for |
|---|---|---|---|---|
| SDK (JSON) | wss://{host}/stream/websocket | Bidirectional | Yes | Browser apps, trade tape |
| SDK (Protobuf) | wss://{host}/stream/websocket?format=protobuf | Bidirectional | Yes | HFT / algo trading |
| Raw WebSocket | wss://{host}/stream/websocket | Bidirectional | Yes | Scripts, custom clients |
| Raw WebSocket | wss://{host}/stream/ws | Unidirectional | No | Simple receive-only consumers |
| SSE / EventSource | https://{host}/stream/sse | Unidirectional | No | Browser fallback |
| HTTP Streaming | https://{host}/stream/http | Unidirectional | No | Last resort fallback |
Hosts: staging.kyan.sh (testnet), alpha.kyan.sh (trading competition), api.kyan.sh (mainnet)
Recovery
Initial Data on Subscribe
Subscribe with since: {} to receive the last known value as the first publication event, before any live updates. No waiting for the next change.
const sub = client.newSubscription("l2:perps:BTC_USDC-PERPETUAL", { since: {} });
sub.on("publication", (ctx) => {
// First publication = last known snapshot (delivered on subscribe)
// Subsequent publications = live updates
const { timestamp, bids, asks } = ctx.data;
});
sub.subscribe();On reconnect, the SDK automatically recovers missed data. No client code needed.
| Channel | First subscribe with since: {} | On reconnect |
|---|---|---|
l2:perps:{instrument} | Last known snapshot | Automatic |
l2:perps:grouped:{bucket}:{instrument} | Last known snapshot | Automatic |
l2:bbo:{instrument} | Last known BBO | Automatic |
l2:options:{pair}:{maturity} | Last known update | Automatic |
l2:bbo:options:{pair}:{maturity} | Last known BBO | Automatic |
orders:{address} | Snapshot of active orders | No recovery — use REST gap fill API |
market:funding:{period}:{instrument} | Last known rate | Automatic |
market:interest_rate:{pair} | Last known rate | Automatic |
market:svi:{pair} | Last known SVI fit | Automatic |
Trade History
Trade channels (trades_perp, trades_option) support fetching up to 100 recent trades via the history API, available on bidirectional transports only.
| Transport | How to get history | Live trades |
|---|---|---|
| Bidirectional (SDK or raw) | sub.history() or history protocol command | Yes |
| Unidirectional (SSE, HTTP, uni WS) | Not available | Live only |
On reconnect, the SDK automatically recovers missed trades using the last known stream position.
Connection Keep-Alive
The server sends a ping (empty {} frame) every 25 seconds. On bidirectional transports, clients must reply with {} within 8 seconds or the connection is closed.
| Transport | Keep-alive | Action required |
|---|---|---|
| SDK (JSON or Protobuf) | Automatic | None — SDK handles ping/pong and auto-reconnect |
| Raw bidirectional WS | Server sends {} | Reply with {} within 8 seconds |
| Unidirectional WS | Server sends {} | None — ignore or use for health detection |
| SSE | Server sends data: {} | None — ignore or use for health detection |
| HTTP Streaming | Server sends null | None — ignore or use for health detection |
Raw bidirectional WebSocket example:
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (Object.keys(msg).length === 0) {
ws.send("{}"); // pong — reply within 8 seconds
return;
}
// ... handle other messages
};Connection health detection: If no message (including pings) arrives within ~35 seconds, consider the connection dead and reconnect.
Connection Limits
| Limit | Value | Details |
|---|---|---|
| New connections per IP | 20/second (burst 40) | Applies to connection attempts, not messages on established connections. Exceeding returns HTTP 429. |
| Subscriptions per client | 512 | Maximum channels a single connection can subscribe to. |
Connections can live indefinitely as long as ping/pong is healthy — there is no forced duration limit.
The rate limit protects against reconnect storms. If your client opens many connections rapidly (e.g. subscribing to 50+ channels on separate connections), stagger the connection attempts or multiplex channels on fewer connections.
Server Time & RTT
On connect, the server includes its current timestamp in the connect reply (time field, Unix milliseconds). Use this to detect clock skew between the client and server.
To measure round-trip time, use the built-in ping RPC:
const startTime = performance.now();
client.rpc("ping", {}).then(() => {
const rtt = (performance.now() - startTime).toFixed(2);
console.log(`RTT: ${rtt}ms`);
});Auto-Reconnect
The SDK automatically reconnects with exponential backoff when the connection drops. Default delays: 500ms initial, 20s maximum. On reconnect, channel subscriptions are restored and the last known data is recovered where supported (see recovery table above).
For raw WebSocket clients, implement your own reconnect logic with backoff. Use the ping timeout (~35s with no messages) as the trigger to detect a dead connection.
Subscribe Examples
All examples subscribe to trades_perp:ETH_USDC-PERPETUAL.
SDK WebSocket — JSON (recommended)
Subscribe, fetch history, then stream live. Live trades are buffered during history fetch to preserve ordering.
import { Centrifuge } from "centrifuge";
const client = new Centrifuge("wss://staging.kyan.sh/stream/websocket");
const sub = client.newSubscription("trades_perp:ETH_USDC-PERPETUAL");
let ready = false, buf = [], maxOffset = 0;
sub.on("subscribed", async () => {
const h = await sub.history({ limit: -1, reverse: false });
const pubs = h.publications || [];
for (const pub of pubs) {
const [price, signedSize, timestamp] = pub.data;
maxOffset = Math.max(maxOffset, pub.offset ?? 0);
}
// Flush live trades that arrived during history fetch, skip duplicates
for (const pub of buf) {
if (pub.offset > maxOffset) {
const [price, signedSize, timestamp] = pub.data;
}
}
buf = [];
ready = true;
});
sub.on("publication", (ctx) => {
// price: "2251.50" (string, always positive)
// signedSize: "1.5" (positive = buy) or "-1.5" (negative = sell)
// timestamp: 1704067200000 (ms)
if (!ready) { buf.push(ctx); return; }
const [price, signedSize, timestamp] = ctx.data;
});
sub.subscribe();
client.connect();SDK WebSocket — Protobuf
Smallest frames, fastest serialization — ideal for HFT. Live only, no history fetch.
import { Centrifuge } from "centrifuge/build/protobuf";
const client = new Centrifuge("wss://staging.kyan.sh/stream/websocket?format=protobuf");
const sub = client.newSubscription("trades_perp:ETH_USDC-PERPETUAL");
sub.on("publication", (ctx) => {
const data = ctx.data instanceof Uint8Array
? JSON.parse(new TextDecoder().decode(ctx.data))
: ctx.data;
const [price, signedSize, timestamp] = data;
});
sub.subscribe();
client.connect();Raw WebSocket — Bidirectional (with history)
No SDK needed. Connect, subscribe, fetch history, then stream live. Live trades are buffered during history fetch to preserve ordering.
const ws = new WebSocket("wss://staging.kyan.sh/stream/websocket");
let id = 0;
let historyLoaded = false;
let historyMaxOffset = 0;
const buf = [];
ws.onopen = () => ws.send(JSON.stringify({ id: ++id, connect: {} }));
ws.onmessage = (e) => {
const msg = JSON.parse(e.data);
// Ping — reply immediately
if (Object.keys(msg).length === 0) { ws.send("{}"); return; }
// Connect reply — subscribe first
if (msg.id === 1) {
ws.send(JSON.stringify({ id: ++id, subscribe: { channel: "trades_perp:ETH_USDC-PERPETUAL" } }));
return;
}
// Subscribe reply — fetch history
if (msg.id === 2) {
ws.send(JSON.stringify({ id: ++id, history: { channel: "trades_perp:ETH_USDC-PERPETUAL", limit: -1 } }));
return;
}
// History reply — last 100 trades, then flush buffered live trades
if (msg.id === 3) {
const pubs = (msg.history || msg.result).publications || [];
for (const pub of pubs) {
const [price, signedSize, timestamp] = pub.data;
historyMaxOffset = Math.max(historyMaxOffset, pub.offset ?? 0);
}
// Flush live trades that arrived during history fetch, skip duplicates
for (const t of buf) {
if (t.offset > historyMaxOffset) {
const [price, signedSize, timestamp] = t.data;
}
}
buf.length = 0;
historyLoaded = true;
return;
}
// Live trades — buffer until history is loaded
if (msg.push?.pub) {
if (!historyLoaded) { buf.push(msg.push.pub); return; }
const [price, signedSize, timestamp] = msg.push.pub.data;
}
};SSE / EventSource (live only)
POST (non-browser):
const response = await fetch("https://staging.kyan.sh/stream/sse", {
method: "POST",
body: JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.startsWith("data: ")) continue;
const msg = JSON.parse(line.slice(6));
if (msg.push?.pub) {
const [price, signedSize, timestamp] = msg.push.pub.data;
}
}
}GET (browser EventSource):
const params = JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } });
const url = new URL("https://staging.kyan.sh/stream/sse");
url.searchParams.append("cf_connect", params);
const es = new EventSource(url);
es.onmessage = (e) => {
const msg = JSON.parse(e.data);
if (msg.push?.pub) {
const [price, signedSize, timestamp] = msg.push.pub.data;
}
};HTTP Streaming (live only)
const response = await fetch("https://staging.kyan.sh/stream/http", {
method: "POST",
body: JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line.trim()) continue;
const msg = JSON.parse(line);
if (msg.push?.pub) {
const [price, signedSize, timestamp] = msg.push.pub.data;
}
}
}Updated about 2 months ago
