1 module vibenotes.broadcast; 2 3 import vibe.core.concurrency : send, receiveOnly; 4 import vibe.core.core: Task, rawYield, runTask; 5 import vibe.core.log: logInfo; 6 import vibe.core.sync: TaskCondition, TaskMutex; 7 import vibe.http.websockets; 8 9 10 class WebSocketBroadcastService { 11 private struct SocketInfo { 12 Task tid; 13 string channel; 14 } 15 16 private { 17 TaskCondition m_signal; 18 SocketInfo[void*] m_sockets; 19 } 20 21 this() { 22 m_signal = new TaskCondition(new TaskMutex); 23 } 24 25 auto getChannelHandler(string channel) 26 { 27 void callback (scope WebSocket socket) { 28 auto sockid = cast(void*)socket; 29 30 auto sendtask = runTask({ 31 while(socket.connected) { 32 auto message = receiveOnly!string(); 33 socket.send(message); 34 } 35 }); 36 37 m_sockets[sockid] = SocketInfo(sendtask, channel); 38 scope (exit) m_sockets.remove(sockid); 39 40 while (socket.waitForData()) { 41 auto data = socket.receiveText(); 42 auto qs = m_sockets.dup; 43 foreach (sid, si; qs) { 44 if (!si.tid.running || sid == sockid) continue; 45 if (si.channel == channel) 46 si.tid.send(cast(string)data); 47 } 48 } 49 } 50 51 return &callback; 52 } 53 54 @property string[] channels() 55 const { 56 int[string] chann; 57 foreach (si; m_sockets) 58 if (si.channel !in chann) 59 chann[si.channel] = 1; 60 return chann.keys; 61 } 62 }