1 /**
2 	Implements WebSocket support and fallbacks for older browsers.
3 
4 	Standards: $(LINK2 https://tools.ietf.org/html/rfc6455, RFC6455)
5 	Copyright: © 2012-2014 Sönke Ludwig
6 	License: Subject to the terms of the MIT license, as written in the included LICENSE.txt file.
7 	Authors: Jan Krüger
8 */
9 module vibe.http.websockets;
10 
11 ///
12 @safe unittest {
13 	void handleConn(scope WebSocket sock)
14 	{
15 		// simple echo server
16 		while (sock.connected) {
17 			auto msg = sock.receiveText();
18 			sock.send(msg);
19 		}
20 	}
21 
22 	void startServer()
23 	{
24 		import vibe.http.router;
25 		auto router = new URLRouter;
26 		router.get("/ws", handleWebSockets(&handleConn));
27 
28 		// Start HTTP server using listenHTTP()...
29 	}
30 }
31 
32 import vibe.core.core;
33 import vibe.core.log;
34 import vibe.core.net;
35 import vibe.core.sync;
36 import vibe.stream.operations;
37 import vibe.http.server;
38 import vibe.http.client;
39 import vibe.core.connectionpool;
40 
41 import core.time;
42 import std.algorithm: equal, splitter;
43 import std.array;
44 import std.base64;
45 import std.conv;
46 import std.exception;
47 import std.bitmanip;
48 import std.digest.sha;
49 import std.string;
50 import std.functional;
51 import std.uuid;
52 import std.base64;
53 import std.digest.sha;
54 import std.uni: asLowerCase;
55 import vibe.crypto.cryptorand;
56 
57 @safe:
58 
59 
60 alias WebSocketHandshakeDelegate = void delegate(scope WebSocket) nothrow;
61 
62 
63 /// Exception thrown by $(D vibe.http.websockets).
64 class WebSocketException: Exception
65 {
66 	@safe pure nothrow:
67 
68 	///
69 	this(string msg, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
70 	{
71 		super(msg, file, line, next);
72 	}
73 
74 	///
75 	this(string msg, Throwable next, string file = __FILE__, size_t line = __LINE__)
76 	{
77 		super(msg, next, file, line);
78 	}
79 }
80 
81 /** Establishes a WebSocket connection at the specified endpoint.
82 */
83 WebSocket connectWebSocketEx(URL url,
84 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
85 	const(HTTPClientSettings) settings = defaultSettings)
86 @safe {
87 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
88 	url.schema = use_tls ? "https" : "http";
89 
90 	auto rng = secureRNG();
91 	auto challengeKey = generateChallengeKey(rng);
92 	auto answerKey = computeAcceptKey(challengeKey);
93 	auto res = requestHTTP(url, (scope req){
94 		req.method = HTTPMethod.GET;
95 		req.headers["Upgrade"] = "websocket";
96 		req.headers["Connection"] = "Upgrade";
97 		req.headers["Sec-WebSocket-Version"] = "13";
98 		req.headers["Sec-WebSocket-Key"] = challengeKey;
99 		request_modifier(req);
100 	}, settings);
101 
102 	enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
103 
104 	auto key = "sec-websocket-accept" in res.headers;
105 	enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
106 	enforce(*key == answerKey, "Response has wrong accept key");
107 	auto conn = res.switchProtocol("websocket");
108 	return new WebSocket(conn, rng, res);
109 }
110 
111 /// ditto
112 void connectWebSocketEx(URL url,
113 	scope void delegate(scope HTTPClientRequest) @safe request_modifier,
114 	scope WebSocketHandshakeDelegate del,
115 	const(HTTPClientSettings) settings = defaultSettings)
116 @safe {
117 	const use_tls = (url.schema == "wss" || url.schema == "https") ? true : false;
118 	url.schema = use_tls ? "https" : "http";
119 
120 	/*scope*/auto rng = secureRNG();
121 	auto challengeKey = generateChallengeKey(rng);
122 	auto answerKey = computeAcceptKey(challengeKey);
123 
124 	requestHTTP(url,
125 		(scope req) {
126 			req.method = HTTPMethod.GET;
127 			req.headers["Upgrade"] = "websocket";
128 			req.headers["Connection"] = "Upgrade";
129 			req.headers["Sec-WebSocket-Version"] = "13";
130 			req.headers["Sec-WebSocket-Key"] = challengeKey;
131 			request_modifier(req);
132 		},
133 		(scope res) {
134 			enforce(res.statusCode == HTTPStatus.switchingProtocols, "Server didn't accept the protocol upgrade request.");
135 			auto key = "sec-websocket-accept" in res.headers;
136 			enforce(key !is null, "Response is missing the Sec-WebSocket-Accept header.");
137 			enforce(*key == answerKey, "Response has wrong accept key");
138 			res.switchProtocol("websocket", (scope conn) @trusted {
139 				scope ws = new WebSocket(conn, rng, res);
140 				del(ws);
141 				if (ws.connected) ws.close();
142 			});
143 		},
144 		settings
145 	);
146 }
147 
148 /// ditto
149 WebSocket connectWebSocket(URL url, const(HTTPClientSettings) settings = defaultSettings)
150 @safe {
151 	return connectWebSocketEx(url, (scope req) {}, settings);
152 }
153 /// ditto
154 void connectWebSocket(URL url, scope WebSocketHandshakeDelegate del, const(HTTPClientSettings) settings = defaultSettings)
155 @safe {
156 	connectWebSocketEx(url, (scope req) {}, del, settings);
157 }
158 /// ditto
159 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system del, const(HTTPClientSettings) settings = defaultSettings)
160 @system {
161 	connectWebSocket(url, (scope ws) nothrow {
162 		try del(ws);
163 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
164 	}, settings);
165 }
166 /// Scheduled for deprecation - use a `@safe` callback instead.
167 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @system nothrow del, const(HTTPClientSettings) settings = defaultSettings)
168 @system {
169 	connectWebSocket(url, (scope ws) @trusted => del(ws), settings);
170 }
171 /// Scheduled for deprecation - use a `nothrow` callback instead.
172 void connectWebSocket(URL url, scope void delegate(scope WebSocket) @safe del, const(HTTPClientSettings) settings = defaultSettings)
173 @safe {
174 	connectWebSocket(url, (scope ws) nothrow {
175 		try del(ws);
176 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
177 	}, settings);
178 }
179 
180 
181 /**
182 	Establishes a web socket conection and passes it to the $(D on_handshake) delegate.
183 */
184 void handleWebSocket(scope WebSocketHandshakeDelegate on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
185 @safe {
186 	auto pUpgrade = "Upgrade" in req.headers;
187 	auto pConnection = "Connection" in req.headers;
188 	auto pKey = "Sec-WebSocket-Key" in req.headers;
189 	//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
190 	auto pVersion = "Sec-WebSocket-Version" in req.headers;
191 
192 	auto isUpgrade = false;
193 
194 	if( pConnection ) {
195 		auto connectionTypes = splitter(*pConnection, ",");
196 		foreach( t ; connectionTypes ) {
197 			if( t.strip().asLowerCase().equal("upgrade") ) {
198 				isUpgrade = true;
199 				break;
200 			}
201 		}
202 	}
203 
204 	string req_error;
205 	if (!isUpgrade) req_error = "WebSocket endpoint only accepts \"Connection: upgrade\" requests.";
206 	else if (!pUpgrade || icmp(*pUpgrade, "websocket") != 0) req_error = "WebSocket endpoint requires \"Upgrade: websocket\" header.";
207 	else if (!pVersion || *pVersion != "13") req_error = "Only version 13 of the WebSocket protocol is supported.";
208 	else if (!pKey) req_error = "Missing \"Sec-WebSocket-Key\" header.";
209 
210 	if (req_error.length) {
211 		logDebug("Browser sent invalid WebSocket request: %s", req_error);
212 		res.statusCode = HTTPStatus.badRequest;
213 		res.writeBody(req_error);
214 		return;
215 	}
216 
217 	auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
218 	res.headers["Sec-WebSocket-Accept"] = accept;
219 	res.headers["Connection"] = "Upgrade";
220 	ConnectionStream conn = res.switchProtocol("websocket");
221 
222 	// NOTE: silencing scope warning here - WebSocket references the scoped
223 	//       req/res objects throughout its lifetime, which has a narrower scope
224 	scope socket = () @trusted { return new WebSocket(conn, req, res); } ();
225 	try {
226 		on_handshake(socket);
227 	} catch (Exception e) {
228 		logDiagnostic("WebSocket handler failed: %s", e.msg);
229 	}
230 	socket.close();
231 }
232 /// Scheduled for deprecation - use a `@safe` callback instead.
233 void handleWebSocket(scope void delegate(scope WebSocket) @system nothrow on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
234 @system {
235 	handleWebSocket((scope ws) @trusted => on_handshake(ws), req, res);
236 }
237 /// Scheduled for deprecation - use a `nothrow` callback instead.
238 void handleWebSocket(scope void delegate(scope WebSocket) @safe on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
239 {
240 	handleWebSocket((scope ws) nothrow {
241 		try on_handshake(ws);
242 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
243 	}, req, res);
244 }
245 /// ditto
246 void handleWebSocket(scope void delegate(scope WebSocket) @system on_handshake, scope HTTPServerRequest req, scope HTTPServerResponse res)
247 @system {
248 	handleWebSocket((scope ws) nothrow {
249 		try on_handshake(ws);
250 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
251 	}, req, res);
252 }
253 
254 
255 /**
256 	Returns a HTTP request handler that establishes web socket conections.
257 */
258 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe nothrow on_handshake)
259 @safe {
260 	return handleWebSockets(() @trusted { return toDelegate(on_handshake); } ());
261 }
262 /// ditto
263 HTTPServerRequestDelegateS handleWebSockets(WebSocketHandshakeDelegate on_handshake)
264 @safe {
265 	void callback(scope HTTPServerRequest req, scope HTTPServerResponse res)
266 	@safe {
267 		auto pUpgrade = "Upgrade" in req.headers;
268 		auto pConnection = "Connection" in req.headers;
269 		auto pKey = "Sec-WebSocket-Key" in req.headers;
270 		//auto pProtocol = "Sec-WebSocket-Protocol" in req.headers;
271 		auto pVersion = "Sec-WebSocket-Version" in req.headers;
272 
273 		auto isUpgrade = false;
274 
275 		if( pConnection ) {
276 			auto connectionTypes = splitter(*pConnection, ",");
277 			foreach( t ; connectionTypes ) {
278 				if( t.strip().asLowerCase().equal("upgrade") ) {
279 					isUpgrade = true;
280 					break;
281 				}
282 			}
283 		}
284 		if( !(isUpgrade &&
285 			  pUpgrade && icmp(*pUpgrade, "websocket") == 0 &&
286 			  pKey &&
287 			  pVersion && *pVersion == "13") )
288 		{
289 			logDebug("Browser sent invalid WebSocket request.");
290 			res.statusCode = HTTPStatus.badRequest;
291 			res.writeVoidBody();
292 			return;
293 		}
294 
295 		auto accept = () @trusted { return cast(string)Base64.encode(sha1Of(*pKey ~ s_webSocketGuid)); } ();
296 		res.headers["Sec-WebSocket-Accept"] = accept;
297 		res.headers["Connection"] = "Upgrade";
298 		res.switchProtocol("websocket", (scope conn) {
299 			// TODO: put back 'scope' once it is actually enforced by DMD
300 			/*scope*/ auto socket = new WebSocket(conn, req, res);
301 			try on_handshake(socket);
302 			catch (Exception e) {
303 				logDiagnostic("WebSocket handler failed: %s", e.msg);
304 			}
305 			socket.close();
306 		});
307 	}
308 	return &callback;
309 }
310 /// Scheduled for deprecation - use a `@safe` callback instead.
311 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system nothrow on_handshake)
312 @system {
313 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
314 }
315 /// Scheduled for deprecation - use a `@safe` callback instead.
316 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system nothrow on_handshake)
317 @system {
318 	return handleWebSockets(delegate (scope ws) @trusted => on_handshake(ws));
319 }
320 /// Scheduled for deprecation - use a `nothrow` callback instead.
321 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @safe on_handshake)
322 {
323 	return handleWebSockets(delegate (scope ws) nothrow {
324 		try on_handshake(ws);
325 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
326 	});
327 }
328 /// ditto
329 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @safe on_handshake)
330 {
331 	return handleWebSockets(delegate (scope ws) nothrow {
332 		try on_handshake(ws);
333 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
334 	});
335 }
336 /// ditto
337 HTTPServerRequestDelegateS handleWebSockets(void delegate(scope WebSocket) @system on_handshake)
338 @system {
339 	return handleWebSockets(delegate (scope ws) nothrow {
340 		try on_handshake(ws);
341 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
342 	});
343 }
344 /// ditto
345 HTTPServerRequestDelegateS handleWebSockets(void function(scope WebSocket) @system on_handshake)
346 @system {
347 	return handleWebSockets(delegate (scope ws) nothrow {
348 		try on_handshake(ws);
349 		catch (Exception e) logWarn("WebSocket handler failed: %s", e.msg);
350 	});
351 }
352 
353 /**
354  * Provides the reason that a websocket connection has closed.
355  *
356  * Further documentation for the WebSocket and it's codes can be found from:
357  * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
358  *
359  * ---
360  *
361  * void echoSocket(scope WebSocket sock)
362  * {
363  *   import std.datetime : seconds;
364  *
365  *   while(sock.waitForData(3.seconds))
366  *   {
367  *     string msg = sock.receiveText;
368  *     logInfo("Got a message: %s", msg);
369  *     sock.send(msg);
370  *   }
371  *
372  *   if(sock.connected)
373  *     sock.close(WebSocketCloseReason.policyViolation, "timeout");
374  * }
375  * ---
376  */
377 enum WebSocketCloseReason : short
378 {
379 	none = 0,
380 	normalClosure = 1000,
381 	goingAway = 1001,
382 	protocolError = 1002,
383 	unsupportedData = 1003,
384 	noStatusReceived = 1005,
385 	abnormalClosure = 1006,
386 	invalidFramePayloadData = 1007,
387 	policyViolation = 1008,
388 	messageTooBig = 1009,
389 	internalError = 1011,
390 	serviceRestart = 1012,
391 	tryAgainLater = 1013,
392 	badGateway = 1014,
393 	tlsHandshake = 1015
394 }
395 
396 string closeReasonString(WebSocketCloseReason reason) @nogc @safe
397 {
398 	import std.math : floor;
399 
400 	//round down to the nearest thousand to get category
401 	switch(cast(short)(cast(float)reason / 1000f).floor)
402 	{
403 		case 0:
404 			return "Reserved and Unused";
405 		case 1:
406 			switch(reason)
407 			{
408 				case 1000:
409 					return "Normal Closure";
410 				case 1001:
411 					return "Going Away";
412 				case 1002:
413 					return "Protocol Error";
414 				case 1003:
415 					return "Unsupported Data";
416 				case 1004:
417 					return "RESERVED";
418 				case 1005:
419 					return "No Status Recvd";
420 				case 1006:
421 					return "Abnormal Closure";
422 				case 1007:
423 					return "Invalid Frame Payload Data";
424 				case 1008:
425 					return "Policy Violation";
426 				case 1009:
427 					return "Message Too Big";
428 				case 1010:
429 					return "Missing Extension";
430 				case 1011:
431 					return "Internal Error";
432 				case 1012:
433 					return "Service Restart";
434 				case 1013:
435 					return "Try Again Later";
436 				case 1014:
437 					return "Bad Gateway";
438 				case 1015:
439 					return "TLS Handshake";
440 				default:
441 					return "RESERVED";
442 			}
443 		case 2:
444 			return "Reserved for extensions";
445 		case 3:
446 			return "Available for frameworks and libraries";
447 		case 4:
448 			return "Available for applications";
449 		default:
450 			return "UNDEFINED - Nasal Demons";
451 	}
452 }
453 
454 unittest
455 {
456 	assert((cast(WebSocketCloseReason)   0).closeReasonString == "Reserved and Unused");
457 	assert((cast(WebSocketCloseReason)   1).closeReasonString == "Reserved and Unused");
458 	assert(WebSocketCloseReason.normalClosure.closeReasonString == "Normal Closure");
459 	assert(WebSocketCloseReason.abnormalClosure.closeReasonString == "Abnormal Closure");
460 	assert((cast(WebSocketCloseReason)1020).closeReasonString == "RESERVED");
461 	assert((cast(WebSocketCloseReason)2000).closeReasonString == "Reserved for extensions");
462 	assert((cast(WebSocketCloseReason)3000).closeReasonString == "Available for frameworks and libraries");
463 	assert((cast(WebSocketCloseReason)4000).closeReasonString == "Available for applications");
464 	assert((cast(WebSocketCloseReason)5000).closeReasonString == "UNDEFINED - Nasal Demons");
465 	assert((cast(WebSocketCloseReason)  -1).closeReasonString == "UNDEFINED - Nasal Demons");
466 
467 	//check the other spec cases
468 	for(short i = 1000; i < 1017; i++)
469 	{
470 		if(i == 1004 || i > 1015)
471 		{
472 			assert(
473 				(cast(WebSocketCloseReason)i).closeReasonString == "RESERVED",
474 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
475 			);
476 		}
477 		else
478 			assert(
479 				(cast(WebSocketCloseReason)i).closeReasonString != "RESERVED",
480 				"(incorrect) code %d = %s".format(i, closeReasonString(cast(WebSocketCloseReason)i))
481 			);
482 	}
483 }
484 
485 
486 /**
487  * Represents a single _WebSocket connection.
488  *
489  * ---
490  * int main (string[] args)
491  * {
492  *   auto taskHandle = runTask(() => connectToWS());
493  *   return runApplication(&args);
494  * }
495  *
496  * void connectToWS ()
497  * {
498  *   auto ws_url = URL("wss://websockets.example.com/websocket/auth_token");
499  *   auto ws = connectWebSocket(ws_url);
500  *   logInfo("WebSocket connected");
501  *
502  *   while (ws.waitForData())
503  *   {
504  *     auto txt = ws.receiveText;
505  *     logInfo("Received: %s", txt);
506  *   }
507  *   logFatal("Connection lost!");
508  * }
509  * ---
510  */
511 final class WebSocket {
512 @safe:
513 
514 	private {
515 		ConnectionStream m_conn;
516 		bool m_sentCloseFrame = false;
517 		IncomingWebSocketMessage m_nextMessage = null;
518 		const HTTPServerRequest m_request;
519 		HTTPServerResponse m_serverResponse;
520 		HTTPClientResponse m_clientResponse;
521 		Task m_reader;
522 		Task m_ownerTask;
523 		InterruptibleTaskMutex m_readMutex, m_writeMutex;
524 		InterruptibleTaskCondition m_readCondition;
525 		Timer m_pingTimer;
526 		uint m_lastPingIndex;
527 		bool m_pongReceived;
528 		short m_closeCode;
529 		const(char)[] m_closeReason;
530 		/// The entropy generator to use
531 		/// If not null, it means this is a server socket.
532 		RandomNumberStream m_rng;
533 	}
534 
535 scope:
536 
537 	/**
538 	 * Private constructor, called from `connectWebSocket`.
539 	 *
540 	 * Params:
541 	 *	 conn = Underlying connection string
542 	 *	 request = HTTP request used to establish the connection
543 	 *	 rng = Source of entropy to use.  If null, assume we're a server socket
544 	 *   client_res = For client sockets, the response object (keeps the http client locked until the socket is done)
545 	 */
546 	private this(ConnectionStream conn, const HTTPServerRequest request, HTTPServerResponse server_res, RandomNumberStream rng, HTTPClientResponse client_res)
547 	{
548 		m_ownerTask = Task.getThis();
549 		m_conn = conn;
550 		m_request = request;
551 		m_clientResponse = client_res;
552 		m_serverResponse = server_res;
553 		assert(m_conn);
554 		m_rng = rng;
555 		m_writeMutex = new InterruptibleTaskMutex;
556 		m_readMutex = new InterruptibleTaskMutex;
557 		m_readCondition = new InterruptibleTaskCondition(m_readMutex);
558 		m_readMutex.performLocked!({
559 			// NOTE: Silencing scope warning here - m_reader MUST be stopped
560 			//       before the end of the lifetime of the WebSocket object,
561 			//       which is done in the mandatory call to close().
562 			//       The same goes for m_pingTimer below.
563 			m_reader = () @trusted { return runTask(&startReader); } ();
564 			if (request !is null && request.serverSettings.webSocketPingInterval != Duration.zero) {
565 				m_pongReceived = true;
566 				m_pingTimer = () @trusted { return setTimer(request.serverSettings.webSocketPingInterval, &sendPing, true); } ();
567 			}
568 		});
569 	}
570 
571 	private this(ConnectionStream conn, RandomNumberStream rng, HTTPClientResponse client_res)
572 	{
573 		this(conn, null, null, rng, client_res);
574 	}
575 
576 	private this(ConnectionStream conn, in HTTPServerRequest request, HTTPServerResponse res)
577 	{
578 		this(conn, request, res, null, null);
579 	}
580 
581 	/**
582 		Determines if the WebSocket connection is still alive and ready for sending.
583 
584 		Note that for determining the ready state for $(EM reading), you need
585 		to use $(D waitForData) instead, because both methods can return
586 		different values while a disconnect is in proress.
587 
588 		See_also: $(D waitForData)
589 	*/
590 	@property bool connected() { return m_conn && m_conn.connected && !m_sentCloseFrame; }
591 
592 	/**
593 		Returns the close code sent by the remote end.
594 
595 		Note if the connection was never opened, is still alive, or was closed
596 		locally this value will be 0. If no close code was given by the remote
597 		end in the close frame, the value will be 1005. If the connection was
598 		not closed cleanly by the remote end, this value will be 1006.
599 	*/
600 	@property short closeCode() { return m_closeCode; }
601 
602 	/**
603 		Returns the close reason sent by the remote end.
604 
605 		Note if the connection was never opened, is still alive, or was closed
606 		locally this value will be an empty string.
607 	*/
608 	@property const(char)[] closeReason() { return m_closeReason; }
609 
610 	/**
611 		The HTTP request that established the web socket connection.
612 	*/
613 	@property const(HTTPServerRequest) request() const { return m_request; }
614 
615 	/**
616 		Checks if data is readily available for read.
617 	*/
618 	@property bool dataAvailableForRead() { return m_conn.dataAvailableForRead || m_nextMessage !is null; }
619 
620 	/** Waits until either a message arrives or until the connection is closed.
621 
622 		This function can be used in a read loop to cleanly determine when to stop reading.
623 	*/
624 	bool waitForData()
625 	{
626 		if (m_nextMessage) return true;
627 
628 		m_readMutex.performLocked!({
629 			while (connected && m_nextMessage is null)
630 				m_readCondition.wait();
631 		});
632 		return m_nextMessage !is null;
633 	}
634 
635 	/// ditto
636 	bool waitForData(Duration timeout)
637 	{
638 		import std.datetime;
639 
640 		if (m_nextMessage) return true;
641 
642 		immutable limit_time = Clock.currTime(UTC()) + timeout;
643 
644 		m_readMutex.performLocked!({
645 			while (connected && m_nextMessage is null && timeout > 0.seconds) {
646 				m_readCondition.wait(timeout);
647 				timeout = limit_time - Clock.currTime(UTC());
648 			}
649 		});
650 		return m_nextMessage !is null;
651 	}
652 
653 	/**
654 		Sends a text message.
655 
656 		On the JavaScript side, the text will be available as message.data (type string).
657 
658 		Throws:
659 			A `WebSocketException` is thrown if the connection gets closed
660 			before or during the transfer of the message.
661 	*/
662 	void send(scope const(char)[] data)
663 	{
664 		send(
665 			(scope message) { message.write(cast(const ubyte[])data); },
666 			FrameOpcode.text);
667 	}
668 
669 	/**
670 		Sends a binary message.
671 
672 		On the JavaScript side, the text will be available as message.data (type Blob).
673 
674 		Throws:
675 			A `WebSocketException` is thrown if the connection gets closed
676 			before or during the transfer of the message.
677 	*/
678 	void send(in ubyte[] data)
679 	{
680 		send((scope message){ message.write(data); }, FrameOpcode.binary);
681 	}
682 
683 	/**
684 		Sends a message using an output stream.
685 
686 		Throws:
687 			A `WebSocketException` is thrown if the connection gets closed
688 			before or during the transfer of the message.
689 	*/
690 	void send(scope void delegate(scope OutgoingWebSocketMessage) @safe sender, FrameOpcode frameOpcode)
691 	{
692 		m_writeMutex.performLocked!({
693 			enforce!WebSocketException(!m_sentCloseFrame, "WebSocket connection already actively closed.");
694 			/*scope*/auto message = new OutgoingWebSocketMessage(m_conn, frameOpcode, m_rng);
695 			scope(exit) message.finalize();
696 			sender(message);
697 		});
698 	}
699 
700 	/**
701 		Actively closes the connection.
702 
703 		Params:
704 			code = Numeric code indicating a termination reason.
705 			reason = Message describing why the connection was terminated.
706 	*/
707 	void close(short code = WebSocketCloseReason.normalClosure, scope const(char)[] reason = "")
708 	{
709 		import std.algorithm.comparison : min;
710 		if(reason !is null && reason.length == 0)
711 			reason = (cast(WebSocketCloseReason)code).closeReasonString;
712 
713 		//control frame payloads are limited to 125 bytes
714 		version(assert)
715 			assert(reason.length <= 123);
716 		else
717 			reason = reason[0 .. min($, 123)];
718 
719 		if (connected) {
720 			try {
721 				send((scope msg) {
722 					m_sentCloseFrame = true;
723 					if (code != 0) {
724 						msg.write(std.bitmanip.nativeToBigEndian(code));
725 						msg.write(cast(const ubyte[])reason);
726 					}
727 				}, FrameOpcode.close);
728 			} catch (Exception e) {
729 				logDiagnostic("Failed to send active web socket close frame: %s", e.msg);
730 			}
731 		}
732 		if (m_pingTimer) m_pingTimer.stop();
733 
734 
735 		if (Task.getThis() == m_ownerTask) {
736 			m_writeMutex.performLocked!({
737 				if (m_clientResponse) {
738 					m_clientResponse.disconnect();
739 					m_clientResponse = HTTPClientResponse.init;
740 				}
741 				if (m_serverResponse) {
742 					m_serverResponse.finalize();
743 					m_serverResponse = HTTPServerResponse.init;
744 				}
745 			});
746 
747 			m_reader.join();
748 
749 			() @trusted { destroy(m_conn); } ();
750 			m_conn = ConnectionStream.init;
751 		}
752 	}
753 
754 	/**
755 		Receives a new message and returns its contents as a newly allocated data array.
756 
757 		Params:
758 			strict = If set, ensures the exact frame type (text/binary) is received and throws an execption otherwise.
759 		Throws: WebSocketException if the connection is closed or
760 			if $(D strict == true) and the frame received is not the right type
761 	*/
762 	ubyte[] receiveBinary(bool strict = true)
763 	{
764 		ubyte[] ret;
765 		receive((scope message){
766 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.binary,
767 				"Expected a binary message, got "~message.frameOpcode.to!string());
768 			ret = message.readAll();
769 		});
770 		return ret;
771 	}
772 	/// ditto
773 	string receiveText(bool strict = true)
774 	{
775 		string ret;
776 		receive((scope message){
777 			enforce!WebSocketException(!strict || message.frameOpcode == FrameOpcode.text,
778 				"Expected a text message, got "~message.frameOpcode.to!string());
779 			ret = message.readAllUTF8();
780 		});
781 		return ret;
782 	}
783 
784 	/**
785 		Receives a new message using an InputStream.
786 		Throws: WebSocketException if the connection is closed.
787 	*/
788 	void receive(scope void delegate(scope IncomingWebSocketMessage) @safe receiver)
789 	{
790 		m_readMutex.performLocked!({
791 			while (!m_nextMessage) {
792 				enforce!WebSocketException(connected, "Connection closed while reading message.");
793 				m_readCondition.wait();
794 			}
795 			receiver(m_nextMessage);
796 			m_nextMessage = null;
797 			m_readCondition.notifyAll();
798 		});
799 	}
800 
801 	private void startReader()
802 	nothrow {
803 		try m_readMutex.performLocked!({}); //Wait until initialization
804 		catch (Exception e) {
805 			logException(e, "WebSocket reader task failed to wait for initialization");
806 			try m_conn.close();
807 			catch (Exception e) logException(e, "Failed to close WebSocket connection after initialization failure");
808 			m_closeCode = WebSocketCloseReason.abnormalClosure;
809 			try m_readCondition.notifyAll();
810 			catch (Exception e) assert(false, e.msg);
811 			return;
812 		}
813 
814 		try {
815 			loop:
816 			while (!m_conn.empty) {
817 				assert(!m_nextMessage);
818 				/*scope*/auto msg = new IncomingWebSocketMessage(m_conn, m_rng);
819 
820 				switch (msg.frameOpcode) {
821 					default: throw new WebSocketException("unknown frame opcode");
822 					case FrameOpcode.ping:
823 						send((scope pong_msg) { pong_msg.write(msg.peek()); }, FrameOpcode.pong);
824 						break;
825 					case FrameOpcode.pong:
826 						// test if pong matches previous ping
827 						if (msg.peek.length != uint.sizeof || m_lastPingIndex != littleEndianToNative!uint(msg.peek()[0..uint.sizeof])) {
828 							logDebugV("Received PONG that doesn't match previous ping.");
829 							break;
830 						}
831 						logDebugV("Received matching PONG.");
832 						m_pongReceived = true;
833 						break;
834 					case FrameOpcode.close:
835 						logDebug("Got closing frame (%s)", m_sentCloseFrame);
836 
837 						// If no close code was passed, we default to 1005
838 						this.m_closeCode = WebSocketCloseReason.noStatusReceived;
839 
840 						// If provided in the frame, attempt to parse the close code/reason
841 						if (msg.peek().length >= short.sizeof) {
842 							this.m_closeCode = bigEndianToNative!short(msg.peek()[0..short.sizeof]);
843 
844 							if (msg.peek().length > short.sizeof) {
845 								this.m_closeReason = cast(const(char) [])msg.peek()[short.sizeof..$];
846 							}
847 						}
848 
849 						if(!m_sentCloseFrame) close();
850 						logDebug("Terminating connection (%s)", m_sentCloseFrame);
851 						break loop;
852 					case FrameOpcode.text:
853 					case FrameOpcode.binary:
854 					case FrameOpcode.continuation: // FIXME: add proper support for continuation frames!
855 						m_readMutex.performLocked!({
856 							m_nextMessage = msg;
857 							m_readCondition.notifyAll();
858 							while (m_nextMessage) m_readCondition.wait();
859 						});
860 						break;
861 				}
862 			}
863 		} catch (Exception e) {
864 			logDiagnostic("Error while reading websocket message: %s", e.msg);
865 			logDiagnostic("Closing connection.");
866 		}
867 
868 		// If no close code was passed, e.g. this was an unclean termination
869 		//  of our websocket connection, set the close code to 1006.
870 		if (m_closeCode == 0) m_closeCode = WebSocketCloseReason.abnormalClosure;
871 
872 		try m_conn.close();
873 		catch (Exception e) logException(e, "Failed to close WebSocket connection");
874 		try m_readCondition.notifyAll();
875 		catch (Exception e) assert(false, e.msg);
876 	}
877 
878 	private void sendPing()
879 	nothrow {
880 		try {
881 			if (!m_pongReceived) {
882 				logDebug("Pong skipped. Closing connection.");
883 				close();
884 				m_pingTimer.stop();
885 				return;
886 			}
887 			m_pongReceived = false;
888 			send((scope msg) { msg.write(nativeToLittleEndian(++m_lastPingIndex)); }, FrameOpcode.ping);
889 			logDebugV("Ping sent");
890 		} catch (Exception e) {
891 			logError("Failed to acquire write mutex for sending a WebSocket ping frame: %s", e.msg);
892 		}
893 	}
894 }
895 
896 /**
897 	Represents a single outgoing _WebSocket message as an OutputStream.
898 */
899 final class OutgoingWebSocketMessage : OutputStream {
900 @safe:
901 	private {
902 		RandomNumberStream m_rng;
903 		Stream m_conn;
904 		FrameOpcode m_frameOpcode;
905 		Appender!(ubyte[]) m_buffer;
906 		bool m_finalized = false;
907 	}
908 
909 	private this(Stream conn, FrameOpcode frameOpcode, RandomNumberStream rng)
910 	{
911 		assert(conn !is null);
912 		m_conn = conn;
913 		m_frameOpcode = frameOpcode;
914 		m_rng = rng;
915 	}
916 
917 	static if (is(typeof(.OutputStream.outputStreamVersion)) && .OutputStream.outputStreamVersion > 1) {
918 		override size_t write(scope const(ubyte)[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
919 	} else {
920 		override size_t write(in ubyte[] bytes_, IOMode mode) { return doWrite(bytes_, mode); }
921 	}
922 
923 	alias write = OutputStream.write;
924 
925 	private size_t doWrite(scope const(ubyte)[] bytes, IOMode mode)
926 	{
927 		assert(!m_finalized);
928 
929 		if (!m_buffer.data.length) {
930 			ubyte[Frame.maxHeaderSize] header_padding;
931 			m_buffer.put(header_padding[]);
932 		}
933 
934 		m_buffer.put(bytes);
935 		return bytes.length;
936 	}
937 
938 	void flush()
939 	{
940 		assert(!m_finalized);
941 		if (m_buffer.data.length > 0)
942 			sendFrame(false);
943 	}
944 
945 	void finalize()
946 	{
947 		if (m_finalized) return;
948 		m_finalized = true;
949 		sendFrame(true);
950 	}
951 
952 	private void sendFrame(bool fin)
953 	{
954 		if (!m_buffer.data.length)
955 			write(null, IOMode.once);
956 
957 		assert(m_buffer.data.length >= Frame.maxHeaderSize);
958 
959 		Frame frame;
960 		frame.fin = fin;
961 		frame.opcode = m_frameOpcode;
962 		frame.payload = m_buffer.data[Frame.maxHeaderSize .. $];
963 		auto hsize = frame.getHeaderSize(m_rng !is null);
964 		auto msg = m_buffer.data[Frame.maxHeaderSize-hsize .. $];
965 		frame.writeHeader(msg[0 .. hsize], m_rng);
966 		m_conn.write(msg);
967 		m_conn.flush();
968 		m_buffer.clear();
969 	}
970 
971 	alias write = OutputStream.write;
972 }
973 
974 
975 /**
976 	Represents a single incoming _WebSocket message as an InputStream.
977 */
978 final class IncomingWebSocketMessage : InputStream {
979 @safe:
980 	private {
981 		RandomNumberStream m_rng;
982 		Stream m_conn;
983 		Frame m_currentFrame;
984 	}
985 
986 	private this(Stream conn, RandomNumberStream rng)
987 	{
988 		assert(conn !is null);
989 		m_conn = conn;
990 		m_rng = rng;
991 		skipFrame(); // reads the first frame
992 	}
993 
994 	@property bool empty() const { return m_currentFrame.payload.length == 0; }
995 
996 	@property ulong leastSize() const { return m_currentFrame.payload.length; }
997 
998 	@property bool dataAvailableForRead() { return true; }
999 
1000 	/// The frame type for this nessage;
1001 	@property FrameOpcode frameOpcode() const { return m_currentFrame.opcode; }
1002 
1003 	const(ubyte)[] peek() { return m_currentFrame.payload; }
1004 
1005 	/**
1006 	 * Retrieve the next websocket frame of the stream and discard the current
1007 	 * one
1008 	 *
1009 	 * This function is helpful if one wish to process frames by frames,
1010 	 * or minimize memory allocation, as `peek` will only return the current
1011 	 * frame data, and read requires a pre-allocated buffer.
1012 	 *
1013 	 * Returns:
1014 	 * `false` if the current frame is the final one, `true` if a new frame
1015 	 * was read.
1016 	 */
1017 	bool skipFrame()
1018 	{
1019 		if (m_currentFrame.fin)
1020 			return false;
1021 
1022 		m_currentFrame = Frame.readFrame(m_conn);
1023 		return true;
1024 	}
1025 
1026 	size_t read(scope ubyte[] dst, IOMode mode)
1027 	{
1028 		size_t nread = 0;
1029 
1030 		while (dst.length > 0) {
1031 			enforce!WebSocketException(!empty , "cannot read from empty stream");
1032 			enforce!WebSocketException(leastSize > 0, "no data available" );
1033 
1034 			import std.algorithm : min;
1035 			auto sz = cast(size_t)min(leastSize, dst.length);
1036 			dst[0 .. sz] = m_currentFrame.payload[0 .. sz];
1037 			dst = dst[sz .. $];
1038 			m_currentFrame.payload = m_currentFrame.payload[sz .. $];
1039 			nread += sz;
1040 
1041 			if (leastSize == 0) {
1042 				if (mode == IOMode.immediate || mode == IOMode.once && nread > 0)
1043 					break;
1044 				this.skipFrame();
1045 			}
1046 		}
1047 
1048 		return nread;
1049 	}
1050 
1051 	alias read = InputStream.read;
1052 }
1053 
1054 /// Magic string defined by the RFC for challenging the server during upgrade
1055 private static immutable s_webSocketGuid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
1056 
1057 
1058 /**
1059  * The Opcode is 4 bits, as defined in Section 5.2
1060  *
1061  * Values are defined in section 11.8
1062  * Currently only 6 values are defined, however the opcode is defined as
1063  * taking 4 bits.
1064  */
1065 public enum FrameOpcode : ubyte {
1066 	continuation = 0x0,
1067 	text = 0x1,
1068 	binary = 0x2,
1069 	close = 0x8,
1070 	ping = 0x9,
1071 	pong = 0xA
1072 }
1073 static assert(FrameOpcode.max < 0b1111, "FrameOpcode is only 4 bits");
1074 
1075 
1076 private struct Frame {
1077 @safe:
1078 	enum maxHeaderSize = 14;
1079 
1080 	bool fin;
1081 	FrameOpcode opcode;
1082 	ubyte[] payload;
1083 
1084     /**
1085      * Return the header length encoded with the expected amount of bits
1086      *
1087      * The WebSocket RFC define a variable-length payload length.
1088      * In short, it means that:
1089      * - If the length is <= 125, it is stored as the 7 least significant
1090      *   bits of the second header byte.  The first bit is reserved for MASK.
1091      * - If the length is <= 65_536 (so it fits in 2 bytes), a magic value of
1092      *   126 is stored in the aforementioned 7 bits, and the actual length
1093      *   is stored in the next two bytes, resulting in a 4 bytes header
1094      *   ( + masking key, if any).
1095      * - If the length is > 65_536, a magic value of 127 will be used for
1096      *   the 7-bit field, and the next 8 bytes are expected to be the length,
1097      *   resulting in a 10 bytes header ( + masking key, if any).
1098      *
1099      * Those functions encapsulate all this logic and allow to just get the
1100      * length with the desired size.
1101      *
1102      * Return:
1103      * - For `ubyte`, the value to store in the 7 bits field, either the
1104      *   length or a magic value (126 or 127).
1105      * - For `ushort`, a value in the range [126; 65_536].
1106      *   If payload.length is not in this bound, an assertion will be triggered.
1107      * - For `ulong`, a value in the range [65_537; size_t.max].
1108      *   If payload.length is not in this bound, an assertion will be triggered.
1109      */
1110 	size_t getHeaderSize(bool mask)
1111 	{
1112 		size_t ret = 1;
1113 		if (payload.length < 126) ret += 1;
1114 		else if (payload.length < 65536) ret += 3;
1115 		else ret += 9;
1116 		if (mask) ret += 4;
1117 		return ret;
1118 	}
1119 
1120 	void writeHeader(ubyte[] dst, RandomNumberStream sys_rng)
1121 	{
1122 		ubyte[4] buff;
1123 		ubyte firstByte = cast(ubyte)opcode;
1124 		if (fin) firstByte |= 0x80;
1125 		dst[0] = firstByte;
1126 		dst = dst[1 .. $];
1127 
1128 		auto b1 = sys_rng ? 0x80 : 0x00;
1129 
1130 		if (payload.length < 126) {
1131 			dst[0] = cast(ubyte)(b1 | payload.length);
1132 			dst = dst[1 .. $];
1133 		} else if (payload.length < 65536) {
1134 			dst[0] = cast(ubyte) (b1 | 126);
1135 			dst[1 .. 3] = std.bitmanip.nativeToBigEndian(cast(ushort)payload.length);
1136 			dst = dst[3 .. $];
1137 		} else {
1138 			dst[0] = cast(ubyte) (b1 | 127);
1139 			dst[1 .. 9] = std.bitmanip.nativeToBigEndian(cast(ulong)payload.length);
1140 			dst = dst[9 .. $];
1141 		}
1142 
1143 		if (sys_rng) {
1144 			sys_rng.read(dst[0 .. 4]);
1145 			for (size_t i = 0; i < payload.length; i++)
1146 				payload[i] ^= dst[i % 4];
1147 		}
1148 	}
1149 
1150 	static Frame readFrame(InputStream stream)
1151 	{
1152 		Frame frame;
1153 		ubyte[8] data;
1154 
1155 		stream.read(data[0 .. 2]);
1156 		frame.fin = (data[0] & 0x80) != 0;
1157 		frame.opcode = cast(FrameOpcode)(data[0] & 0x0F);
1158 
1159 		bool masked = !!(data[1] & 0b1000_0000);
1160 
1161 		//parsing length
1162 		ulong length = data[1] & 0b0111_1111;
1163 		if (length == 126) {
1164 			stream.read(data[0 .. 2]);
1165 			length = bigEndianToNative!ushort(data[0 .. 2]);
1166 		} else if (length == 127) {
1167 			stream.read(data);
1168 			length = bigEndianToNative!ulong(data);
1169 
1170 			// RFC 6455, 5.2, 'Payload length': If 127, the following 8 bytes
1171 			// interpreted as a 64-bit unsigned integer (the most significant
1172 			// bit MUST be 0)
1173 			enforce!WebSocketException(!(length >> 63),
1174 				"Received length has a non-zero most significant bit");
1175 
1176 		}
1177 		logDebug("Read frame: %s %s %s length=%d",
1178 				 frame.opcode,
1179 				 frame.fin ? "final frame" : "continuation",
1180 				 masked ? "masked" : "not masked",
1181 				 length);
1182 
1183 		// Masking key is 32 bits / uint
1184 		if (masked)
1185 			stream.read(data[0 .. 4]);
1186 
1187 		// Read payload
1188 		// TODO: Provide a way to limit the size read, easy
1189 		// DOS for server code here (rejectedsoftware/vibe.d#1496).
1190 		enforce!WebSocketException(length <= size_t.max);
1191 		frame.payload = new ubyte[](cast(size_t)length);
1192 		stream.read(frame.payload);
1193 
1194 		//de-masking
1195 		if (masked)
1196 			foreach (size_t i; 0 .. cast(size_t)length)
1197 				frame.payload[i] = frame.payload[i] ^ data[i % 4];
1198 
1199 		return frame;
1200 	}
1201 }
1202 
1203 unittest {
1204 	import std.algorithm.searching : all;
1205 
1206 	final class DummyRNG : RandomNumberStream {
1207 	@safe:
1208 		@property bool empty() { return false; }
1209 		@property ulong leastSize() { return ulong.max; }
1210 		@property bool dataAvailableForRead() { return true; }
1211 		const(ubyte)[] peek() { return null; }
1212 		size_t read(scope ubyte[] buffer, IOMode mode) @trusted { buffer[] = 13; return buffer.length; }
1213 		alias read = RandomNumberStream.read;
1214 	}
1215 
1216 	ubyte[14] hdrbuf;
1217 	auto rng = new DummyRNG;
1218 
1219 	Frame f;
1220 	f.payload = new ubyte[125];
1221 
1222 	assert(f.getHeaderSize(false) == 2);
1223 	hdrbuf[] = 0;
1224 	f.writeHeader(hdrbuf[0 .. 2], null);
1225 	assert(hdrbuf[0 .. 2] == [0, 125]);
1226 
1227 	assert(f.getHeaderSize(true) == 6);
1228 	hdrbuf[] = 0;
1229 	f.writeHeader(hdrbuf[0 .. 6], rng);
1230 	assert(hdrbuf[0 .. 2] == [0, 128|125]);
1231 	assert(hdrbuf[2 .. 6].all!(b => b == 13));
1232 
1233 	f.payload = new ubyte[126];
1234 	assert(f.getHeaderSize(false) == 4);
1235 	hdrbuf[] = 0;
1236 	f.writeHeader(hdrbuf[0 .. 4], null);
1237 	assert(hdrbuf[0 .. 4] == [0, 126, 0, 126]);
1238 
1239 	assert(f.getHeaderSize(true) == 8);
1240 	hdrbuf[] = 0;
1241 	f.writeHeader(hdrbuf[0 .. 8], rng);
1242 	assert(hdrbuf[0 .. 4] == [0, 128|126, 0, 126]);
1243 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1244 
1245 	f.payload = new ubyte[65535];
1246 	assert(f.getHeaderSize(false) == 4);
1247 	hdrbuf[] = 0;
1248 	f.writeHeader(hdrbuf[0 .. 4], null);
1249 	assert(hdrbuf[0 .. 4] == [0, 126, 255, 255]);
1250 
1251 	assert(f.getHeaderSize(true) == 8);
1252 	hdrbuf[] = 0;
1253 	f.writeHeader(hdrbuf[0 .. 8], rng);
1254 	assert(hdrbuf[0 .. 4] == [0, 128|126, 255, 255]);
1255 	assert(hdrbuf[4 .. 8].all!(b => b == 13));
1256 
1257 	f.payload = new ubyte[65536];
1258 	assert(f.getHeaderSize(false) == 10);
1259 	hdrbuf[] = 0;
1260 	f.writeHeader(hdrbuf[0 .. 10], null);
1261 	assert(hdrbuf[0 .. 10] == [0, 127, 0, 0, 0, 0, 0, 1, 0, 0]);
1262 
1263 	assert(f.getHeaderSize(true) == 14);
1264 	hdrbuf[] = 0;
1265 	f.writeHeader(hdrbuf[0 .. 14], rng);
1266 	assert(hdrbuf[0 .. 10] == [0, 128|127, 0, 0, 0, 0, 0, 1, 0, 0]);
1267 	assert(hdrbuf[10 .. 14].all!(b => b == 13));
1268 }
1269 
1270 /**
1271  * Generate a challenge key for the protocol upgrade phase.
1272  */
1273 private string generateChallengeKey(RandomNumberStream rng)
1274 {
1275 	ubyte[16] buffer;
1276 	rng.read(buffer);
1277 	return Base64.encode(buffer);
1278 }
1279 
1280 private string computeAcceptKey(string challengekey)
1281 {
1282 	immutable(ubyte)[] b = challengekey.representation;
1283 	immutable(ubyte)[] a = s_webSocketGuid.representation;
1284 	SHA1 hash;
1285 	hash.start();
1286 	hash.put(b);
1287 	hash.put(a);
1288 	auto result = Base64.encode(hash.finish());
1289 	return to!(string)(result);
1290 }