1 /** 2 Implements WebSocket support and fallbacks for older browsers. 3 4 Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455) 5 Copyright: © 2012-2014 Sönke Ludwig 6 License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file. 7 Authors: Jan Krüger 8 */ 9 module vibe.http.websockets; 10 11 /// 12 @safe unittest { 13 void handleConn(scope WebSocket sock) 14 { 15 // simple echo server 16 while (sock.connected) { 17 auto msg = sock.receiveText(); 18 sock.send(msg); 19 } 20 } 21 22 void startServer() 23 { 24 import vibe.http.router; 25 auto router = new URLRouter; 26 router.get("/ws", handleWebSockets(&handleConn)); 27 28 // Start HTTP server using listenHTTP()... 29 } 30 } 31 32 import vibe.core.core; 33 import vibe.core.log; 34 import vibe.core.net; 35 import vibe.core.sync; 36 import vibe.stream.operations; 37 import vibe.http.server; 38 import vibe.http.client; 39 import vibe.core.connectionpool; 40 import vibe.utils.array; 41 static import vibe.internal.exception; 42 43 import core.time; 44 import std.algorithm: equal, splitter; 45 import std.array; 46 import std.base64; 47 import std.conv; 48 import std.exception; 49 import std.bitmanip; 50 import std.digest.sha; 51 import std..string; 52 import std.functional; 53 import std.uuid; 54 import std.base64; 55 import std.digest.sha; 56 import std.uni: asLowerCase; 57 import vibe.crypto.cryptorand; 58 59 @safe: 60 61 62 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow; 63 64 65 /// Exception thrown by $(D vibe.http.websockets). 66 class WebSocketException: Exception 67 { 68 @safe pure nothrow: 69 70 /// 71 this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 72 { 73 super(msg, file, line, next); 74 } 75 76 /// 77 this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__) 78 { 79 super(msg, next, file, line); 80 } 81 } 82 83 /** Establishes a WebSocket connection at the specified endpoint. 84 */ 85 WebSocket connectWebSocketEx(URL url, 86 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 87 const(HTTPClientSettings) settings = defaultSettings) 88 @safe { 89 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 90 url.schema = use_tls ? "https" : "http"; 91 92 auto rng = secureRNG(); 93 auto challengeKey = generateChallengeKey(rng); 94 auto answerKey = computeAcceptKey(challengeKey); 95 auto res = requestHTTP(url, (scope req){ 96 req.method = HTTPMethod.GET; 97 req.headers["Upgrade"] = "websocket"; 98 req.headers["Connection"] = "Upgrade"; 99 req.headers["Sec-WebSocket-Version"] = "13"; 100 req.headers["Sec-WebSocket-Key"] = challengeKey; 101 request_modifier(req); 102 }, settings); 103 104 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 105 106 auto key = "sec-websocket-accept" in res.headers; 107 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 108 enforce(*key == answerKey, "Response has wrong accept key"); 109 auto conn = res.switchProtocol("websocket"); 110 return new WebSocket(conn, rng, res); 111 } 112 113 /// ditto 114 void connectWebSocketEx(URL url, 115 scope void delegate(scope HTTPClientRequest) @safe request_modifier, 116 scope WebSocketHandshakeDelegate del, 117 const(HTTPClientSettings) settings = defaultSettings) 118 @safe { 119 const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false; 120 url.schema = use_tls ? "https" : "http"; 121 122 /*scope*/auto rng = secureRNG(); 123 auto challengeKey = generateChallengeKey(rng); 124 auto answerKey = computeAcceptKey(challengeKey); 125 126 requestHTTP(url, 127 (scope req) { 128 req.method = HTTPMethod.GET; 129 req.headers["Upgrade"] = "websocket"; 130 req.headers["Connection"] = "Upgrade"; 131 req.headers["Sec-WebSocket-Version"] = "13"; 132 req.headers["Sec-WebSocket-Key"] = challengeKey; 133 request_modifier(req); 134 }, 135 (scope res) { 136 enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request."); 137 auto key = "sec-websocket-accept" in res.headers; 138 enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header."); 139 enforce(*key == answerKey, "Response has wrong accept key"); 140 res.switchProtocol("websocket", (scope conn) @trusted { 141 scope ws = new WebSocket(conn, rng, res); 142 del(ws); 143 if (ws.connected) ws.close(); 144 }); 145 }, 146 settings 147 ); 148 } 149 150 /// ditto 151 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings) 152 @safe { 153 return connectWebSocketEx(url, (scope req) {}, settings); 154 } 155 /// ditto 156 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings) 157 @safe { 158 connectWebSocketEx(url, (scope req) {}, del, settings); 159 } 160 /// ditto 161 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings) 162 @system { 163 connectWebSocket(url, (scope ws) nothrow { 164 try del(ws); 165 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 166 }, settings); 167 } 168 /// Scheduled for deprecation - use a `@safe` callback instead. 169 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings) 170 @system { 171 connectWebSocket(url, (scope ws) @trusted => del(ws), settings); 172 } 173 /// Scheduled for deprecation - use a `nothrow` callback instead. 174 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings) 175 @safe { 176 connectWebSocket(url, (scope ws) nothrow { 177 try del(ws); 178 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 179 }, settings); 180 } 181 182 183 /** 184 Establishes a web socket conection and passes it to the $(D on_handshake) delegate. 185 */ 186 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 187 @safe { 188 auto pUpgrade = "Upgrade" in req.headers; 189 auto pConnection = "Connection" in req.headers; 190 auto pKey = "Sec-WebSocket-Key" in req.headers; 191 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 192 auto pVersion = "Sec-WebSocket-Version" in req.headers; 193 194 auto isUpgrade = false; 195 196 if( pConnection ) { 197 auto connectionTypes = splitter(*pConnection, ","); 198 foreach( t ; connectionTypes ) { 199 if( t.strip().asLowerCase().equal("upgrade") ) { 200 isUpgrade = true; 201 break; 202 } 203 } 204 } 205 206 string req_error; 207 if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests."; 208 else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header."; 209 else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported."; 210 else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header."; 211 212 if (req_error.length) { 213 logDebug("Browser sent invalid WebSocket request: %s", req_error); 214 res.statusCode = HTTPStatus.badRequest; 215 res.writeBody(req_error); 216 return; 217 } 218 219 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 220 res.headers["Sec-WebSocket-Accept"] = accept; 221 res.headers["Connection"] = "Upgrade"; 222 ConnectionStream conn = res.switchProtocol("websocket"); 223 224 WebSocket socket = new WebSocket(conn, req, res); 225 try { 226 on_handshake(socket); 227 } catch (Exception e) { 228 logDiagnostic("WebSocket handler failed: %s", e.msg); 229 } 230 socket.close(); 231 } 232 /// Scheduled for deprecation - use a `@safe` callback instead. 233 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 234 @system { 235 handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res); 236 } 237 /// Scheduled for deprecation - use a `nothrow` callback instead. 238 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 239 { 240 handleWebSocket((scope ws) nothrow { 241 try on_handshake(ws); 242 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 243 }, req, res); 244 } 245 /// ditto 246 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res) 247 @system { 248 handleWebSocket((scope ws) nothrow { 249 try on_handshake(ws); 250 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 251 }, req, res); 252 } 253 254 255 /** 256 Returns a HTTP request handler that establishes web socket conections. 257 */ 258 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake) 259 @safe { 260 return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ()); 261 } 262 /// ditto 263 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake) 264 @safe { 265 void callback(scope HTTPServerRequest req, scope HTTPServerResponse res) 266 @safe { 267 auto pUpgrade = "Upgrade" in req.headers; 268 auto pConnection = "Connection" in req.headers; 269 auto pKey = "Sec-WebSocket-Key" in req.headers; 270 //auto pProtocol = "Sec-WebSocket-Protocol" in req.headers; 271 auto pVersion = "Sec-WebSocket-Version" in req.headers; 272 273 auto isUpgrade = false; 274 275 if( pConnection ) { 276 auto connectionTypes = splitter(*pConnection, ","); 277 foreach( t ; connectionTypes ) { 278 if( t.strip().asLowerCase().equal("upgrade") ) { 279 isUpgrade = true; 280 break; 281 } 282 } 283 } 284 if( !(isUpgrade && 285 pUpgrade && icmp(*pUpgrade, "websocket") == 0 && 286 pKey && 287 pVersion && *pVersion == "13") ) 288 { 289 logDebug("Browser sent invalid WebSocket request."); 290 res.statusCode = HTTPStatus.badRequest; 291 res.writeVoidBody(); 292 return; 293 } 294 295 auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } (); 296 res.headers["Sec-WebSocket-Accept"] = accept; 297 res.headers["Connection"] = "Upgrade"; 298 res.switchProtocol("websocket", (scope conn) { 299 // TODO: put back 'scope' once it is actually enforced by DMD 300 /*scope*/ auto socket = new WebSocket(conn, req, res); 301 try on_handshake(socket); 302 catch (Exception e) { 303 logDiagnostic("WebSocket handler failed: %s", e.msg); 304 } 305 socket.close(); 306 }); 307 } 308 return &callback; 309 } 310 /// Scheduled for deprecation - use a `@safe` callback instead. 311 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake) 312 @system { 313 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 314 } 315 /// Scheduled for deprecation - use a `@safe` callback instead. 316 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake) 317 @system { 318 return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws)); 319 } 320 /// Scheduled for deprecation - use a `nothrow` callback instead. 321 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake) 322 { 323 return handleWebSockets(delegate (scope ws) nothrow { 324 try on_handshake(ws); 325 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 326 }); 327 } 328 /// ditto 329 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake) 330 { 331 return handleWebSockets(delegate (scope ws) nothrow { 332 try on_handshake(ws); 333 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 334 }); 335 } 336 /// ditto 337 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake) 338 @system { 339 return handleWebSockets(delegate (scope ws) nothrow { 340 try on_handshake(ws); 341 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 342 }); 343 } 344 /// ditto 345 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake) 346 @system { 347 return handleWebSockets(delegate (scope ws) nothrow { 348 try on_handshake(ws); 349 catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg); 350 }); 351 } 352 353 /** 354 * Provides the reason that a websocket connection has closed. 355 * 356 * Further documentation for the WebSocket and it's codes can be found from: 357 * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent 358 * 359 * --- 360 * 361 * void echoSocket(scope WebSocket sock) 362 * { 363 * import std.datetime : seconds; 364 * 365 * while(sock.waitForData(3.seconds)) 366 * { 367 * string msg = sock.receiveText; 368 * logInfo("Got a message: %s", msg); 369 * sock.send(msg); 370 * } 371 * 372 * if(sock.connected) 373 * sock.close(WebSocketCloseReason.policyViolation, "timeout"); 374 * } 375 * --- 376 */ 377 enum WebSocketCloseReason : short 378 { 379 none = 0, 380 normalClosure = 1000, 381 goingAway = 1001, 382 protocolError = 1002, 383 unsupportedData = 1003, 384 noStatusReceived = 1005, 385 abnormalClosure = 1006, 386 invalidFramePayloadData = 1007, 387 policyViolation = 1008, 388 messageTooBig = 1009, 389 internalError = 1011, 390 serviceRestart = 1012, 391 tryAgainLater = 1013, 392 badGateway = 1014, 393 tlsHandshake = 1015 394 } 395 396 string closeReasonString(WebSocketCloseReason reason) @nogc @safe 397 { 398 import std.math : floor; 399 400 //round down to the nearest thousand to get category 401 switch(cast(short)(cast(float)reason / 1000f).floor) 402 { 403 case 0: 404 return "Reserved and Unused"; 405 case 1: 406 switch(reason) 407 { 408 case 1000: 409 return "Normal Closure"; 410 case 1001: 411 return "Going Away"; 412 case 1002: 413 return "Protocol Error"; 414 case 1003: 415 return "Unsupported Data"; 416 case 1004: 417 return "RESERVED"; 418 case 1005: 419 return "No Status Recvd"; 420 case 1006: 421 return "Abnormal Closure"; 422 case 1007: 423 return "Invalid Frame Payload Data"; 424 case 1008: 425 return "Policy Violation"; 426 case 1009: 427 return "Message Too Big"; 428 case 1010: 429 return "Missing Extension"; 430 case 1011: 431 return "Internal Error"; 432 case 1012: 433 return "Service Restart"; 434 case 1013: 435 return "Try Again Later"; 436 case 1014: 437 return "Bad Gateway"; 438 case 1015: 439 return "TLS Handshake"; 440 default: 441 return "RESERVED"; 442 } 443 case 2: 444 return "Reserved for extensions"; 445 case 3: 446 return "Available for frameworks and libraries"; 447 case 4: 448 return "Available for applications"; 449 default: 450 return "UNDEFINED - Nasal Demons"; 451 } 452 } 453 454 unittest 455 { 456 assert((cast(WebSocketCloseReason) 0).closeReasonString == "Reserved and Unused"); 457 assert((cast(WebSocketCloseReason) 1).closeReasonString == "Reserved and Unused"); 458 assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure"); 459 assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure"); 460 assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED"); 461 assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions"); 462 assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries"); 463 assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications"); 464 assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons"); 465 assert((cast(WebSocketCloseReason) -1).closeReasonString == "UNDEFINED - Nasal Demons"); 466 467 //check the other spec cases 468 for(short i = 1000; i < 1017; i++) 469 { 470 if(i == 1004 || i > 1015) 471 { 472 assert( 473 (cast(WebSocketCloseReason)i).closeReasonString == "RESERVED", 474 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 475 ); 476 } 477 else 478 assert( 479 (cast(WebSocketCloseReason)i).closeReasonString != "RESERVED", 480 "(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i)) 481 ); 482 } 483 } 484 485 486 /** 487 * Represents a single _WebSocket connection. 488 * 489 * --- 490 * shared static this () 491 * { 492 * runTask(() => connectToWS()); 493 * } 494 * 495 * void connectToWS () 496 * { 497 * auto ws_url = URL("wss://websockets.example.com/websocket/auth_token"); 498 * auto ws = connectWebSocket(ws_url); 499 * logInfo("WebSocket connected"); 500 * 501 * while (ws.waitForData()) 502 * { 503 * auto txt = ws.receiveText; 504 * logInfo("Received: %s", txt); 505 * } 506 * logFatal("Connection lost!"); 507 * } 508 * --- 509 */ 510 final class WebSocket { 511 @safe: 512 513 private { 514 ConnectionStream m_conn; 515 bool m_sentCloseFrame = false; 516 IncomingWebSocketMessage m_nextMessage = null; 517 const HTTPServerRequest m_request; 518 HTTPServerResponse m_serverResponse; 519 HTTPClientResponse m_clientResponse; 520 Task m_reader; 521 Task m_ownerTask; 522 InterruptibleTaskMutex m_readMutex, m_writeMutex; 523 InterruptibleTaskCondition m_readCondition; 524 Timer m_pingTimer; 525 uint m_lastPingIndex; 526 bool m_pongReceived; 527 short m_closeCode; 528 const(char)[] m_closeReason; 529 /// The entropy generator to use 530 /// If not null, it means this is a server socket. 531 RandomNumberStream m_rng; 532 } 533 534 /** 535 * Private constructor, called from `connectWebSocket`. 536 * 537 * Params: 538 * conn = Underlying connection string 539 * request = HTTP request used to establish the connection 540 * rng = Source of entropy to use. If null, assume we're a server socket 541 * client_res = For client sockets, the response object (keeps the http client locked until the socket is done) 542 */ 543 private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res) 544 { 545 m_ownerTask = Task.getThis(); 546 m_conn = conn; 547 m_request = request; 548 m_clientResponse = client_res; 549 m_serverResponse = server_res; 550 assert(m_conn); 551 m_rng = rng; 552 m_writeMutex = new InterruptibleTaskMutex; 553 m_readMutex = new InterruptibleTaskMutex; 554 m_readCondition = new InterruptibleTaskCondition(m_readMutex); 555 m_readMutex.performLocked!({ 556 m_reader = runTask(&startReader); 557 if (request && request.serverSettings.webSocketPingInterval != Duration.zero) { 558 m_pongReceived = true; 559 m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); 560 } 561 }); 562 } 563 564 private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res) 565 { 566 this(conn, HTTPServerRequest.init, HTTPServerResponse.init, rng, client_res); 567 } 568 569 private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res) 570 { 571 this(conn, request, res, RandomNumberStream.init, HTTPClientResponse.init); 572 } 573 574 /** 575 Determines if the WebSocket connection is still alive and ready for sending. 576 577 Note that for determining the ready state for $(EM reading), you need 578 to use $(D waitForData) instead, because both methods can return 579 different values while a disconnect is in proress. 580 581 See_also: $(D waitForData) 582 */ 583 @property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; } 584 585 /** 586 Returns the close code sent by the remote end. 587 588 Note if the connection was never opened, is still alive, or was closed 589 locally this value will be 0. If no close code was given by the remote 590 end in the close frame, the value will be 1005. If the connection was 591 not closed cleanly by the remote end, this value will be 1006. 592 */ 593 @property short closeCode() { return m_closeCode; } 594 595 /** 596 Returns the close reason sent by the remote end. 597 598 Note if the connection was never opened, is still alive, or was closed 599 locally this value will be an empty string. 600 */ 601 @property const(char)[] closeReason() { return m_closeReason; } 602 603 /** 604 The HTTP request that established the web socket connection. 605 */ 606 @property const(HTTPServerRequest) request() const { return m_request; } 607 608 /** 609 Checks if data is readily available for read. 610 */ 611 @property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; } 612 613 /** Waits until either a message arrives or until the connection is closed. 614 615 This function can be used in a read loop to cleanly determine when to stop reading. 616 */ 617 bool waitForData() 618 { 619 if (m_nextMessage) return true; 620 621 m_readMutex.performLocked!({ 622 while (connected && m_nextMessage is null) 623 m_readCondition.wait(); 624 }); 625 return m_nextMessage !is null; 626 } 627 628 /// ditto 629 bool waitForData(Duration timeout) 630 { 631 import std.datetime; 632 633 if (m_nextMessage) return true; 634 635 immutable limit_time = Clock.currTime(UTC()) + timeout; 636 637 m_readMutex.performLocked!({ 638 while (connected && m_nextMessage is null && timeout > 0.seconds) { 639 m_readCondition.wait(timeout); 640 timeout = limit_time - Clock.currTime(UTC()); 641 } 642 }); 643 return m_nextMessage !is null; 644 } 645 646 /** 647 Sends a text message. 648 649 On the JavaScript side, the text will be available as message.data (type string). 650 651 Throws: 652 A `WebSocketException` is thrown if the connection gets closed 653 before or during the transfer of the message. 654 */ 655 void send(scope const(char)[] data) 656 { 657 send( 658 (scope message) { message.write(cast(const ubyte[])data); }, 659 FrameOpcode.text); 660 } 661 662 /** 663 Sends a binary message. 664 665 On the JavaScript side, the text will be available as message.data (type Blob). 666 667 Throws: 668 A `WebSocketException` is thrown if the connection gets closed 669 before or during the transfer of the message. 670 */ 671 void send(in ubyte[] data) 672 { 673 send((scope message){ message.write(data); }, FrameOpcode.binary); 674 } 675 676 /** 677 Sends a message using an output stream. 678 679 Throws: 680 A `WebSocketException` is thrown if the connection gets closed 681 before or during the transfer of the message. 682 */ 683 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode) 684 { 685 m_writeMutex.performLocked!({ 686 vibe.internal.exception.enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed."); 687 /*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng); 688 scope(exit) message.finalize(); 689 sender(message); 690 }); 691 } 692 693 /// Compatibility overload - will be removed soon. 694 deprecated("Call the overload which requires an explicit FrameOpcode.") 695 void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender) 696 { 697 send(sender, FrameOpcode.text); 698 } 699 700 /** 701 Actively closes the connection. 702 703 Params: 704 code = Numeric code indicating a termination reason. 705 reason = Message describing why the connection was terminated. 706 */ 707 void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "") 708 { 709 import std.algorithm.comparison : min; 710 if(reason !is null && reason.length == 0) 711 reason = (cast(WebSocketCloseReason)code).closeReasonString; 712 713 //control frame payloads are limited to 125 bytes 714 version(assert) 715 assert(reason.length <= 123); 716 else 717 reason = reason[0 .. min($, 123)]; 718 719 if (connected) { 720 try { 721 send((scope msg) { 722 m_sentCloseFrame = true; 723 if (code != 0) { 724 msg.write(std.bitmanip.nativeToBigEndian(code)); 725 msg.write(cast(const ubyte[])reason); 726 } 727 }, FrameOpcode.close); 728 } catch (Exception e) { 729 logDiagnostic("Failed to send active web socket close frame: %s", e.msg); 730 } 731 } 732 if (m_pingTimer) m_pingTimer.stop(); 733 734 735 if (Task.getThis() == m_ownerTask) { 736 m_writeMutex.performLocked!({ 737 if (m_clientResponse) { 738 m_clientResponse.disconnect(); 739 m_clientResponse = HTTPClientResponse.init; 740 } 741 if (m_serverResponse) { 742 m_serverResponse.finalize(); 743 m_serverResponse = HTTPServerResponse.init; 744 } 745 }); 746 747 m_reader.join(); 748 749 () @trusted { destroy(m_conn); } (); 750 m_conn = ConnectionStream.init; 751 } 752 } 753 754 /** 755 Receives a new message and returns its contents as a newly allocated data array. 756 757 Params: 758 strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise. 759 Throws: WebSocketException if the connection is closed or 760 if $(D strict == true) and the frame received is not the right type 761 */ 762 ubyte[] receiveBinary(bool strict = true) 763 { 764 ubyte[] ret; 765 receive((scope message){ 766 vibe.internal.exception.enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary, 767 "Expected a binary message, got "~message.frameOpcode.to!string()); 768 ret = message.readAll(); 769 }); 770 return ret; 771 } 772 /// ditto 773 string receiveText(bool strict = true) 774 { 775 string ret; 776 receive((scope message){ 777 vibe.internal.exception.enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text, 778 "Expected a text message, got "~message.frameOpcode.to!string()); 779 ret = message.readAllUTF8(); 780 }); 781 return ret; 782 } 783 784 /** 785 Receives a new message using an InputStream. 786 Throws: WebSocketException if the connection is closed. 787 */ 788 void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver) 789 { 790 m_readMutex.performLocked!({ 791 while (!m_nextMessage) { 792 vibe.internal.exception.enforce!WebSocketException(connected, "Connection closed while reading message."); 793 m_readCondition.wait(); 794 } 795 receiver(m_nextMessage); 796 m_nextMessage = null; 797 m_readCondition.notifyAll(); 798 }); 799 } 800 801 private void startReader() 802 { 803 m_readMutex.performLocked!({}); //Wait until initialization 804 scope (exit) { 805 m_conn.close(); 806 m_readCondition.notifyAll(); 807 } 808 try { 809 while (!m_conn.empty) { 810 assert(!m_nextMessage); 811 /*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng); 812 813 switch (msg.frameOpcode) { 814 default: throw new WebSocketException("unknown frame opcode"); 815 case FrameOpcode.ping: 816 send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong); 817 break; 818 case FrameOpcode.pong: 819 // test if pong matches previous ping 820 if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) { 821 logDebugV("Received PONG that doesn't match previous ping."); 822 break; 823 } 824 logDebugV("Received matching PONG."); 825 m_pongReceived = true; 826 break; 827 case FrameOpcode.close: 828 logDebug("Got closing frame (%s)", m_sentCloseFrame); 829 830 // If no close code was passed, we default to 1005 831 this.m_closeCode = WebSocketCloseReason.noStatusReceived; 832 833 // If provided in the frame, attempt to parse the close code/reason 834 if (msg.peek().length >= short.sizeof) { 835 this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]); 836 837 if (msg.peek().length > short.sizeof) { 838 this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$]; 839 } 840 } 841 842 if(!m_sentCloseFrame) close(); 843 logDebug("Terminating connection (%s)", m_sentCloseFrame); 844 return; 845 case FrameOpcode.text: 846 case FrameOpcode.binary: 847 case FrameOpcode.continuation: // FIXME: add proper support for continuation frames! 848 m_readMutex.performLocked!({ 849 m_nextMessage = msg; 850 m_readCondition.notifyAll(); 851 while (m_nextMessage) m_readCondition.wait(); 852 }); 853 break; 854 } 855 } 856 } catch (Exception e) { 857 logDiagnostic("Error while reading websocket message: %s", e.msg); 858 logDiagnostic("Closing connection."); 859 } 860 861 // If no close code was passed, e.g. this was an unclean termination 862 // of our websocket connection, set the close code to 1006. 863 if (this.m_closeCode == 0) this.m_closeCode = WebSocketCloseReason.abnormalClosure; 864 } 865 866 private void sendPing() 867 nothrow { 868 try { 869 if (!m_pongReceived) { 870 logDebug("Pong skipped. Closing connection."); 871 close(); 872 m_pingTimer.stop(); 873 return; 874 } 875 m_pongReceived = false; 876 send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping); 877 logDebugV("Ping sent"); 878 } catch (Exception e) { 879 logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg); 880 } 881 } 882 } 883 884 /** 885 Represents a single outgoing _WebSocket message as an OutputStream. 886 */ 887 final class OutgoingWebSocketMessage : OutputStream { 888 @safe: 889 private { 890 RandomNumberStream m_rng; 891 Stream m_conn; 892 FrameOpcode m_frameOpcode; 893 Appender!(ubyte[]) m_buffer; 894 bool m_finalized = false; 895 } 896 897 private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng) 898 { 899 assert(conn !is null); 900 m_conn = conn; 901 m_frameOpcode = frameOpcode; 902 m_rng = rng; 903 } 904 905 size_t write(in ubyte[] bytes, IOMode mode) 906 { 907 assert(!m_finalized); 908 909 if (!m_buffer.data.length) { 910 ubyte[Frame.maxHeaderSize] header_padding; 911 m_buffer.put(header_padding[]); 912 } 913 914 m_buffer.put(bytes); 915 return bytes.length; 916 } 917 918 void flush() 919 { 920 assert(!m_finalized); 921 if (m_buffer.data.length > 0) 922 sendFrame(false); 923 } 924 925 void finalize() 926 { 927 if (m_finalized) return; 928 m_finalized = true; 929 sendFrame(true); 930 } 931 932 private void sendFrame(bool fin) 933 { 934 if (!m_buffer.data.length) 935 write(null, IOMode.once); 936 937 assert(m_buffer.data.length >= Frame.maxHeaderSize); 938 939 Frame frame; 940 frame.fin = fin; 941 frame.opcode = m_frameOpcode; 942 frame.payload = m_buffer.data[Frame.maxHeaderSize .. $]; 943 auto hsize = frame.getHeaderSize(m_rng !is null); 944 auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $]; 945 frame.writeHeader(msg[0 .. hsize], m_rng); 946 m_conn.write(msg); 947 m_conn.flush(); 948 m_buffer.clear(); 949 } 950 951 alias write = OutputStream.write; 952 } 953 954 955 /** 956 Represents a single incoming _WebSocket message as an InputStream. 957 */ 958 final class IncomingWebSocketMessage : InputStream { 959 @safe: 960 private { 961 RandomNumberStream m_rng; 962 Stream m_conn; 963 Frame m_currentFrame; 964 } 965 966 private this(Stream conn, RandomNumberStream rng) 967 { 968 assert(conn !is null); 969 m_conn = conn; 970 m_rng = rng; 971 skipFrame(); // reads the first frame 972 } 973 974 @property bool empty() const { return m_currentFrame.payload.length == 0; } 975 976 @property ulong leastSize() const { return m_currentFrame.payload.length; } 977 978 @property bool dataAvailableForRead() { return true; } 979 980 /// The frame type for this nessage; 981 @property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; } 982 983 const(ubyte)[] peek() { return m_currentFrame.payload; } 984 985 /** 986 * Retrieve the next websocket frame of the stream and discard the current 987 * one 988 * 989 * This function is helpful if one wish to process frames by frames, 990 * or minimize memory allocation, as `peek` will only return the current 991 * frame data, and read requires a pre-allocated buffer. 992 * 993 * Returns: 994 * `false` if the current frame is the final one, `true` if a new frame 995 * was read. 996 */ 997 bool skipFrame() 998 { 999 if (m_currentFrame.fin) 1000 return false; 1001 1002 m_currentFrame = Frame.readFrame(m_conn); 1003 return true; 1004 } 1005 1006 size_t read(scope ubyte[] dst, IOMode mode) 1007 { 1008 size_t nread = 0; 1009 1010 while (dst.length > 0) { 1011 vibe.internal.exception.enforce!WebSocketException(!empty , "cannot read from empty stream"); 1012 vibe.internal.exception.enforce!WebSocketException(leastSize > 0, "no data available" ); 1013 1014 import std.algorithm : min; 1015 auto sz = cast(size_t)min(leastSize, dst.length); 1016 dst[0 .. sz] = m_currentFrame.payload[0 .. sz]; 1017 dst = dst[sz .. $]; 1018 m_currentFrame.payload = m_currentFrame.payload[sz .. $]; 1019 nread += sz; 1020 1021 if (leastSize == 0) { 1022 if (mode == IOMode.immediate || mode == IOMode.once && nread > 0) 1023 break; 1024 this.skipFrame(); 1025 } 1026 } 1027 1028 return nread; 1029 } 1030 1031 alias read = InputStream.read; 1032 } 1033 1034 /// Magic string defined by the RFC for challenging the server during upgrade 1035 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; 1036 1037 1038 /** 1039 * The Opcode is 4 bits, as defined in Section 5.2 1040 * 1041 * Values are defined in section 11.8 1042 * Currently only 6 values are defined, however the opcode is defined as 1043 * taking 4 bits. 1044 */ 1045 private enum FrameOpcode : ubyte { 1046 continuation = 0x0, 1047 text = 0x1, 1048 binary = 0x2, 1049 close = 0x8, 1050 ping = 0x9, 1051 pong = 0xA 1052 } 1053 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits"); 1054 1055 1056 private struct Frame { 1057 @safe: 1058 enum maxHeaderSize = 14; 1059 1060 bool fin; 1061 FrameOpcode opcode; 1062 ubyte[] payload; 1063 1064 /** 1065 * Return the header length encoded with the expected amount of bits 1066 * 1067 * The WebSocket RFC define a variable-length payload length. 1068 * In short, it means that: 1069 * - If the length is <= 125, it is stored as the 7 least significant 1070 * bits of the second header byte. The first bit is reserved for MASK. 1071 * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of 1072 * 126 is stored in the aforementioned 7 bits, and the actual length 1073 * is stored in the next two bytes, resulting in a 4 bytes header 1074 * ( + masking key, if any). 1075 * - If the length is > 65_536, a magic value of 127 will be used for 1076 * the 7-bit field, and the next 8 bytes are expected to be the length, 1077 * resulting in a 10 bytes header ( + masking key, if any). 1078 * 1079 * Those functions encapsulate all this logic and allow to just get the 1080 * length with the desired size. 1081 * 1082 * Return: 1083 * - For `ubyte`, the value to store in the 7 bits field, either the 1084 * length or a magic value (126 or 127). 1085 * - For `ushort`, a value in the range [126; 65_536]. 1086 * If payload.length is not in this bound, an assertion will be triggered. 1087 * - For `ulong`, a value in the range [65_537; size_t.max]. 1088 * If payload.length is not in this bound, an assertion will be triggered. 1089 */ 1090 size_t getHeaderSize(bool mask) 1091 { 1092 size_t ret = 1; 1093 if (payload.length < 126) ret += 1; 1094 else if (payload.length < 65536) ret += 3; 1095 else ret += 9; 1096 if (mask) ret += 4; 1097 return ret; 1098 } 1099 1100 void writeHeader(ubyte[] dst, RandomNumberStream sys_rng) 1101 { 1102 ubyte[4] buff; 1103 ubyte firstByte = cast(ubyte)opcode; 1104 if (fin) firstByte |= 0x80; 1105 dst[0] = firstByte; 1106 dst = dst[1 .. $]; 1107 1108 auto b1 = sys_rng ? 0x80 : 0x00; 1109 1110 if (payload.length < 126) { 1111 dst[0] = cast(ubyte)(b1 | payload.length); 1112 dst = dst[1 .. $]; 1113 } else if (payload.length < 65536) { 1114 dst[0] = cast(ubyte) (b1 | 126); 1115 dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length); 1116 dst = dst[3 .. $]; 1117 } else { 1118 dst[0] = cast(ubyte) (b1 | 127); 1119 dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length); 1120 dst = dst[9 .. $]; 1121 } 1122 1123 if (sys_rng) { 1124 sys_rng.read(dst[0 .. 4]); 1125 for (size_t i = 0; i < payload.length; i++) 1126 payload[i] ^= dst[i % 4]; 1127 } 1128 } 1129 1130 static Frame readFrame(InputStream stream) 1131 { 1132 Frame frame; 1133 ubyte[8] data; 1134 1135 stream.read(data[0 .. 2]); 1136 frame.fin = (data[0] & 0x80) != 0; 1137 frame.opcode = cast(FrameOpcode)(data[0] & 0x0F); 1138 1139 bool masked = !!(data[1] & 0b1000_0000); 1140 1141 //parsing length 1142 ulong length = data[1] & 0b0111_1111; 1143 if (length == 126) { 1144 stream.read(data[0 .. 2]); 1145 length = bigEndianToNative!ushort(data[0 .. 2]); 1146 } else if (length == 127) { 1147 stream.read(data); 1148 length = bigEndianToNative!ulong(data); 1149 1150 // RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes 1151 // interpreted as a 64-bit unsigned integer (the most significant 1152 // bit MUST be 0) 1153 vibe.internal.exception.enforce!WebSocketException(!(length >> 63), 1154 "Received length has a non-zero most significant bit"); 1155 1156 } 1157 logDebug("Read frame: %s %s %s length=%d", 1158 frame.opcode, 1159 frame.fin ? "final frame" : "continuation", 1160 masked ? "masked" : "not masked", 1161 length); 1162 1163 // Masking key is 32 bits / uint 1164 if (masked) 1165 stream.read(data[0 .. 4]); 1166 1167 // Read payload 1168 // TODO: Provide a way to limit the size read, easy 1169 // DOS for server code here (rejectedsoftware/vibe.d#1496). 1170 vibe.internal.exception.enforce!WebSocketException(length <= size_t.max); 1171 frame.payload = new ubyte[](cast(size_t)length); 1172 stream.read(frame.payload); 1173 1174 //de-masking 1175 if (masked) 1176 foreach (size_t i; 0 .. cast(size_t)length) 1177 frame.payload[i] = frame.payload[i] ^ data[i % 4]; 1178 1179 return frame; 1180 } 1181 } 1182 1183 unittest { 1184 import std.algorithm.searching : all; 1185 1186 final class DummyRNG : RandomNumberStream { 1187 @safe: 1188 @property bool empty() { return false; } 1189 @property ulong leastSize() { return ulong.max; } 1190 @property bool dataAvailableForRead() { return true; } 1191 const(ubyte)[] peek() { return null; } 1192 size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; } 1193 alias read = RandomNumberStream.read; 1194 } 1195 1196 ubyte[14] hdrbuf; 1197 auto rng = new DummyRNG; 1198 1199 Frame f; 1200 f.payload = new ubyte[125]; 1201 1202 assert(f.getHeaderSize(false) == 2); 1203 hdrbuf[] = 0; 1204 f.writeHeader(hdrbuf[0 .. 2], null); 1205 assert(hdrbuf[0 .. 2] == [0, 125]); 1206 1207 assert(f.getHeaderSize(true) == 6); 1208 hdrbuf[] = 0; 1209 f.writeHeader(hdrbuf[0 .. 6], rng); 1210 assert(hdrbuf[0 .. 2] == [0, 128|125]); 1211 assert(hdrbuf[2 .. 6].all!(b => b == 13)); 1212 1213 f.payload = new ubyte[126]; 1214 assert(f.getHeaderSize(false) == 4); 1215 hdrbuf[] = 0; 1216 f.writeHeader(hdrbuf[0 .. 4], null); 1217 assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]); 1218 1219 assert(f.getHeaderSize(true) == 8); 1220 hdrbuf[] = 0; 1221 f.writeHeader(hdrbuf[0 .. 8], rng); 1222 assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]); 1223 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1224 1225 f.payload = new ubyte[65535]; 1226 assert(f.getHeaderSize(false) == 4); 1227 hdrbuf[] = 0; 1228 f.writeHeader(hdrbuf[0 .. 4], null); 1229 assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]); 1230 1231 assert(f.getHeaderSize(true) == 8); 1232 hdrbuf[] = 0; 1233 f.writeHeader(hdrbuf[0 .. 8], rng); 1234 assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]); 1235 assert(hdrbuf[4 .. 8].all!(b => b == 13)); 1236 1237 f.payload = new ubyte[65536]; 1238 assert(f.getHeaderSize(false) == 10); 1239 hdrbuf[] = 0; 1240 f.writeHeader(hdrbuf[0 .. 10], null); 1241 assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]); 1242 1243 assert(f.getHeaderSize(true) == 14); 1244 hdrbuf[] = 0; 1245 f.writeHeader(hdrbuf[0 .. 14], rng); 1246 assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]); 1247 assert(hdrbuf[10 .. 14].all!(b => b == 13)); 1248 } 1249 1250 /** 1251 * Generate a challenge key for the protocol upgrade phase. 1252 */ 1253 private string generateChallengeKey(scope RandomNumberStream rng) 1254 { 1255 ubyte[16] buffer; 1256 rng.read(buffer); 1257 return Base64.encode(buffer); 1258 } 1259 1260 private string computeAcceptKey(string challengekey) 1261 { 1262 immutable(ubyte)[] b = challengekey.representation; 1263 immutable(ubyte)[] a = s_webSocketGuid.representation; 1264 SHA1 hash; 1265 hash.start(); 1266 hash.put(b); 1267 hash.put(a); 1268 auto result = Base64.encode(hash.finish()); 1269 return to!(string)(result); 1270 }