1 /*
2  * Archttp - A highly performant web framework written in D.
3  *
4  * Copyright (C) 2021-2022 Kerisy.com
5  *
6  * Website: https://www.kerisy.com
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 
12 module geario.codec.Framed;
13 
14 import nbuff;
15 
16 import geario.codec.Codec;
17 
18 import geario.net.TcpStream;
19 import geario.net.channel.Types;
20 
21 import geario.logging;
22 
23 alias FrameHandle(DT) = void delegate(DT bufer);
24 
25 /**
26  * DT: Decode Template
27  * ET: Encode Template
28  */
29 class Framed(DT, ET)
30 {
31     private
32     {
33         TcpStream _connection;
34         Codec!(DT, ET) _codec;
35         FrameHandle!DT _handle;
36 
37         Nbuff _receivedBuffer;
38         Nbuff _sendBuffer;
39     }
40 
41     this(TcpStream connection, Codec!(DT, ET) codec)
42     {
43         _codec = codec;
44         _connection = connection;
45 
46         connection.Received(&Received);
47         connection.Writed(&Sended);
48     }
49 
50     private void Received(NbuffChunk bytes)
51     {
52         // log.trace(cast(string) bytes.data());
53         _receivedBuffer.append(bytes);
54 
55         while (true)
56         {
57             DT message;
58             long result = _codec.decoder().Decode(_receivedBuffer, message);
59             if (result == -1)
60             {
61                 log.error("decode error, close the connection.");
62                 _connection.Close();
63                 break;
64             }
65 
66             // Multiple messages, continue decode
67             if (result > 0)
68             {
69                 Handle(message);
70                 if (_receivedBuffer.length() > 0)
71                     continue;
72                 else
73                     break;
74             }
75 
76             if (result == 0)
77             {
78                 // log.warn("waiting data ..");
79                 break;
80             }
81         }
82     }
83 
84     private void Sended(ulong n)
85     {
86         // if (_sendBuffer.length() >= n)
87         // {
88         //     version(GEAR_IO_DEBUG) log.trace("Pop bytes: %d", n);
89 
90         //     _sendBuffer.pop(n);
91         // }
92     }
93 
94     void Handle(DT message)
95     {
96         if (_handle !is null)
97         {
98             _handle(message);
99         }
100     }
101 
102     void OnFrame(FrameHandle!DT handle)
103     {
104         _handle = handle;
105     }
106 
107     void Send(ET message)
108     {
109         NbuffChunk bytes = _codec.encoder().Encode(message);
110 
111         version(GEAR_IO_DEBUG) log.trace("Sending bytes: %d", bytes.length());
112 
113         _connection.Write(bytes);
114     }
115 }