1 module appbase.listener;
2 
3 import core.thread;
4 import core.sync.mutex;
5 
6 import std.bitmanip;
7 import std.socket;
8 import std.file;
9 import std.path;
10 import std.exception;
11 
12 import async;
13 import async.container;
14 
15 import appbase.utils.log;
16 
17 alias RequestCallback = void function(TcpClient, const scope ubyte[]);
18 
19 private __gshared ushort             _protocolMagic;
20 private __gshared RequestCallback    _request;
21 private __gshared OnSendCompleted    _onSendCompleted;
22 
23 private __gshared ByteBuffer[int]    _queue;
24 private __gshared Mutex              _lock;
25 
26 private __gshared ThreadPool         _businessPool;
27 
28 void startServer(const ushort port, const int workThreads, const ushort protocolMagic,
29     RequestCallback onRequest, OnSendCompleted onSendCompleted)
30 {
31     _lock            = new Mutex();
32     _businessPool    = new ThreadPool(workThreads);
33 
34     _protocolMagic   = protocolMagic;
35     _request         = onRequest;
36     _onSendCompleted = onSendCompleted;
37 
38     TcpListener listener = new TcpListener();
39     listener.bind(new InternetAddress("0.0.0.0", port));
40     listener.listen(1024);
41 
42     EventLoop loop = new EventLoop(listener, &onConnected, &onDisConnected, &onReceive, _onSendCompleted, &onSocketError);
43     loop.run();
44 }
45 
46 private:
47 
48 void onConnected(TcpClient client) nothrow @trusted
49 {
50     collectException({
51         synchronized(_lock) _queue[client.fd] = ByteBuffer();
52         //writeln("New connection: ", client.remoteAddress().toString());
53     }());
54 }
55 
56 void onDisConnected(const int fd, string remoteAddress) nothrow @trusted
57 {
58     collectException({
59         synchronized(_lock) _queue.remove(fd);
60     }());
61 }
62 
63 void onReceive(TcpClient client, const scope ubyte[] data) nothrow @trusted
64 {
65     collectException({
66         ubyte[] buffer;
67 
68         synchronized(_lock)
69         {
70             if (client.fd !in _queue)
71             {
72                 logger.write(baseName(thisExePath) ~ " Socket Error: " ~ client.remoteAddress.toString() ~ ", queue key not exists!");
73                 return;
74             }
75 
76             _queue[client.fd] ~= data;
77 
78             size_t len = findCompleteMessage(client, _queue[client.fd]);
79             if (len == 0)
80             {
81                 return;
82             }
83 
84             buffer = _queue[client.fd][0 .. len];
85             _queue[client.fd].popFront(len);
86         }
87 
88         _businessPool.run!_request(client, buffer);
89     }());
90 }
91 
92 void onSocketError(const int fd, string remoteAddress, string msg) nothrow @trusted
93 {
94     // collectException({
95     //     logger.write(baseName(thisExePath) ~ " Socket Error: " ~ remoteAddress ~ ", " ~ msg);
96     // }());
97 }
98 
99 size_t findCompleteMessage(TcpClient client, ref ByteBuffer data)
100 {
101     if (data.length < (ushort.sizeof + int.sizeof))
102     {
103         return 0;
104     }
105 
106     ubyte[] head = data[0 .. ushort.sizeof + int.sizeof];
107 
108     if (head.peek!ushort(0) != _protocolMagic)
109     {
110         string remoteAddress = client.remoteAddress().toString();
111         client.forceClose();
112         //logger.write(baseName(thisExePath) ~ " Socket Error: " ~ remoteAddress ~ ", An unusual message data!");
113 
114         return 0;
115     }
116 
117     size_t len = head.peek!int(ushort.sizeof);
118 
119     if (data.length < (len + (ushort.sizeof + int.sizeof)))
120     {
121         return 0;
122     }
123 
124     return len + (ushort.sizeof + int.sizeof);
125 }