Transports & Connection

Transports & Connection

Supported Transports

TransportURLDirectionHistoryRecommended for
SDK (JSON)wss://{host}/stream/websocketBidirectionalYesBrowser apps, trade tape
SDK (Protobuf)wss://{host}/stream/websocket?format=protobufBidirectionalYesHFT / algo trading
Raw WebSocketwss://{host}/stream/websocketBidirectionalYesScripts, custom clients
Raw WebSocketwss://{host}/stream/wsUnidirectionalNoSimple receive-only consumers
SSE / EventSourcehttps://{host}/stream/sseUnidirectionalNoBrowser fallback
HTTP Streaminghttps://{host}/stream/httpUnidirectionalNoLast resort fallback

Hosts: staging.kyan.sh (testnet), alpha.kyan.sh (trading competition), api.kyan.sh (mainnet)

Recovery

L2 Orderbook — Cache Recovery

L2 channels (l2:perps, l2:perps:grouped) support two recovery features:

1. Cache recovery (first subscribe) — subscribe with since: {} to receive the last cached snapshot as the first publication event, before any live updates arrive. No waiting for the next book change.

2. Automatic reconnect recovery — on network disconnect, the SDK automatically recovers the latest snapshot using force_recovery: true on the server. No client code needed — the SDK tracks the stream position internally and the server pushes any missed data on reconnect.

// SDK: subscribe with since: {} for instant initial data
const sub = client.newSubscription("l2:perps:100:BTC_USDC-PERPETUAL", { since: {} });
sub.on("publication", (ctx) => {
  // First publication = cached snapshot (delivered immediately on subscribe)
  // Subsequent publications = live updates
  const { timestamp, bids, asks } = ctx.data;
});
sub.subscribe();

BBO channels (l2:bbo) do not use cache recovery — updates are frequent enough that clients receive data within milliseconds. Reconnect recovery is still automatic.

ChannelFirst subscribeOn reconnect
l2:perps:{freq}:{instrument}Cached snapshot via since: {}Automatic (server-pushed)
l2:perps:grouped:{bucket}:{instrument}Cached snapshot via since: {}Automatic (server-pushed)
l2:bbo:{instrument}Next BBO change (sub-ms)Automatic (server-pushed)

Trade History

Trade channels (trades_perp, trades_option) support fetching up to 100 recent trades via the history API, available on bidirectional transports only.

TransportHow to get historyLive trades
Bidirectional (SDK or raw)sub.history() or history protocol commandYes
Unidirectional (SSE, HTTP, uni WS)Not availableLive only

On reconnect, the SDK automatically recovers missed trades using the last known stream position.

Connection Keep-Alive

The server sends a ping (empty {} frame) every 25 seconds on bidirectional WebSocket connections. Clients must reply with {} within 8 seconds or the connection is closed.

TransportKeep-aliveAction required
SDK (JSON or Protobuf)AutomaticNone — SDK handles ping/pong
Raw bidirectional WSServer pingsReply with {} when you receive {}
SSE / HTTP StreamingServer-managedNone
Unidirectional WSClient pingsSend WebSocket ping frames every 25s

Raw bidirectional WebSocket example:

ws.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (Object.keys(msg).length === 0) {
    ws.send("{}"); // pong — reply immediately
    return;
  }
  // ... handle other messages
};

Connection health detection: If no message (including pings) arrives within ~35 seconds, consider the connection dead and reconnect.

Subscribe Examples

All examples subscribe to trades_perp:ETH_USDC-PERPETUAL.

SDK WebSocket — JSON (recommended)

Subscribe, fetch history, then stream live. Live trades are buffered during history fetch to preserve ordering.

import { Centrifuge } from "centrifuge";

const client = new Centrifuge("wss://staging.kyan.sh/stream/websocket");
const sub = client.newSubscription("trades_perp:ETH_USDC-PERPETUAL");
let ready = false, buf = [], maxOffset = 0;

sub.on("subscribed", async () => {
  const h = await sub.history({ limit: -1, reverse: false });
  const pubs = h.publications || [];
  for (const pub of pubs) {
    const [price, signedSize, timestamp] = pub.data;
    maxOffset = Math.max(maxOffset, pub.offset ?? 0);
  }
  // Flush live trades that arrived during history fetch, skip duplicates
  for (const pub of buf) {
    if (pub.offset > maxOffset) {
      const [price, signedSize, timestamp] = pub.data;
    }
  }
  buf = [];
  ready = true;
});

sub.on("publication", (ctx) => {
  // price: "2251.50" (string, always positive)
  // signedSize: "1.5" (positive = buy) or "-1.5" (negative = sell)
  // timestamp: 1704067200000 (ms)
  if (!ready) { buf.push(ctx); return; }
  const [price, signedSize, timestamp] = ctx.data;
});

sub.subscribe();
client.connect();

SDK WebSocket — Protobuf

Smallest frames, fastest serialization — ideal for HFT. Live only, no history fetch.

import { Centrifuge } from "centrifuge/build/protobuf";

const client = new Centrifuge("wss://staging.kyan.sh/stream/websocket?format=protobuf");
const sub = client.newSubscription("trades_perp:ETH_USDC-PERPETUAL");

sub.on("publication", (ctx) => {
  const data = ctx.data instanceof Uint8Array
    ? JSON.parse(new TextDecoder().decode(ctx.data))
    : ctx.data;
  const [price, signedSize, timestamp] = data;
});

sub.subscribe();
client.connect();

Raw WebSocket — Bidirectional (with history)

No SDK needed. Connect, subscribe, fetch history, then stream live. Live trades are buffered during history fetch to preserve ordering.

const ws = new WebSocket("wss://staging.kyan.sh/stream/websocket");
let id = 0;
let historyLoaded = false;
let historyMaxOffset = 0;
const buf = [];

ws.onopen = () => ws.send(JSON.stringify({ id: ++id, connect: {} }));

ws.onmessage = (e) => {
  const msg = JSON.parse(e.data);

  // Ping — reply immediately
  if (Object.keys(msg).length === 0) { ws.send("{}"); return; }

  // Connect reply — subscribe first
  if (msg.id === 1) {
    ws.send(JSON.stringify({ id: ++id, subscribe: { channel: "trades_perp:ETH_USDC-PERPETUAL" } }));
    return;
  }

  // Subscribe reply — fetch history
  if (msg.id === 2) {
    ws.send(JSON.stringify({ id: ++id, history: { channel: "trades_perp:ETH_USDC-PERPETUAL", limit: -1 } }));
    return;
  }

  // History reply — last 100 trades, then flush buffered live trades
  if (msg.id === 3) {
    const pubs = (msg.history || msg.result).publications || [];
    for (const pub of pubs) {
      const [price, signedSize, timestamp] = pub.data;
      historyMaxOffset = Math.max(historyMaxOffset, pub.offset ?? 0);
    }
    // Flush live trades that arrived during history fetch, skip duplicates
    for (const t of buf) {
      if (t.offset > historyMaxOffset) {
        const [price, signedSize, timestamp] = t.data;
      }
    }
    buf.length = 0;
    historyLoaded = true;
    return;
  }

  // Live trades — buffer until history is loaded
  if (msg.push?.pub) {
    if (!historyLoaded) { buf.push(msg.push.pub); return; }
    const [price, signedSize, timestamp] = msg.push.pub.data;
  }
};

SSE / EventSource (live only)

POST (non-browser):

const response = await fetch("https://staging.kyan.sh/stream/sse", {
  method: "POST",
  body: JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } }),
});

const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split("\n");
  buffer = lines.pop() ?? "";

  for (const line of lines) {
    if (!line.startsWith("data: ")) continue;
    const msg = JSON.parse(line.slice(6));
    if (msg.push?.pub) {
      const [price, signedSize, timestamp] = msg.push.pub.data;
    }
  }
}

GET (browser EventSource):

const params = JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } });
const url = new URL("https://staging.kyan.sh/stream/sse");
url.searchParams.append("cf_connect", params);

const es = new EventSource(url);
es.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (msg.push?.pub) {
    const [price, signedSize, timestamp] = msg.push.pub.data;
  }
};

HTTP Streaming (live only)

const response = await fetch("https://staging.kyan.sh/stream/http", {
  method: "POST",
  body: JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } }),
});

const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split("\n");
  buffer = lines.pop() ?? "";

  for (const line of lines) {
    if (!line.trim()) continue;
    const msg = JSON.parse(line);
    if (msg.push?.pub) {
      const [price, signedSize, timestamp] = msg.push.pub.data;
    }
  }
}