@@ -38,7 +38,7 @@ type WsService struct {
3838// ConnConf default URL is spot websocket
3939type ConnConf struct {
4040 App string
41- subscribeMsg * sync.Map
41+ subscribeMsg sync.Map
4242 URL string
4343 Key string
4444 Secret string
@@ -116,15 +116,14 @@ func NewWsService(ctx context.Context, logger *log.Logger, conf *ConnConf) (*WsS
116116 clientMu : new (sync.Mutex ),
117117 }
118118
119- go ws .activePing ()
119+ ws .keepAlive ()
120120
121121 return ws , nil
122122}
123123
124124func getInitConnConf () * ConnConf {
125125 return & ConnConf {
126126 App : "spot" ,
127- subscribeMsg : new (sync.Map ),
128127 MaxRetryConn : MaxRetryConn ,
129128 Key : "" ,
130129 Secret : "" ,
@@ -165,7 +164,6 @@ func NewConnConfFromOption(op *ConfOptions) *ConnConf {
165164 }
166165 return & ConnConf {
167166 App : op .App ,
168- subscribeMsg : new (sync.Map ),
169167 MaxRetryConn : op .MaxRetryConn ,
170168 Key : op .Key ,
171169 Secret : op .Secret ,
@@ -216,6 +214,11 @@ func (ws *WsService) reconnect() error {
216214
217215 ws .status = connected
218216
217+ // should login when reconnect
218+ if err := ws .login (); err != nil {
219+ ws .Logger .Println ("reconnect login err:%s" , err .Error ())
220+ }
221+
219222 // resubscribe after reconnect
220223 ws .conf .subscribeMsg .Range (func (key , value interface {}) bool {
221224 // key is channel, value is []requestHistory
@@ -313,45 +316,31 @@ func (ws *WsService) GetConnection() *websocket.Conn {
313316 return ws .Client
314317}
315318
316- func (ws * WsService ) activePing () {
317- du , err := time .ParseDuration (ws .conf .PingInterval )
318- if err != nil {
319- ws .Logger .Printf ("failed to parse ping interval: %s, use default ping interval 10s instead" , ws .conf .PingInterval )
320- du , err = time .ParseDuration (DefaultPingInterval )
321- if err != nil {
322- du = time .Second * 10
323- }
324- }
319+ func (ws * WsService ) keepAlive () {
320+ var timeout = 10 * time .Second
321+ ticker := time .NewTicker (timeout )
325322
326- ticker := time .NewTicker (du )
327- defer ticker .Stop ()
328-
329- for {
330- select {
331- case <- ws .Ctx .Done ():
332- return
333- case <- ticker .C :
334- subscribeMap := map [string ]int {}
335- ws .conf .subscribeMsg .Range (func (key , value interface {}) bool {
336- splits := strings .Split (key .(string ), "." )
337- if len (splits ) == 2 {
338- subscribeMap [splits [0 ]] = 1
339- }
340- return true
341- })
323+ lastResponse := time .Now ()
324+ ws .Client .SetPongHandler (func (msg string ) error {
325+ lastResponse = time .Now ()
326+ return nil
327+ })
342328
343- if ws .status != connected {
344- continue
329+ go func () {
330+ defer ticker .Stop ()
331+ for {
332+ ws .mu .Lock ()
333+ err := ws .Client .WriteControl (websocket .PingMessage , []byte {}, time .Now ().Add (10 * time .Second ))
334+ ws .mu .Unlock ()
335+ if err != nil {
336+ ws .Logger .Printf ("send ping err:%s" , err .Error ())
345337 }
346-
347- for app := range subscribeMap {
348- channel := app + ".ping"
349- if err := ws .Subscribe (channel , nil ); err != nil {
350- ws .Logger .Printf ("subscribe channel[%s] failed: %v" , channel , err )
351- }
338+ <- ticker .C
339+ if time .Since (lastResponse ) > 30 * time .Second {
340+ ws .Logger .Printf ("ping timeout, should reconnect" )
352341 }
353342 }
354- }
343+ }()
355344}
356345
357346var statusString = map [status ]string {
0 commit comments