1 module vibenotes.broadcast;
2 
3 import vibe.core.concurrency : send, receiveOnly;
4 import vibe.core.core: Task, rawYield, runTask;
5 import vibe.core.log: logInfo;
6 import vibe.core.sync: TaskCondition, TaskMutex;
7 import vibe.http.websockets;
8 
9 
10 class WebSocketBroadcastService {
11 	private struct SocketInfo {
12 		Task tid;
13 		string channel;
14 	}
15 
16 	private {
17 		TaskCondition m_signal;
18 		SocketInfo[void*] m_sockets;
19 	}
20 
21 	this() {
22 		m_signal = new TaskCondition(new TaskMutex);
23 	}
24 
25 	auto getChannelHandler(string channel)
26 	{
27 		void callback (scope WebSocket socket) {
28 			auto sockid = cast(void*)socket;
29 
30 			auto sendtask = runTask({
31 				while(socket.connected) {
32 					auto message = receiveOnly!string();
33 					socket.send(message);
34 				}
35 			});
36 
37 			m_sockets[sockid] = SocketInfo(sendtask, channel);
38 			scope (exit) m_sockets.remove(sockid);
39 
40 			while (socket.waitForData()) {
41 				auto data = socket.receiveText();
42 				auto qs = m_sockets.dup;
43 				foreach (sid, si; qs) {
44 					if (!si.tid.running || sid == sockid) continue;
45 					if (si.channel == channel)
46 						si.tid.send(cast(string)data);
47 				}
48 			}
49 		}
50 
51 		return &callback;
52 	}
53 
54 	@property string[] channels()
55 	const {
56 		int[string] chann;
57 		foreach (si; m_sockets)
58 			if (si.channel !in chann)
59 				chann[si.channel] = 1;
60 		return chann.keys;
61 	}		
62 }