WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)
首發CSDN:徐同學呀,原創不易,轉載請注明源鏈接。我是徐同學,用心輸出高質量文章,希望對你有所幫助。 本篇基于Tomcat10.0.6。建議收藏起來慢慢看。
文章目錄
- 一、前言
- 二、什么是WebSocket
- 1、HTTP/1.1的缺陷
- 2、WebSocket發展歷史
- (1)背景
- (2)歷史
- 3、WebSocket握手和雙向通信
- (1)定義
- (2)握手(建立連接)
- (3)消息幀
- (4)揮手(關閉連接)
- 4、WebSocket優點
- 三、Java API for WebSocket(JSR356)
- 1、服務端API
- (1)注解方式@ServerEndpoint
- (2)繼承抽象類Endpoint
- 2、客戶端API
- 3、上下文Session
- 4、HandshakeRequest 和 HandshakeResponse
- (1)HandshakeRequest
- (2)HandshakeResponse
- 5、WebSocketContainer
- 四、WebSocket基于Tomcat應用
- 1、服務器端實現
- (1)@ServerEndpoint注解方式
- (2)繼承抽象類Endpoint方式
- (3)早期Tomcat7中Server端實現對比
- 2、客戶端實現
- (1)前端js版
- (2)@ClientEndpoint注解方式
- (3)繼承抽象類Endpoint方式
- 3、基于Nginx反向代理注意事項
- 五、WebSocket在Tomcat中的源碼實現
- 1、WsSci初始化
- (1)WsSci#onStartup
- (2)WsServerContainer#addEndpoint
- (3)PojoMethodMapping方法映射和形參解析
- 2、協議升級(握手)
- (1)WsFilter
- (2)UpgradeUtil#doUpgrade
- (3)Request#upgrade
- (4)回調機制ActionHook#action
- (5)ConnectionHandler#process
- (6)WsHttpUpgradeHandler#init握手成功
- 3、數據傳輸和解析
- (1)接收客戶端消息
- (2)發送消息給客戶端
- 六、要點回顧
- 七、參考文獻
一、前言
WebSocket是一種全雙工通信協議,即客戶端可以向服務端發送請求,服務端也可以主動向客戶端推送數據。這樣的特點,使得它在一些實時性要求比較高的場景效果斐然(比如微信朋友圈實時通知、在線協同編輯等)。主流瀏覽器以及一些常見服務端通信框架(Tomcat、netty、undertow、webLogic等)都對WebSocket進行了技術支持。那么,WebSocket具體是什么?為什么會出現WebSocket?如何做到全雙工通信?解決了什么問題?
二、什么是WebSocket
1、HTTP/1.1的缺陷
HTTP/1.1最初是為網絡中超文本資源(HTML),請求-響應傳輸而設計的,后來支持了傳輸更多類型的資源,如圖片、視頻等,但都沒有改變它單向的請求-響應模式。
隨著互聯網的日益壯大,HTTP/1.1功能使用上已體現捉襟見肘的疲態。雖然可以通過某些方式滿足需求(如Ajax、Comet),但是性能上還是局限于HTTP/1.1,那么HTTP/1.1有哪些缺陷呢:
- 請求-響應模式,只能客戶端發送請求給服務端,服務端才可以發送響應數據給客戶端。
- 傳輸數據為文本格式,且請求/響應頭部冗長重復。
(為了區分HTTP/1.1和HTTP/1.2,下面描述中,HTTP均代表HTTP/1.1)
2、WebSocket發展歷史
(1)背景
在WebSocket出現之前,主要通過長輪詢和HTTP長連接實現實時數據更新,這種方式有個統稱叫Comet,Tomcat8.5之前有對Comet基于流的HTTP長連接做支持,后來因為WebSocket的成熟和標準化,以及Comet自身依然是基于HTTP,在性能消耗和瓶頸上無法跳脫HTTP,就把Comet廢棄了。
還有一個SPDY技術,也對HTTP進行了改進,多路復用流、服務器推送等,后來演化成HTTP/2.0,因為適用場景和解決的問題不同,暫不對HTTP/2.0做過多解釋,不過對于HTTP/2.0和WebSocket在Tomcat實現中都是作為協議升級來處理的。
(Comet和SPDY的原理不是本篇重點,沒有展開講解,感興趣的同學可自行百度)
(2)歷史
在這種背景下,HTML5制定了WebSocket
- 籌備階段,WebSocket被劃分為HTML5標準的一部分,2008年6月,Michael Carter進行了一系列討論,最終形成了稱為WebSocket的協議。
- 2009年12月,Google Chrome 4是第一個提供標準支持的瀏覽器,默認情況下啟用了WebSocket。
- 2010年2月,WebSocket協議的開發從W3C和WHATWG小組轉移到IETF(TheInternet Engineering Task Force),并在Ian Hickson的指導下進行了兩次修訂。
- 2011年,IETF將WebSocket協議標準化為RFC 6455起,大多數Web瀏覽器都在實現支持WebSocket協議的客戶端API。此外,已經開發了許多實現WebSocket協議的Java庫。
- 2013年,發布JSR356標準,Java API for WebSocket。
(為什么要去了解WebSocket的發展歷史和背景呢?個人認為可以更好的理解某個技術實現的演變歷程,比如Tomcat,早期有Comet沒有WebSocket時,Tomcat就對Comet做了支持,后來有WebSocket了,但是還沒出JSR356標準,Tomcat就對Websocket做了支持,自定義API,再后來有了JSR356,Tomcat立馬緊跟潮流,廢棄自定義的API,實現JSR356那一套,這就使得在Tomcat7使用WebSocket的同學,想升為Tomcat8(其實Tomcat7.0.47之后就是JSR356標準了),發現WebSocket接入方式變了,而且一些細節也變了。)
3、WebSocket握手和雙向通信
(1)定義
WebSocket全雙工通信協議,在客戶端和服務端建立連接后,可以持續雙向通信,和HTTP同屬于應用層協議,并且都依賴于傳輸層的TCP/IP協議。
雖然WebSocket有別于HTTP,是一種新協議,但是RFC 6455中規定:
it is designed to work over HTTP ports 80 and 443 as well as to support HTTP proxies and intermediaries.
- WebSocket通過HTTP端口80和443進行工作,并支持HTTP代理和中介,從而使其與HTTP協議兼容。
- 為了實現兼容性,WebSocket握手使用HTTP Upgrade頭從HTTP協議更改為WebSocket協議。
- Websocket使用ws或wss的統一資源標志符(URI),分別對應明文和加密連接。
(2)握手(建立連接)
在雙向通信之前,必須通過握手建立連接。Websocket通過 HTTP/1.1 協議的101狀態碼進行握手,首先客戶端(如瀏覽器)發出帶有特殊消息頭(Upgrade、Connection)的請求到服務器,服務器判斷是否支持升級,支持則返回響應狀態碼101,表示協議升級成功,對于WebSocket就是握手成功。
客戶端請求示例:
GET /test HTTP/1.1 Host: server.example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: tFGdnEL/5fXMS9yKwBjllg== Origin: http://example.com Sec-WebSocket-Protocol: v10.stomp, v11.stomp, v12.stomp Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits Sec-WebSocket-Version: 13- Connection必須設置Upgrade,表示客戶端希望連接升級。
- Upgrade: websocket表明協議升級為websocket。
- Sec-WebSocket-Key字段內記錄著握手過程中必不可少的鍵值,由客戶端(瀏覽器)生成,可以盡量避免普通HTTP請求被誤認為Websocket協議。
- Sec-WebSocket-Version 表示支持的Websocket版本。RFC6455要求使用的版本是13。
- Origin字段是必須的。如果缺少origin字段,WebSocket服務器需要回復HTTP 403 狀態碼(禁止訪問),通過Origin可以做安全校驗。
服務端響應示例:
HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: HaA6EjhHRejpHyuO0yBnY4J4n3A= Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15 Sec-WebSocket-Protocol: v12.stompSec-WebSocket-Accept的字段值是由握手請求中的Sec-WebSocket-Key的字段值生成的。成功握手確立WebSocket連接之后,通信時不再使用HTTP的數據幀,而采用WebSocket獨立的數據幀。
(3)消息幀
WebSocket使用二進制消息幀作為雙向通信的媒介。何為消息幀?發送方將每個應用程序消息拆分為一個或多個幀,通過網絡將它們傳輸到目的地,并重新組裝解析出一個完整消息。
有別于HTTP/1.1文本消息格式(冗長的消息頭和分隔符等),WebSocket消息幀規定一定的格式,以二進制傳輸,更加短小精悍。二者相同之處就是都是基于TCP/IP流式協議(沒有規定消息邊界)。
如下是消息幀的基本結構圖:
- FIN: 1 bit,表示該幀是否為消息的最后一幀。1-是,0-否。
- RSV1,RSV2,RSV3: 1 bit each,預留(3位),擴展的預留標志。一般情況為0,除非協商的擴展定義為非零值。如果接收到非零值且不為協商擴展定義,接收端必須使連接失敗。
- Opcode: 4 bits,定義消息幀的操作類型,如果接收到一個未知Opcode,接收端必須使連接失敗。(0x0-延續幀,0x1-文本幀,0x2-二進制幀,0x8-關閉幀,0x9-PING幀,0xA-PONG幀(在接收到PING幀時,終端必須發送一個PONG幀響應,除非它已經接收到關閉幀),0x3-0x7保留給未來的非控制幀,0xB-F保留給未來的控制幀)
- Mask: 1 bit,表示該幀是否為隱藏的,即被加密保護的。1-是,0-否。Mask=1時,必須傳一個Masking-key,用于解除隱藏(客戶端發送消息給服務器端,Mask必須為1)。
- Payload length: 7 bits, 7+16 bits, or 7+64 bits,有效載荷數據的長度(擴展數據長度+應用數據長度,擴展數據長度可以為0)。
if 0-125, that is the payload length. If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length. If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length.
- Masking-key: 0 or 4 bytes,用于解除幀隱藏(加密)的key,Mask=1時不為空,Mask=0時不用傳。
- Payload data: (x+y) bytes,有效載荷數據包括擴展數據(x bytes)和應用數據(y bytes)。有效載荷數據是用戶真正要傳輸的數據。
這樣的二進制消息幀設計,與HTTP協議相比,WebSocket協議可以提供約500:1的流量減少和3:1的延遲減少。
(4)揮手(關閉連接)
揮手相對于握手要簡單很多,客戶端和服務器端任何一方都可以通過發送關閉幀來發起揮手請求。發送關閉幀的一方,之后不再發送任何數據給對方;接收到關閉幀的一方,如果之前沒有發送過關閉幀,則必須發送一個關閉幀作為響應。關閉幀中可以攜帶關閉原因。
在發送和接收一個關閉幀消息之后,就認為WebSocket連接已關閉,且必須關閉底層TCP連接。
除了通過關閉握手來關閉連接外,WebSocket連接也可能在另一方離開或底層TCP連接關閉時突然關閉。
4、WebSocket優點
-
較少的控制開銷。在連接建立后,服務器和客戶端之間交換數據時,用于協議控制的數據包頭部相對于HTTP請求每次都要攜帶完整的頭部,顯著減少。
-
更強的實時性。由于協議是全雙工的,所以服務器可以隨時主動給客戶端下發數據。相對于HTTP請求需要等待客戶端發起請求服務端才能響應,延遲明顯更少。
-
保持連接狀態。與HTTP不同的是,Websocket需要先建立連接,這就使得其成為一種有狀態的協議,之后通信時可以省略部分狀態信息。而HTTP請求可能需要在每個請求都攜帶狀態信息(如身份認證等)。
-
更好的二進制支持。Websocket定義了二進制幀,相對HTTP,可以更輕松地處理二進制內容。
-
支持擴展。Websocket定義了擴展,用戶可以擴展協議、實現部分自定義的子協議。
-
更好的壓縮效果。相對于HTTP壓縮,Websocket在適當的擴展支持下,可以沿用之前內容的上下文,在傳遞類似的數據時,可以顯著提高壓縮率。
三、Java API for WebSocket(JSR356)
JSR356在Java EE7時歸為Java EE標準的一部分(后來Java EE更名為Jakarta EE,世上再無Java EE,以下統一稱Jakarta EE),所有兼容Jakarta EE的應用服務器,都必須遵循JSR356標準的WebSocket協議API。
根據JSR356規定, 建立WebSocket連接的服務器端和客戶端,兩端對稱,可以互相通信,差異性較小,抽象成API,就是一個個Endpoint(端點),只不過服務器端的叫ServerEndpoint,客戶端的叫ClientEndpoint。客戶端向服務端發送WebSocket握手請求,建立連接后就創建一個ServerEndpoint對象。(這里的Endpoint和Tomcat連接器里的AbstractEndpoint名稱上有點像,但是兩個毫不相干的東西,就像周杰倫和周杰的關系。)
ServerEndpoint和ClientEndpoint在API上差異也很小,有相同的生命周期事件(OnOpen、OnClose、OnError、OnMessage),不同之處是ServerEndpoint作為服務器端點,可以指定一個URI路徑供客戶端連接,ClientEndpoint沒有。
1、服務端API
服務器端的Endpoint有兩種實現方式,一種是注解方式@ServerEndpoint,一種是繼承抽象類Endpoint。
(1)注解方式@ServerEndpoint
首先看看@ServerEndpoint有哪些要素:
- value,可以指定一個URI路徑標識一個Endpoint。
- subprotocols,用戶在WebSocket協議下自定義擴展一些子協議。
- decoders,用戶可以自定義一些消息解碼器,比如通信的消息是一個對象,接收到消息可以自動解碼封裝成消息對象。
- encoders,有解碼器就有編碼器,定義解碼器和編碼器的好處是可以規范使用層消息的傳輸。
- configurator,ServerEndpoint配置類,主要提供ServerEndpoint對象的創建方式擴展(如果使用Tomcat的WebSocket實現,默認是反射創建ServerEndpoint對象)。
@ServerEndpoint可以注解到任何類上,但是想實現服務端的完整功能,還需要配合幾個生命周期的注解使用,這些生命周期注解只能注解在方法上:
- @OnOpen 建立連接時觸發。
- @OnClose 關閉連接時觸發。
- @OnError 發生異常時觸發。
- @OnMessage 接收到消息時觸發。
(2)繼承抽象類Endpoint
繼承抽象類Endpoint,重寫幾個生命周期方法。
怎么沒有onMessage方法,實現onMessage還需要繼承實現一個接口jakarta.websocket.MessageHandler,MessageHandler接口又分為Partial和Whole,實現的MessageHandler需要在onOpen觸發時注冊到jakarta.websocket.Session中。
繼承抽象類Endpoint的方式相對于注解方式要麻煩的多,除了繼承Endpoint和實現接口MessageHandler外,還必須實現一個jakarta.websocket.server.ServerApplicationConfig來管理Endpoint,比如給Endpoint分配URI路徑。
而encoders、decoders、configurator等配置信息由jakarta.websocket.server.ServerEndpointConfig管理,默認實現jakarta.websocket.server.DefaultServerEndpointConfig。
所以如果使用 Java 版WebSocket服務器端實現首推注解方式。
2、客戶端API
對于客戶端API,也是有注解方式和繼承抽象類Endpoint方式。
- 注解方式,只需要將@ServerEndpoint換成@ClientEndpoint。
- 繼承抽象類Endpoint方式,需要一個jakarta.websocket.ClientEndpointConfig來管理encoders、decoders、configurator等配置信息,默認實現jakarta.websocket.DefaultClientEndpointConfig。
3、上下文Session
WebSocket是一個有狀態的連接,建立連接后的通信都是通過jakarta.websocket.Session保持狀態,一個連接一個Session,每一個Session有一個唯一標識Id。
Session的主要職責涉及:
- 基礎信息管理(request信息(getRequestURI、getRequestParameterMap、getPathParameters等)、協議版本getProtocolVersion、子協議getNegotiatedSubprotocol等)。
- 連接管理(狀態判斷isOpen、接收消息的MessageHandler、發送消息的異步遠程端點RemoteEndpoint.Async和同步遠程端點RemoteEndpoint.Basic等)。
4、HandshakeRequest 和 HandshakeResponse
HandshakeRequest 和 HandshakeResponse了解即可,這兩個接口主要用于WebScoket握手升級過程中握手請求響應的封裝,如果只是單純使用WebSocket,不會接觸到這兩個接口。
(1)HandshakeRequest
(2)HandshakeResponse
Sec-WebSocket-Accept根據客戶端傳的Sec-WebSocket-Key生成,如下是Tomcat10.0.6 WebSocket源碼實現中生成Sec-WebSocket-Accept的算法:
private static String getWebSocketAccept(String key) {byte[] digest = ConcurrentMessageDigest.digestSHA1(key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT);return Base64.encodeBase64String(digest); }5、WebSocketContainer
jakarta.websocket.WebSocketContainer顧名思義,就是WebSocket的容器,集大成者。其主要職責包括但不限于connectToServer,客戶端連接服務器端,基于瀏覽器的WebSocket客戶端連接服務器端,由瀏覽器支持,但是基于Java版的WebSocket客戶端就可以通過WebSocketContainer#connectToServer向服務端發起連接請求。
四、WebSocket基于Tomcat應用
(如下使用的是javax.websocket包,未使用最新的jakarta.websocket,主要是測試項目基于SpringBoot+Tomcat9.x的,Java API for WebSocket版本需要保持一致。)
1、服務器端實現
(1)@ServerEndpoint注解方式
import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong;@ServerEndpoint(value = "/ws/test/{userId}", encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}, configurator = MyServerConfigurator.class) public class WebSocketServerEndpoint {private Session session;private String userId;@OnOpenpublic void OnOpen(Session session, @PathParam(value = "userId") String userId) {this.session = session;this.userId = userId;// 建立連接后,將連接存到一個map里endpointMap.put(userId, this);Message message = new Message(0, "connected, hello " + userId);sendMsg(message);}@OnClosepublic void OnClose() {// 關閉連接時觸發,從map中刪除連接endpointMap.remove(userId);System.out.println("server closed...");}@OnMessagepublic void onMessage(Message message) {System.out.println("server recive message=" + message.toString());}@OnErrorpublic void onError(Throwable t) throws Throwable {this.session.close(new CloseReason(CloseReason.CloseCodes.CLOSED_ABNORMALLY, "系統異常"));t.printStackTrace();}/*** 群發* @param data*/public void sendAllMsg(Message data) {for (WebSocketServerEndpoint value : endpointMap.values()) {value.sendMsgAsync(data);}}/*** 推送消息給指定 userId* @param data* @param userId*/public void sendMsg(Message data, String userId) {WebSocketServerEndpoint endpoint = endpointMap.get(userId);if (endpoint == null) {System.out.println("not conected to " + userId);return;}endpoint.sendMsgAsync(data);}private void sendMsg(Message data) {try {this.session.getBasicRemote().sendObject(data);} catch (IOException ioException) {ioException.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}private void sendMsgAsync(Message data) {this.session.getAsyncRemote().sendObject(data);}// 存儲建立連接的Endpointprivate static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>(); }每一個客戶端與服務器端建立連接后,都會生成一個WebSocketServerEndpoint,可以通過一個Map將其與userId對應存起來,為后續群發廣播和單獨推送消息給某個客戶端提供便利。
注意:@ServerEndpoint的encoders、decoders、configurator等配置信息在實際使用中可以不定義,如果項目簡單,完全可以用默認的。
如果通信消息被封裝成一個對象,如示例的Message(因為源碼過于簡單就不展示了,屬性主要有code、msg、data),就必須提供編碼器和解碼器。也可以在每次發送消息時硬編碼轉為字符串,在接收到消息時轉為Message。有了編碼器和解碼器,顯得比較規范,轉為字符串由編碼器做,字符串轉為對象由解碼器做,但也使得架構變復雜了,視項目需求而定。
Configurator的用處就是自定義Endpoint對象創建方式,默認Tomcat提供的是通過反射。WebScoket是每個連接都會創建一個Endpoint對象,如果連接比較多,很頻繁,通過反射創建,用后即毀,可能不是一個好主意,所以可以搞一個對象池,用過回收,用時先從對象池中拿,有就重置,省去實例化分配內存等消耗過程。
如果使用SpringBoot內置Tomcat、undertow、Netty等,接入WebSocket時除了加@ServerEndpoint還需要加一個@Component,再給Spring注冊一個ServerEndpointExporter類,這樣,服務端Endpoint就交由Spring去掃描注冊了。
@Configuration public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {ServerEndpointExporter serverEndpointExporter = new ServerEndpointExporter();return serverEndpointExporter;} }外置Tomcat就不需要這么麻煩,Tomcat會默認掃描classpath下帶有@ServerEndpoint注解的類。(SpringBoot接入Websocket后續會單獨出文章講解,也挺有意思的)
(2)繼承抽象類Endpoint方式
import javax.websocket.*; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap;public class WebSocketServerEndpoint extends Endpoint {private Session session;private String userId;@Overridepublic void onOpen(Session session, EndpointConfig endpointConfig) {this.session = session;this.userId = session.getPathParameters().get("userId");session.addMessageHandler(new MessageHandler());endpointMap.put(userId, this);Message message = new Message(0, "connected, hello " + userId);sendMsg(message);}@Overridepublic void onClose(Session session, CloseReason closeReason) {endpointMap.remove(userId);}@Overridepublic void onError(Session session, Throwable throwable) {throwable.printStackTrace();}/*** 群發* @param data*/public void sendAllMsg(Message data) {for (WebSocketServerEndpoint value : endpointMap.values()) {value.sendMsgAsync(data);}}/*** 推送消息給指定 userId* @param data* @param userId*/public void sendMsg(Message data, String userId) {WebSocketServerEndpoint endpoint = endpointMap.get(userId);if (endpoint == null) {System.out.println("not conected to " + userId);return;}endpoint.sendMsgAsync(data);}private void sendMsg(Message data) {try {this.session.getBasicRemote().sendObject(data);} catch (IOException ioException) {ioException.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}private void sendMsgAsync(Message data) {this.session.getAsyncRemote().sendObject(data);}private class MessageHandler implements javax.websocket.MessageHandler.Whole<Message> {@Overridepublic void onMessage(Message message) {System.out.println("server recive message=" + message.toString());}}private static ConcurrentHashMap<String, WebSocketServerEndpoint> endpointMap = new ConcurrentHashMap<String, WebSocketServerEndpoint>();}繼承抽象類Endpoint方式比加注解@ServerEndpoint方式麻煩的很,主要是需要自己實現MessageHandler和ServerApplicationConfig。@ServerEndpoint的話都是使用默認的,原理上差不多,只是注解更自動化,更簡潔。
MessageHandler做的事情,一個@OnMessage就搞定了,ServerApplicationConfig做的URI映射、decoders、encoders,configurator等,一個@ServerEndpoint就可以了。
import javax.websocket.Decoder; import javax.websocket.Encoder; import javax.websocket.Endpoint; import javax.websocket.server.ServerApplicationConfig; import javax.websocket.server.ServerEndpointConfig; import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set;public class MyServerApplicationConfig implements ServerApplicationConfig {@Overridepublic Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {Set<ServerEndpointConfig> result = new HashSet<ServerEndpointConfig>();List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();decoderList.add(MessageDecoder.class);List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();encoderList.add(MessageEncoder.class);if (set.contains(WebSocketServerEndpoint3.class)) {ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(WebSocketServerEndpoint3.class, "/ws/test3").decoders(decoderList).encoders(encoderList).configurator(new MyServerConfigurator()).build();result.add(serverEndpointConfig);}return result;}@Overridepublic Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {return set;} }如果使用SpringBoot內置Tomcat,則不需要ServerApplicationConfig了,但是需要給Spring注冊一個ServerEndpointConfig。
@Bean public ServerEndpointConfig serverEndpointConfig() {List<Class<? extends Decoder>> decoderList = new ArrayList<Class<? extends Decoder>>();decoderList.add(MessageDecoder.class);List<Class<? extends Encoder>> encoderList = new ArrayList<Class<? extends Encoder>>();encoderList.add(MessageEncoder.class);ServerEndpointConfig serverEndpointConfig = ServerEndpointConfig.Builder.create(WebSocketServerEndpoint3.class, "/ws/test3/{userId}").decoders(decoderList).encoders(encoderList).configurator(new MyServerConfigurator()).build();return serverEndpointConfig; }(3)早期Tomcat7中Server端實現對比
Tomcat7早期版本7.0.47之前還沒有出JSR 356時,自己搞了一套接口,其實就是一個Servlet。
和遵循JSR356標準的版本對比,有一個比較大的變化是,createWebSocketInbound創建生命周期事件處理器StreamInbound的時機是WebSocket協議升級之前,此時還可以通過用戶線程緩存(ThreadLocal等)的HttpServletRequest對象,獲取一些請求頭等信息。
而遵循JSR356標準的版本實現,創建生命周期事件處理的Endpoint是在WebSocket協議升級完成(經過HTTP握手)之后創建的,而WebSocket握手成功給客戶端響應101前,會結束銷毀HttpServletRequest對象,此時是獲取不到請求頭等信息的。
import org.apache.catalina.websocket.StreamInbound; import org.apache.catalina.websocket.WebSocketServlet;import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServletRequest;@WebServlet(urlPatterns = "/ws/test") public class MyWeSocketServlet extends WebSocketServlet {@Overrideprotected StreamInbound createWebSocketInbound(String subProtocol, HttpServletRequest request) {MyMessageInbound messageInbound = new MyMessageInbound(subProtocol, request);return messageInbound;}} import org.apache.catalina.websocket.MessageInbound; import org.apache.catalina.websocket.WsOutbound;import javax.servlet.http.HttpServletRequest; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.CharBuffer;public class MyMessageInbound extends MessageInbound {private String subProtocol;private HttpServletRequest request;public MyMessageInbound(String subProtocol, HttpServletRequest request) {this.subProtocol = subProtocol;this.request = request;}@Overrideprotected void onOpen(WsOutbound outbound) {String msg = "connected, hello";ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());try {outbound.writeBinaryMessage(byteBuffer);} catch (IOException e) {e.printStackTrace();}}@Overrideprotected void onClose(int status) {}@Overrideprotected void onBinaryMessage(ByteBuffer byteBuffer) throws IOException {// 接收到客戶端信息}@Overrideprotected void onTextMessage(CharBuffer charBuffer) throws IOException {// 接收到客戶端信息} }2、客戶端實現
(1)前端js版
js版的客戶端主要依托瀏覽器對WebScoket的支持,在生命周期事件觸發上和服務器端的差不多,這也應證了建立WebSocket連接的兩端是對等的。
編寫WebSocket客戶端需要注意以下幾點:
- 和服務器端商議好傳輸的消息的格式,一般為json字符串,比較直觀,編碼解碼都很簡單,也可以是其他商定的格式。
- 需要心跳檢測,定時給服務器端發送消息,保持連接正常。
- 正常關閉連接,即關閉瀏覽器窗口前主動關閉連接,以免服務器端拋異常。
- 如果因為異常斷開連接,支持重連。
這里推薦一個在線測試WebSocket連接和發送消息的網站easyswoole.com/wstool.html:
真的很牛逼,很方便,很簡單。還有源碼github:https://github.com/easy-swoole/wstool,感興趣可以看看。
(2)@ClientEndpoint注解方式
Java版客戶端不用多說,把@ServerEndpoint換成@ClientEndpoint就可以了,其他都一樣。@ClientEndpoint比@ServerEndpoint就少了一個value,不需要設置URI。
@ClientEndpoint(encoders = {MessageEncoder.class}, decoders = {MessageDecoder.class}) public class WebSocketClientEndpoint {private Session session;@OnOpenpublic void OnOpen(Session session) {this.session = session;Message message = new Message(0, "connecting...");sendMsg(message);}@OnClosepublic void OnClose() {Message message = new Message(0, "client closed...");sendMsg(message);System.out.println("client closed");}@OnMessagepublic void onMessage(Message message) {System.out.println("client recive message=" + message.toString());}@OnErrorpublic void onError(Throwable t) throws Throwable {t.printStackTrace();}public void sendMsg(Message data) {try {this.session.getBasicRemote().sendObject(data);} catch (IOException ioException) {ioException.printStackTrace();} catch (EncodeException e) {e.printStackTrace();}}public void sendMsgAsync(Message data) {this.session.getAsyncRemote().sendObject(data);} }連接服務器端:
WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(WebSocketClientEndpoint.class,new URI("ws://localhost:8080/ws/test"));(3)繼承抽象類Endpoint方式
繼承抽象類Endpoint方式也和服務器端的差不多,但是不需要實現ServerApplicationConfig,需要實例化一個ClientEndpointConfig。Endpoint實現類和服務器端的一樣,就省略了,如下是連接服務器端的代碼:
ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().build(); container.connectToServer(new WebSocketClientEndpoint(),clientEndpointConfig,new URI("ws://localhost:8080/websocket/hello"));3、基于Nginx反向代理注意事項
一般web服務器會用Nginx做反向代理,經過Nginx反向轉發的HTTP請求不會帶上Upgrade和Connection消息頭,所以需要在Nginx配置里顯式指定需要升級為WebSocket的URI帶上這兩個頭:
location /chat/ {proxy_pass http://backend;proxy_http_version 1.1;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection "upgrade";proxy_connect_timeout 4s; proxy_read_timeout 7200s; proxy_send_timeout 12s; }默認情況下,如果代理服務器在60秒內沒有傳輸任何數據,連接將被關閉。這個超時可以通過proxy_read_timeout指令來增加?;蛘?#xff0c;可以將代理服務器配置為定期發送WebSocket PING幀以重置超時并檢查連接是否仍然活躍。
具體可參考:http://nginx.org/en/docs/http/websocket.html
五、WebSocket在Tomcat中的源碼實現
所有兼容Java EE的應用服務器,必須遵循JSR356 WebSocket Java API標準,Tomcat也不例外。而且Tomcat也是支持WebSocket最早的Web應用服務器框架(之一),在還沒有出JSR356標準時,就已經自定義了一套WebSocket API,但是JSR356一出,不得不改弦更張。
通過前面的講解,在使用上完全沒有問題,但是有幾個問題完全是黑盒的:
- Server Endpoint 是如何被掃描加載的?
- WebSocket是如何借助HTTP 進行握手升級的?
- WebSocket建立連接后如何保持連接不斷,互相通信的?
(如下源碼解析,需要對Tomcat連接器源碼有一定了解)
1、WsSci初始化
Tomcat 提供了一個org.apache.tomcat.websocket.server.WsSci類來初始化、加載WebSocket。從類名上顧名思義,利用了Sci加載機制,何為Sci加載機制?就是實現接口 jakarta.servlet.ServletContainerInitializer,在Tomcat部署裝載Web項目(org.apache.catalina.core.StandardContext#startInternal)時主動觸發ServletContainerInitializer#onStartup,做一些擴展的初始化操作。
WsSci主要做了一件事,就是掃描加載Server Endpoint,并將其加到WebSocket容器里jakarta.websocket.WebSocketContainer。
WsSci主要會掃描三種類:
- 加了@ServerEndpoint的類。
- Endpoint的子類。
- ServerApplicationConfig的子類。
(1)WsSci#onStartup
@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class,Endpoint.class}) public class WsSci implements ServletContainerInitializer {@Overridepublic void onStartup(Set<Class<?>> clazzes, ServletContext ctx)throws ServletException {WsServerContainer sc = init(ctx, true);if (clazzes == null || clazzes.size() == 0) {return;}// Group the discovered classes by typeSet<ServerApplicationConfig> serverApplicationConfigs = new HashSet<>();Set<Class<? extends Endpoint>> scannedEndpointClazzes = new HashSet<>();Set<Class<?>> scannedPojoEndpoints = new HashSet<>();try {// wsPackage is "jakarta.websocket."String wsPackage = ContainerProvider.class.getName();wsPackage = wsPackage.substring(0, wsPackage.lastIndexOf('.') + 1);for (Class<?> clazz : clazzes) {JreCompat jreCompat = JreCompat.getInstance();int modifiers = clazz.getModifiers();if (!Modifier.isPublic(modifiers) ||Modifier.isAbstract(modifiers) ||Modifier.isInterface(modifiers) ||!jreCompat.isExported(clazz)) {// Non-public, abstract, interface or not in an exported// package (Java 9+) - skip it.continue;}// Protect against scanning the WebSocket API JARs// 防止掃描WebSocket API jarif (clazz.getName().startsWith(wsPackage)) {continue;}if (ServerApplicationConfig.class.isAssignableFrom(clazz)) {// 1、clazz是ServerApplicationConfig子類serverApplicationConfigs.add((ServerApplicationConfig) clazz.getConstructor().newInstance());}if (Endpoint.class.isAssignableFrom(clazz)) {// 2、clazz是Endpoint子類@SuppressWarnings("unchecked")Class<? extends Endpoint> endpoint =(Class<? extends Endpoint>) clazz;scannedEndpointClazzes.add(endpoint);}if (clazz.isAnnotationPresent(ServerEndpoint.class)) {// 3、clazz是加了注解ServerEndpoint的類scannedPojoEndpoints.add(clazz);}}} catch (ReflectiveOperationException e) {throw new ServletException(e);}// Filter the resultsSet<ServerEndpointConfig> filteredEndpointConfigs = new HashSet<>();Set<Class<?>> filteredPojoEndpoints = new HashSet<>();if (serverApplicationConfigs.isEmpty()) {// 從這里看出@ServerEndpoint的服務器端是可以不用ServerApplicationConfig的filteredPojoEndpoints.addAll(scannedPojoEndpoints);} else {// serverApplicationConfigs不為空,for (ServerApplicationConfig config : serverApplicationConfigs) {Set<ServerEndpointConfig> configFilteredEndpoints =config.getEndpointConfigs(scannedEndpointClazzes);if (configFilteredEndpoints != null) {filteredEndpointConfigs.addAll(configFilteredEndpoints);}// getAnnotatedEndpointClasses 對于 scannedPojoEndpoints起到一個過濾作用// 不滿足條件的后面不加到WsServerContainer里Set<Class<?>> configFilteredPojos =config.getAnnotatedEndpointClasses(scannedPojoEndpoints);if (configFilteredPojos != null) {filteredPojoEndpoints.addAll(configFilteredPojos);}}}try {// 繼承抽象類Endpoint的需要使用者手動封裝成ServerEndpointConfig// 而加了注解@ServerEndpoint的類 Tomcat會自動封裝成ServerEndpointConfig// Deploy endpointsfor (ServerEndpointConfig config : filteredEndpointConfigs) {sc.addEndpoint(config);}// Deploy POJOsfor (Class<?> clazz : filteredPojoEndpoints) {sc.addEndpoint(clazz, true);}} catch (DeploymentException e) {throw new ServletException(e);}}static WsServerContainer init(ServletContext servletContext,boolean initBySciMechanism) {WsServerContainer sc = new WsServerContainer(servletContext);servletContext.setAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc);// 注冊監聽器WsSessionListener給servletContext,// 在http session銷毀時觸發 ws session的關閉銷毀servletContext.addListener(new WsSessionListener(sc));// Can't register the ContextListener again if the ContextListener is// calling this methodif (initBySciMechanism) {// 注冊監聽器WsContextListener給servletContext,// 在 servletContext初始化時觸發WsSci.init// 在 servletContext銷毀時觸發WsServerContainer的銷毀// 不過呢,只在WsSci.onStartup時注冊一次servletContext.addListener(new WsContextListener());}return sc;} }從上述源碼中可以看出ServerApplicationConfig起到一個過濾的作用:
- 當沒有ServerApplicationConfig時,加了@ServerEndpoint的類會默認全部加到一個Set集合(filteredPojoEndpoints),所以加了@ServerEndpoint的類可以不需要自定義實現ServerApplicationConfig。
- 當有ServerApplicationConfig時,ServerApplicationConfig#getEndpointConfigs用來過濾Endpoint子類,并且Endpoint子類必須封裝成一個ServerEndpointConfig。
- ServerApplicationConfig#getAnnotatedEndpointClasses用來過濾加了注解@ServerEndpoint的類,一般空實現就行了(如果不想某個類被加到WsServerContainer里,那不加@ServerEndpoint不就可以了)。
過濾之后的Endpoint子類和加了注解@ServerEndpoint的類會分別調用不同形參的WsServerContainer#addEndpoint,將其加到WsServerContainer里。
(2)WsServerContainer#addEndpoint
- 將Endpoint子類加到WsServerContainer里,調用的是形參為ServerEndpointConfig的addEndpoint:
因為Endpoint子類需要使用者封裝成ServerEndpointConfig,不需要Tomcat來封裝。
- 將加了注解@ServerEndpoint的類加到WsServerContainer,調用的是形參為Class<?>的addEndpoint(fromAnnotatedPojo參數暫時在這個方法里沒什么用處):
該方法主要職責就是解析@ServerEndpoint,獲取path、decoders、encoders、configurator等構建一個ServerEndpointConfig對象
最終調用的都是如下這個比較復雜的方法,fromAnnotatedPojo表示是否是加了@ServerEndpoint的類。主要做了兩件事:
-
對加了@ServerEndpoint類的生命周期方法(@OnOpen、@OnClose、@OnError、@OnMessage)的掃描和映射封裝。
-
對path的有效性檢查和path param解析。
(3)PojoMethodMapping方法映射和形參解析
PojoMethodMapping構造函數比較長,主要是對加了@OnOpen、@OnClose、@OnError、@OnMessage的方法進行校驗和映射,以及對每個方法的形參進行解析和校驗,主要邏輯總結如下:
- 對當前類以及其父類中的方法進行掃描。
- 當前類中不能存在多個相同注解的方法,否則會拋出Duplicate annotation異常。
- 父類和子類中存在相同注解的方法,子類必須重寫該方法,否則會拋出Duplicate annotation異常。
- 對于@OnMessage,可以有多個,但是接收消息的類型必須不同,消息類型大概分為三種:PongMessage心跳消息、字節型、字符型。
- 如果掃描到對的注解都是父類的方法,子類重寫了該方法,但是沒有加響應的注解,則會被清除。
- 形參解析。
雖然方法名可以隨意,但是形參卻有著強制限制:
- @onOpen方法,可以有的參數Session、EndpointConfig、@PathParam,不能有其他參數。
- @onError方法,可以有的參數Session、@PathParam, 必須有Throwable,不能有其他參數。
- @onClose方法,可以有的參數Session, CloseReason, @PathParam,不能有其他參數。
2、協議升級(握手)
Tomcat中WebSocket是通過UpgradeToken機制實現的,其具體的升級處理器為WsHttpUpgradeHandler。WebSocket協議升級的過程比較曲折,首先要通過過濾器WsFilter進行升級判斷,然后調用org.apache.catalina.connector.Request#upgrade進行UpgradeToken的構建,最后通過org.apache.catalina.connector.Request#coyoteRequest回調函數action將UpgradeToken回傳給連接器為后續升級處理做準備。
(1)WsFilter
WebSocket協議升級的過程比較曲折。帶有WebSocket握手的請求會平安經過Tomcat的Connector,被轉發到Servlet容器中,在業務處理之前經過過濾器WsFilter判斷是否需要升級(WsFilter 在 org.apache.catalina.core.ApplicationFilterChain過濾鏈中觸發):
- 首先判斷WsServerContainer是否有進行Endpoint的掃描和注冊以及請頭中是否有Upgrade: websocket。
- 獲取請求path即uri在WsServerContainer中找對應的ServerEndpointConfig。
- 調用UpgradeUtil.doUpgrade進行升級。
(2)UpgradeUtil#doUpgrade
UpgradeUtil#doUpgrade主要做了如下幾件事情:
- 檢查HttpServletRequest的一些請求頭的有效性,如Connection: upgrade、Sec-WebSocket-Version:13、Sec-WebSocket-Key等。
- 給HttpServletResponse設置一些響應頭,如Upgrade:websocket、Connection: upgrade、根據Sec-WebSocket-Key的值生成響應頭Sec-WebSocket-Accept的值。
- 封裝WsHandshakeRequest和WsHandshakeResponse。
- 調用HttpServletRequest#upgrade進行升級,并獲取WsHttpUpgradeHandler(具體的升級流程處理器)。
(3)Request#upgrade
Request#upgrade主要做了三件事:
- 實例化WsHttpUpgradeHandler并構建UpgradeToken。
- 回調coyoteRequest.action,將UpgradeToken回傳給連接器。
- 設置響應碼101。
(4)回調機制ActionHook#action
一些發生在Servlet容器的動作可能需要回傳給連接器做處理,比如WebSocket的握手升級,所以連接器就給org.apache.coyote.Request設置了一個動作鉤子``ActionHook#action。一些動作表示定義在枚舉類ActionCode中,ActionCode.UPGRADE就代表協議升級動作。org.apache.coyote.AbstractProcessor實現了ActionHook接口,ActionCode.UPGRADE動作會調用org.apache.coyote.http11.Http11Processor#doHttpUpgrade,只是簡單將upgradeToken設置給Http11Processor`。
(5)ConnectionHandler#process
Tomcat連接器是同步調用容器業務處理,容器中的業務處理結束后還是回到連接器繼續往下執行。
連接器將請求轉發給容器處理是在適配器里完成的,容器中流程處理結束返回到org.apache.catalina.connector.CoyoteAdapter#service,繼續往下執行,最終結束并回收HttpServletrequest、HttpServletreponse對象。
org.apache.catalina.connector.CoyoteAdapter#service是在org.apache.coyote.http11.Http11Processor#service中調用的,
Http11Processor#service是HTTP請求處理主流程,通過upgradeToken != null來判斷是否為升級操作,s是則返回SocketState.UPGRADING。
最后來到org.apache.coyote.AbstractProtocol.ConnectionHandler#process一個連接處理的主流程,根據Http11Processor#service返回SocketState.UPGRADING來進行升級操作,如下只截取了和WebSocket協議升級相關流程的代碼:
- 獲取UpgradeToken,從中取出HttpUpgradeHandler,對于WebSocket來說是WsHttpUpgradeHandler。
- 調用WsHttpUpgradeHandler#init啟動協議升級處理。
(6)WsHttpUpgradeHandler#init握手成功
走到這里,基本上就是握手成功了,接下來就是創建WsSession和觸發onOpen。
WsSession的構建中會實例化Endpoint,如果實例化出來的對象不是Endpoint類型,即加了@ServerEndpoint的實例對象,則用一個PojoEndpointServer進行包裝,而PojoEndpointServer是繼承了抽象類Endpoint的。
觸發onOpen時會將WsSession傳進去,對于加PojoEndpointServer,因為用戶自定義的方法名和形參不確定,所以通過反射調用用戶自定義的onopen形式的方法,并且會將通過@onMessage解析出的MessageHandler設置給WsSession。
3、數據傳輸和解析
握手成功之后就建立了雙向通信的連接,該連接有別于HTTP/1.1長連接(應用服務器中工作線程循環占用),而是占用一條TCP連接。在連接建立是進行TCP三次握手,之后全雙工互相通信,將不需要再進行耗時的TCP的三次握手和四次揮手,一方需要關閉WebSocket連接時,發送關閉幀,另一方接收到關閉幀之后,也發送個關閉幀作為響應,之后就認為WebSocket連接關閉了,并且關閉底層TCP連接(四次揮手)。
實則WebSocket全雙工是建立在TCP的長鏈接上的,TCP長鏈接長時間沒有消息通信,會定時?;?#xff0c;一般WebSocket會通過代理如nginx等進行連接通信,nginx有一個連接超時沒有任何信息傳輸時,會斷開,所以需要WebSocket一端定時發送心跳保活。
(1)接收客戶端消息
客戶端來了消息,由連接器的Poller輪詢監測socket底層是否有數據到來,有數據可讀,則封裝成一個SocketProcessor扔到線程池里處理,org.apache.coyote.http11.upgrade.UpgradeProcessorInternal#dispatch具有處理升級協議連接,org.apache.tomcat.websocket.server.WsHttpUpgradeHandler#upgradeDispatch是專門處理WebSocket連接的處理器。
org.apache.tomcat.websocket.server.WsFrameServer是對服務器端消息幀處理的封裝,包括讀取底層數據,按消息幀格式解析、拼裝出有效載荷數據,觸發onMessage。
因為源碼篇幅較多,只展示具體源碼調用流程:
(2)發送消息給客戶端
一般,客戶端發送WebSocket握手請求,和服務器端建立連接后,服務器端需要將連接(Endpoint+WsSession)保存起來,為后續主動推送消息給客戶端提供方便。
Tomcat提供了可以發送三種數據類型(文本、二進制、Object對象)和兩種發送方式(同步、異步)的發送消息的方法。
- org.apache.tomcat.websocket.WsRemoteEndpointAsync異步發送。
- org.apache.tomcat.websocket.WsRemoteEndpointBasic 同步發送。
發送消息也同樣需要按消息幀格式封裝,然后通過socket寫到網絡里即可。
六、要點回顧
WebSocket的出現不是空穴來風,起初在HTTP/1.1基礎上通過輪詢和長連接達到信息實時同步的功能,但是這并沒有跳出HTTP/1.1自身的缺陷。HTTP/1.1明顯的兩個缺陷:消息頭冗長且為文本傳輸,請求響應模式。為此,WebSocket誕生了,跳出HTTP/1.1,建立一個新的真正全雙工通信協議。
不僅僅要會在項目中使用WebSocket,還要知道其通信原理和在應用服務器中的實現原理,很多注意事項都是在查閱了官方資源和源碼之后恍然大悟的。
- 在Tomcat中使用WebSocket不可以在Endpoint里獲取緩存的HttpServletRequest對象,因為在WebSocket握手之前,HTTP/1.1請求就算結束了(HttpServletRequest對象被回收),建立連接之后就更是獨立于HTTP/1.1了。
- 建立連接的WebSocket,會生成新的Endpoint和WsSession。
- 使用內置Tomcat需要注意,WsSci做的事情交給了Spring做。
- WebSocket全雙工是建立在TCP長連接的基礎之上。
- … …
七、參考文獻
如若文章有錯誤理解,歡迎批評指正,同時非常期待你的留言和點贊。如果覺得有用,不妨點個在看,讓更多人受益。
總結
以上是生活随笔為你收集整理的WebSocket通信原理和在Tomcat中实现源码详解(万字爆肝)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: XIO: fatal IO error
- 下一篇: Android ORC文字识别之识别身份