@@ -40,31 +40,45 @@ type natConn struct {
4040 queueMutex sync.Mutex
4141 onDataReady func ()
4242
43+ deadlineMutex sync.Mutex
44+ deadlineTimer * time.Timer
45+ deadlineChan chan struct {}
46+ dataSignal chan struct {}
47+
4348 closeOnce sync.Once
4449 doneChan chan struct {}
4550}
4651
47- func (c * natConn ) ReadPacket (buffer * buf.Buffer ) (addr M.Socksaddr , err error ) {
48- select {
49- case <- c .doneChan :
50- return M.Socksaddr {}, io .ErrClosedPipe
51- default :
52- }
52+ func (c * natConn ) ReadPacket (buffer * buf.Buffer ) (destination M.Socksaddr , err error ) {
53+ for {
54+ select {
55+ case <- c .doneChan :
56+ return M.Socksaddr {}, io .ErrClosedPipe
57+ default :
58+ }
5359
54- c .queueMutex .Lock ()
55- if len (c .dataQueue ) == 0 {
60+ c .queueMutex .Lock ()
61+ if len (c .dataQueue ) > 0 {
62+ packet := c .dataQueue [0 ]
63+ c .dataQueue = c .dataQueue [1 :]
64+ c .queueMutex .Unlock ()
65+ _ , err = buffer .ReadOnceFrom (packet .Buffer )
66+ destination = packet .Destination
67+ packet .Buffer .Release ()
68+ N .PutPacketBuffer (packet )
69+ return
70+ }
5671 c .queueMutex .Unlock ()
57- return M.Socksaddr {}, os .ErrDeadlineExceeded
58- }
59- packet := c .dataQueue [0 ]
60- c .dataQueue = c .dataQueue [1 :]
61- c .queueMutex .Unlock ()
6272
63- _ , err = buffer .ReadOnceFrom (packet .Buffer )
64- destination := packet .Destination
65- packet .Buffer .Release ()
66- N .PutPacketBuffer (packet )
67- return destination , err
73+ select {
74+ case <- c .doneChan :
75+ return M.Socksaddr {}, io .ErrClosedPipe
76+ case <- c .waitDeadline ():
77+ return M.Socksaddr {}, os .ErrDeadlineExceeded
78+ case <- c .dataSignal :
79+ continue
80+ }
81+ }
6882}
6983
7084func (c * natConn ) WritePacket (buffer * buf.Buffer , destination M.Socksaddr ) error {
@@ -79,25 +93,34 @@ func (c *natConn) InitializeReadWaiter(options N.ReadWaitOptions) (needCopy bool
7993}
8094
8195func (c * natConn ) WaitReadPacket () (buffer * buf.Buffer , destination M.Socksaddr , err error ) {
82- select {
83- case <- c .doneChan :
84- return nil , M.Socksaddr {}, io .ErrClosedPipe
85- default :
86- }
96+ for {
97+ select {
98+ case <- c .doneChan :
99+ return nil , M.Socksaddr {}, io .ErrClosedPipe
100+ default :
101+ }
87102
88- c .queueMutex .Lock ()
89- if len (c .dataQueue ) == 0 {
103+ c .queueMutex .Lock ()
104+ if len (c .dataQueue ) > 0 {
105+ packet := c .dataQueue [0 ]
106+ c .dataQueue = c .dataQueue [1 :]
107+ c .queueMutex .Unlock ()
108+ buffer = c .readWaitOptions .Copy (packet .Buffer )
109+ destination = packet .Destination
110+ N .PutPacketBuffer (packet )
111+ return
112+ }
90113 c .queueMutex .Unlock ()
91- return nil , M.Socksaddr {}, os .ErrDeadlineExceeded
92- }
93- packet := c .dataQueue [0 ]
94- c .dataQueue = c .dataQueue [1 :]
95- c .queueMutex .Unlock ()
96114
97- buffer = c .readWaitOptions .Copy (packet .Buffer )
98- destination = packet .Destination
99- N .PutPacketBuffer (packet )
100- return
115+ select {
116+ case <- c .doneChan :
117+ return nil , M.Socksaddr {}, io .ErrClosedPipe
118+ case <- c .waitDeadline ():
119+ return nil , M.Socksaddr {}, os .ErrDeadlineExceeded
120+ case <- c .dataSignal :
121+ continue
122+ }
123+ }
101124}
102125
103126func (c * natConn ) SetHandler (handler N.UDPHandlerEx ) {
@@ -141,6 +164,11 @@ func (c *natConn) PushPacket(packet *N.PacketBuffer) {
141164 callback := c .onDataReady
142165 c .queueMutex .Unlock ()
143166
167+ select {
168+ case c .dataSignal <- struct {}{}:
169+ default :
170+ }
171+
144172 if callback != nil {
145173 callback ()
146174 }
@@ -187,9 +215,52 @@ func (c *natConn) SetDeadline(t time.Time) error {
187215}
188216
189217func (c * natConn ) SetReadDeadline (t time.Time ) error {
218+ c .deadlineMutex .Lock ()
219+ defer c .deadlineMutex .Unlock ()
220+
221+ if c .deadlineTimer != nil && ! c .deadlineTimer .Stop () {
222+ <- c .deadlineChan
223+ }
224+ c .deadlineTimer = nil
225+
226+ if t .IsZero () {
227+ if isClosedChan (c .deadlineChan ) {
228+ c .deadlineChan = make (chan struct {})
229+ }
230+ return nil
231+ }
232+
233+ if duration := time .Until (t ); duration > 0 {
234+ if isClosedChan (c .deadlineChan ) {
235+ c .deadlineChan = make (chan struct {})
236+ }
237+ c .deadlineTimer = time .AfterFunc (duration , func () {
238+ close (c .deadlineChan )
239+ })
240+ return nil
241+ }
242+
243+ if ! isClosedChan (c .deadlineChan ) {
244+ close (c .deadlineChan )
245+ }
190246 return nil
191247}
192248
249+ func (c * natConn ) waitDeadline () chan struct {} {
250+ c .deadlineMutex .Lock ()
251+ defer c .deadlineMutex .Unlock ()
252+ return c .deadlineChan
253+ }
254+
255+ func isClosedChan (channel <- chan struct {}) bool {
256+ select {
257+ case <- channel :
258+ return true
259+ default :
260+ return false
261+ }
262+ }
263+
193264func (c * natConn ) SetWriteDeadline (t time.Time ) error {
194265 return os .ErrInvalid
195266}
0 commit comments