1 module appbase.listener;
2 
3 import core.thread;
4 
5 import std.socket;
6 import std.file;
7 import std.path;
8 import std.exception;
9 import std.parallelism : totalCPUs;
10 
11 import async;
12 
13 import appbase.utils.log;
14 
15 alias RequestCallback = void function(TcpClient, const scope ubyte[]);
16 
17 private __gshared ushort             _protocolMagic;
18 private __gshared RequestCallback    _request;
19 private __gshared OnSendCompleted    _onSendCompleted;
20 
21 private __gshared ThreadPool         _businessPool;
22 
23 deprecated("Will be removed in the next release.")
24 void startServer(const ushort port, const int businessThreads, const ushort protocolMagic,
25     RequestCallback onRequest, OnSendCompleted onSendCompleted)
26 {
27     startServer(port, protocolMagic, onRequest, onSendCompleted, businessThreads, 0);
28 }
29 
30 void startServer(const ushort port, const ushort protocolMagic,
31     RequestCallback onRequest, OnSendCompleted onSendCompleted, const int businessThreads = 0, const int workerThreads = 0)
32 {
33     startServer("0.0.0.0", port, protocolMagic, onRequest, onSendCompleted, businessThreads, workerThreads);
34 }
35 
36 void startServer(const string host, const ushort port, const ushort protocolMagic,
37     RequestCallback onRequest, OnSendCompleted onSendCompleted, const int businessThreads = 0, const int workerThreads = 0)
38 {
39     _businessPool    = new ThreadPool((businessThreads < 1) ? (totalCPUs * 2 + 2) : businessThreads);
40 
41     _protocolMagic   = protocolMagic;
42     _request         = onRequest;
43     _onSendCompleted = onSendCompleted;
44 
45     TcpListener listener = new TcpListener();
46     listener.bind(new InternetAddress(host, port));
47     listener.listen(1024);
48 
49     Codec codec = new Codec(CodecType.SizeGuide, protocolMagic);
50     EventLoop loop = new EventLoop(listener, &onConnected, &onDisConnected, &onReceive, _onSendCompleted, &onSocketError, codec, workerThreads);
51     loop.run();
52 }
53 
54 private:
55 
56 void onConnected(TcpClient client) nothrow @trusted
57 {
58     // collectException({
59     //     writeln("New connection: ", client.remoteAddress().toString());
60     // }());
61 }
62 
63 void onDisConnected(const int fd, string remoteAddress) nothrow @trusted
64 {
65     // collectException({
66     // }());
67 }
68 
69 void onReceive(TcpClient client, const scope ubyte[] data) nothrow @trusted
70 {
71     collectException({
72         _businessPool.run!_request(client, data);
73     }());
74 }
75 
76 void onSocketError(const int fd, string remoteAddress, string msg) nothrow @trusted
77 {
78     // collectException({
79     //     logger.write(baseName(thisExePath) ~ " Socket Error: " ~ remoteAddress ~ ", " ~ msg);
80     // }());
81 }