OkHttp实现分析之Websocket
HTML5 擁有許多引人注目的新特性,WebSocket就是其中之一。WebSocket一向有著 “Web 的 TCP ”之稱。通常 WebSocket 都是用于Web的,用于構(gòu)建實(shí)時(shí)的 Web 應(yīng)用。它可以在瀏覽器和服務(wù)器之間提供一個(gè)基于 TCP 連接的雙向通道。
WebSocket 協(xié)議本質(zhì)上是一個(gè)基于 TCP 的協(xié)議。為了建立一個(gè) WebSocket 連接,客戶端瀏覽器首先要向服務(wù)器發(fā)起一個(gè) HTTP 請(qǐng)求,這個(gè)請(qǐng)求和通常的 HTTP 請(qǐng)求不同,包含了一些附加頭信息,其中附加頭信息 ”Upgrade: WebSocket” 表明這是一個(gè)申請(qǐng)協(xié)議升級(jí)的 HTTP 請(qǐng)求,服務(wù)器端解析這些附加的頭信息然后產(chǎn)生應(yīng)答信息返回給客戶端,客戶端和服務(wù)器端的 WebSocket 連接就建立起來(lái)了,雙方就可以通過(guò)這個(gè)連接通道自由的傳遞信息,并且這個(gè)連接會(huì)持續(xù)存在直到客戶端或者服務(wù)器端的某一方主動(dòng)的關(guān)閉連接。
Websocket同樣可以用于移動(dòng)端。盡管移動(dòng)端 Android/iOS 的本地應(yīng)用可以直接通過(guò)Socket與服務(wù)器建立連接,并定義自己的協(xié)議來(lái)解決 Web 中實(shí)時(shí)應(yīng)用創(chuàng)建困難的問(wèn)題,但 WebSocket 服務(wù)通常復(fù)用Web的 80 端口,且可以比較方便的基于Web服務(wù)器來(lái)實(shí)現(xiàn),因而對(duì)于某些端口容易被封的網(wǎng)絡(luò)環(huán)境而言,WebSocket 就變得非常有意義。
OkHttp 是在 2016 年 6 月 10 日發(fā)布的 3.4.1 版中添加的對(duì)WebSocket的支持的。本文通過(guò)分析 OkHttp-3.5.0 的 WebSocket 實(shí)現(xiàn)來(lái)學(xué)習(xí)一下這個(gè)協(xié)議。
OkHttp WebSocket客戶端 API 用法
在開(kāi)始分析 WebSocket 的實(shí)現(xiàn)之前,我們先來(lái)看一下 OkHttp 的 WebSocket API怎么用。示例代碼如下:
import android.util.Log;import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.WebSocket; import okhttp3.WebSocketListener; import okio.ByteString;public class WebsocketClient {private static final int NORMAL_CLOSURE_STATUS = 1000;private static OkHttpClient sClient;private static WebSocket sWebSocket;public static synchronized void startRequest() {if (sClient == null) {sClient = new OkHttpClient();}if (sWebSocket == null) {Request request = new Request.Builder().url("ws://echo.websocket.org").build();EchoWebSocketListener listener = new EchoWebSocketListener();sWebSocket = sClient.newWebSocket(request, listener);}}private static void sendMessage(WebSocket webSocket) {webSocket.send("Knock, knock!");webSocket.send("Hello!");webSocket.send(ByteString.decodeHex("deadbeef"));}public static void sendMessage() {WebSocket webSocket;synchronized (WebsocketClient.class) {webSocket = sWebSocket;}if (webSocket != null) {sendMessage(webSocket);}}public static synchronized void closeWebSocket() {if (sWebSocket != null) {sWebSocket.close(NORMAL_CLOSURE_STATUS, "Goodbye!");sWebSocket = null;}}public static synchronized void destroy() {if (sClient != null) {sClient.dispatcher().executorService().shutdown();sClient = null;}}private static void resetWebSocket() {synchronized (WebsocketClient.class) {sWebSocket = null;}}public static class EchoWebSocketListener extends WebSocketListener {private static final String TAG = "EchoWebSocketListener";@Overridepublic void onOpen(WebSocket webSocket, Response response) {sendMessage(webSocket);}@Overridepublic void onMessage(WebSocket webSocket, String text) {Log.i(TAG, "Receiving: " + text);}@Overridepublic void onMessage(WebSocket webSocket, ByteString bytes) {Log.i(TAG, "Receiving: " + bytes.hex());}@Overridepublic void onClosing(WebSocket webSocket, int code, String reason) {webSocket.close(NORMAL_CLOSURE_STATUS, null);Log.i(TAG, "Closing: " + code + " " + reason);resetWebSocket();}@Overridepublic void onClosed(WebSocket webSocket, int code, String reason) {Log.i(TAG, "Closed: " + code + " " + reason);}@Overridepublic void onFailure(WebSocket webSocket, Throwable t, Response response) {t.printStackTrace();resetWebSocket();}} }這個(gè)過(guò)程與發(fā)送HTTP請(qǐng)求的過(guò)程有許多相似之處,它們都需要?jiǎng)?chuàng)建 OkHttpClient 和Request。然而它們不同的地方更多:
后兩點(diǎn)正是 WebSocket 全雙工連接的體現(xiàn)。
OkHttp 的 WebSocket 實(shí)現(xiàn)
接著我們來(lái)看OkHttp 的 WebSocket 實(shí)現(xiàn)。WebSocket 包含兩個(gè)部分,分別是握手和數(shù)據(jù)傳輸,數(shù)據(jù)傳輸又包括數(shù)據(jù)的發(fā)送,數(shù)據(jù)的接收,連接的保活,以及連接的關(guān)閉等,我們將分別分析這些過(guò)程。
連接握手
創(chuàng)建 WebSocket 的過(guò)程如下:
public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {. . . . . .@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {RealWebSocket webSocket = new RealWebSocket(request, listener, new SecureRandom());webSocket.connect(this);return webSocket;}在這里會(huì)創(chuàng)建一個(gè) RealWebSocket 對(duì)象,然后執(zhí)行其 connect() 方法建立連接。 RealWebSocket 對(duì)象的創(chuàng)建過(guò)程如下:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .public RealWebSocket(Request request, WebSocketListener listener, Random random) {if (!"GET".equals(request.method())) {throw new IllegalArgumentException("Request must be GET: " + request.method());}this.originalRequest = request;this.listener = listener;this.random = random;byte[] nonce = new byte[16];random.nextBytes(nonce);this.key = ByteString.of(nonce).base64();this.writerRunnable = new Runnable() {@Override public void run() {try {while (writeOneFrame()) {}} catch (IOException e) {failWebSocket(e, null);}}};}這里最主要的是初始化了 key,以備后續(xù)連接建立及握手之用。Key 是一個(gè)16字節(jié)長(zhǎng)的隨機(jī)數(shù)經(jīng)過(guò) Base64 編碼得到的。此外還初始化了 writerRunnable 等。
連接建立及握手過(guò)程如下:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .public void connect(OkHttpClient client) {client = client.newBuilder().protocols(ONLY_HTTP1).build();final int pingIntervalMillis = client.pingIntervalMillis();final Request request = originalRequest.newBuilder().header("Upgrade", "websocket").header("Connection", "Upgrade").header("Sec-WebSocket-Key", key).header("Sec-WebSocket-Version", "13").build();call = Internal.instance.newWebSocketCall(client, request);call.enqueue(new Callback() {@Override public void onResponse(Call call, Response response) {try {checkResponse(response);} catch (ProtocolException e) {failWebSocket(e, response);closeQuietly(response);return;}// Promote the HTTP streams into web socket streams.StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);streamAllocation.noNewStreams(); // Prevent connection pooling!Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);// Process all web socket messages.try {listener.onOpen(RealWebSocket.this, response);String name = "OkHttp WebSocket " + request.url().redact();initReaderAndWriter(name, pingIntervalMillis, streams);streamAllocation.connection().socket().setSoTimeout(0);loopReader();} catch (Exception e) {failWebSocket(e, null);}}@Override public void onFailure(Call call, IOException e) {failWebSocket(e, null);}});}連接建立及握手的過(guò)程主要是向服務(wù)器發(fā)送一個(gè)HTTP請(qǐng)求。這個(gè) HTTP 請(qǐng)求的特別之處在于,它包含了如下的一些Headers:
Upgrade: WebSocket Connection: Upgrade Sec-WebSocket-Key: 7wgaspE0Tl7/66o4Dov2kw== Sec-WebSocket-Version: 13其中 Upgrade 和 Connection header 向服務(wù)器表明,請(qǐng)求的目的就是要將客戶端和服務(wù)器端的通訊協(xié)議從 HTTP 協(xié)議升級(jí)到 WebSocket 協(xié)議,同時(shí)在請(qǐng)求處理完成之后,連接不要斷開(kāi)。Sec-WebSocket-Key header 值正是我們前面看到的key,它是 WebSocket 客戶端發(fā)送的一個(gè) base64 編碼的密文,要求服務(wù)端必須返回一個(gè)對(duì)應(yīng)加密的 “Sec-WebSocket-Accept” 應(yīng)答,否則客戶端會(huì)拋出 “Error during WebSocket handshake” 錯(cuò)誤,并關(guān)閉連接。
來(lái)自于 HTTP 服務(wù)器的響應(yīng)到達(dá)的時(shí)候,即是連接建立大功告成的時(shí)候,也就是熱豆腐孰了的時(shí)候。
然而,響應(yīng)到達(dá)時(shí),盡管連接已經(jīng)建立,還要為數(shù)據(jù)的收發(fā)做一些準(zhǔn)備。這些準(zhǔn)備中的第一步就是檢查 HTTP 響應(yīng):
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .void checkResponse(Response response) throws ProtocolException {if (response.code() != 101) {throw new ProtocolException("Expected HTTP 101 response but was '"+ response.code() + " " + response.message() + "'");}String headerConnection = response.header("Connection");if (!"Upgrade".equalsIgnoreCase(headerConnection)) {throw new ProtocolException("Expected 'Connection' header value 'Upgrade' but was '"+ headerConnection + "'");}String headerUpgrade = response.header("Upgrade");if (!"websocket".equalsIgnoreCase(headerUpgrade)) {throw new ProtocolException("Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");}String headerAccept = response.header("Sec-WebSocket-Accept");String acceptExpected = ByteString.encodeUtf8(key + WebSocketProtocol.ACCEPT_MAGIC).sha1().base64();if (!acceptExpected.equals(headerAccept)) {throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"+ acceptExpected + "' but was '" + headerAccept + "'");}}. . . . . .public void failWebSocket(Exception e, Response response) {Streams streamsToClose;synchronized (this) {if (failed) return; // Already failed.failed = true;streamsToClose = this.streams;this.streams = null;if (cancelFuture != null) cancelFuture.cancel(false);if (executor != null) executor.shutdown();}try {listener.onFailure(this, e, response);} finally {closeQuietly(streamsToClose);}}根據(jù) WebSocket 的協(xié)議,服務(wù)器端用如下響應(yīng),來(lái)表示接受建立 WebSocket 連接的請(qǐng)求:
為數(shù)據(jù)收發(fā)做準(zhǔn)備的第二步是,初始化用于輸入輸出的 Source 和 Sink。Source 和 Sink 創(chuàng)建于之前發(fā)送HTTP請(qǐng)求的時(shí)候。這里會(huì)阻止在這個(gè)連接上再創(chuàng)建新的流。
public final class RealConnection extends Http2Connection.Listener implements Connection {. . . . . .public RealWebSocket.Streams newWebSocketStreams(final StreamAllocation streamAllocation) {return new RealWebSocket.Streams(true, source, sink) {@Override public void close() throws IOException {streamAllocation.streamFinished(true, streamAllocation.codec());}};}Streams是一個(gè) BufferedSource 和 BufferedSink 的holder:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .public abstract static class Streams implements Closeable {public final boolean client;public final BufferedSource source;public final BufferedSink sink;public Streams(boolean client, BufferedSource source, BufferedSink sink) {this.client = client;this.source = source;this.sink = sink;}}第三步是調(diào)用回調(diào) onOpen()。
第四步是初始化 Reader 和 Writer:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .public void initReaderAndWriter(String name, long pingIntervalMillis, Streams streams) throws IOException {synchronized (this) {this.streams = streams;this.writer = new WebSocketWriter(streams.client, streams.sink, random);this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));if (pingIntervalMillis != 0) {executor.scheduleAtFixedRate(new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);}if (!messageAndCloseQueue.isEmpty()) {runWriter(); // Send messages that were enqueued before we were connected.}}reader = new WebSocketReader(streams.client, streams.source, this);}OkHttp使用 WebSocketReader 和 WebSocketWriter 來(lái)處理數(shù)據(jù)的收發(fā)。在發(fā)送數(shù)據(jù)時(shí)將數(shù)據(jù)組織成幀,在接收數(shù)據(jù)時(shí)則進(jìn)行反向擦做,同時(shí)處理 WebSocket 的控制消息。
WebSocket 的所有數(shù)據(jù)發(fā)送動(dòng)作,都會(huì)在單線程線程池的線程中,通過(guò) WebSocketWriter 執(zhí)行。在這里會(huì)創(chuàng)建 ScheduledThreadPoolExecutor 用于跑數(shù)據(jù)的發(fā)送操作。WebSocket 協(xié)議中主要會(huì)傳輸兩種類型的幀,一是控制幀,主要是用于連接保活的 Ping 幀等;二是用戶數(shù)據(jù)載荷幀。在這里會(huì)根據(jù)用戶的配置,調(diào)度 Ping 幀周期性地發(fā)送。我們?cè)谡{(diào)用 WebSocket 的接口發(fā)送數(shù)據(jù)時(shí),數(shù)據(jù)并不是同步發(fā)送的,而是被放在了一個(gè)消息隊(duì)列中。發(fā)送消息的 Runnable 從消息隊(duì)列中讀取數(shù)據(jù)發(fā)送。這里會(huì)檢查消息隊(duì)列中是否有數(shù)據(jù),如果有的話,會(huì)調(diào)度發(fā)送消息的 Runnable 執(zhí)行。
第五步是配置socket的超時(shí)時(shí)間為0,也就是阻塞IO。
第六步執(zhí)行 loopReader()。這實(shí)際上是進(jìn)入了消息讀取循環(huán)了,也就是數(shù)據(jù)接收的邏輯了。
數(shù)據(jù)發(fā)送
我們可以通過(guò) WebSocket 接口的 send(String text) 和 send(ByteString bytes) 分別發(fā)送文本的和二進(jìn)制格式的消息。
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .@Override public boolean send(String text) {if (text == null) throw new NullPointerException("text == null");return send(ByteString.encodeUtf8(text), OPCODE_TEXT);}@Override public boolean send(ByteString bytes) {if (bytes == null) throw new NullPointerException("bytes == null");return send(bytes, OPCODE_BINARY);}private synchronized boolean send(ByteString data, int formatOpcode) {// Don't send new frames after we've failed or enqueued a close frame.if (failed || enqueuedClose) return false;// If this frame overflows the buffer, reject it and close the web socket.if (queueSize + data.size() > MAX_QUEUE_SIZE) {close(CLOSE_CLIENT_GOING_AWAY, null);return false;}// Enqueue the message frame.queueSize += data.size();messageAndCloseQueue.add(new Message(formatOpcode, data));runWriter();return true;}. . . . . .private void runWriter() {assert (Thread.holdsLock(this));if (executor != null) {executor.execute(writerRunnable);}}可以看到我們調(diào)用發(fā)送數(shù)據(jù)的接口時(shí),做的事情主要是將數(shù)據(jù)格式化,構(gòu)造消息,放進(jìn)一個(gè)消息隊(duì)列,然后調(diào)度 writerRunnable 執(zhí)行。
此外,值得注意的是,當(dāng)消息隊(duì)列中的未發(fā)送數(shù)據(jù)超出最大大小限制,WebSocket 連接會(huì)被直接關(guān)閉。對(duì)于發(fā)送失敗過(guò)或被關(guān)閉了的 WebSocket,將無(wú)法再發(fā)送信息。
在 writerRunnable 中會(huì)循環(huán)調(diào)用 writeOneFrame() 逐幀發(fā)送數(shù)據(jù),直到數(shù)據(jù)發(fā)完,或發(fā)送失敗。在 WebSocket 協(xié)議中,客戶端需要發(fā)送 四種類型 的幀:
PING幀用于連接保活,它的發(fā)送是在 PingRunnable 中執(zhí)行的,在初始化 Reader 和 Writer 的時(shí)候,就會(huì)根據(jù)設(shè)置調(diào)度執(zhí)行或不執(zhí)行。除PING 幀外的其它 三種 幀,都在 writeOneFrame() 中發(fā)送。PONG 幀是對(duì)服務(wù)器發(fā)過(guò)來(lái)的 PING 幀的響應(yīng),同樣用于保活連接。后面我們?cè)诜治鲞B接的保活時(shí)會(huì)更詳細(xì)的分析 PING 和 PONG 這兩種幀。CLOSE 幀用于關(guān)閉連接,稍后我們?cè)诜治鲞B接關(guān)閉過(guò)程時(shí)再來(lái)詳細(xì)地分析。
這里我們主要關(guān)注用戶數(shù)據(jù)發(fā)送的部分。PONG 幀具有最高的發(fā)送優(yōu)先級(jí)。在沒(méi)有PONG 幀需要發(fā)送時(shí),writeOneFrame() 從消息隊(duì)列中取出一條消息,如果消息不是 CLOSE 幀,則主要通過(guò)如下的過(guò)程進(jìn)行發(fā)送:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .boolean writeOneFrame() throws IOException {WebSocketWriter writer;ByteString pong;Object messageOrClose = null;int receivedCloseCode = -1;String receivedCloseReason = null;Streams streamsToClose = null;synchronized (RealWebSocket.this) {if (failed) {return false; // Failed web socket.}writer = this.writer;pong = pongQueue.poll();if (pong == null) {messageOrClose = messageAndCloseQueue.poll();. . . . . .} else if (messageOrClose instanceof Message) {ByteString data = ((Message) messageOrClose).data;BufferedSink sink = Okio.buffer(writer.newMessageSink(((Message) messageOrClose).formatOpcode, data.size()));sink.write(data);sink.close();synchronized (this) {queueSize -= data.size();}} else if (messageOrClose instanceof Close) {數(shù)據(jù)發(fā)送的過(guò)程可以總結(jié)如下:
這里面的玄機(jī)主要在創(chuàng)建的 BufferedSink。創(chuàng)建的 Sink 是一個(gè) FrameSink:
static void toggleMask(byte[] buffer, long byteCount, byte[] key, long frameBytesRead) {int keyLength = key.length;for (int i = 0; i < byteCount; i++, frameBytesRead++) {int keyIndex = (int) (frameBytesRead % keyLength);buffer[i] = (byte) (buffer[i] ^ key[keyIndex]);}}. . . . . .Sink newMessageSink(int formatOpcode, long contentLength) {if (activeWriter) {throw new IllegalStateException("Another message writer is active. Did you call close()?");}activeWriter = true;// Reset FrameSink state for a new writer.frameSink.formatOpcode = formatOpcode;frameSink.contentLength = contentLength;frameSink.isFirstFrame = true;frameSink.closed = false;return frameSink;}void writeMessageFrameSynchronized(int formatOpcode, long byteCount, boolean isFirstFrame,boolean isFinal) throws IOException {assert Thread.holdsLock(this);if (writerClosed) throw new IOException("closed");int b0 = isFirstFrame ? formatOpcode : OPCODE_CONTINUATION;if (isFinal) {b0 |= B0_FLAG_FIN;}sink.writeByte(b0);int b1 = 0;if (isClient) {b1 |= B1_FLAG_MASK;}if (byteCount <= PAYLOAD_BYTE_MAX) {b1 |= (int) byteCount;sink.writeByte(b1);} else if (byteCount <= PAYLOAD_SHORT_MAX) {b1 |= PAYLOAD_SHORT;sink.writeByte(b1);sink.writeShort((int) byteCount);} else {b1 |= PAYLOAD_LONG;sink.writeByte(b1);sink.writeLong(byteCount);}if (isClient) {random.nextBytes(maskKey);sink.write(maskKey);for (long written = 0; written < byteCount; ) {int toRead = (int) Math.min(byteCount, maskBuffer.length);int read = buffer.read(maskBuffer, 0, toRead);if (read == -1) throw new AssertionError();toggleMask(maskBuffer, read, maskKey, written);sink.write(maskBuffer, 0, read);written += read;}} else {sink.write(buffer, byteCount);}sink.emit();}final class FrameSink implements Sink {int formatOpcode;long contentLength;boolean isFirstFrame;boolean closed;@Override public void write(Buffer source, long byteCount) throws IOException {if (closed) throw new IOException("closed");buffer.write(source, byteCount);// Determine if this is a buffered write which we can defer until close() flushes.boolean deferWrite = isFirstFrame&& contentLength != -1&& buffer.size() > contentLength - 8192 /* segment size */;long emitCount = buffer.completeSegmentByteCount();if (emitCount > 0 && !deferWrite) {synchronized (WebSocketWriter.this) {writeMessageFrameSynchronized(formatOpcode, emitCount, isFirstFrame, false /* final */);}isFirstFrame = false;}}@Override public void flush() throws IOException {if (closed) throw new IOException("closed");synchronized (WebSocketWriter.this) {writeMessageFrameSynchronized(formatOpcode, buffer.size(), isFirstFrame, false /* final */);}isFirstFrame = false;}@Override public Timeout timeout() {return sink.timeout();}@SuppressWarnings("PointlessBitwiseExpression")@Override public void close() throws IOException {if (closed) throw new IOException("closed");synchronized (WebSocketWriter.this) {writeMessageFrameSynchronized(formatOpcode, buffer.size(), isFirstFrame, true /* final */);}closed = true;activeWriter = false;}}FrameSink 的 write() 會(huì)先將數(shù)據(jù)寫如一個(gè) Buffer 中,然后再?gòu)倪@個(gè) Buffer 中讀取數(shù)據(jù)來(lái)發(fā)送。如果是第一次發(fā)送數(shù)據(jù),同時(shí)剩余要發(fā)送的數(shù)據(jù)小于 8192 字節(jié)時(shí),會(huì)延遲執(zhí)行實(shí)際的數(shù)據(jù)發(fā)送,等 close() 時(shí)刷新。根據(jù) RealWebSocket 的 writeOneFrame() 的邏輯,在 write() 時(shí),總是寫入整個(gè)消息的所有數(shù)據(jù),因而,在 FrameSink 的 write() 中總是不會(huì)發(fā)送數(shù)據(jù)的。
writeMessageFrameSynchronized() 將用戶數(shù)據(jù)格式化并發(fā)送出去。規(guī)范中定義的數(shù)據(jù)格式如下:
0 1 2 30 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1+-+-+-+-+-------+-+-------------+-------------------------------+|F|R|R|R| opcode|M| Payload len | Extended payload length ||I|S|S|S| (4) |A| (7) | (16/64) ||N|V|V|V| |S| | (if payload len==126/127) || |1|2|3| |K| | |+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +| Extended payload length continued, if payload len == 127 |+ - - - - - - - - - - - - - - - +-------------------------------+| |Masking-key, if MASK set to 1 |+-------------------------------+-------------------------------+| Masking-key (continued) | Payload Data |+-------------------------------- - - - - - - - - - - - - - - - +: Payload Data continued ... :+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +| Payload Data continued ... |+---------------------------------------------------------------+基本結(jié)構(gòu)為:
關(guān)于幀格式的更詳細(xì)信息,可以參考 WebSocket Protocol 規(guī)范。
數(shù)據(jù)的接收
如我們前面看到的, 在握手的HTTP請(qǐng)求返回之后,會(huì)在HTTP請(qǐng)求的回調(diào)里,啟動(dòng)消息讀取循環(huán) loopReader():
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . ./** Receive frames until there are no more. Invoked only by the reader thread. */public void loopReader() throws IOException {while (receivedCloseCode == -1) {// This method call results in one or more onRead* methods being called on this thread.reader.processNextFrame();}}在這個(gè)循環(huán)中,不斷通過(guò) WebSocketReader 的 processNextFrame() 讀取消息,直到收到了關(guān)閉連接的消息。
final class WebSocketReader {public interface FrameCallback {void onReadMessage(String text) throws IOException;void onReadMessage(ByteString bytes) throws IOException;void onReadPing(ByteString buffer);void onReadPong(ByteString buffer);void onReadClose(int code, String reason);}. . . . . .void processNextFrame() throws IOException {readHeader();if (isControlFrame) {readControlFrame();} else {readMessageFrame();}}private void readHeader() throws IOException {if (closed) throw new IOException("closed");// Disable the timeout to read the first byte of a new frame.int b0;long timeoutBefore = source.timeout().timeoutNanos();source.timeout().clearTimeout();try {b0 = source.readByte() & 0xff;} finally {source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS);}opcode = b0 & B0_MASK_OPCODE;isFinalFrame = (b0 & B0_FLAG_FIN) != 0;isControlFrame = (b0 & OPCODE_FLAG_CONTROL) != 0;// Control frames must be final frames (cannot contain continuations).if (isControlFrame && !isFinalFrame) {throw new ProtocolException("Control frames must be final.");}boolean reservedFlag1 = (b0 & B0_FLAG_RSV1) != 0;boolean reservedFlag2 = (b0 & B0_FLAG_RSV2) != 0;boolean reservedFlag3 = (b0 & B0_FLAG_RSV3) != 0;if (reservedFlag1 || reservedFlag2 || reservedFlag3) {// Reserved flags are for extensions which we currently do not support.throw new ProtocolException("Reserved flags are unsupported.");}int b1 = source.readByte() & 0xff;isMasked = (b1 & B1_FLAG_MASK) != 0;if (isMasked == isClient) {// Masked payloads must be read on the server. Unmasked payloads must be read on the client.throw new ProtocolException(isClient? "Server-sent frames must not be masked.": "Client-sent frames must be masked.");}// Get frame length, optionally reading from follow-up bytes if indicated by special values.frameLength = b1 & B1_MASK_LENGTH;if (frameLength == PAYLOAD_SHORT) {frameLength = source.readShort() & 0xffffL; // Value is unsigned.} else if (frameLength == PAYLOAD_LONG) {frameLength = source.readLong();if (frameLength < 0) {throw new ProtocolException("Frame length 0x" + Long.toHexString(frameLength) + " > 0x7FFFFFFFFFFFFFFF");}}frameBytesRead = 0;if (isControlFrame && frameLength > PAYLOAD_BYTE_MAX) {throw new ProtocolException("Control frame must be less than " + PAYLOAD_BYTE_MAX + "B.");}if (isMasked) {// Read the masking key as bytes so that they can be used directly for unmasking.source.readFully(maskKey);}}processNextFrame() 先讀取 Header 的兩個(gè)字節(jié),然后根據(jù) Header 的信息,讀取數(shù)據(jù)內(nèi)容。
在讀取 Header 時(shí),讀的第一個(gè)字節(jié)是同步的不計(jì)超時(shí)時(shí)間的。WebSocketReader 從 Header 中,獲取到這個(gè)幀是不是消息的最后一幀,消息的類型,是否有掩碼字節(jié),保留位,幀的長(zhǎng)度,以及掩碼字節(jié)等信息。WebSocket 通過(guò)掩碼位和掩碼字節(jié)來(lái)區(qū)分?jǐn)?shù)據(jù)是從客戶端發(fā)送給服務(wù)器的,還是服務(wù)器發(fā)送給客戶端的。這里會(huì)根據(jù)協(xié)議,對(duì)這些信息進(jìn)行有效性一致性檢驗(yàn),若不一致則會(huì)拋出 ProtocolException。
WebSocketReader 同步讀取時(shí)的調(diào)用棧如下:
Reader Thread
通過(guò)幀的 Header 確定了是數(shù)據(jù)幀,則會(huì)執(zhí)行 readMessageFrame() 讀取消息幀:
final class WebSocketReader {. . . . . .private void readMessageFrame() throws IOException {int opcode = this.opcode;if (opcode != OPCODE_TEXT && opcode != OPCODE_BINARY) {throw new ProtocolException("Unknown opcode: " + toHexString(opcode));}Buffer message = new Buffer();readMessage(message);if (opcode == OPCODE_TEXT) {frameCallback.onReadMessage(message.readUtf8());} else {frameCallback.onReadMessage(message.readByteString());}}/** Read headers and process any control frames until we reach a non-control frame. */void readUntilNonControlFrame() throws IOException {while (!closed) {readHeader();if (!isControlFrame) {break;}readControlFrame();}}/*** Reads a message body into across one or more frames. Control frames that occur between* fragments will be processed. If the message payload is masked this will unmask as it's being* processed.*/private void readMessage(Buffer sink) throws IOException {while (true) {if (closed) throw new IOException("closed");if (frameBytesRead == frameLength) {if (isFinalFrame) return; // We are exhausted and have no continuations.readUntilNonControlFrame();if (opcode != OPCODE_CONTINUATION) {throw new ProtocolException("Expected continuation opcode. Got: " + toHexString(opcode));}if (isFinalFrame && frameLength == 0) {return; // Fast-path for empty final frame.}}long toRead = frameLength - frameBytesRead;long read;if (isMasked) {toRead = Math.min(toRead, maskBuffer.length);read = source.read(maskBuffer, 0, (int) toRead);if (read == -1) throw new EOFException();toggleMask(maskBuffer, read, maskKey, frameBytesRead);sink.write(maskBuffer, 0, (int) read);} else {read = source.read(sink, toRead);if (read == -1) throw new EOFException();}frameBytesRead += read;}}這個(gè)過(guò)程中,會(huì)讀取一條消息包含的所有數(shù)據(jù)幀。按照 WebSocket 的標(biāo)準(zhǔn),包含用戶數(shù)據(jù)的消息數(shù)據(jù)幀可以和控制幀交替發(fā)送;但消息之間的數(shù)據(jù)幀不可以。因而在這個(gè)過(guò)程中,若遇到了控制幀,則會(huì)先讀取控制幀進(jìn)行處理,然后繼續(xù)讀取消息的數(shù)據(jù)幀,直到讀取了消息的所有數(shù)據(jù)幀。
掩碼位和掩碼字節(jié),對(duì)于客戶端而言,發(fā)送的數(shù)據(jù)中包含這些東西,在接收的數(shù)據(jù)中不包含這些;對(duì)于服務(wù)器而言,則是在接收的數(shù)據(jù)中包含這些,發(fā)送的數(shù)據(jù)中不包含。OkHttp 既支持服務(wù)器開(kāi)發(fā),也支持客戶端開(kāi)發(fā),因而可以看到對(duì)于掩碼位和掩碼字節(jié)完整的處理。
在一個(gè)消息讀取完成之后,會(huì)通過(guò)回調(diào) FrameCallback 將讀取的內(nèi)容通知出去。
final class WebSocketReader {. . . . . .WebSocketReader(boolean isClient, BufferedSource source, FrameCallback frameCallback) {if (source == null) throw new NullPointerException("source == null");if (frameCallback == null) throw new NullPointerException("frameCallback == null");this.isClient = isClient;this.source = source;this.frameCallback = frameCallback;}這一事件會(huì)通知到 RealWebSocket。
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .@Override public void onReadMessage(String text) throws IOException {listener.onMessage(this, text);}@Override public void onReadMessage(ByteString bytes) throws IOException {listener.onMessage(this, bytes);}在 RealWebSocket 中,這一事件又被通知到我們?cè)趹?yīng)用程序中創(chuàng)建的回調(diào) WebSocketListener。
連接的保活
連接的保活通過(guò) PING 幀和 PONG 幀來(lái)實(shí)現(xiàn)。如我們前面看到的,若用戶設(shè)置了 PING 幀的發(fā)送周期,在握手的HTTP請(qǐng)求返回時(shí),消息讀取循環(huán)開(kāi)始前會(huì)調(diào)度 PingRunnable 周期性的向服務(wù)器發(fā)送 PING 幀:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .private final class PingRunnable implements Runnable {PingRunnable() {}@Override public void run() {writePingFrame();}}void writePingFrame() {WebSocketWriter writer;synchronized (this) {if (failed) return;writer = this.writer;}try {writer.writePing(ByteString.EMPTY);} catch (IOException e) {failWebSocket(e, null);}}在 PingRunnable 中,通過(guò) WebSocketWriter 發(fā)送 PING 幀:
final class WebSocketWriter {. . . . . ./** Send a ping with the supplied {@code payload}. */void writePing(ByteString payload) throws IOException {synchronized (this) {writeControlFrameSynchronized(OPCODE_CONTROL_PING, payload);}}. . . . . .private void writeControlFrameSynchronized(int opcode, ByteString payload) throws IOException {assert Thread.holdsLock(this);if (writerClosed) throw new IOException("closed");int length = payload.size();if (length > PAYLOAD_BYTE_MAX) {throw new IllegalArgumentException("Payload size must be less than or equal to " + PAYLOAD_BYTE_MAX);}int b0 = B0_FLAG_FIN | opcode;sink.writeByte(b0);int b1 = length;if (isClient) {b1 |= B1_FLAG_MASK;sink.writeByte(b1);random.nextBytes(maskKey);sink.write(maskKey);byte[] bytes = payload.toByteArray();toggleMask(bytes, bytes.length, maskKey, 0);sink.write(bytes);} else {sink.writeByte(b1);sink.write(payload);}sink.flush();}PING 幀是一個(gè)不包含載荷的控制幀。關(guān)于掩碼位和掩碼字節(jié)的設(shè)置,與消息的數(shù)據(jù)幀相同。即客戶端發(fā)送的幀,設(shè)置掩碼位,幀中包含掩碼字節(jié);服務(wù)器發(fā)送的幀,不設(shè)置掩碼位,幀中不包含掩碼字節(jié)。
通過(guò) WebSocket 通信的雙方,在收到對(duì)方發(fā)來(lái)的 PING 幀時(shí),需要用PONG幀來(lái)回復(fù)。在 WebSocketReader 的 readControlFrame() 中可以看到這一點(diǎn):
final class WebSocketReader {. . . . . .private void readControlFrame() throws IOException {Buffer buffer = new Buffer();if (frameBytesRead < frameLength) {if (isClient) {source.readFully(buffer, frameLength);} else {while (frameBytesRead < frameLength) {int toRead = (int) Math.min(frameLength - frameBytesRead, maskBuffer.length);int read = source.read(maskBuffer, 0, toRead);if (read == -1) throw new EOFException();toggleMask(maskBuffer, read, maskKey, frameBytesRead);buffer.write(maskBuffer, 0, read);frameBytesRead += read;}}}switch (opcode) {case OPCODE_CONTROL_PING:frameCallback.onReadPing(buffer.readByteString());break;case OPCODE_CONTROL_PONG:frameCallback.onReadPong(buffer.readByteString());break;PING 幀和 PONG 幀都不帶載荷,控制幀讀寫時(shí)對(duì)于載荷長(zhǎng)度的處理,都是為 CLOSE 幀做的。因而針對(duì) PING 幀和 PONG 幀,除了 Header 外, readControlFrame() 實(shí)際上無(wú)需再讀取任何數(shù)據(jù),但它會(huì)將這些事件通知出去:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .@Override public synchronized void onReadPing(ByteString payload) {// Don't respond to pings after we've failed or sent the close frame.if (failed || (enqueuedClose && messageAndCloseQueue.isEmpty())) return;pongQueue.add(payload);runWriter();pingCount++;}@Override public synchronized void onReadPong(ByteString buffer) {// This API doesn't expose pings.pongCount++;}可見(jiàn)在收到 PING 幀的時(shí)候,總是會(huì)發(fā)一個(gè) PONG 幀出去,且通常其沒(méi)有載荷數(shù)據(jù)。在收到一個(gè) PONG 幀時(shí),則通常只是記錄一下,然后什么也不做。如我們前面所見(jiàn),PONG 幀在 writerRunnable 中被發(fā)送出去:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .if (pong != null) {writer.writePong(pong);} else if (messageOrClose instanceof Message) {PONG 幀的發(fā)送與 PING 幀的非常相似:
final class WebSocketWriter {. . . . . ./** Send a pong with the supplied {@code payload}. */void writePong(ByteString payload) throws IOException {synchronized (this) {writeControlFrameSynchronized(OPCODE_CONTROL_PONG, payload);}}連接的關(guān)閉
連接的關(guān)閉,與數(shù)據(jù)發(fā)送的過(guò)程頗有幾分相似之處。通過(guò) WebSocket 接口的 close(int code, String reason) 我們可以關(guān)閉一個(gè) WebSocket 連接:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .@Override public boolean close(int code, String reason) {return close(code, reason, CANCEL_AFTER_CLOSE_MILLIS);}synchronized boolean close(int code, String reason, long cancelAfterCloseMillis) {validateCloseCode(code);ByteString reasonBytes = null;if (reason != null) {reasonBytes = ByteString.encodeUtf8(reason);if (reasonBytes.size() > CLOSE_MESSAGE_MAX) {throw new IllegalArgumentException("reason.size() > " + CLOSE_MESSAGE_MAX + ": " + reason);}}if (failed || enqueuedClose) return false;// Immediately prevent further frames from being enqueued.enqueuedClose = true;// Enqueue the close frame.messageAndCloseQueue.add(new Close(code, reasonBytes, cancelAfterCloseMillis));runWriter();return true;}在執(zhí)行關(guān)閉連接動(dòng)作前,會(huì)先檢查一下 close code 的有效性在合法范圍內(nèi)。關(guān)于不同 close code 的詳細(xì)說(shuō)明,可以參考 WebSocket 協(xié)議規(guī)范。
檢查完了之后,會(huì)構(gòu)造一個(gè) Close 消息放入發(fā)送消息隊(duì)列,并調(diào)度 writerRunnable 執(zhí)行。Close 消息可以帶有不超出 123 字節(jié)的字符串,以作為 Close message,來(lái)說(shuō)明連接關(guān)閉的原因。
連接的關(guān)閉分為主動(dòng)關(guān)閉和被動(dòng)關(guān)閉。客戶端先向服務(wù)器發(fā)送一個(gè) CLOSE 幀,然后服務(wù)器恢復(fù)一個(gè) CLOSE 幀,對(duì)于客戶端而言,這個(gè)過(guò)程為主動(dòng)關(guān)閉;反之則為對(duì)客戶端而言則為被動(dòng)關(guān)閉。
在 writerRunnable 執(zhí)行的 writeOneFrame() 實(shí)際發(fā)送 CLOSE 幀:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .messageOrClose = messageAndCloseQueue.poll();if (messageOrClose instanceof Close) {receivedCloseCode = this.receivedCloseCode;receivedCloseReason = this.receivedCloseReason;if (receivedCloseCode != -1) {streamsToClose = this.streams;this.streams = null;this.executor.shutdown();} else {// When we request a graceful close also schedule a cancel of the websocket.cancelFuture = executor.schedule(new CancelRunnable(),((Close) messageOrClose).cancelAfterCloseMillis, MILLISECONDS);}} else if (messageOrClose == null) {return false; // The queue is exhausted.}}. . . . . .} else if (messageOrClose instanceof Close) {Close close = (Close) messageOrClose;writer.writeClose(close.code, close.reason);// We closed the writer: now both reader and writer are closed.if (streamsToClose != null) {listener.onClosed(this, receivedCloseCode, receivedCloseReason);}} else {發(fā)送 CLOSE 幀也分為主動(dòng)關(guān)閉的發(fā)送還是被動(dòng)關(guān)閉的發(fā)送。
對(duì)于被動(dòng)關(guān)閉,在發(fā)送完 CLOSE 幀之后,連接被最終關(guān)閉,因而,發(fā)送 CLOSE 幀之前,這里會(huì)停掉發(fā)送消息用的 executor。而在發(fā)送之后,則會(huì)通過(guò) onClosed() 通知用戶。
而對(duì)于主動(dòng)關(guān)閉,則在發(fā)送前會(huì)調(diào)度 CancelRunnable 的執(zhí)行,發(fā)送后不會(huì)通過(guò) onClosed() 通知用戶。
final class WebSocketWriter {. . . . . .void writeClose(int code, ByteString reason) throws IOException {ByteString payload = ByteString.EMPTY;if (code != 0 || reason != null) {if (code != 0) {validateCloseCode(code);}Buffer buffer = new Buffer();buffer.writeShort(code);if (reason != null) {buffer.write(reason);}payload = buffer.readByteString();}synchronized (this) {try {writeControlFrameSynchronized(OPCODE_CONTROL_CLOSE, payload);} finally {writerClosed = true;}}}將 CLOSE 幀發(fā)送到網(wǎng)絡(luò)的過(guò)程與 PING 和 PONG 幀的頗為相似,僅有的差別就是 CLOSE 幀有載荷。關(guān)于掩碼位和掩碼自己的規(guī)則,同樣適用于 CLOSE 幀的發(fā)送。
CLOSE 的讀取在 WebSocketReader 的 readControlFrame()中:
final class WebSocketReader {. . . . . .private void readControlFrame() throws IOException {Buffer buffer = new Buffer();if (frameBytesRead < frameLength) {if (isClient) {source.readFully(buffer, frameLength);} else {while (frameBytesRead < frameLength) {int toRead = (int) Math.min(frameLength - frameBytesRead, maskBuffer.length);int read = source.read(maskBuffer, 0, toRead);if (read == -1) throw new EOFException();toggleMask(maskBuffer, read, maskKey, frameBytesRead);buffer.write(maskBuffer, 0, read);frameBytesRead += read;}}}switch (opcode) {. . . . . .case OPCODE_CONTROL_CLOSE:int code = CLOSE_NO_STATUS_CODE;String reason = "";long bufferSize = buffer.size();if (bufferSize == 1) {throw new ProtocolException("Malformed close payload length of 1.");} else if (bufferSize != 0) {code = buffer.readShort();reason = buffer.readUtf8();String codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code);if (codeExceptionMessage != null) throw new ProtocolException(codeExceptionMessage);}frameCallback.onReadClose(code, reason);closed = true;break;default:throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));}}讀到 CLOSE 幀時(shí),WebSocketReader 會(huì)將這一事件通知出去:
public final class RealWebSocket implements WebSocket, WebSocketReader.FrameCallback {. . . . . .@Override public void onReadClose(int code, String reason) {if (code == -1) throw new IllegalArgumentException();Streams toClose = null;synchronized (this) {if (receivedCloseCode != -1) throw new IllegalStateException("already closed");receivedCloseCode = code;receivedCloseReason = reason;if (enqueuedClose && messageAndCloseQueue.isEmpty()) {toClose = this.streams;this.streams = null;if (cancelFuture != null) cancelFuture.cancel(false);this.executor.shutdown();}}try {listener.onClosing(this, code, reason);if (toClose != null) {listener.onClosed(this, code, reason);}} finally {closeQuietly(toClose);}}對(duì)于收到的 CLOSE 幀處理同樣分為主動(dòng)關(guān)閉的情況和被動(dòng)關(guān)閉的情況。與 CLOSE 發(fā)送時(shí)的情形正好相反,若是主動(dòng)關(guān)閉,則在收到 CLOSE 幀之后,WebSocket 連接最終斷開(kāi),因而需要停掉executor,被動(dòng)關(guān)閉則暫時(shí)不需要。
收到 CLOSE 幀,總是會(huì)通過(guò) onClosing() 將事件通知出去。
對(duì)于主動(dòng)關(guān)閉的情形,最后還會(huì)通過(guò) onClosed() 通知用戶,連接已經(jīng)最終關(guān)閉。
關(guān)于 WebSocket 的 CLOSE 幀的更多說(shuō)明,可以參考 WebSocket協(xié)議規(guī)范。
WebSocket連接的生命周期
總結(jié)一下 WebSocket 連接的生命周期:
關(guān)于 WebSocket 的詳細(xì)信息,可以參考 WebSocket協(xié)議規(guī)范。
參考文檔
WebSocket 協(xié)議規(guī)范
WebSocket 實(shí)戰(zhàn)
使用 HTML5 WebSocket 構(gòu)建實(shí)時(shí) Web 應(yīng)用
WebSocket Client Example with OkHttp
總結(jié)
以上是生活随笔為你收集整理的OkHttp实现分析之Websocket的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 读《Android 安全架构深究》
- 下一篇: 标准STUN判断NAT类型的过程及改进