1 /**
2 	Implements WebSocket support and fallbacks for older browsers.
3 
4 	Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
5 	Copyright: © 2012-2014 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Jan Krüger
8 */
9 module vibe.http.websockets;
10 
11 ///
12 @safe unittest {
13 	void handleConn(scope WebSocket sock)
14 	{
15 		// simple echo server
16 		while (sock.connected) {
17 			auto msg = sock.receiveText();
18 			sock.send(msg);
19 		}
20 	}
21 
22 	void startServer()
23 	{
24 		import vibe.http.router;
25 		auto router = new URLRouter;
26 		router.get("/ws", handleWebSockets(&handleConn));
27 
28 		// Start HTTP server using listenHTTP()...
29 	}
30 }
31 
32 import vibe.core.core;
33 import vibe.core.log;
34 import vibe.core.net;
35 import vibe.core.sync;
36 import vibe.stream.operations;
37 import vibe.http.server;
38 import vibe.http.client;
39 import vibe.core.connectionpool;
40 import vibe.utils.array;
41 static import vibe.internal.exception;
42 
43 import core.time;
44 import std.algorithm: equal, splitter;
45 import std.array;
46 import std.base64;
47 import std.conv;
48 import std.exception;
49 import std.bitmanip;
50 import std.digest.sha;
51 import std..string;
52 import std.functional;
53 import std.uuid;
54 import std.base64;
55 import std.digest.sha;
56 import std.uni: asLowerCase;
57 import vibe.crypto.cryptorand;
58 
59 @safe:
60 
61 
62 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow;
63 
64 
65 /// Exception thrown by $(D vibe.http.websockets).
66 class WebSocketException: Exception
67 {
68 	@safe pure nothrow:
69 
70 	///
71 	this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
72 	{
73 		super(msg, file, line, next);
74 	}
75 
76 	///
77 	this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__)
78 	{
79 		super(msg, next, file, line);
80 	}
81 }
82 
83 /** Establishes a WebSocket connection at the specified endpoint.
84 */
85 WebSocket connectWebSocketEx(URL url,
86 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
87 	const(HTTPClientSettings) settings = defaultSettings)
88 @safe {
89 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
90 	url.schema = use_tls ? "https" : "http";
91 
92 	auto rng = secureRNG();
93 	auto challengeKey = generateChallengeKey(rng);
94 	auto answerKey = computeAcceptKey(challengeKey);
95 	auto res = requestHTTP(url, (scope req){
96 		req.method = HTTPMethod.GET;
97 		req.headers["Upgrade"] = "websocket";
98 		req.headers["Connection"] = "Upgrade";
99 		req.headers["Sec-WebSocket-Version"] = "13";
100 		req.headers["Sec-WebSocket-Key"] = challengeKey;
101 		request_modifier(req);
102 	}, settings);
103 
104 	enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
105 
106 	auto key = "sec-websocket-accept" in res.headers;
107 	enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
108 	enforce(*key == answerKey, "Response has wrong accept key");
109 	auto conn = res.switchProtocol("websocket");
110 	return new WebSocket(conn, rng, res);
111 }
112 
113 /// ditto
114 void connectWebSocketEx(URL url,
115 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
116 	scope WebSocketHandshakeDelegate del,
117 	const(HTTPClientSettings) settings = defaultSettings)
118 @safe {
119 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
120 	url.schema = use_tls ? "https" : "http";
121 
122 	/*scope*/auto rng = secureRNG();
123 	auto challengeKey = generateChallengeKey(rng);
124 	auto answerKey = computeAcceptKey(challengeKey);
125 
126 	requestHTTP(url,
127 		(scope req) {
128 			req.method = HTTPMethod.GET;
129 			req.headers["Upgrade"] = "websocket";
130 			req.headers["Connection"] = "Upgrade";
131 			req.headers["Sec-WebSocket-Version"] = "13";
132 			req.headers["Sec-WebSocket-Key"] = challengeKey;
133 			request_modifier(req);
134 		},
135 		(scope res) {
136 			enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
137 			auto key = "sec-websocket-accept" in res.headers;
138 			enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
139 			enforce(*key == answerKey, "Response has wrong accept key");
140 			res.switchProtocol("websocket", (scope conn) @trusted {
141 				scope ws = new WebSocket(conn, rng, res);
142 				del(ws);
143 				if (ws.connected) ws.close();
144 			});
145 		},
146 		settings
147 	);
148 }
149 
150 /// ditto
151 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings)
152 @safe {
153 	return connectWebSocketEx(url, (scope req) {}, settings);
154 }
155 /// ditto
156 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings)
157 @safe {
158 	connectWebSocketEx(url, (scope req) {}, del, settings);
159 }
160 /// ditto
161 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings)
162 @system {
163 	connectWebSocket(url, (scope ws) nothrow {
164 		try del(ws);
165 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
166 	}, settings);
167 }
168 /// Scheduled for deprecation - use a `@safe` callback instead.
169 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings)
170 @system {
171 	connectWebSocket(url, (scope ws) @trusted => del(ws), settings);
172 }
173 /// Scheduled for deprecation - use a `nothrow` callback instead.
174 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings)
175 @safe {
176 	connectWebSocket(url, (scope ws) nothrow {
177 		try del(ws);
178 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
179 	}, settings);
180 }
181 
182 
183 /**
184 	Establishes a web socket conection and passes it to the $(D on_handshake) delegate.
185 */
186 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
187 @safe {
188 	auto pUpgrade = "Upgrade" in req.headers;
189 	auto pConnection = "Connection" in req.headers;
190 	auto pKey = "Sec-WebSocket-Key" in req.headers;
191 	//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
192 	auto pVersion = "Sec-WebSocket-Version" in req.headers;
193 
194 	auto isUpgrade = false;
195 
196 	if( pConnection ) {
197 		auto connectionTypes = splitter(*pConnection, ",");
198 		foreach( t ; connectionTypes ) {
199 			if( t.strip().asLowerCase().equal("upgrade") ) {
200 				isUpgrade = true;
201 				break;
202 			}
203 		}
204 	}
205 
206 	string req_error;
207 	if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests.";
208 	else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header.";
209 	else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported.";
210 	else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header.";
211 
212 	if (req_error.length) {
213 		logDebug("Browser sent invalid WebSocket request: %s", req_error);
214 		res.statusCode = HTTPStatus.badRequest;
215 		res.writeBody(req_error);
216 		return;
217 	}
218 
219 	auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
220 	res.headers["Sec-WebSocket-Accept"] = accept;
221 	res.headers["Connection"] = "Upgrade";
222 	ConnectionStream conn = res.switchProtocol("websocket");
223 
224 	WebSocket socket = new WebSocket(conn, req, res);
225 	try {
226 		on_handshake(socket);
227 	} catch (Exception e) {
228 		logDiagnostic("WebSocket handler failed: %s", e.msg);
229 	}
230 	socket.close();
231 }
232 /// Scheduled for deprecation - use a `@safe` callback instead.
233 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
234 @system {
235 	handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res);
236 }
237 /// Scheduled for deprecation - use a `nothrow` callback instead.
238 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
239 {
240 	handleWebSocket((scope ws) nothrow {
241 		try on_handshake(ws);
242 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
243 	}, req, res);
244 }
245 /// ditto
246 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
247 @system {
248 	handleWebSocket((scope ws) nothrow {
249 		try on_handshake(ws);
250 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
251 	}, req, res);
252 }
253 
254 
255 /**
256 	Returns a HTTP request handler that establishes web socket conections.
257 */
258 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake)
259 @safe {
260 	return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ());
261 }
262 /// ditto
263 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake)
264 @safe {
265 	void callback(scope HTTPServerRequest req, scope HTTPServerResponse res)
266 	@safe {
267 		auto pUpgrade = "Upgrade" in req.headers;
268 		auto pConnection = "Connection" in req.headers;
269 		auto pKey = "Sec-WebSocket-Key" in req.headers;
270 		//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
271 		auto pVersion = "Sec-WebSocket-Version" in req.headers;
272 
273 		auto isUpgrade = false;
274 
275 		if( pConnection ) {
276 			auto connectionTypes = splitter(*pConnection, ",");
277 			foreach( t ; connectionTypes ) {
278 				if( t.strip().asLowerCase().equal("upgrade") ) {
279 					isUpgrade = true;
280 					break;
281 				}
282 			}
283 		}
284 		if( !(isUpgrade &&
285 			  pUpgrade && icmp(*pUpgrade, "websocket") == 0 &&
286 			  pKey &&
287 			  pVersion && *pVersion == "13") )
288 		{
289 			logDebug("Browser sent invalid WebSocket request.");
290 			res.statusCode = HTTPStatus.badRequest;
291 			res.writeVoidBody();
292 			return;
293 		}
294 
295 		auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
296 		res.headers["Sec-WebSocket-Accept"] = accept;
297 		res.headers["Connection"] = "Upgrade";
298 		res.switchProtocol("websocket", (scope conn) {
299 			// TODO: put back 'scope' once it is actually enforced by DMD
300 			/*scope*/ auto socket = new WebSocket(conn, req, res);
301 			try on_handshake(socket);
302 			catch (Exception e) {
303 				logDiagnostic("WebSocket handler failed: %s", e.msg);
304 			}
305 			socket.close();
306 		});
307 	}
308 	return &callback;
309 }
310 /// Scheduled for deprecation - use a `@safe` callback instead.
311 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake)
312 @system {
313 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
314 }
315 /// Scheduled for deprecation - use a `@safe` callback instead.
316 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake)
317 @system {
318 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
319 }
320 /// Scheduled for deprecation - use a `nothrow` callback instead.
321 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake)
322 {
323 	return handleWebSockets(delegate (scope ws) nothrow {
324 		try on_handshake(ws);
325 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
326 	});
327 }
328 /// ditto
329 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake)
330 {
331 	return handleWebSockets(delegate (scope ws) nothrow {
332 		try on_handshake(ws);
333 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
334 	});
335 }
336 /// ditto
337 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake)
338 @system {
339 	return handleWebSockets(delegate (scope ws) nothrow {
340 		try on_handshake(ws);
341 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
342 	});
343 }
344 /// ditto
345 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake)
346 @system {
347 	return handleWebSockets(delegate (scope ws) nothrow {
348 		try on_handshake(ws);
349 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
350 	});
351 }
352 
353 /**
354  * Provides the reason that a websocket connection has closed.
355  *
356  * Further documentation for the WebSocket and it's codes can be found from:
357  * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
358  *
359  * ---
360  *
361  * void echoSocket(scope WebSocket sock)
362  * {
363  *   import std.datetime : seconds;
364  *
365  *   while(sock.waitForData(3.seconds))
366  *   {
367  *     string msg = sock.receiveText;
368  *     logInfo("Got a message: %s", msg);
369  *     sock.send(msg);
370  *   }
371  *
372  *   if(sock.connected)
373  *     sock.close(WebSocketCloseReason.policyViolation, "timeout");
374  * }
375  * ---
376  */
377 enum WebSocketCloseReason : short
378 {
379 	none = 0,
380 	normalClosure = 1000,
381 	goingAway = 1001,
382 	protocolError = 1002,
383 	unsupportedData = 1003,
384 	noStatusReceived = 1005,
385 	abnormalClosure = 1006,
386 	invalidFramePayloadData = 1007,
387 	policyViolation = 1008,
388 	messageTooBig = 1009,
389 	internalError = 1011,
390 	serviceRestart = 1012,
391 	tryAgainLater = 1013,
392 	badGateway = 1014,
393 	tlsHandshake = 1015
394 }
395 
396 string closeReasonString(WebSocketCloseReason reason) @nogc @safe
397 {
398 	import std.math : floor;
399 
400 	//round down to the nearest thousand to get category
401 	switch(cast(short)(cast(float)reason / 1000f).floor)
402 	{
403 		case 0:
404 			return "Reserved and Unused";
405 		case 1:
406 			switch(reason)
407 			{
408 				case 1000:
409 					return "Normal Closure";
410 				case 1001:
411 					return "Going Away";
412 				case 1002:
413 					return "Protocol Error";
414 				case 1003:
415 					return "Unsupported Data";
416 				case 1004:
417 					return "RESERVED";
418 				case 1005:
419 					return "No Status Recvd";
420 				case 1006:
421 					return "Abnormal Closure";
422 				case 1007:
423 					return "Invalid Frame Payload Data";
424 				case 1008:
425 					return "Policy Violation";
426 				case 1009:
427 					return "Message Too Big";
428 				case 1010:
429 					return "Missing Extension";
430 				case 1011:
431 					return "Internal Error";
432 				case 1012:
433 					return "Service Restart";
434 				case 1013:
435 					return "Try Again Later";
436 				case 1014:
437 					return "Bad Gateway";
438 				case 1015:
439 					return "TLS Handshake";
440 				default:
441 					return "RESERVED";
442 			}
443 		case 2:
444 			return "Reserved for extensions";
445 		case 3:
446 			return "Available for frameworks and libraries";
447 		case 4:
448 			return "Available for applications";
449 		default:
450 			return "UNDEFINED - Nasal Demons";
451 	}
452 }
453 
454 unittest
455 {
456 	assert((cast(WebSocketCloseReason)   0).closeReasonString == "Reserved and Unused");
457 	assert((cast(WebSocketCloseReason)   1).closeReasonString == "Reserved and Unused");
458 	assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure");
459 	assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure");
460 	assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED");
461 	assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions");
462 	assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries");
463 	assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications");
464 	assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons");
465 	assert((cast(WebSocketCloseReason)  -1).closeReasonString == "UNDEFINED - Nasal Demons");
466 
467 	//check the other spec cases
468 	for(short i = 1000; i < 1017; i++)
469 	{
470 		if(i == 1004 || i > 1015)
471 		{
472 			assert(
473 				(cast(WebSocketCloseReason)i).closeReasonString == "RESERVED",
474 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
475 			);
476 		}
477 		else
478 			assert(
479 				(cast(WebSocketCloseReason)i).closeReasonString != "RESERVED",
480 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
481 			);
482 	}
483 }
484 
485 
486 /**
487  * Represents a single _WebSocket connection.
488  *
489  * ---
490  * shared static this ()
491  * {
492  *   runTask(() => connectToWS());
493  * }
494  *
495  * void connectToWS ()
496  * {
497  *   auto ws_url = URL("wss://websockets.example.com/websocket/auth_token");
498  *   auto ws = connectWebSocket(ws_url);
499  *   logInfo("WebSocket connected");
500  *
501  *   while (ws.waitForData())
502  *   {
503  *     auto txt = ws.receiveText;
504  *     logInfo("Received: %s", txt);
505  *   }
506  *   logFatal("Connection lost!");
507  * }
508  * ---
509  */
510 final class WebSocket {
511 @safe:
512 
513 	private {
514 		ConnectionStream m_conn;
515 		bool m_sentCloseFrame = false;
516 		IncomingWebSocketMessage m_nextMessage = null;
517 		const HTTPServerRequest m_request;
518 		HTTPServerResponse m_serverResponse;
519 		HTTPClientResponse m_clientResponse;
520 		Task m_reader;
521 		Task m_ownerTask;
522 		InterruptibleTaskMutex m_readMutex, m_writeMutex;
523 		InterruptibleTaskCondition m_readCondition;
524 		Timer m_pingTimer;
525 		uint m_lastPingIndex;
526 		bool m_pongReceived;
527 		short m_closeCode;
528 		const(char)[] m_closeReason;
529 		/// The entropy generator to use
530 		/// If not null, it means this is a server socket.
531 		RandomNumberStream m_rng;
532 	}
533 
534 	/**
535 	 * Private constructor, called from `connectWebSocket`.
536 	 *
537 	 * Params:
538 	 *	 conn = Underlying connection string
539 	 *	 request = HTTP request used to establish the connection
540 	 *	 rng = Source of entropy to use.  If null, assume we're a server socket
541 	 *   client_res = For client sockets, the response object (keeps the http client locked until the socket is done)
542 	 */
543 	private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res)
544 	{
545 		m_ownerTask = Task.getThis();
546 		m_conn = conn;
547 		m_request = request;
548 		m_clientResponse = client_res;
549 		m_serverResponse = server_res;
550 		assert(m_conn);
551 		m_rng = rng;
552 		m_writeMutex = new InterruptibleTaskMutex;
553 		m_readMutex = new InterruptibleTaskMutex;
554 		m_readCondition = new InterruptibleTaskCondition(m_readMutex);
555 		m_readMutex.performLocked!({
556 			m_reader = runTask(&startReader);
557 			if (request && request.serverSettings.webSocketPingInterval != Duration.zero) {
558 				m_pongReceived = true;
559 				m_pingTimer = setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true);
560 			}
561 		});
562 	}
563 
564 	private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res)
565 	{
566 		this(conn, HTTPServerRequest.init, HTTPServerResponse.init, rng, client_res);
567 	}
568 
569 	private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res)
570 	{
571 		this(conn, request, res, RandomNumberStream.init, HTTPClientResponse.init);
572 	}
573 
574 	/**
575 		Determines if the WebSocket connection is still alive and ready for sending.
576 
577 		Note that for determining the ready state for $(EM reading), you need
578 		to use $(D waitForData) instead, because both methods can return
579 		different values while a disconnect is in proress.
580 
581 		See_also: $(D waitForData)
582 	*/
583 	@property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; }
584 
585 	/**
586 		Returns the close code sent by the remote end.
587 
588 		Note if the connection was never opened, is still alive, or was closed
589 		locally this value will be 0. If no close code was given by the remote
590 		end in the close frame, the value will be 1005. If the connection was
591 		not closed cleanly by the remote end, this value will be 1006.
592 	*/
593 	@property short closeCode() { return m_closeCode; }
594 
595 	/**
596 		Returns the close reason sent by the remote end.
597 
598 		Note if the connection was never opened, is still alive, or was closed
599 		locally this value will be an empty string.
600 	*/
601 	@property const(char)[] closeReason() { return m_closeReason; }
602 
603 	/**
604 		The HTTP request that established the web socket connection.
605 	*/
606 	@property const(HTTPServerRequest) request() const { return m_request; }
607 
608 	/**
609 		Checks if data is readily available for read.
610 	*/
611 	@property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; }
612 
613 	/** Waits until either a message arrives or until the connection is closed.
614 
615 		This function can be used in a read loop to cleanly determine when to stop reading.
616 	*/
617 	bool waitForData()
618 	{
619 		if (m_nextMessage) return true;
620 
621 		m_readMutex.performLocked!({
622 			while (connected && m_nextMessage is null)
623 				m_readCondition.wait();
624 		});
625 		return m_nextMessage !is null;
626 	}
627 
628 	/// ditto
629 	bool waitForData(Duration timeout)
630 	{
631 		import std.datetime;
632 
633 		if (m_nextMessage) return true;
634 
635 		immutable limit_time = Clock.currTime(UTC()) + timeout;
636 
637 		m_readMutex.performLocked!({
638 			while (connected && m_nextMessage is null && timeout > 0.seconds) {
639 				m_readCondition.wait(timeout);
640 				timeout = limit_time - Clock.currTime(UTC());
641 			}
642 		});
643 		return m_nextMessage !is null;
644 	}
645 
646 	/**
647 		Sends a text message.
648 
649 		On the JavaScript side, the text will be available as message.data (type string).
650 
651 		Throws:
652 			A `WebSocketException` is thrown if the connection gets closed
653 			before or during the transfer of the message.
654 	*/
655 	void send(scope const(char)[] data)
656 	{
657 		send(
658 			(scope message) { message.write(cast(const ubyte[])data); },
659 			FrameOpcode.text);
660 	}
661 
662 	/**
663 		Sends a binary message.
664 
665 		On the JavaScript side, the text will be available as message.data (type Blob).
666 
667 		Throws:
668 			A `WebSocketException` is thrown if the connection gets closed
669 			before or during the transfer of the message.
670 	*/
671 	void send(in ubyte[] data)
672 	{
673 		send((scope message){ message.write(data); }, FrameOpcode.binary);
674 	}
675 
676 	/**
677 		Sends a message using an output stream.
678 
679 		Throws:
680 			A `WebSocketException` is thrown if the connection gets closed
681 			before or during the transfer of the message.
682 	*/
683 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
684 	{
685 		m_writeMutex.performLocked!({
686 			vibe.internal.exception.enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
687 			/*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng);
688 			scope(exit) message.finalize();
689 			sender(message);
690 		});
691 	}
692 
693 	/// Compatibility overload - will be removed soon.
694 	deprecated("Call the overload which requires an explicit FrameOpcode.")
695 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender)
696 	{
697 		send(sender, FrameOpcode.text);
698 	}
699 
700 	/**
701 		Actively closes the connection.
702 
703 		Params:
704 			code = Numeric code indicating a termination reason.
705 			reason = Message describing why the connection was terminated.
706 	*/
707 	void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "")
708 	{
709 		import std.algorithm.comparison : min;
710 		if(reason !is null && reason.length == 0)
711 			reason = (cast(WebSocketCloseReason)code).closeReasonString;
712 
713 		//control frame payloads are limited to 125 bytes
714 		version(assert)
715 			assert(reason.length <= 123);
716 		else
717 			reason = reason[0 .. min($, 123)];
718 
719 		if (connected) {
720 			try {
721 				send((scope msg) {
722 					m_sentCloseFrame = true;
723 					if (code != 0) {
724 						msg.write(std.bitmanip.nativeToBigEndian(code));
725 						msg.write(cast(const ubyte[])reason);
726 					}
727 				}, FrameOpcode.close);
728 			} catch (Exception e) {
729 				logDiagnostic("Failed to send active web socket close frame: %s", e.msg);
730 			}
731 		}
732 		if (m_pingTimer) m_pingTimer.stop();
733 
734 
735 		if (Task.getThis() == m_ownerTask) {
736 			m_writeMutex.performLocked!({
737 				if (m_clientResponse) {
738 					m_clientResponse.disconnect();
739 					m_clientResponse = HTTPClientResponse.init;
740 				}
741 				if (m_serverResponse) {
742 					m_serverResponse.finalize();
743 					m_serverResponse = HTTPServerResponse.init;
744 				}
745 			});
746 
747 			m_reader.join();
748 
749 			() @trusted { destroy(m_conn); } ();
750 			m_conn = ConnectionStream.init;
751 		}
752 	}
753 
754 	/**
755 		Receives a new message and returns its contents as a newly allocated data array.
756 
757 		Params:
758 			strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise.
759 		Throws: WebSocketException if the connection is closed or
760 			if $(D strict == true) and the frame received is not the right type
761 	*/
762 	ubyte[] receiveBinary(bool strict = true)
763 	{
764 		ubyte[] ret;
765 		receive((scope message){
766 			vibe.internal.exception.enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary,
767 				"Expected a binary message, got "~message.frameOpcode.to!string());
768 			ret = message.readAll();
769 		});
770 		return ret;
771 	}
772 	/// ditto
773 	string receiveText(bool strict = true)
774 	{
775 		string ret;
776 		receive((scope message){
777 			vibe.internal.exception.enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text,
778 				"Expected a text message, got "~message.frameOpcode.to!string());
779 			ret = message.readAllUTF8();
780 		});
781 		return ret;
782 	}
783 
784 	/**
785 		Receives a new message using an InputStream.
786 		Throws: WebSocketException if the connection is closed.
787 	*/
788 	void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver)
789 	{
790 		m_readMutex.performLocked!({
791 			while (!m_nextMessage) {
792 				vibe.internal.exception.enforce!WebSocketException(connected, "Connection closed while reading message.");
793 				m_readCondition.wait();
794 			}
795 			receiver(m_nextMessage);
796 			m_nextMessage = null;
797 			m_readCondition.notifyAll();
798 		});
799 	}
800 
801 	private void startReader()
802 	{
803 		m_readMutex.performLocked!({}); //Wait until initialization
804 		scope (exit) {
805 			m_conn.close();
806 			m_readCondition.notifyAll();
807 		}
808 		try {
809 			while (!m_conn.empty) {
810 				assert(!m_nextMessage);
811 				/*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
812 
813 				switch (msg.frameOpcode) {
814 					default: throw new WebSocketException("unknown frame opcode");
815 					case FrameOpcode.ping:
816 						send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
817 						break;
818 					case FrameOpcode.pong:
819 						// test if pong matches previous ping
820 						if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
821 							logDebugV("Received PONG that doesn't match previous ping.");
822 							break;
823 						}
824 						logDebugV("Received matching PONG.");
825 						m_pongReceived = true;
826 						break;
827 					case FrameOpcode.close:
828 						logDebug("Got closing frame (%s)", m_sentCloseFrame);
829 
830 						// If no close code was passed, we default to 1005
831 						this.m_closeCode = WebSocketCloseReason.noStatusReceived;
832 
833 						// If provided in the frame, attempt to parse the close code/reason
834 						if (msg.peek().length >= short.sizeof) {
835 							this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);
836 
837 							if (msg.peek().length > short.sizeof) {
838 								this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
839 							}
840 						}
841 
842 						if(!m_sentCloseFrame) close();
843 						logDebug("Terminating connection (%s)", m_sentCloseFrame);
844 						return;
845 					case FrameOpcode.text:
846 					case FrameOpcode.binary:
847 					case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
848 						m_readMutex.performLocked!({
849 							m_nextMessage = msg;
850 							m_readCondition.notifyAll();
851 							while (m_nextMessage) m_readCondition.wait();
852 						});
853 						break;
854 				}
855 			}
856 		} catch (Exception e) {
857 			logDiagnostic("Error while reading websocket message: %s", e.msg);
858 			logDiagnostic("Closing connection.");
859 		}
860 
861 		// If no close code was passed, e.g. this was an unclean termination
862 		//  of our websocket connection, set the close code to 1006.
863 		if (this.m_closeCode == 0) this.m_closeCode = WebSocketCloseReason.abnormalClosure;
864 	}
865 
866 	private void sendPing()
867 	nothrow {
868 		try {
869 			if (!m_pongReceived) {
870 				logDebug("Pong skipped. Closing connection.");
871 				close();
872 				m_pingTimer.stop();
873 				return;
874 			}
875 			m_pongReceived = false;
876 			send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
877 			logDebugV("Ping sent");
878 		} catch (Exception e) {
879 			logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
880 		}
881 	}
882 }
883 
884 /**
885 	Represents a single outgoing _WebSocket message as an OutputStream.
886 */
887 final class OutgoingWebSocketMessage : OutputStream {
888 @safe:
889 	private {
890 		RandomNumberStream m_rng;
891 		Stream m_conn;
892 		FrameOpcode m_frameOpcode;
893 		Appender!(ubyte[]) m_buffer;
894 		bool m_finalized = false;
895 	}
896 
897 	private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
898 	{
899 		assert(conn !is null);
900 		m_conn = conn;
901 		m_frameOpcode = frameOpcode;
902 		m_rng = rng;
903 	}
904 
905 	size_t write(in ubyte[] bytes, IOMode mode)
906 	{
907 		assert(!m_finalized);
908 
909 		if (!m_buffer.data.length) {
910 			ubyte[Frame.maxHeaderSize] header_padding;
911 			m_buffer.put(header_padding[]);
912 		}
913 
914 		m_buffer.put(bytes);
915 		return bytes.length;
916 	}
917 
918 	void flush()
919 	{
920 		assert(!m_finalized);
921 		if (m_buffer.data.length > 0)
922 			sendFrame(false);
923 	}
924 
925 	void finalize()
926 	{
927 		if (m_finalized) return;
928 		m_finalized = true;
929 		sendFrame(true);
930 	}
931 
932 	private void sendFrame(bool fin)
933 	{
934 		if (!m_buffer.data.length)
935 			write(null, IOMode.once);
936 
937 		assert(m_buffer.data.length >= Frame.maxHeaderSize);
938 
939 		Frame frame;
940 		frame.fin = fin;
941 		frame.opcode = m_frameOpcode;
942 		frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
943 		auto hsize = frame.getHeaderSize(m_rng !is null);
944 		auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
945 		frame.writeHeader(msg[0 .. hsize], m_rng);
946 		m_conn.write(msg);
947 		m_conn.flush();
948 		m_buffer.clear();
949 	}
950 
951 	alias write = OutputStream.write;
952 }
953 
954 
955 /**
956 	Represents a single incoming _WebSocket message as an InputStream.
957 */
958 final class IncomingWebSocketMessage : InputStream {
959 @safe:
960 	private {
961 		RandomNumberStream m_rng;
962 		Stream m_conn;
963 		Frame m_currentFrame;
964 	}
965 
966 	private this(Stream conn, RandomNumberStream rng)
967 	{
968 		assert(conn !is null);
969 		m_conn = conn;
970 		m_rng = rng;
971 		skipFrame(); // reads the first frame
972 	}
973 
974 	@property bool empty() const { return m_currentFrame.payload.length == 0; }
975 
976 	@property ulong leastSize() const { return m_currentFrame.payload.length; }
977 
978 	@property bool dataAvailableForRead() { return true; }
979 
980 	/// The frame type for this nessage;
981 	@property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; }
982 
983 	const(ubyte)[] peek() { return m_currentFrame.payload; }
984 
985 	/**
986 	 * Retrieve the next websocket frame of the stream and discard the current
987 	 * one
988 	 *
989 	 * This function is helpful if one wish to process frames by frames,
990 	 * or minimize memory allocation, as `peek` will only return the current
991 	 * frame data, and read requires a pre-allocated buffer.
992 	 *
993 	 * Returns:
994 	 * `false` if the current frame is the final one, `true` if a new frame
995 	 * was read.
996 	 */
997 	bool skipFrame()
998 	{
999 		if (m_currentFrame.fin)
1000 			return false;
1001 
1002 		m_currentFrame = Frame.readFrame(m_conn);
1003 		return true;
1004 	}
1005 
1006 	size_t read(scope ubyte[] dst, IOMode mode)
1007 	{
1008 		size_t nread = 0;
1009 
1010 		while (dst.length > 0) {
1011 			vibe.internal.exception.enforce!WebSocketException(!empty , "cannot read from empty stream");
1012 			vibe.internal.exception.enforce!WebSocketException(leastSize > 0, "no data available" );
1013 
1014 			import std.algorithm : min;
1015 			auto sz = cast(size_t)min(leastSize, dst.length);
1016 			dst[0 .. sz] = m_currentFrame.payload[0 .. sz];
1017 			dst = dst[sz .. $];
1018 			m_currentFrame.payload = m_currentFrame.payload[sz .. $];
1019 			nread += sz;
1020 
1021 			if (leastSize == 0) {
1022 				if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
1023 					break;
1024 				this.skipFrame();
1025 			}
1026 		}
1027 
1028 		return nread;
1029 	}
1030 
1031 	alias read = InputStream.read;
1032 }
1033 
1034 /// Magic string defined by the RFC for challenging the server during upgrade
1035 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1036 
1037 
1038 /**
1039  * The Opcode is 4 bits, as defined in Section 5.2
1040  *
1041  * Values are defined in section 11.8
1042  * Currently only 6 values are defined, however the opcode is defined as
1043  * taking 4 bits.
1044  */
1045 private enum FrameOpcode : ubyte {
1046 	continuation = 0x0,
1047 	text = 0x1,
1048 	binary = 0x2,
1049 	close = 0x8,
1050 	ping = 0x9,
1051 	pong = 0xA
1052 }
1053 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");
1054 
1055 
1056 private struct Frame {
1057 @safe:
1058 	enum maxHeaderSize = 14;
1059 
1060 	bool fin;
1061 	FrameOpcode opcode;
1062 	ubyte[] payload;
1063 
1064     /**
1065      * Return the header length encoded with the expected amount of bits
1066      *
1067      * The WebSocket RFC define a variable-length payload length.
1068      * In short, it means that:
1069      * - If the length is <= 125, it is stored as the 7 least significant
1070      *   bits of the second header byte.  The first bit is reserved for MASK.
1071      * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
1072      *   126 is stored in the aforementioned 7 bits, and the actual length
1073      *   is stored in the next two bytes, resulting in a 4 bytes header
1074      *   ( + masking key, if any).
1075      * - If the length is > 65_536, a magic value of 127 will be used for
1076      *   the 7-bit field, and the next 8 bytes are expected to be the length,
1077      *   resulting in a 10 bytes header ( + masking key, if any).
1078      *
1079      * Those functions encapsulate all this logic and allow to just get the
1080      * length with the desired size.
1081      *
1082      * Return:
1083      * - For `ubyte`, the value to store in the 7 bits field, either the
1084      *   length or a magic value (126 or 127).
1085      * - For `ushort`, a value in the range [126; 65_536].
1086      *   If payload.length is not in this bound, an assertion will be triggered.
1087      * - For `ulong`, a value in the range [65_537; size_t.max].
1088      *   If payload.length is not in this bound, an assertion will be triggered.
1089      */
1090 	size_t getHeaderSize(bool mask)
1091 	{
1092 		size_t ret = 1;
1093 		if (payload.length < 126) ret += 1;
1094 		else if (payload.length < 65536) ret += 3;
1095 		else ret += 9;
1096 		if (mask) ret += 4;
1097 		return ret;
1098 	}
1099 
1100 	void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
1101 	{
1102 		ubyte[4] buff;
1103 		ubyte firstByte = cast(ubyte)opcode;
1104 		if (fin) firstByte |= 0x80;
1105 		dst[0] = firstByte;
1106 		dst = dst[1 .. $];
1107 
1108 		auto b1 = sys_rng ? 0x80 : 0x00;
1109 
1110 		if (payload.length < 126) {
1111 			dst[0] = cast(ubyte)(b1 | payload.length);
1112 			dst = dst[1 .. $];
1113 		} else if (payload.length < 65536) {
1114 			dst[0] = cast(ubyte) (b1 | 126);
1115 			dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
1116 			dst = dst[3 .. $];
1117 		} else {
1118 			dst[0] = cast(ubyte) (b1 | 127);
1119 			dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
1120 			dst = dst[9 .. $];
1121 		}
1122 
1123 		if (sys_rng) {
1124             sys_rng.read(dst[0 .. 4]);
1125 			for (size_t i = 0; i < payload.length; i++)
1126 				payload[i] ^= dst[i % 4];
1127 		}
1128 	}
1129 
1130 	static Frame readFrame(InputStream stream)
1131 	{
1132 		Frame frame;
1133 		ubyte[8] data;
1134 
1135 		stream.read(data[0 .. 2]);
1136 		frame.fin = (data[0] & 0x80) != 0;
1137 		frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);
1138 
1139 		bool masked = !!(data[1] & 0b1000_0000);
1140 
1141 		//parsing length
1142 		ulong length = data[1] & 0b0111_1111;
1143 		if (length == 126) {
1144 			stream.read(data[0 .. 2]);
1145 			length = bigEndianToNative!ushort(data[0 .. 2]);
1146 		} else if (length == 127) {
1147 			stream.read(data);
1148 			length = bigEndianToNative!ulong(data);
1149 
1150 			// RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
1151 			// interpreted as a 64-bit unsigned integer (the most significant
1152 			// bit MUST be 0)
1153 			vibe.internal.exception.enforce!WebSocketException(!(length >> 63),
1154 				"Received length has a non-zero most significant bit");
1155 
1156 		}
1157 		logDebug("Read frame: %s %s %s length=%d",
1158 				 frame.opcode,
1159 				 frame.fin ? "final frame" : "continuation",
1160 				 masked ? "masked" : "not masked",
1161 				 length);
1162 
1163 		// Masking key is 32 bits / uint
1164 		if (masked)
1165 			stream.read(data[0 .. 4]);
1166 
1167 		// Read payload
1168 		// TODO: Provide a way to limit the size read, easy
1169 		// DOS for server code here (rejectedsoftware/vibe.d#1496).
1170 		vibe.internal.exception.enforce!WebSocketException(length <= size_t.max);
1171 		frame.payload = new ubyte[](cast(size_t)length);
1172 		stream.read(frame.payload);
1173 
1174 		//de-masking
1175 		if (masked)
1176 			foreach (size_t i; 0 .. cast(size_t)length)
1177 				frame.payload[i] = frame.payload[i] ^ data[i % 4];
1178 
1179 		return frame;
1180 	}
1181 }
1182 
1183 unittest {
1184 	import std.algorithm.searching : all;
1185 
1186 	final class DummyRNG : RandomNumberStream {
1187 	@safe:
1188 		@property bool empty() { return false; }
1189 		@property ulong leastSize() { return ulong.max; }
1190 		@property bool dataAvailableForRead() { return true; }
1191 		const(ubyte)[] peek() { return null; }
1192 		size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
1193 		alias read = RandomNumberStream.read;
1194 	}
1195 
1196 	ubyte[14] hdrbuf;
1197 	auto rng = new DummyRNG;
1198 
1199 	Frame f;
1200 	f.payload = new ubyte[125];
1201 
1202 	assert(f.getHeaderSize(false) == 2);
1203 	hdrbuf[] = 0;
1204 	f.writeHeader(hdrbuf[0 .. 2], null);
1205 	assert(hdrbuf[0 .. 2] == [0, 125]);
1206 
1207 	assert(f.getHeaderSize(true) == 6);
1208 	hdrbuf[] = 0;
1209 	f.writeHeader(hdrbuf[0 .. 6], rng);
1210 	assert(hdrbuf[0 .. 2] == [0, 128|125]);
1211 	assert(hdrbuf[2 .. 6].all!(b => b == 13));
1212 
1213 	f.payload = new ubyte[126];
1214 	assert(f.getHeaderSize(false) == 4);
1215 	hdrbuf[] = 0;
1216 	f.writeHeader(hdrbuf[0 .. 4], null);
1217 	assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);
1218 
1219 	assert(f.getHeaderSize(true) == 8);
1220 	hdrbuf[] = 0;
1221 	f.writeHeader(hdrbuf[0 .. 8], rng);
1222 	assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
1223 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1224 
1225 	f.payload = new ubyte[65535];
1226 	assert(f.getHeaderSize(false) == 4);
1227 	hdrbuf[] = 0;
1228 	f.writeHeader(hdrbuf[0 .. 4], null);
1229 	assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);
1230 
1231 	assert(f.getHeaderSize(true) == 8);
1232 	hdrbuf[] = 0;
1233 	f.writeHeader(hdrbuf[0 .. 8], rng);
1234 	assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
1235 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1236 
1237 	f.payload = new ubyte[65536];
1238 	assert(f.getHeaderSize(false) == 10);
1239 	hdrbuf[] = 0;
1240 	f.writeHeader(hdrbuf[0 .. 10], null);
1241 	assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);
1242 
1243 	assert(f.getHeaderSize(true) == 14);
1244 	hdrbuf[] = 0;
1245 	f.writeHeader(hdrbuf[0 .. 14], rng);
1246 	assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
1247 	assert(hdrbuf[10 .. 14].all!(b => b == 13));
1248 }
1249 
1250 /**
1251  * Generate a challenge key for the protocol upgrade phase.
1252  */
1253 private string generateChallengeKey(scope RandomNumberStream rng)
1254 {
1255 	ubyte[16] buffer;
1256 	rng.read(buffer);
1257 	return Base64.encode(buffer);
1258 }
1259 
1260 private string computeAcceptKey(string challengekey)
1261 {
1262 	immutable(ubyte)[] b = challengekey.representation;
1263 	immutable(ubyte)[] a = s_webSocketGuid.representation;
1264 	SHA1 hash;
1265 	hash.start();
1266 	hash.put(b);
1267 	hash.put(a);
1268 	auto result = Base64.encode(hash.finish());
1269 	return to!(string)(result);
1270 }