1 module appbase.listener; 2 3 import core.thread; 4 5 import std.socket; 6 import std.file; 7 import std.path; 8 import std.exception; 9 import std.parallelism : totalCPUs; 10 11 import async; 12 13 import appbase.utils.log; 14 15 alias RequestCallback = void function(TcpClient, const scope ubyte[]); 16 17 private __gshared EventLoop _loop; 18 private __gshared ushort _protocolMagic; 19 private __gshared RequestCallback _request; 20 private __gshared OnSendCompleted _onSendCompleted; 21 22 private __gshared ThreadPool _businessPool; 23 24 deprecated("Will be removed in the next release.") 25 void startServer(const ushort port, const int businessThreads, const ushort protocolMagic, 26 RequestCallback onRequest, OnSendCompleted onSendCompleted) 27 { 28 startServer(port, protocolMagic, onRequest, onSendCompleted, businessThreads, 0); 29 } 30 31 void startServer(const ushort port, const ushort protocolMagic, 32 RequestCallback onRequest, OnSendCompleted onSendCompleted, const int businessThreads = 0, const int workerThreads = 0) 33 { 34 startServer("0.0.0.0", port, protocolMagic, onRequest, onSendCompleted, businessThreads, workerThreads); 35 } 36 37 void startServer(const string host, const ushort port, const ushort protocolMagic, 38 RequestCallback onRequest, OnSendCompleted onSendCompleted, const int businessThreads = 0, const int workerThreads = 0) 39 { 40 _businessPool = new ThreadPool((businessThreads < 1) ? (totalCPUs * 2 + 2) : businessThreads); 41 42 _protocolMagic = protocolMagic; 43 _request = onRequest; 44 _onSendCompleted = onSendCompleted; 45 46 TcpListener listener = new TcpListener(); 47 listener.bind(new InternetAddress(host, port)); 48 listener.listen(1024); 49 50 Codec codec = new Codec(CodecType.SizeGuide, protocolMagic); 51 _loop = new EventLoop(listener, &onConnected, &onDisConnected, &onReceive, _onSendCompleted, &onSocketError, codec, workerThreads); 52 _loop.run(); 53 } 54 55 void stopServer() 56 { 57 if (_loop !is null) 58 { 59 _loop.stop(); 60 } 61 } 62 63 private: 64 65 void onConnected(TcpClient client) nothrow @trusted 66 { 67 // collectException({ 68 // writeln("New connection: ", client.remoteAddress().toString()); 69 // }()); 70 } 71 72 void onDisConnected(const int fd, string remoteAddress) nothrow @trusted 73 { 74 // collectException({ 75 // }()); 76 } 77 78 void onReceive(TcpClient client, const scope ubyte[] data) nothrow @trusted 79 { 80 collectException({ 81 _businessPool.run!_request(client, data); 82 }()); 83 } 84 85 void onSocketError(const int fd, string remoteAddress, string msg) nothrow @trusted 86 { 87 // collectException({ 88 // logger.write(baseName(thisExePath) ~ " Socket Error: " ~ remoteAddress ~ ", " ~ msg); 89 // }()); 90 }