1 /** 2 Implements WebSocket support and fallbacks for older browsers. 3 4 Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455) 5 Copyright: © 2012-2014 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Jan Krüger 8 */ 9 module vibe.http.websockets; 10 11 /// 12 @safe unittest { 13 void handleConn(scope WebSocket sock) 14 { 15 // simple echo server 16 while (sock.connected) { 17 auto msg = sock.receiveText(); 18 sock.send(msg); 19 } 20 } 21 22 void startServer() 23 { 24 import vibe.http.router; 25 auto router = new URLRouter; 26 router.get("/ws", handleWebSockets(&handleConn)); 27 28 // Start HTTP server using listenHTTP()... 29 } 30 } 31 32 import vibe.core.core; 33 import vibe.core.log; 34 import vibe.core.net; 35 import vibe.core.sync; 36 import vibe.stream.operations; 37 import vibe.http.server; 38 import vibe.http.client; 39 import vibe.core.connectionpool; 40 41 import core.time; 42 import std.algorithm: equal, splitter; 43 import std.array; 44 import std.base64; 45 import std.conv; 46 import std.exception; 47 import std.bitmanip; 48 import std.digest.sha; 49 import std.string; 50 import std.functional; 51 import std.uuid; 52 import std.base64; 53 import std.digest.sha; 54 import std.uni: asLowerCase; 55 import vibe.crypto.cryptorand; 56 57 @safe: 58 59 60 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow; 61 62 63 /// Exception thrown by $(D vibe.http.websockets). 64 class WebSocketException: Exception 65 { 66 @safe pure nothrow: 67 68 /// 69 this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 70 { 71 super(msg, file, line, next); 72 } 73 74 /// 75 this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__) 76 { 77 super(msg, next, file, line); 78 } 79 } 80 81 /** Establishes a WebSocket connection at the specified endpoint. 82 */ 83 WebSocket connectWebSocketEx(URL url, 84 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 85 const(HTTPClientSettings) settings = defaultSettings) 86 @safe { 87 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 88 url.schema = use_tls ? "https" : "http"; 89 90 auto rng = secureRNG(); 91 auto challengeKey = generateChallengeKey(rng); 92 auto answerKey = computeAcceptKey(challengeKey); 93 auto res = requestHTTP(url, (scope req){ 94 req.method = HTTPMethod.GET; 95 req.headers["Upgrade"] = "websocket"; 96 req.headers["Connection"] = "Upgrade"; 97 req.headers["Sec-WebSocket-Version"] = "13"; 98 req.headers["Sec-WebSocket-Key"] = challengeKey; 99 request_modifier(req); 100 }, settings); 101 102 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 103 104 auto key = "sec-websocket-accept" in res.headers; 105 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 106 enforce(*key == answerKey, "Response has wrong accept key"); 107 auto conn = res.switchProtocol("websocket"); 108 return new WebSocket(conn, rng, res); 109 } 110 111 /// ditto 112 void connectWebSocketEx(URL url, 113 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 114 scope WebSocketHandshakeDelegate del, 115 const(HTTPClientSettings) settings = defaultSettings) 116 @safe { 117 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 118 url.schema = use_tls ? "https" : "http"; 119 120 /*scope*/auto rng = secureRNG(); 121 auto challengeKey = generateChallengeKey(rng); 122 auto answerKey = computeAcceptKey(challengeKey); 123 124 requestHTTP(url, 125 (scope req) { 126 req.method = HTTPMethod.GET; 127 req.headers["Upgrade"] = "websocket"; 128 req.headers["Connection"] = "Upgrade"; 129 req.headers["Sec-WebSocket-Version"] = "13"; 130 req.headers["Sec-WebSocket-Key"] = challengeKey; 131 request_modifier(req); 132 }, 133 (scope res) { 134 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 135 auto key = "sec-websocket-accept" in res.headers; 136 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 137 enforce(*key == answerKey, "Response has wrong accept key"); 138 res.switchProtocol("websocket", (scope conn) @trusted { 139 scope ws = new WebSocket(conn, rng, res); 140 del(ws); 141 if (ws.connected) ws.close(); 142 }); 143 }, 144 settings 145 ); 146 } 147 148 /// ditto 149 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings) 150 @safe { 151 return connectWebSocketEx(url, (scope req) {}, settings); 152 } 153 /// ditto 154 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings) 155 @safe { 156 connectWebSocketEx(url, (scope req) {}, del, settings); 157 } 158 /// ditto 159 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings) 160 @system { 161 connectWebSocket(url, (scope ws) nothrow { 162 try del(ws); 163 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 164 }, settings); 165 } 166 /// Scheduled for deprecation - use a `@safe` callback instead. 167 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings) 168 @system { 169 connectWebSocket(url, (scope ws) @trusted => del(ws), settings); 170 } 171 /// Scheduled for deprecation - use a `nothrow` callback instead. 172 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings) 173 @safe { 174 connectWebSocket(url, (scope ws) nothrow { 175 try del(ws); 176 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 177 }, settings); 178 } 179 180 181 /** 182 Establishes a web socket conection and passes it to the $(D on_handshake) delegate. 183 */ 184 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 185 @safe { 186 auto pUpgrade = "Upgrade" in req.headers; 187 auto pConnection = "Connection" in req.headers; 188 auto pKey = "Sec-WebSocket-Key" in req.headers; 189 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 190 auto pVersion = "Sec-WebSocket-Version" in req.headers; 191 192 auto isUpgrade = false; 193 194 if( pConnection ) { 195 auto connectionTypes = splitter(*pConnection, ","); 196 foreach( t ; connectionTypes ) { 197 if( t.strip().asLowerCase().equal("upgrade") ) { 198 isUpgrade = true; 199 break; 200 } 201 } 202 } 203 204 string req_error; 205 if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests."; 206 else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header."; 207 else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported."; 208 else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header."; 209 210 if (req_error.length) { 211 logDebug("Browser sent invalid WebSocket request: %s", req_error); 212 res.statusCode = HTTPStatus.badRequest; 213 res.writeBody(req_error); 214 return; 215 } 216 217 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 218 res.headers["Sec-WebSocket-Accept"] = accept; 219 res.headers["Connection"] = "Upgrade"; 220 ConnectionStream conn = res.switchProtocol("websocket"); 221 222 // NOTE: silencing scope warning here - WebSocket references the scoped 223 // req/res objects throughout its lifetime, which has a narrower scope 224 scope socket = () @trusted { return new WebSocket(conn, req, res); } (); 225 try { 226 on_handshake(socket); 227 } catch (Exception e) { 228 logDiagnostic("WebSocket handler failed: %s", e.msg); 229 } 230 socket.close(); 231 } 232 /// Scheduled for deprecation - use a `@safe` callback instead. 233 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 234 @system { 235 handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res); 236 } 237 /// Scheduled for deprecation - use a `nothrow` callback instead. 238 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 239 { 240 handleWebSocket((scope ws) nothrow { 241 try on_handshake(ws); 242 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 243 }, req, res); 244 } 245 /// ditto 246 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 247 @system { 248 handleWebSocket((scope ws) nothrow { 249 try on_handshake(ws); 250 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 251 }, req, res); 252 } 253 254 255 /** 256 Returns a HTTP request handler that establishes web socket conections. 257 */ 258 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake) 259 @safe { 260 return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ()); 261 } 262 /// ditto 263 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake) 264 @safe { 265 void callback(scope HTTPServerRequest req, scope HTTPServerResponse res) 266 @safe { 267 auto pUpgrade = "Upgrade" in req.headers; 268 auto pConnection = "Connection" in req.headers; 269 auto pKey = "Sec-WebSocket-Key" in req.headers; 270 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 271 auto pVersion = "Sec-WebSocket-Version" in req.headers; 272 273 auto isUpgrade = false; 274 275 if( pConnection ) { 276 auto connectionTypes = splitter(*pConnection, ","); 277 foreach( t ; connectionTypes ) { 278 if( t.strip().asLowerCase().equal("upgrade") ) { 279 isUpgrade = true; 280 break; 281 } 282 } 283 } 284 if( !(isUpgrade && 285 pUpgrade && icmp(*pUpgrade, "websocket") == 0 && 286 pKey && 287 pVersion && *pVersion == "13") ) 288 { 289 logDebug("Browser sent invalid WebSocket request."); 290 res.statusCode = HTTPStatus.badRequest; 291 res.writeVoidBody(); 292 return; 293 } 294 295 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 296 res.headers["Sec-WebSocket-Accept"] = accept; 297 res.headers["Connection"] = "Upgrade"; 298 res.switchProtocol("websocket", (scope conn) { 299 // TODO: put back 'scope' once it is actually enforced by DMD 300 /*scope*/ auto socket = new WebSocket(conn, req, res); 301 try on_handshake(socket); 302 catch (Exception e) { 303 logDiagnostic("WebSocket handler failed: %s", e.msg); 304 } 305 socket.close(); 306 }); 307 } 308 return &callback; 309 } 310 /// Scheduled for deprecation - use a `@safe` callback instead. 311 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake) 312 @system { 313 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 314 } 315 /// Scheduled for deprecation - use a `@safe` callback instead. 316 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake) 317 @system { 318 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 319 } 320 /// Scheduled for deprecation - use a `nothrow` callback instead. 321 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake) 322 { 323 return handleWebSockets(delegate (scope ws) nothrow { 324 try on_handshake(ws); 325 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 326 }); 327 } 328 /// ditto 329 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake) 330 { 331 return handleWebSockets(delegate (scope ws) nothrow { 332 try on_handshake(ws); 333 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 334 }); 335 } 336 /// ditto 337 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake) 338 @system { 339 return handleWebSockets(delegate (scope ws) nothrow { 340 try on_handshake(ws); 341 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 342 }); 343 } 344 /// ditto 345 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake) 346 @system { 347 return handleWebSockets(delegate (scope ws) nothrow { 348 try on_handshake(ws); 349 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 350 }); 351 } 352 353 /** 354 * Provides the reason that a websocket connection has closed. 355 * 356 * Further documentation for the WebSocket and it's codes can be found from: 357 * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent 358 * 359 * --- 360 * 361 * void echoSocket(scope WebSocket sock) 362 * { 363 * import std.datetime : seconds; 364 * 365 * while(sock.waitForData(3.seconds)) 366 * { 367 * string msg = sock.receiveText; 368 * logInfo("Got a message: %s", msg); 369 * sock.send(msg); 370 * } 371 * 372 * if(sock.connected) 373 * sock.close(WebSocketCloseReason.policyViolation, "timeout"); 374 * } 375 * --- 376 */ 377 enum WebSocketCloseReason : short 378 { 379 none = 0, 380 normalClosure = 1000, 381 goingAway = 1001, 382 protocolError = 1002, 383 unsupportedData = 1003, 384 noStatusReceived = 1005, 385 abnormalClosure = 1006, 386 invalidFramePayloadData = 1007, 387 policyViolation = 1008, 388 messageTooBig = 1009, 389 internalError = 1011, 390 serviceRestart = 1012, 391 tryAgainLater = 1013, 392 badGateway = 1014, 393 tlsHandshake = 1015 394 } 395 396 string closeReasonString(WebSocketCloseReason reason) @nogc @safe 397 { 398 import std.math : floor; 399 400 //round down to the nearest thousand to get category 401 switch(cast(short)(cast(float)reason / 1000f).floor) 402 { 403 case 0: 404 return "Reserved and Unused"; 405 case 1: 406 switch(reason) 407 { 408 case 1000: 409 return "Normal Closure"; 410 case 1001: 411 return "Going Away"; 412 case 1002: 413 return "Protocol Error"; 414 case 1003: 415 return "Unsupported Data"; 416 case 1004: 417 return "RESERVED"; 418 case 1005: 419 return "No Status Recvd"; 420 case 1006: 421 return "Abnormal Closure"; 422 case 1007: 423 return "Invalid Frame Payload Data"; 424 case 1008: 425 return "Policy Violation"; 426 case 1009: 427 return "Message Too Big"; 428 case 1010: 429 return "Missing Extension"; 430 case 1011: 431 return "Internal Error"; 432 case 1012: 433 return "Service Restart"; 434 case 1013: 435 return "Try Again Later"; 436 case 1014: 437 return "Bad Gateway"; 438 case 1015: 439 return "TLS Handshake"; 440 default: 441 return "RESERVED"; 442 } 443 case 2: 444 return "Reserved for extensions"; 445 case 3: 446 return "Available for frameworks and libraries"; 447 case 4: 448 return "Available for applications"; 449 default: 450 return "UNDEFINED - Nasal Demons"; 451 } 452 } 453 454 unittest 455 { 456 assert((cast(WebSocketCloseReason) 0).closeReasonString == "Reserved and Unused"); 457 assert((cast(WebSocketCloseReason) 1).closeReasonString == "Reserved and Unused"); 458 assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure"); 459 assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure"); 460 assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED"); 461 assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions"); 462 assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries"); 463 assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications"); 464 assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons"); 465 assert((cast(WebSocketCloseReason) -1).closeReasonString == "UNDEFINED - Nasal Demons"); 466 467 //check the other spec cases 468 for(short i = 1000; i < 1017; i++) 469 { 470 if(i == 1004 || i > 1015) 471 { 472 assert( 473 (cast(WebSocketCloseReason)i).closeReasonString == "RESERVED", 474 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 475 ); 476 } 477 else 478 assert( 479 (cast(WebSocketCloseReason)i).closeReasonString != "RESERVED", 480 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 481 ); 482 } 483 } 484 485 486 /** 487 * Represents a single _WebSocket connection. 488 * 489 * --- 490 * int main (string[] args) 491 * { 492 * auto taskHandle = runTask(() => connectToWS()); 493 * return runApplication(&args); 494 * } 495 * 496 * void connectToWS () 497 * { 498 * auto ws_url = URL("wss://websockets.example.com/websocket/auth_token"); 499 * auto ws = connectWebSocket(ws_url); 500 * logInfo("WebSocket connected"); 501 * 502 * while (ws.waitForData()) 503 * { 504 * auto txt = ws.receiveText; 505 * logInfo("Received: %s", txt); 506 * } 507 * logFatal("Connection lost!"); 508 * } 509 * --- 510 */ 511 final class WebSocket { 512 @safe: 513 514 private { 515 ConnectionStream m_conn; 516 bool m_sentCloseFrame = false; 517 IncomingWebSocketMessage m_nextMessage = null; 518 const HTTPServerRequest m_request; 519 HTTPServerResponse m_serverResponse; 520 HTTPClientResponse m_clientResponse; 521 Task m_reader; 522 Task m_ownerTask; 523 InterruptibleTaskMutex m_readMutex, m_writeMutex; 524 InterruptibleTaskCondition m_readCondition; 525 Timer m_pingTimer; 526 uint m_lastPingIndex; 527 bool m_pongReceived; 528 short m_closeCode; 529 const(char)[] m_closeReason; 530 /// The entropy generator to use 531 /// If not null, it means this is a server socket. 532 RandomNumberStream m_rng; 533 } 534 535 scope: 536 537 /** 538 * Private constructor, called from `connectWebSocket`. 539 * 540 * Params: 541 * conn = Underlying connection string 542 * request = HTTP request used to establish the connection 543 * rng = Source of entropy to use. If null, assume we're a server socket 544 * client_res = For client sockets, the response object (keeps the http client locked until the socket is done) 545 */ 546 private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res) 547 { 548 m_ownerTask = Task.getThis(); 549 m_conn = conn; 550 m_request = request; 551 m_clientResponse = client_res; 552 m_serverResponse = server_res; 553 assert(m_conn); 554 m_rng = rng; 555 m_writeMutex = new InterruptibleTaskMutex; 556 m_readMutex = new InterruptibleTaskMutex; 557 m_readCondition = new InterruptibleTaskCondition(m_readMutex); 558 m_readMutex.performLocked!({ 559 // NOTE: Silencing scope warning here - m_reader MUST be stopped 560 // before the end of the lifetime of the WebSocket object, 561 // which is done in the mandatory call to close(). 562 // The same goes for m_pingTimer below. 563 m_reader = () @trusted { return runTask(&startReader); } (); 564 if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) { 565 m_pongReceived = true; 566 m_pingTimer = () @trusted { return setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); } (); 567 } 568 }); 569 } 570 571 private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res) 572 { 573 this(conn, null, null, rng, client_res); 574 } 575 576 private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res) 577 { 578 this(conn, request, res, null, null); 579 } 580 581 /** 582 Determines if the WebSocket connection is still alive and ready for sending. 583 584 Note that for determining the ready state for $(EM reading), you need 585 to use $(D waitForData) instead, because both methods can return 586 different values while a disconnect is in proress. 587 588 See_also: $(D waitForData) 589 */ 590 @property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; } 591 592 /** 593 Returns the close code sent by the remote end. 594 595 Note if the connection was never opened, is still alive, or was closed 596 locally this value will be 0. If no close code was given by the remote 597 end in the close frame, the value will be 1005. If the connection was 598 not closed cleanly by the remote end, this value will be 1006. 599 */ 600 @property short closeCode() { return m_closeCode; } 601 602 /** 603 Returns the close reason sent by the remote end. 604 605 Note if the connection was never opened, is still alive, or was closed 606 locally this value will be an empty string. 607 */ 608 @property const(char)[] closeReason() { return m_closeReason; } 609 610 /** 611 The HTTP request that established the web socket connection. 612 */ 613 @property const(HTTPServerRequest) request() const { return m_request; } 614 615 /** 616 Checks if data is readily available for read. 617 */ 618 @property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; } 619 620 /** Waits until either a message arrives or until the connection is closed. 621 622 This function can be used in a read loop to cleanly determine when to stop reading. 623 */ 624 bool waitForData() 625 { 626 if (m_nextMessage) return true; 627 628 m_readMutex.performLocked!({ 629 while (connected && m_nextMessage is null) 630 m_readCondition.wait(); 631 }); 632 return m_nextMessage !is null; 633 } 634 635 /// ditto 636 bool waitForData(Duration timeout) 637 { 638 import std.datetime; 639 640 if (m_nextMessage) return true; 641 642 immutable limit_time = Clock.currTime(UTC()) + timeout; 643 644 m_readMutex.performLocked!({ 645 while (connected && m_nextMessage is null && timeout > 0.seconds) { 646 m_readCondition.wait(timeout); 647 timeout = limit_time - Clock.currTime(UTC()); 648 } 649 }); 650 return m_nextMessage !is null; 651 } 652 653 /** 654 Sends a text message. 655 656 On the JavaScript side, the text will be available as message.data (type string). 657 658 Throws: 659 A `WebSocketException` is thrown if the connection gets closed 660 before or during the transfer of the message. 661 */ 662 void send(scope const(char)[] data) 663 { 664 send( 665 (scope message) { message.write(cast(const ubyte[])data); }, 666 FrameOpcode.text); 667 } 668 669 /** 670 Sends a binary message. 671 672 On the JavaScript side, the text will be available as message.data (type Blob). 673 674 Throws: 675 A `WebSocketException` is thrown if the connection gets closed 676 before or during the transfer of the message. 677 */ 678 void send(in ubyte[] data) 679 { 680 send((scope message){ message.write(data); }, FrameOpcode.binary); 681 } 682 683 /** 684 Sends a message using an output stream. 685 686 Throws: 687 A `WebSocketException` is thrown if the connection gets closed 688 before or during the transfer of the message. 689 */ 690 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode) 691 { 692 m_writeMutex.performLocked!({ 693 enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed."); 694 /*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng); 695 scope(exit) message.finalize(); 696 sender(message); 697 }); 698 } 699 700 /** 701 Actively closes the connection. 702 703 Params: 704 code = Numeric code indicating a termination reason. 705 reason = Message describing why the connection was terminated. 706 */ 707 void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "") 708 { 709 import std.algorithm.comparison : min; 710 if(reason !is null && reason.length == 0) 711 reason = (cast(WebSocketCloseReason)code).closeReasonString; 712 713 //control frame payloads are limited to 125 bytes 714 version(assert) 715 assert(reason.length <= 123); 716 else 717 reason = reason[0 .. min($, 123)]; 718 719 if (connected) { 720 try { 721 send((scope msg) { 722 m_sentCloseFrame = true; 723 if (code != 0) { 724 msg.write(std.bitmanip.nativeToBigEndian(code)); 725 msg.write(cast(const ubyte[])reason); 726 } 727 }, FrameOpcode.close); 728 } catch (Exception e) { 729 logDiagnostic("Failed to send active web socket close frame: %s", e.msg); 730 } 731 } 732 if (m_pingTimer) m_pingTimer.stop(); 733 734 735 if (Task.getThis() == m_ownerTask) { 736 m_writeMutex.performLocked!({ 737 if (m_clientResponse) { 738 m_clientResponse.disconnect(); 739 m_clientResponse = HTTPClientResponse.init; 740 } 741 if (m_serverResponse) { 742 m_serverResponse.finalize(); 743 m_serverResponse = HTTPServerResponse.init; 744 } 745 }); 746 747 m_reader.join(); 748 749 () @trusted { destroy(m_conn); } (); 750 m_conn = ConnectionStream.init; 751 } 752 } 753 754 /** 755 Receives a new message and returns its contents as a newly allocated data array. 756 757 Params: 758 strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise. 759 Throws: WebSocketException if the connection is closed or 760 if $(D strict == true) and the frame received is not the right type 761 */ 762 ubyte[] receiveBinary(bool strict = true) 763 { 764 ubyte[] ret; 765 receive((scope message){ 766 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary, 767 "Expected a binary message, got "~message.frameOpcode.to!string()); 768 ret = message.readAll(); 769 }); 770 return ret; 771 } 772 /// ditto 773 string receiveText(bool strict = true) 774 { 775 string ret; 776 receive((scope message){ 777 enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text, 778 "Expected a text message, got "~message.frameOpcode.to!string()); 779 ret = message.readAllUTF8(); 780 }); 781 return ret; 782 } 783 784 /** 785 Receives a new message using an InputStream. 786 Throws: WebSocketException if the connection is closed. 787 */ 788 void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver) 789 { 790 m_readMutex.performLocked!({ 791 while (!m_nextMessage) { 792 enforce!WebSocketException(connected, "Connection closed while reading message."); 793 m_readCondition.wait(); 794 } 795 receiver(m_nextMessage); 796 m_nextMessage = null; 797 m_readCondition.notifyAll(); 798 }); 799 } 800 801 private void startReader() 802 nothrow { 803 try m_readMutex.performLocked!({}); //Wait until initialization 804 catch (Exception e) { 805 logException(e, "WebSocket reader task failed to wait for initialization"); 806 try m_conn.close(); 807 catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure"); 808 m_closeCode = WebSocketCloseReason.abnormalClosure; 809 try m_readCondition.notifyAll(); 810 catch (Exception e) assert(false, e.msg); 811 return; 812 } 813 814 try { 815 loop: 816 while (!m_conn.empty) { 817 assert(!m_nextMessage); 818 /*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng); 819 820 switch (msg.frameOpcode) { 821 default: throw new WebSocketException("unknown frame opcode"); 822 case FrameOpcode.ping: 823 send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong); 824 break; 825 case FrameOpcode.pong: 826 // test if pong matches previous ping 827 if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) { 828 logDebugV("Received PONG that doesn't match previous ping."); 829 break; 830 } 831 logDebugV("Received matching PONG."); 832 m_pongReceived = true; 833 break; 834 case FrameOpcode.close: 835 logDebug("Got closing frame (%s)", m_sentCloseFrame); 836 837 // If no close code was passed, we default to 1005 838 this.m_closeCode = WebSocketCloseReason.noStatusReceived; 839 840 // If provided in the frame, attempt to parse the close code/reason 841 if (msg.peek().length >= short.sizeof) { 842 this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]); 843 844 if (msg.peek().length > short.sizeof) { 845 this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$]; 846 } 847 } 848 849 if(!m_sentCloseFrame) close(); 850 logDebug("Terminating connection (%s)", m_sentCloseFrame); 851 break loop; 852 case FrameOpcode.text: 853 case FrameOpcode.binary: 854 case FrameOpcode.continuation: // FIXME: add proper support for continuation frames! 855 m_readMutex.performLocked!({ 856 m_nextMessage = msg; 857 m_readCondition.notifyAll(); 858 while (m_nextMessage) m_readCondition.wait(); 859 }); 860 break; 861 } 862 } 863 } catch (Exception e) { 864 logDiagnostic("Error while reading websocket message: %s", e.msg); 865 logDiagnostic("Closing connection."); 866 } 867 868 // If no close code was passed, e.g. this was an unclean termination 869 // of our websocket connection, set the close code to 1006. 870 if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure; 871 872 try m_conn.close(); 873 catch (Exception e) logException(e, "Failed to close WebSocket connection"); 874 try m_readCondition.notifyAll(); 875 catch (Exception e) assert(false, e.msg); 876 } 877 878 private void sendPing() 879 nothrow { 880 try { 881 if (!m_pongReceived) { 882 logDebug("Pong skipped. Closing connection."); 883 close(); 884 m_pingTimer.stop(); 885 return; 886 } 887 m_pongReceived = false; 888 send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping); 889 logDebugV("Ping sent"); 890 } catch (Exception e) { 891 logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg); 892 } 893 } 894 } 895 896 /** 897 Represents a single outgoing _WebSocket message as an OutputStream. 898 */ 899 final class OutgoingWebSocketMessage : OutputStream { 900 @safe: 901 private { 902 RandomNumberStream m_rng; 903 Stream m_conn; 904 FrameOpcode m_frameOpcode; 905 Appender!(ubyte[]) m_buffer; 906 bool m_finalized = false; 907 } 908 909 private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng) 910 { 911 assert(conn !is null); 912 m_conn = conn; 913 m_frameOpcode = frameOpcode; 914 m_rng = rng; 915 } 916 917 static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) { 918 override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 919 } else { 920 override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); } 921 } 922 923 alias write = OutputStream.write; 924 925 private size_t doWrite(scope const(ubyte)[] bytes, IOMode mode) 926 { 927 assert(!m_finalized); 928 929 if (!m_buffer.data.length) { 930 ubyte[Frame.maxHeaderSize] header_padding; 931 m_buffer.put(header_padding[]); 932 } 933 934 m_buffer.put(bytes); 935 return bytes.length; 936 } 937 938 void flush() 939 { 940 assert(!m_finalized); 941 if (m_buffer.data.length > 0) 942 sendFrame(false); 943 } 944 945 void finalize() 946 { 947 if (m_finalized) return; 948 m_finalized = true; 949 sendFrame(true); 950 } 951 952 private void sendFrame(bool fin) 953 { 954 if (!m_buffer.data.length) 955 write(null, IOMode.once); 956 957 assert(m_buffer.data.length >= Frame.maxHeaderSize); 958 959 Frame frame; 960 frame.fin = fin; 961 frame.opcode = m_frameOpcode; 962 frame.payload = m_buffer.data[Frame.maxHeaderSize .. $]; 963 auto hsize = frame.getHeaderSize(m_rng !is null); 964 auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $]; 965 frame.writeHeader(msg[0 .. hsize], m_rng); 966 m_conn.write(msg); 967 m_conn.flush(); 968 m_buffer.clear(); 969 } 970 971 alias write = OutputStream.write; 972 } 973 974 975 /** 976 Represents a single incoming _WebSocket message as an InputStream. 977 */ 978 final class IncomingWebSocketMessage : InputStream { 979 @safe: 980 private { 981 RandomNumberStream m_rng; 982 Stream m_conn; 983 Frame m_currentFrame; 984 } 985 986 private this(Stream conn, RandomNumberStream rng) 987 { 988 assert(conn !is null); 989 m_conn = conn; 990 m_rng = rng; 991 skipFrame(); // reads the first frame 992 } 993 994 @property bool empty() const { return m_currentFrame.payload.length == 0; } 995 996 @property ulong leastSize() const { return m_currentFrame.payload.length; } 997 998 @property bool dataAvailableForRead() { return true; } 999 1000 /// The frame type for this nessage; 1001 @property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; } 1002 1003 const(ubyte)[] peek() { return m_currentFrame.payload; } 1004 1005 /** 1006 * Retrieve the next websocket frame of the stream and discard the current 1007 * one 1008 * 1009 * This function is helpful if one wish to process frames by frames, 1010 * or minimize memory allocation, as `peek` will only return the current 1011 * frame data, and read requires a pre-allocated buffer. 1012 * 1013 * Returns: 1014 * `false` if the current frame is the final one, `true` if a new frame 1015 * was read. 1016 */ 1017 bool skipFrame() 1018 { 1019 if (m_currentFrame.fin) 1020 return false; 1021 1022 m_currentFrame = Frame.readFrame(m_conn); 1023 return true; 1024 } 1025 1026 size_t read(scope ubyte[] dst, IOMode mode) 1027 { 1028 size_t nread = 0; 1029 1030 while (dst.length > 0) { 1031 enforce!WebSocketException(!empty , "cannot read from empty stream"); 1032 enforce!WebSocketException(leastSize > 0, "no data available" ); 1033 1034 import std.algorithm : min; 1035 auto sz = cast(size_t)min(leastSize, dst.length); 1036 dst[0 .. sz] = m_currentFrame.payload[0 .. sz]; 1037 dst = dst[sz .. $]; 1038 m_currentFrame.payload = m_currentFrame.payload[sz .. $]; 1039 nread += sz; 1040 1041 if (leastSize == 0) { 1042 if (mode == IOMode.immediate || mode == IOMode.once && nread > 0) 1043 break; 1044 this.skipFrame(); 1045 } 1046 } 1047 1048 return nread; 1049 } 1050 1051 alias read = InputStream.read; 1052 } 1053 1054 /// Magic string defined by the RFC for challenging the server during upgrade 1055 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 1056 1057 1058 /** 1059 * The Opcode is 4 bits, as defined in Section 5.2 1060 * 1061 * Values are defined in section 11.8 1062 * Currently only 6 values are defined, however the opcode is defined as 1063 * taking 4 bits. 1064 */ 1065 public enum FrameOpcode : ubyte { 1066 continuation = 0x0, 1067 text = 0x1, 1068 binary = 0x2, 1069 close = 0x8, 1070 ping = 0x9, 1071 pong = 0xA 1072 } 1073 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits"); 1074 1075 1076 private struct Frame { 1077 @safe: 1078 enum maxHeaderSize = 14; 1079 1080 bool fin; 1081 FrameOpcode opcode; 1082 ubyte[] payload; 1083 1084 /** 1085 * Return the header length encoded with the expected amount of bits 1086 * 1087 * The WebSocket RFC define a variable-length payload length. 1088 * In short, it means that: 1089 * - If the length is <= 125, it is stored as the 7 least significant 1090 * bits of the second header byte. The first bit is reserved for MASK. 1091 * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of 1092 * 126 is stored in the aforementioned 7 bits, and the actual length 1093 * is stored in the next two bytes, resulting in a 4 bytes header 1094 * ( + masking key, if any). 1095 * - If the length is > 65_536, a magic value of 127 will be used for 1096 * the 7-bit field, and the next 8 bytes are expected to be the length, 1097 * resulting in a 10 bytes header ( + masking key, if any). 1098 * 1099 * Those functions encapsulate all this logic and allow to just get the 1100 * length with the desired size. 1101 * 1102 * Return: 1103 * - For `ubyte`, the value to store in the 7 bits field, either the 1104 * length or a magic value (126 or 127). 1105 * - For `ushort`, a value in the range [126; 65_536]. 1106 * If payload.length is not in this bound, an assertion will be triggered. 1107 * - For `ulong`, a value in the range [65_537; size_t.max]. 1108 * If payload.length is not in this bound, an assertion will be triggered. 1109 */ 1110 size_t getHeaderSize(bool mask) 1111 { 1112 size_t ret = 1; 1113 if (payload.length < 126) ret += 1; 1114 else if (payload.length < 65536) ret += 3; 1115 else ret += 9; 1116 if (mask) ret += 4; 1117 return ret; 1118 } 1119 1120 void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) 1121 { 1122 ubyte[4] buff; 1123 ubyte firstByte = cast(ubyte)opcode; 1124 if (fin) firstByte |= 0x80; 1125 dst[0] = firstByte; 1126 dst = dst[1 .. $]; 1127 1128 auto b1 = sys_rng ? 0x80 : 0x00; 1129 1130 if (payload.length < 126) { 1131 dst[0] = cast(ubyte)(b1 | payload.length); 1132 dst = dst[1 .. $]; 1133 } else if (payload.length < 65536) { 1134 dst[0] = cast(ubyte) (b1 | 126); 1135 dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length); 1136 dst = dst[3 .. $]; 1137 } else { 1138 dst[0] = cast(ubyte) (b1 | 127); 1139 dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length); 1140 dst = dst[9 .. $]; 1141 } 1142 1143 if (sys_rng) { 1144 sys_rng.read(dst[0 .. 4]); 1145 for (size_t i = 0; i < payload.length; i++) 1146 payload[i] ^= dst[i % 4]; 1147 } 1148 } 1149 1150 static Frame readFrame(InputStream stream) 1151 { 1152 Frame frame; 1153 ubyte[8] data; 1154 1155 stream.read(data[0 .. 2]); 1156 frame.fin = (data[0] & 0x80) != 0; 1157 frame.opcode = cast(FrameOpcode)(data[0] & 0x0F); 1158 1159 bool masked = !!(data[1] & 0b1000_0000); 1160 1161 //parsing length 1162 ulong length = data[1] & 0b0111_1111; 1163 if (length == 126) { 1164 stream.read(data[0 .. 2]); 1165 length = bigEndianToNative!ushort(data[0 .. 2]); 1166 } else if (length == 127) { 1167 stream.read(data); 1168 length = bigEndianToNative!ulong(data); 1169 1170 // RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes 1171 // interpreted as a 64-bit unsigned integer (the most significant 1172 // bit MUST be 0) 1173 enforce!WebSocketException(!(length >> 63), 1174 "Received length has a non-zero most significant bit"); 1175 1176 } 1177 logDebug("Read frame: %s %s %s length=%d", 1178 frame.opcode, 1179 frame.fin ? "final frame" : "continuation", 1180 masked ? "masked" : "not masked", 1181 length); 1182 1183 // Masking key is 32 bits / uint 1184 if (masked) 1185 stream.read(data[0 .. 4]); 1186 1187 // Read payload 1188 // TODO: Provide a way to limit the size read, easy 1189 // DOS for server code here (rejectedsoftware/vibe.d#1496). 1190 enforce!WebSocketException(length <= size_t.max); 1191 frame.payload = new ubyte[](cast(size_t)length); 1192 stream.read(frame.payload); 1193 1194 //de-masking 1195 if (masked) 1196 foreach (size_t i; 0 .. cast(size_t)length) 1197 frame.payload[i] = frame.payload[i] ^ data[i % 4]; 1198 1199 return frame; 1200 } 1201 } 1202 1203 unittest { 1204 import std.algorithm.searching : all; 1205 1206 final class DummyRNG : RandomNumberStream { 1207 @safe: 1208 @property bool empty() { return false; } 1209 @property ulong leastSize() { return ulong.max; } 1210 @property bool dataAvailableForRead() { return true; } 1211 const(ubyte)[] peek() { return null; } 1212 size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; } 1213 alias read = RandomNumberStream.read; 1214 } 1215 1216 ubyte[14] hdrbuf; 1217 auto rng = new DummyRNG; 1218 1219 Frame f; 1220 f.payload = new ubyte[125]; 1221 1222 assert(f.getHeaderSize(false) == 2); 1223 hdrbuf[] = 0; 1224 f.writeHeader(hdrbuf[0 .. 2], null); 1225 assert(hdrbuf[0 .. 2] == [0, 125]); 1226 1227 assert(f.getHeaderSize(true) == 6); 1228 hdrbuf[] = 0; 1229 f.writeHeader(hdrbuf[0 .. 6], rng); 1230 assert(hdrbuf[0 .. 2] == [0, 128|125]); 1231 assert(hdrbuf[2 .. 6].all!(b => b == 13)); 1232 1233 f.payload = new ubyte[126]; 1234 assert(f.getHeaderSize(false) == 4); 1235 hdrbuf[] = 0; 1236 f.writeHeader(hdrbuf[0 .. 4], null); 1237 assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]); 1238 1239 assert(f.getHeaderSize(true) == 8); 1240 hdrbuf[] = 0; 1241 f.writeHeader(hdrbuf[0 .. 8], rng); 1242 assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]); 1243 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1244 1245 f.payload = new ubyte[65535]; 1246 assert(f.getHeaderSize(false) == 4); 1247 hdrbuf[] = 0; 1248 f.writeHeader(hdrbuf[0 .. 4], null); 1249 assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]); 1250 1251 assert(f.getHeaderSize(true) == 8); 1252 hdrbuf[] = 0; 1253 f.writeHeader(hdrbuf[0 .. 8], rng); 1254 assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]); 1255 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1256 1257 f.payload = new ubyte[65536]; 1258 assert(f.getHeaderSize(false) == 10); 1259 hdrbuf[] = 0; 1260 f.writeHeader(hdrbuf[0 .. 10], null); 1261 assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]); 1262 1263 assert(f.getHeaderSize(true) == 14); 1264 hdrbuf[] = 0; 1265 f.writeHeader(hdrbuf[0 .. 14], rng); 1266 assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]); 1267 assert(hdrbuf[10 .. 14].all!(b => b == 13)); 1268 } 1269 1270 /** 1271 * Generate a challenge key for the protocol upgrade phase. 1272 */ 1273 private string generateChallengeKey(RandomNumberStream rng) 1274 { 1275 ubyte[16] buffer; 1276 rng.read(buffer); 1277 return Base64.encode(buffer); 1278 } 1279 1280 private string computeAcceptKey(string challengekey) 1281 { 1282 immutable(ubyte)[] b = challengekey.representation; 1283 immutable(ubyte)[] a = s_webSocketGuid.representation; 1284 SHA1 hash; 1285 hash.start(); 1286 hash.put(b); 1287 hash.put(a); 1288 auto result = Base64.encode(hash.finish()); 1289 return to!(string)(result); 1290 }