@@ -3,8 +3,10 @@ package diskqueue
33import (
44 "bufio"
55 "bytes"
6+ "crypto/rand"
67 "fmt"
78 "io/ioutil"
9+ "log"
810 "os"
911 "path"
1012 "path/filepath"
@@ -696,3 +698,80 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
696698 <- dq .ReadChan ()
697699 }
698700}
701+
702+ func TestDiskQueue_ReadChan (t * testing.T ) {
703+ dataDir := "./testdata/dat"
704+ err := os .MkdirAll (dataDir , 0o755 )
705+ if err != nil {
706+ t .Fatal (err )
707+ }
708+ defer os .RemoveAll (dataDir )
709+
710+ var (
711+ megabyte int64 = 1 << 20
712+ datCount = 1112
713+ )
714+
715+ dq := New ("nsqio_diskqueue" , dataDir , 128 * megabyte , 0 , int32 (16 * megabyte ),
716+ 32 * megabyte , time .Second * 5 , func (lvl LogLevel , f string , args ... interface {}) {
717+ if lvl >= WARN {
718+ t .Errorf (f , args )
719+ return
720+ }
721+ log .Println (lvl , fmt .Sprintf (f , args ... ))
722+ })
723+
724+ buf := make ([]byte , 3231197 )
725+ n , err := rand .Read (buf )
726+ if err != nil {
727+ t .Fatal (err )
728+ }
729+ if n != len (buf ) {
730+ t .Fatal ("buf is not full" )
731+ }
732+
733+ pushExit := make (chan struct {})
734+ go func () {
735+ for i := 0 ; i < datCount ; i ++ {
736+ if err := dq .Put (buf ); err != nil {
737+ t .Error (err )
738+ return
739+ }
740+ }
741+ close (pushExit )
742+ }()
743+
744+ var wg sync.WaitGroup
745+ wg .Add (5 )
746+
747+ var counter atomic.Int64
748+
749+ for i := 0 ; i < 5 ; i ++ {
750+ go func () {
751+ defer wg .Done ()
752+ for {
753+ select {
754+ case data := <- dq .ReadChan ():
755+ if bytes .Compare (buf , data ) != 0 {
756+ t .Error ("get corrupt msg" )
757+ return
758+ }
759+ counter .Add (1 )
760+ case <- pushExit :
761+ if dq .Depth () == 0 {
762+ return
763+ }
764+ }
765+ }
766+ }()
767+ }
768+
769+ wg .Wait ()
770+
771+ if counter .Load () != int64 (datCount ) {
772+ t .Fatal ("push message count not equals get message count" )
773+ }
774+ if err := dq .Close (); err != nil {
775+ t .Fatal (err )
776+ }
777+ }
0 commit comments