Skip to content

WebSocket

WebSocket并不是一个全新的协议,而是利用了HTTP协议来建立连接。

WebSocketHTTP的异同:

  • WebSocket协议跟HTTP一样处在应用层,是可靠的tcp连接,具体查看分层管理WebSocket在建立连接时,可以使用已有的HTTPGET请求进行握手:客户端在请求头中将WebSocket协议版本等信息发生到服务器,服务器同意的话,会响应一个101的状态码。就是说一次HTTP请求和响应,即可轻松转换协议到WebSocket
  • WebSocket可以双向通信,HTTP只能由客户端发起
  • WebSocket每个数据帧较HTTP报文都更轻量
    • WebSocket每个数据帧只有固定、轻量的头信息,不会有cookie等或者自定义的头信息。并且建立通讯后是一对一的,不需要携带验证信息。但握手时的 HTTP 请求会自动携带cookie
    • WebSocket在应用层就会将大的数据分拆到多个数据帧,而HTTP不会拆分每个报文。
  • WebSocket属于HTML5

其他长连接方案

在了解WebSocket前应该了解类似的技术解决方案:轮询

应用场景

  • 弹幕
  • 即时通讯
  • 协同编辑
  • 实时位置
  • 体育实况更新
  • 股票基金实时更新

优缺点

优点:

  • 连接次数少:WebSocket并不是一个新协议,通过一个HTTP升级协议就可以完成连接
  • 实时性高:服务端可以实时推送数据到客户端
  • 双向通信:客户端和服务端可以互相发生信息
  • 适合频繁通信:长时间保持连接,适合需要持续对话的场景

缺点:

  • 代理限制:某些代理应用如nginx的长连接时间是有限制的,可能需要客户端自动重连
  • 保持状态:WebSocket是有状态的,服务更新时需要对正在连接的客户端作兼容

工作原理

连接阶段

socket使用一个标准的HTTP请求附带一些特殊的首部字段向服务器发起连接。

HTTP
GET ws://localhost:3000/ws/chat HTTP/1.1
HOST: localhost
upgrade: WebSocket
Connection: Upgrade
Origin: HTTP://localhost:3000
Sec-WebSocket-Key: client-random-string
Sec-WebSocket-Version: 13

该请求和普通HTTP请求有几点不同:

  1. GET请求是以ws://开头的地址
  2. 请求头upgrade: WebSocketConnection: Upgrade表示这个连接将要被转换为WebSocket连接
  3. Sec-WebSocket-Key用于标识这个连接,并非用于加密数据
  4. Sec-WebSocket-Version指定了WebSocket的协议版本

若服务器接受请求,则会返回如下响应:

HTTP
HTTP/1.1 101 Switching Protocols
Upgrade: WebSocket
Connection: upgrade
Sec-WebSocket-Accept: server-random-string

该响应代码101表示本次连接的HTTP协议即将被更改,更改后的协议为Upgrade: WebSocket指定的WebSocket协议

版本号和子协议规定了双方可以理解的数据格式和是否支持压缩。如果直接使用WebSocketapi则不需要考虑这些。

  • 通讯数据格式:可以使用字符串或者二进制数据进行通讯,通常使用json字符串比较方便。

  • WebSocket的全双工通讯: 为什么WebSocket可以实现全双工通信而HTTP不行呢?实际上HTTP是建立在tcp协议之上,tcp协议本身实现了全双工通信,但是HTTP的请求-应答机制限制了全双工通信。WebSocket建立连接后就不会再使用HTTP协议请求,从而可以互相通信。

  • 安全的WebSocket连接机制:与HTTPS类似,浏览器使用wss://xxx创建WebSocket连接时,会先通过HTTPS创建安全的连接,然后升级为安全的WebSocket连接,底层通信走的还是安全的ssl/tls协议

数据传输阶段

具体的数据格式是怎么样的呢?WebSocket 的每条消息可能会被切分成多个数据帧(最小单位)。发送端会将消息切割成多个帧发送给接收端,接收端接收消息帧,并将关联的帧重新组装成完整的消息。

帧信息通常是无法直接获取的,浏览器并没有暴露获取帧信息的API,以下只作了解

  • 数据帧

    • 帧头:帧头包括四个部分:finrsv1rsv2rsv3opcodemaskedpayload_length。其中,fin 表示数据帧的结束标志,rsv1rsv2rsv3 表示保留字段,opcode 表示数据帧的类型,masked 表示是否进行掩码处理,payload_length 表示有效载荷的长度。
    • 有效载荷:有效载荷是数据帧中实际的数据部分,它由客户端和服务端进行数据传输。
  • 控制帧

    • Ping 帧:Ping 帧用于测试客户端和服务端之间的连接状态,客户端向服务端发送 Ping 帧,服务端收到后需要向客户端发送 Pong 帧进行响应。
    • Pong 帧:Pong 帧用于响应客户端的 Ping 帧,它用于测试客户端和服务端之间的连接状态。
    • Close 帧:Close 帧用于关闭客户端和服务端之间的连接,它包括四个部分:finrsv1rsv2rsv3opcodemaskedpayload_length。其中,opcode 的值为 8,表示 Close 帧。

关闭连接阶段

当不再需要WebSocket连接时,需要进行关闭阶段。关闭阶段包括以下几个步骤:

客户端向服务端发送关闭请求,请求中包含一个WebSocket的随机密钥。 服务端接收到关闭请求后,向客户端发送关闭响应,关闭响应中包含服务端生成的随机密钥。 客户端收到关闭响应后,关闭WebSocket连接。

心跳检测机制

  1. 传输层TCP有自己的心跳检测机制,失活后会通知应用层,但是依赖于系统的实现,具有不确定性,若要通过tcp来检测需要修改对应的配置,在浏览器环境不太现实
  2. WebSocket有自己的保活机制,但是在协议中不是强制的。WebSocket通讯的数据帧会有一个4位的OPCODE,标记当前传输的数据帧类型,例如:0x8表示关闭帧、0x9表示ping帧、0xA表示pong帧、0x1普通文本数据帧等。
    • 关闭数据帧,在任意一方要关闭通道时,发送给对方。例如浏览器的WebSocket实例调用close时,就会发送一个OPCODE为连接关闭的数据帧给服务器端,服务器端接收到后同样需要返回一个关闭数据帧,然后关闭底层的TCP连接。
    • ping数据帧,用于发送方询问对方是否存活,也就是心跳检测包。目前只有后端可以控制ping数据帧的发送。但浏览器端的WebSocket实例上没有对应的api可用。
    • pong数据帧,当WebSocket通讯一方接收到对方发送的ping数据帧后,需要及时回复一个内容一致,且OPCODE标记为pong的数据帧,告诉对方我还在。但目前回复pong是浏览器的自动行为,意味着不同浏览器会有差异。而且在js中没有相关api可以控制。

综合上述,探测对方存活的方式都是服务器主动进行心跳检测。浏览器并没有提供相关能力。为了能够在浏览器端实时探测后端的存活,或者说连接依旧可用,只能自己实现心跳检测。

在服务端使用协议推荐的方式对客户端进行心跳检测:

js
const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (client) => {
  client.on('message', (message) => {
    // 响应客户端的自定义ping pong
    if (message.toString() === 'ping') {
      client.send('pong');
    }
  });

  // 发送规范推荐的ping
  client.ping();
  // 收到规范推荐的pong
  client.on('pong', () => {
    console.log('pong');
  });

  client.on('close', () => {});

  setInterval(() => {
    client.send(
      JSON.stringify({
        a: 'b',
      })
    );
  }, 5000);
});

在服务端对客户端进行心跳检测可以清理不在线的客户端,释放资源,但是对于是否要实现客户端对服务端的心跳检测,我认为是不必要的,断连了再重连即可。

若为了增强连接的稳定性,也可以实现客户端和服务端配合进行心跳检测:

Details
  • 客户端心跳检测
js
/**
 * @class WebSocketAuto
 * @extends EventTarget
 * @description WebSocket包装,扩展超时、心跳、重连机制
 * @param {Object} options 选项配置
 * @param {String} options.url 连接地址
 * @param {String | String[]} options.protocols 对应的原生WebSocket参数
 * @param {Number} [options.connectTimeOut = 5000] 默认连接超时时间
 * @param {Number} [options.reConnectTimeOut = 2000] 重连间隔时间
 * @param {Number} options.reConnectRepeatLimit 重连次数限制,不设置则不限制
 * @param {Boolean} [options.heartCheckEnable = false] 默认不开启心跳检测
 * @param {Number} [options.heartCheckTimeOut = 2000] 心跳检测频率
 * @param {String | Number} [options.heartCheckPing = 'ping'] 心跳发送包内容
 * @param {String | Number} [options.heartCheckPong = 'pong'] 心跳相应包内容
 * @param {Number} [options.heartCheckCloseTimeOut = 2000] 心跳检测超时时间
 * @param {Boolean} [options.parseMessage = true] 开启消息格式化
 * @param {Boolean} [options.debug = false] 开启debug消息打印,需要打开控制台详细级别的输出
 * @example
 * import { WebSocketAuto } from '@blinkjun/utils';
 *
 * const ws = new WebSocketAuto({
 *  url: 'ws://localhost:8080',
 *  heartCheckEnable: true,
 * });
 *
 * // 发送方法
 * try{
 *  ws.send('hello')
 * }catch(e){
 *
 * }
 *
 * // 手动关闭
 * ws.close()
 *
 * // 主要监听消息事件
 * ws.addEventListener('message', (e) => {
 *  console.log(e);
 * });
 *
 * // 连接成功,可省略
 * ws.addEventListener('open',(e)=>{
 *  console.log(e)
 * })
 *
 * // 连接关闭,可省略
 * ws.addEventListener('close',(e)=>{
 *  console.log(e)
 * })
 *
 * // 连接错误,可省略
 * ws.addEventListener('error',(e)=>{
 *  console.log(e)
 * })
 */
export class WebSocketAuto extends EventTarget {
    /**
     * @ignore
     * @param {Object} options 选项配置
     * @param {String} options.url 连接地址
     * @param {String | String[]} options.protocols 对应的原生WebSocket参数
     * @param {Number} [options.connectTimeOut = 5000] 默认连接超时时间
     * @param {Number} [options.reConnectTimeOut = 2000] 重连间隔时间
     * @param {Number} options.reConnectRepeatLimit 重连次数限制,不设置则不限制
     * @param {Boolean} [options.heartCheckEnable = false] 默认不开启心跳检测
     * @param {Number} [options.heartCheckTimeOut = 2000] 心跳检测频率
     * @param {String | Number} [options.heartCheckPing = 'ping'] 心跳发送包内容
     * @param {String | Number} [options.heartCheckPong = 'pong'] 心跳相应包内容
     * @param {Number} [options.heartCheckCloseTimeOut = 2000] 心跳检测超时时间
     * @param {Boolean} [options.parseMessage = true] 开启消息格式化
     * @param {Boolean} [options.debug = false] 开启debug消息打印,需要打开控制台详细级别的输出
     */
    constructor(options) {
        super();

        this.url = options.url;
        this.protocols = options.protocols;

        // 设置连接的超时时间
        this.connectTimeOut = options.connectTimeOut || 5000;

        // 限制重连次数,默认不限制
        this.reConnectTimmer = null;
        this.reConnectTimeOut = options.reConnectTimeOut || 2000;
        this.reConnectRepeatLimit = options.reConnectRepeatLimit || null;
        this.reConnectRepeatCount = 0;

        // 心跳配置
        // 是否开启心跳检测,后端需要响应心跳,若后端不响应心跳,会一直重连!
        this.heartCheckEnable = options.heartCheckEnable || false;
        this.heartCheckTimer = null;
        // 心跳频率
        this.heartCheckTimeOut = options.heartCheckTimeOut || 2000;
        this.heartCheckPing = options.heartCheckPing || "ping";
        this.heartCheckPong = options.heartCheckPong || "pong";
        // 发送心跳包后等待的时间,超时则重连
        this.heartCheckCloseTimer = null;
        this.heartCheckCloseTimeOut = options.heartCheckCloseTimeOut || 2000;

        // 是否主动关闭
        this.activeClose = false;

        // 是否格式化消息内容
        this.parseMessage = options.parseMessage || true;

        // 是否开启debug消息
        this.debug = options.debug || false;

        this.connect();
    }

    /**
     * @description 连接
     * @ignore
     */
    connect() {
        const ws = (this.ws = new WebSocket(this.url, this.protocols));

        // 设置连接超时时间
        const connectTimeOutTimer = setTimeout(() => {
            ws.close();
            this.reConnect();
            this.debug &&
                console.debug(`websocket connect timeout:`, this.url);
        }, this.connectTimeOut);

        // 连接成功
        ws.addEventListener("open", (event) => {
            this.debug && console.debug(`websocket open:`, event);
            clearTimeout(connectTimeOutTimer);
            // 重置配置
            this.reConnectRepeatCount = 0;
            this.activeClose = false;
            this.dispatchEvent(new CustomEvent("open", { detail: event }));
            // 开始心跳检测
            if (this.heartCheckEnable) {
                this.heartCheck();
            }
        });

        // 收到消息
        ws.addEventListener("message", (event) => {
            this.debug && console.debug(`websocket message:`, event);
            // 没有开启心跳检测则直接触发事件,开启了心跳检测且返回数据不是指定的心跳响应也触发事件
            if (!this.heartCheckEnable || this.heartCheckPong !== event.data) {
                let message = event.data;
                if (this.parseMessage) {
                    try {
                        message = JSON.parse(message);
                    } catch (error) {
                        this.debug &&
                            console.debug(
                                `WebSocketAuto parse message data fail`
                            );
                    }
                }
                this.dispatchEvent(
                    new MessageEvent("message", {
                        ...event,
                        data: message,
                    })
                );
            }
            if (this.heartCheckEnable) {
                // 开始心跳检测
                this.heartCheck();
            }
        });

        // 连接关闭
        ws.addEventListener("close", (event) => {
            this.debug && console.debug(`websocket closed:`, event);
            // 停止心跳检测
            this.stopHeartCheck();
            this.dispatchEvent(new CloseEvent("close", event));
            // 如果不是主动被关闭,则尝试重连
            if (!this.activeClose) {
                this.reConnect();
            }
        });

        // 连接出错
        ws.addEventListener("error", (event) => {
            this.debug && console.debug(`websocket error:`, event);
            // 停止心跳检测
            this.stopHeartCheck();
            this.dispatchEvent(new ErrorEvent("error", event));
            // 重连
            this.reConnect();
        });
    }

    /**
     * @description 重连
     * @ignore
     */
    reConnect() {
        // 如果设置了重连次数限制,超出限制则不再重连
        if (
            this.reConnectRepeatLimit &&
            this.reConnectRepeatCount >= this.reConnectRepeatLimit
        ) {
            return false;
        }
        // 延时重连
        clearTimeout(this.reConnectTimmer);
        this.reConnectTimmer = setTimeout(() => {
            this.debug && console.debug(`websocket reconnecting`);
            this.reConnectRepeatCount++;
            this.connect();
        }, this.reConnectTimeOut);
    }

    /**
     * @description 发送消息,断网情况下,无法触发close事件,ws会较长时间处于CLOSING状态,此时此函数会报错,请使用trycatch捕获错误处理
     */
    send(...args) {
        this.debug && console.debug(`websocket send:`, ...args);
        return this.ws?.send(...args);
    }

    /**
     * @description 手动关闭
     */
    close() {
        this.activeClose = true;
        return this.ws?.close();
    }

    /**
     * @description 心跳检测
     * @ignore
     */
    heartCheck() {
        this.stopHeartCheck();
        this.heartCheckTimer = setTimeout(() => {
            // 发送心跳包
            this.send(this.heartCheckPing);
            // 一定时间内无响应则关闭,触发重连
            this.heartCheckCloseTimer = setTimeout(() => {
                this.ws?.close();
            }, this.heartCheckCloseTimeOut);
        }, this.heartCheckTimeOut);
    }

    /**
     * @description 停止心跳检测
     * @ignore
     */
    stopHeartCheck() {
        clearTimeout(this.heartCheckTimer);
        clearTimeout(this.heartCheckCloseTimer);
    }
}
  • 服务端配合响应
js
const { WebSocketServer } = require('ws');
const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (client) => {
  client.on('message', (message) => {
    // 响应客户端的自定义ping pong
    if (message.toString() === 'ping') {
      client.send('pong');
    }
  });

  // 发送规范推荐的ping
  client.ping();
  // 收到规范推荐的pong
  client.on('pong', () => {
    console.log('pong');
  });

  client.on('close', () => {});

  setInterval(() => {
    client.send(
      JSON.stringify({
        a: 'b',
      })
    );
  }, 5000);
});

ws 模块

客户端:

js
// HTTPS://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket
const ws = new WebSocket('ws://localhost:8080');

ws.onopen = () => {};
ws.onmessage = (msg) => {};
ws.send('something');

服务端:

js
// HTTPS://www.npmjs.com/package/ws
import { WebSocketServer } from 'ws';
const wss = new WebSocketServer({ port: 8080 });

wss.on('connection', (ws) => {
  ws.on('message', (message) => {});
  ws.send('something');
  ws.on('close', () => {});
});

// 获取所有客户端
console.log(wss.clients);

消息格式规范

js
const WSMessageType = {
  message: 1,
  error: 2,
  image: 3,
};
const createMessage = (type, data) => {
  return JSON.stringify({
    type,
    data,
  });
};

const message = createMessage(WSMessageType.message, 'hello world');

登录用户信息和ws客户端关联

js
ws.user = user;

socket.io 模块

客户端

js
// HTTPS://socket.io/docs/v4/client-initialization/
import io from 'socket.io';
const socket = io('ws://localhost:8080');
socket.on('connection', () => {});
socket.emit('message', 'world');

服务端

js
// HTTPS://socket.io/docs/v4/server-installation/
const server = require('HTTP').createServer();
const io = require('socket.io')(server);
io.on('connection', (client) => {
  socket.emit('message', 'hello');
  client.on('event', (data) => {
    /* … */
  });
  client.on('disconnect', () => {
    /* … */
  });

  // 获取请求
  console.log(client.handshake);

  // 获取客户端 Map结构
  console.log(io.sockets.sockets);
});
server.listen(3000);

轮询

当服务器出现以下情况时,没办法通过一个HTTP请求返回客户端所需要的内容:

  • 此次处理需要的时间不确定,有可能会很长,需要客户端等待
  • 服务器会持续有新内容,此次请求完成后还需要下一次请求

而客户端又需要获得最新的数据,对数据实时性要求很高,这时候就需要通过以下几种方式来实现功能:

  1. 短轮询(polling)
  2. 长轮询(long polling)
  3. 长连接(WebSocket)
  4. 服务器事件推送(Sever-Sent Eventsaka SSE)

短轮询(Polling

短轮询其实就是每隔一段时间就请求一次,获取最新的数据。

短轮询服务端通常不做处理,提供HTTP接口即可,由客户端决定轮询哪种数据格式的接口。

假设有一个持续更新的记录列表,一般有以下两种情况:

  • 服务端不作修改,客户端持续的请求接口得到全部数据状态
  • 客户端携带上一次获取得到的数据时间戳进行请求,服务端通过时间戳对比,返回时间戳之后更新的数据

优点:

  • 实现简单

缺点:

  • 无用的请求过多,每次请求不一定有新的数据
  • 数据实时性差,并不是有了新数据就能马上被轮询到

短轮询实现:

Details
js
/**
 * @class Polling
 * @extends EventTarget
 * @description 轮询类
 * @param {Object} options 轮询选项
 * @param {Function} options.request 请求方法
 * @param {Function} [options.needToPolling = ()=>true] 是否需要继续轮询的检查方法,默认
 * @example
 * const pollingInstance = new Polling({
 *  // 轮询的请求
 *  request:()=>fetch('/data'),
 *  // 可选:在此判断是否继续轮询,不传默认持续轮询
 *  needToPolling: (res) => {
 *    const undoneCount = 1;
 *    return undoneCount > 0;
 *  },
 * });
 * // 每次轮询请求回来的数据更新事件
 * pollingInstance.addEventListener('update',(res)=>{
 *  console.log(res)
 * })
 * // 轮询结束
 * pollingInstance.addEventListener('end',()=>{
 *  console.log('end')
 * })
 * // 开始轮询
 * pollingInstance.start()
 * // 主动结束
 * pollingInstance.stop()
 */
export class Polling extends EventTarget {
    /**
     * @ignore
     * @param {Object} options 轮询选项
     * @param {Function} options.request 请求方法
     * @param {Function} [options.needToPolling = ()=>true] 是否需要继续轮询的检查方法,默认
     */
    constructor({ request, needToPolling = () => true }) {
        super();
        // 请求
        this.request = request;
        // 检查是否需要继续轮询
        this.needToPolling = needToPolling;
        this.pollingListTimer = null;
        this.delay = 3000;
        this.pollingId = 0;
        this.onRequestPollingList = false;
        this.requestErrorCount = 0;
    }
    /**
     * @description 开始轮询
     */
    async start() {
        const needPolling = await this.needToPolling();
        if (needPolling) {
            let delay = this.delay * (this.requestErrorCount + 1);
            delay = delay > 30000 ? 30000 : delay;
            this.pollingListTimer = setTimeout(() => {
                this.polling();
            }, delay);
        }
    }
    /**
     * @description 停止
     */
    async stop() {
        // 更新轮询id,所有请求废弃
        this.pollingId = this.pollingId + 1;
        clearTimeout(this.pollingListTimer);
        this.onRequestPollingList = false;
        this.requestErrorCount = 0;
    }
    /**
     * @description 开始轮询
     * @ignore
     */
    async polling() {
        const pollingId = this.pollingId;
        if (this.onRequestPollingList) {
            return false;
        }
        this.onRequestPollingList = true;
        try {
            const res = await this.request();
            // 重置请求错误状态为0
            this.requestErrorCount = 0;
            // 如果已经变更了轮询id,则废弃
            if (pollingId !== this.pollingId) {
                this.onRequestPollingList = false;
                return false;
            }
            // 更新数据
            this.dispatchEvent(new CustomEvent("update", { detail: res }));
            // 传入新数据由调用者判断是否需要继续轮询
            const needPolling = await this.needToPolling(res);
            // 不再需要轮询表示轮询已结束
            if (!needPolling) {
                this.dispatchEvent(new Event("end"));
                this.onRequestPollingList = false;
                return false;
            }
        } catch (error) {
            this.requestErrorCount++;
            console.log(error);
        }
        this.start();
        this.onRequestPollingList = false;
    }
}

长轮询(Long Polling

在需要等待的情况下,那么服务端也进入等待状态,不立刻响应这个请求,将请求缓存起来,等待一段时间,如果到处理完成,或者有新数据更新时,逐个取出请求进行响应。

处理时间和新数据出现这个时间是无法预料的,所以“等待一段时间”通常为30s或者60s等预估的时间,超过时间还没有新数据就响应请求,让客户端重新请求。

优点:

  • 大大减少了请求的次数
  • 具有更好的数据实时性

缺点:

  • 服务端会一直缓存客户端的请求,占用服务器资源
  • 数据频繁更新的情况下会有大量的连接建立和关闭的处理,对性能有一定的影响

长轮询主要逻辑在服务端实现:

Details
  • 服务端
js
const http = require('http');
const url = require('url');

// 缓存回调事件
const events = [];
// 缓存定时器
const timers = new Set();
// 当前挂起的请求
const subscribers = new Set();

// 触发完成事件,回调挂起的请求
const EventProducer = () => {
  const event = {
    id: Date.now(),
    timestamp: Date.now(),
  };
  events.push(event);

  // 通知所有挂起的请求
  subscribers.forEach((subscriber) => {
    subscriber.resp.write(
      JSON.stringify(
        events.filter((event) => event.timestamp > subscriber.timestamp)
      )
    );
    subscriber.resp.end();
  });
  // 重置subscribers
  subscribers.clear();
  // 取消请求的超时回调
  timers.forEach((timer) => clearTimeout(timer));
  // 重置timers
  timers.clear();
};

// 5秒生成一个事件
setInterval(() => {
  EventProducer();
}, 5000);

// 创建服务
const server = http.createServer((req, resp) => {
  const urlParsed = url.parse(req.url, true);

  resp.setHeader('Access-Control-Allow-Origin', '*');
  resp.setHeader('Origin', '*');

  if (urlParsed.pathname == '/list') {
    // 发送服务端现存事件
    resp.write(JSON.stringify(events));
    resp.end();
  } else if (urlParsed.pathname == '/subscribe') {
    const timestamp = parseInt(urlParsed.query['timestamp']);
    const subscriber = {
      timestamp,
      resp,
    };
    // 新建的连接挂起来
    subscribers.add(subscriber);

    // 30s超时,自动关闭连接
    const timer = setTimeout(() => {
      resp.end();
      timers.delete(timer);
    }, 30000);

    // 客户端主动断开连接
    req.on('close', () => {
      subscribers.delete(subscriber);
      clearTimeout(timer);
    });

    timers.add(timer);
  }
});

server.listen(8080, () => {
  console.log('server is up');
});
  • 客户端
vue
<script setup>
import { Polling } from '@blinkjun/utils';
const pollingInstance = new Polling({
  request: () => {
    return fetch('http://localhost:8080/subscribe?timestamp=' + Date.now());
  },
});
pollingInstance.addEventListener('update', async (res) => {
  console.log(await res.detail.json());
});
pollingInstance.addEventListener('end', () => {
  console.log('end');
});
pollingInstance.start();
</script>

SSE(Server-Send Events)

基于HTTP协议,服务端向客户端推送事件的技术。

客户端向服务端发起一个持久化的HTTP连接,服务端接受请求后挂起,有新消息时通过这个连接推送数据给客户端,可以多次推送,跟WebSocket不同的是,只能服务端推送消息,不允许客户端向服务端发送消息。

优点:

  • 连接数少:客户端和服务端只有一个持久的HTTP连接
  • 数据实时性高:服务端可以实时推送数据到客户端
  • 默认断线重连

缺点:

  • 单项通信:只允许服务端推送,不允许客户端
  • 代理限制:跟WebSocket一样,可能需要客户端重连
  • 一般传输文本,二进制需要编码后发送,WebSocket默认支持传送二进制数据

SSE实现:

主要实现在服务端:

  1. 首先设置HTTP响应头:
http
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
  1. message

每个消息的结构为:[field]: value,每个结构使用\n分割,每个消息体再使用\n分割

每个消息必须发送data,否则不会触发事件。

field有以下几种类型:

  • ::注释
  • data:消息的具体内容
  • event:自定义事件,会触发客户端对应的事件,不设置则触发message事件
  • id:此次消息的id,在客户端使用lastEventId获取
  • retry:指定客户端重新发起连接的间隔
Details
  • 服务端
js
const http = require('http');

http
  .createServer((req, res) => {
    res.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      Connection: 'keep-alive',
      'Access-Control-Allow-Origin': '*',
      origin: '*',
    });

    const sendMessage = (data) => {
      res.write(`data: ${data}\n\n`);
    };

    // 每隔一段时间发送一个消息
    setInterval(() => {
      sendMessage(
        JSON.stringify({
          time: Date.now(),
        })
      );
    }, 1000);

    setTimeout(() => {
      // 发送一个自定义事件
      res.write(`: custom close event\n`);
      res.write(`id: close-id\n`);
      res.write(`event: close\n`);
      res.write(`retry: 1000\n`);
      res.write(`data: close event\n\n`);
    }, 1000);
  })
  .listen(3000);

console.log('SSE server running at http://localhost:3000');
  • 客户端
html
<!DOCTYPE html>
<html>
  <head>
    <title>SSE Client</title>
  </head>
  <body>
    <div id="sseData"></div>

    <script>
      const sseData = document.getElementById('sseData');
      const eventSource = new EventSource('http://localhost:3000');

      eventSource.onmessage = function (event) {
        sseData.innerText = 'Server Time: ' + event.data;
      };

      // 监听自定义事件
      eventSource.addEventListener('close', (e) => {
        console.log(e);
      });
    </script>
  </body>
</html>