Transports & Connection

Transports & Connection

Supported Transports

TransportURLDirectionHistoryRecommended for
SDK (JSON)wss://{host}/stream/websocketBidirectionalYesBrowser apps, trade tape
SDK (Protobuf)wss://{host}/stream/websocket?format=protobufBidirectionalYesHFT / algo trading
Raw WebSocketwss://{host}/stream/websocketBidirectionalYesScripts, custom clients
Raw WebSocketwss://{host}/stream/wsUnidirectionalNoSimple receive-only consumers
SSE / EventSourcehttps://{host}/stream/sseUnidirectionalNoBrowser fallback
HTTP Streaminghttps://{host}/stream/httpUnidirectionalNoLast resort fallback

Hosts: staging.kyan.sh (testnet), alpha.kyan.sh (trading competition), api.kyan.sh (mainnet)

Recovery

Initial Data on Subscribe

Subscribe with since: {} to receive the last known value as the first publication event, before any live updates. No waiting for the next change.

const sub = client.newSubscription("l2:perps:BTC_USDC-PERPETUAL", { since: {} });
sub.on("publication", (ctx) => {
  // First publication = last known snapshot (delivered on subscribe)
  // Subsequent publications = live updates
  const { timestamp, bids, asks } = ctx.data;
});
sub.subscribe();

On reconnect, the SDK automatically recovers missed data. No client code needed.

ChannelFirst subscribe with since: {}On reconnect
l2:perps:{instrument}Last known snapshotAutomatic
l2:perps:grouped:{bucket}:{instrument}Last known snapshotAutomatic
l2:bbo:{instrument}Last known BBOAutomatic
l2:options:{pair}:{maturity}Last known updateAutomatic
l2:bbo:options:{pair}:{maturity}Last known BBOAutomatic
orders:{address}Snapshot of active ordersNo recovery — use REST gap fill API
market:funding:{period}:{instrument}Last known rateAutomatic
market:interest_rate:{pair}Last known rateAutomatic
market:svi:{pair}Last known SVI fitAutomatic

Trade History

Trade channels (trades_perp, trades_option) support fetching up to 100 recent trades via the history API, available on bidirectional transports only.

TransportHow to get historyLive trades
Bidirectional (SDK or raw)sub.history() or history protocol commandYes
Unidirectional (SSE, HTTP, uni WS)Not availableLive only

On reconnect, the SDK automatically recovers missed trades using the last known stream position.

Connection Keep-Alive

The server sends a ping (empty {} frame) every 25 seconds. On bidirectional transports, clients must reply with {} within 8 seconds or the connection is closed.

TransportKeep-aliveAction required
SDK (JSON or Protobuf)AutomaticNone — SDK handles ping/pong and auto-reconnect
Raw bidirectional WSServer sends {}Reply with {} within 8 seconds
Unidirectional WSServer sends {}None — ignore or use for health detection
SSEServer sends data: {}None — ignore or use for health detection
HTTP StreamingServer sends nullNone — ignore or use for health detection

Raw bidirectional WebSocket example:

ws.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (Object.keys(msg).length === 0) {
    ws.send("{}"); // pong — reply within 8 seconds
    return;
  }
  // ... handle other messages
};

Connection health detection: If no message (including pings) arrives within ~35 seconds, consider the connection dead and reconnect.

Connection Limits

LimitValueDetails
New connections per IP20/second (burst 40)Applies to connection attempts, not messages on established connections. Exceeding returns HTTP 429.
Subscriptions per client512Maximum channels a single connection can subscribe to.

Connections can live indefinitely as long as ping/pong is healthy — there is no forced duration limit.

The rate limit protects against reconnect storms. If your client opens many connections rapidly (e.g. subscribing to 50+ channels on separate connections), stagger the connection attempts or multiplex channels on fewer connections.

Server Time & RTT

On connect, the server includes its current timestamp in the connect reply (time field, Unix milliseconds). Use this to detect clock skew between the client and server.

To measure round-trip time, use the built-in ping RPC:

const startTime = performance.now();
client.rpc("ping", {}).then(() => {
  const rtt = (performance.now() - startTime).toFixed(2);
  console.log(`RTT: ${rtt}ms`);
});

Auto-Reconnect

The SDK automatically reconnects with exponential backoff when the connection drops. Default delays: 500ms initial, 20s maximum. On reconnect, channel subscriptions are restored and the last known data is recovered where supported (see recovery table above).

For raw WebSocket clients, implement your own reconnect logic with backoff. Use the ping timeout (~35s with no messages) as the trigger to detect a dead connection.

Subscribe Examples

All examples subscribe to trades_perp:ETH_USDC-PERPETUAL.

SDK WebSocket — JSON (recommended)

Subscribe, fetch history, then stream live. Live trades are buffered during history fetch to preserve ordering.

import { Centrifuge } from "centrifuge";

const client = new Centrifuge("wss://staging.kyan.sh/stream/websocket");
const sub = client.newSubscription("trades_perp:ETH_USDC-PERPETUAL");
let ready = false, buf = [], maxOffset = 0;

sub.on("subscribed", async () => {
  const h = await sub.history({ limit: -1, reverse: false });
  const pubs = h.publications || [];
  for (const pub of pubs) {
    const [price, signedSize, timestamp] = pub.data;
    maxOffset = Math.max(maxOffset, pub.offset ?? 0);
  }
  // Flush live trades that arrived during history fetch, skip duplicates
  for (const pub of buf) {
    if (pub.offset > maxOffset) {
      const [price, signedSize, timestamp] = pub.data;
    }
  }
  buf = [];
  ready = true;
});

sub.on("publication", (ctx) => {
  // price: "2251.50" (string, always positive)
  // signedSize: "1.5" (positive = buy) or "-1.5" (negative = sell)
  // timestamp: 1704067200000 (ms)
  if (!ready) { buf.push(ctx); return; }
  const [price, signedSize, timestamp] = ctx.data;
});

sub.subscribe();
client.connect();

SDK WebSocket — Protobuf

Smallest frames, fastest serialization — ideal for HFT. Live only, no history fetch.

import { Centrifuge } from "centrifuge/build/protobuf";

const client = new Centrifuge("wss://staging.kyan.sh/stream/websocket?format=protobuf");
const sub = client.newSubscription("trades_perp:ETH_USDC-PERPETUAL");

sub.on("publication", (ctx) => {
  const data = ctx.data instanceof Uint8Array
    ? JSON.parse(new TextDecoder().decode(ctx.data))
    : ctx.data;
  const [price, signedSize, timestamp] = data;
});

sub.subscribe();
client.connect();

Raw WebSocket — Bidirectional (with history)

No SDK needed. Connect, subscribe, fetch history, then stream live. Live trades are buffered during history fetch to preserve ordering.

const ws = new WebSocket("wss://staging.kyan.sh/stream/websocket");
let id = 0;
let historyLoaded = false;
let historyMaxOffset = 0;
const buf = [];

ws.onopen = () => ws.send(JSON.stringify({ id: ++id, connect: {} }));

ws.onmessage = (e) => {
  const msg = JSON.parse(e.data);

  // Ping — reply immediately
  if (Object.keys(msg).length === 0) { ws.send("{}"); return; }

  // Connect reply — subscribe first
  if (msg.id === 1) {
    ws.send(JSON.stringify({ id: ++id, subscribe: { channel: "trades_perp:ETH_USDC-PERPETUAL" } }));
    return;
  }

  // Subscribe reply — fetch history
  if (msg.id === 2) {
    ws.send(JSON.stringify({ id: ++id, history: { channel: "trades_perp:ETH_USDC-PERPETUAL", limit: -1 } }));
    return;
  }

  // History reply — last 100 trades, then flush buffered live trades
  if (msg.id === 3) {
    const pubs = (msg.history || msg.result).publications || [];
    for (const pub of pubs) {
      const [price, signedSize, timestamp] = pub.data;
      historyMaxOffset = Math.max(historyMaxOffset, pub.offset ?? 0);
    }
    // Flush live trades that arrived during history fetch, skip duplicates
    for (const t of buf) {
      if (t.offset > historyMaxOffset) {
        const [price, signedSize, timestamp] = t.data;
      }
    }
    buf.length = 0;
    historyLoaded = true;
    return;
  }

  // Live trades — buffer until history is loaded
  if (msg.push?.pub) {
    if (!historyLoaded) { buf.push(msg.push.pub); return; }
    const [price, signedSize, timestamp] = msg.push.pub.data;
  }
};

SSE / EventSource (live only)

POST (non-browser):

const response = await fetch("https://staging.kyan.sh/stream/sse", {
  method: "POST",
  body: JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } }),
});

const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split("\n");
  buffer = lines.pop() ?? "";

  for (const line of lines) {
    if (!line.startsWith("data: ")) continue;
    const msg = JSON.parse(line.slice(6));
    if (msg.push?.pub) {
      const [price, signedSize, timestamp] = msg.push.pub.data;
    }
  }
}

GET (browser EventSource):

const params = JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } });
const url = new URL("https://staging.kyan.sh/stream/sse");
url.searchParams.append("cf_connect", params);

const es = new EventSource(url);
es.onmessage = (e) => {
  const msg = JSON.parse(e.data);
  if (msg.push?.pub) {
    const [price, signedSize, timestamp] = msg.push.pub.data;
  }
};

HTTP Streaming (live only)

const response = await fetch("https://staging.kyan.sh/stream/http", {
  method: "POST",
  body: JSON.stringify({ subs: { "trades_perp:ETH_USDC-PERPETUAL": {} } }),
});

const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  buffer += decoder.decode(value, { stream: true });
  const lines = buffer.split("\n");
  buffer = lines.pop() ?? "";

  for (const line of lines) {
    if (!line.trim()) continue;
    const msg = JSON.parse(line);
    if (msg.push?.pub) {
      const [price, signedSize, timestamp] = msg.push.pub.data;
    }
  }
}