@@ -2,10 +2,12 @@ package buf
22
33import (
44 "io"
5+ "sync"
56 "time"
67
78 "github.com/xtls/xray-core/common/errors"
89 "github.com/xtls/xray-core/common/signal"
10+ "github.com/xtls/xray-core/features/policy"
911 "github.com/xtls/xray-core/features/stats"
1012)
1113
@@ -113,7 +115,12 @@ func Copy(reader Reader, writer Writer, options ...CopyOption) error {
113115 for _ , option := range options {
114116 option (& handler )
115117 }
116- err := copyInternal (reader , writer , & handler )
118+ var err error
119+ if sReader , ok := reader .(* SingleReader ); ok {
120+ err = copyV (sReader , writer , & handler )
121+ } else {
122+ err = copyInternal (reader , writer , & handler )
123+ }
117124 if err != nil && errors .Cause (err ) != io .EOF {
118125 return err
119126 }
@@ -133,3 +140,92 @@ func CopyOnceTimeout(reader Reader, writer Writer, timeout time.Duration) error
133140 }
134141 return writer .WriteMultiBuffer (mb )
135142}
143+
144+ func copyV (r * SingleReader , w Writer , handler * copyHandler ) error {
145+ // channel buffer size is maxBuffer/maxPerPacketLen (ignore the case of many small packets)
146+ // default buffer size:
147+ // 0 in ARM MIPS MIPSLE
148+ // 4kb in ARM64 MIPS64 MIPS64LE
149+ // 512kb in others
150+ channelBuffer := (policy .SessionDefault ().Buffer .PerConnection ) / Size
151+ if channelBuffer <= 0 {
152+ channelBuffer = 4
153+ }
154+ cache := make (chan * Buffer , channelBuffer )
155+ stopRead := make (chan struct {})
156+ var rErr error
157+ var wErr error
158+ wg := sync.WaitGroup {}
159+ wg .Add (2 )
160+ // downlink
161+ go func () {
162+ defer wg .Done ()
163+ defer close (cache )
164+ for {
165+ b , err := r .readBuffer ()
166+ if err == nil {
167+ select {
168+ case cache <- b :
169+ // must be write error
170+ case <- stopRead :
171+ b .Release ()
172+ return
173+ }
174+ } else {
175+ rErr = err
176+ select {
177+ case cache <- b :
178+ case <- stopRead :
179+ b .Release ()
180+ }
181+ return
182+ }
183+ }
184+ }()
185+ // uplink
186+ go func () {
187+ defer wg .Done ()
188+ for {
189+ b , ok := <- cache
190+ if ! ok {
191+ return
192+ }
193+ var buffers = []* Buffer {b }
194+ for stop := false ; ! stop ; {
195+ select {
196+ case b , ok := <- cache :
197+ if ! ok {
198+ stop = true
199+ continue
200+ }
201+ buffers = append (buffers , b )
202+ default :
203+ stop = true
204+ }
205+ }
206+ mb := MultiBuffer (buffers )
207+ err := w .WriteMultiBuffer (mb )
208+ for _ , handler := range handler .onData {
209+ handler (mb )
210+ }
211+ ReleaseMulti (mb )
212+ if err != nil {
213+ wErr = err
214+ close (stopRead )
215+ return
216+ }
217+ }
218+ }()
219+ wg .Wait ()
220+ // drain cache
221+ for b := range cache {
222+ b .Release ()
223+ }
224+ if wErr != nil {
225+ return writeError {wErr }
226+ }
227+ if rErr != nil {
228+ return readError {rErr }
229+ }
230+ return nil
231+ }
0 commit comments