1 module appbase.listener; 2 3 import core.thread; 4 import core.sync.mutex; 5 6 import std.bitmanip; 7 import std.socket; 8 import std.file; 9 import std.path; 10 import std.exception; 11 12 import async; 13 import async.container; 14 15 import appbase.utils.log; 16 17 alias RequestCallback = void function(TcpClient, const scope ubyte[]); 18 19 private __gshared ushort _protocolMagic; 20 private __gshared RequestCallback _request; 21 private __gshared OnSendCompleted _onSendCompleted; 22 23 private __gshared ByteBuffer[int] _queue; 24 private __gshared Mutex _lock; 25 26 private __gshared ThreadPool _businessPool; 27 28 void startServer(const ushort port, const int workThreads, const ushort protocolMagic, 29 RequestCallback onRequest, OnSendCompleted onSendCompleted) 30 { 31 _lock = new Mutex(); 32 _businessPool = new ThreadPool(workThreads); 33 34 _protocolMagic = protocolMagic; 35 _request = onRequest; 36 _onSendCompleted = onSendCompleted; 37 38 TcpListener listener = new TcpListener(); 39 listener.bind(new InternetAddress("0.0.0.0", port)); 40 listener.listen(1024); 41 42 EventLoop loop = new EventLoop(listener, &onConnected, &onDisConnected, &onReceive, _onSendCompleted, &onSocketError); 43 loop.run(); 44 } 45 46 private: 47 48 void onConnected(TcpClient client) nothrow @trusted 49 { 50 collectException({ 51 synchronized(_lock) _queue[client.fd] = ByteBuffer(); 52 //writeln("New connection: ", client.remoteAddress().toString()); 53 }()); 54 } 55 56 void onDisConnected(const int fd, string remoteAddress) nothrow @trusted 57 { 58 collectException({ 59 synchronized(_lock) _queue.remove(fd); 60 }()); 61 } 62 63 void onReceive(TcpClient client, const scope ubyte[] data) nothrow @trusted 64 { 65 collectException({ 66 ubyte[] buffer; 67 68 synchronized(_lock) 69 { 70 if (client.fd !in _queue) 71 { 72 logger.write(baseName(thisExePath) ~ " Socket Error: " ~ client.remoteAddress.toString() ~ ", queue key not exists!"); 73 return; 74 } 75 76 _queue[client.fd] ~= data; 77 78 size_t len = findCompleteMessage(client, _queue[client.fd]); 79 if (len == 0) 80 { 81 return; 82 } 83 84 buffer = _queue[client.fd][0 .. len]; 85 _queue[client.fd].popFront(len); 86 } 87 88 _businessPool.run!_request(client, buffer); 89 }()); 90 } 91 92 void onSocketError(const int fd, string remoteAddress, string msg) nothrow @trusted 93 { 94 // collectException({ 95 // logger.write(baseName(thisExePath) ~ " Socket Error: " ~ remoteAddress ~ ", " ~ msg); 96 // }()); 97 } 98 99 size_t findCompleteMessage(TcpClient client, ref ByteBuffer data) 100 { 101 if (data.length < (ushort.sizeof + int.sizeof)) 102 { 103 return 0; 104 } 105 106 ubyte[] head = data[0 .. ushort.sizeof + int.sizeof]; 107 108 if (head.peek!ushort(0) != _protocolMagic) 109 { 110 string remoteAddress = client.remoteAddress().toString(); 111 client.forceClose(); 112 //logger.write(baseName(thisExePath) ~ " Socket Error: " ~ remoteAddress ~ ", An unusual message data!"); 113 114 return 0; 115 } 116 117 size_t len = head.peek!int(ushort.sizeof); 118 119 if (data.length < (len + (ushort.sizeof + int.sizeof))) 120 { 121 return 0; 122 } 123 124 return len + (ushort.sizeof + int.sizeof); 125 }