1 module appbase.listener;
2 
3 import core.thread;
4 
5 import std.socket;
6 import std.file;
7 import std.path;
8 import std.exception;
9 import std.parallelism : totalCPUs;
10 
11 import async;
12 
13 import appbase.utils.log;
14 
15 alias RequestCallback = void function(TcpClient, const scope ubyte[]);
16 
17 private __gshared EventLoop       _loop;
18 private __gshared ushort          _protocolMagic;
19 private __gshared RequestCallback _request;
20 private __gshared OnSendCompleted _onSendCompleted;
21 
22 private __gshared ThreadPool _businessPool;
23 
24 deprecated("Will be removed in the next release.")
25 void startServer(const ushort port, const int businessThreads, const ushort protocolMagic,
26     RequestCallback onRequest, OnSendCompleted onSendCompleted)
27 {
28     startServer(port, protocolMagic, onRequest, onSendCompleted, businessThreads, 0);
29 }
30 
31 void startServer(const ushort port, const ushort protocolMagic,
32     RequestCallback onRequest, OnSendCompleted onSendCompleted, const int businessThreads = 0, const int workerThreads = 0)
33 {
34     startServer("0.0.0.0", port, protocolMagic, onRequest, onSendCompleted, businessThreads, workerThreads);
35 }
36 
37 void startServer(const string host, const ushort port, const ushort protocolMagic,
38     RequestCallback onRequest, OnSendCompleted onSendCompleted, const int businessThreads = 0, const int workerThreads = 0)
39 {
40     _businessPool    = new ThreadPool((businessThreads < 1) ? (totalCPUs * 2 + 2) : businessThreads);
41 
42     _protocolMagic   = protocolMagic;
43     _request         = onRequest;
44     _onSendCompleted = onSendCompleted;
45 
46     TcpListener listener = new TcpListener();
47     listener.bind(new InternetAddress(host, port));
48     listener.listen(1024);
49 
50     Codec codec = new Codec(CodecType.SizeGuide, protocolMagic);
51     _loop = new EventLoop(listener, &onConnected, &onDisConnected, &onReceive, _onSendCompleted, &onSocketError, codec, workerThreads);
52     _loop.run();
53 }
54 
55 void stopServer()
56 {
57     if (_loop !is null)
58     {
59         _loop.stop();
60     }
61 }
62 
63 private:
64 
65 void onConnected(TcpClient client) nothrow @trusted
66 {
67     // collectException({
68     //     writeln("New connection: ", client.remoteAddress().toString());
69     // }());
70 }
71 
72 void onDisConnected(const int fd, string remoteAddress) nothrow @trusted
73 {
74     // collectException({
75     // }());
76 }
77 
78 void onReceive(TcpClient client, const scope ubyte[] data) nothrow @trusted
79 {
80     collectException({
81         _businessPool.run!_request(client, data);
82     }());
83 }
84 
85 void onSocketError(const int fd, string remoteAddress, string msg) nothrow @trusted
86 {
87     // collectException({
88     //     logger.write(baseName(thisExePath) ~ " Socket Error: " ~ remoteAddress ~ ", " ~ msg);
89     // }());
90 }