1 module appbase.listener; 2 3 import core.thread; 4 5 import std.socket; 6 import std.file; 7 import std.path; 8 import std.exception; 9 import std.parallelism : totalCPUs; 10 11 import async; 12 13 import appbase.utils.log; 14 15 alias RequestCallback = void function(TcpClient, const scope ubyte[]); 16 17 private __gshared ushort _protocolMagic; 18 private __gshared RequestCallback _request; 19 private __gshared OnSendCompleted _onSendCompleted; 20 21 private __gshared ThreadPool _businessPool; 22 23 deprecated("Will be removed in the next release.") 24 void startServer(const ushort port, const int businessThreads, const ushort protocolMagic, 25 RequestCallback onRequest, OnSendCompleted onSendCompleted) 26 { 27 startServer(port, protocolMagic, onRequest, onSendCompleted, businessThreads, 0); 28 } 29 30 void startServer(const ushort port, const ushort protocolMagic, 31 RequestCallback onRequest, OnSendCompleted onSendCompleted, const int businessThreads = 0, const int workerThreads = 0) 32 { 33 startServer("0.0.0.0", port, protocolMagic, onRequest, onSendCompleted, businessThreads, workerThreads); 34 } 35 36 void startServer(const string host, const ushort port, const ushort protocolMagic, 37 RequestCallback onRequest, OnSendCompleted onSendCompleted, const int businessThreads = 0, const int workerThreads = 0) 38 { 39 _businessPool = new ThreadPool((businessThreads < 1) ? (totalCPUs * 2 + 2) : businessThreads); 40 41 _protocolMagic = protocolMagic; 42 _request = onRequest; 43 _onSendCompleted = onSendCompleted; 44 45 TcpListener listener = new TcpListener(); 46 listener.bind(new InternetAddress(host, port)); 47 listener.listen(1024); 48 49 Codec codec = new Codec(CodecType.SizeGuide, protocolMagic); 50 EventLoop loop = new EventLoop(listener, &onConnected, &onDisConnected, &onReceive, _onSendCompleted, &onSocketError, codec, workerThreads); 51 loop.run(); 52 } 53 54 private: 55 56 void onConnected(TcpClient client) nothrow @trusted 57 { 58 // collectException({ 59 // writeln("New connection: ", client.remoteAddress().toString()); 60 // }()); 61 } 62 63 void onDisConnected(const int fd, string remoteAddress) nothrow @trusted 64 { 65 // collectException({ 66 // }()); 67 } 68 69 void onReceive(TcpClient client, const scope ubyte[] data) nothrow @trusted 70 { 71 collectException({ 72 _businessPool.run!_request(client, data); 73 }()); 74 } 75 76 void onSocketError(const int fd, string remoteAddress, string msg) nothrow @trusted 77 { 78 // collectException({ 79 // logger.write(baseName(thisExePath) ~ " Socket Error: " ~ remoteAddress ~ ", " ~ msg); 80 // }()); 81 }