@@ -25,19 +25,19 @@ type (
2525 ReadWriter struct {
2626 client * storage.Client
2727 FileReader io.Reader
28- srcPrefixedBucket * prefixedBucket
29- dstPrefixedBucket * prefixedBucket
28+ srcPrefixedBucket * PrefixedBucket
29+ dstPrefixedBucket * PrefixedBucket
3030 ReadWriters []* ReadWriteCloser
3131 }
3232
3333 Completer struct {
3434 client * storage.Client
35- dstPrefixedBucket * prefixedBucket
35+ dstPrefixedBucket * PrefixedBucket
3636 }
3737
38- prefixedBucket struct {
39- bucket string
40- prefix string
38+ PrefixedBucket struct {
39+ Bucket string
40+ Prefix string
4141 }
4242
4343 // ReadWriteCloser contains the name of the object, its reader and a writer.
@@ -56,6 +56,19 @@ type (
5656 Option func (* bucketOptions )
5757)
5858
59+ // GCSClientOptions is used to set insucure HTTP client for integration tests.
60+ var GCSClientOptions = []option.ClientOption {}
61+
62+ func gcsClientOptions (token string ) []option.ClientOption {
63+ return append (GCSClientOptions , option .WithTokenSource (
64+ oauth2 .StaticTokenSource (
65+ & oauth2.Token {
66+ AccessToken : token ,
67+ },
68+ ),
69+ ))
70+ }
71+
5972// WithReader allows to specify a reader to be used for the bucket.
6073func WithReader (reader io.Reader ) Option {
6174 return func (o * bucketOptions ) {
@@ -77,16 +90,7 @@ func NewBucketCompleter(ctx context.Context, downscopedToken string, dstURL stri
7790 return nil , ErrTokenRequired
7891 }
7992
80- client , err := storage .NewClient (
81- ctx ,
82- option .WithTokenSource (
83- oauth2 .StaticTokenSource (
84- & oauth2.Token {
85- AccessToken : downscopedToken ,
86- },
87- ),
88- ),
89- )
93+ client , err := storage .NewClient (ctx , gcsClientOptions (downscopedToken )... )
9094 if err != nil {
9195 return nil , fmt .Errorf ("failed to create storage client: %w" , err )
9296 }
@@ -105,8 +109,8 @@ func NewBucketCompleter(ctx context.Context, downscopedToken string, dstURL stri
105109// Complete writes a .Completed file to the destination bucket to signal that the transfer is complete.
106110// It then closes the client.
107111func (b * Completer ) Complete (ctx context.Context ) error {
108- dstBucket := b .client .Bucket (b .dstPrefixedBucket .bucket )
109- completedWriter := dstBucket .Object (fmt .Sprintf ("%s/%s" , b .dstPrefixedBucket .prefix , CompletedFile )).NewWriter (ctx )
112+ dstBucket := b .client .Bucket (b .dstPrefixedBucket .Bucket )
113+ completedWriter := dstBucket .Object (fmt .Sprintf ("%s/%s" , b .dstPrefixedBucket .Prefix , CompletedFile )).NewWriter (ctx )
110114 if _ , err := completedWriter .Write ([]byte {}); err != nil {
111115 return fmt .Errorf ("failed to write completed file: %w" , err )
112116 }
@@ -120,8 +124,8 @@ func (b *Completer) Complete(ctx context.Context) error {
120124
121125// Checks if the .Completed file exists in the destination bucket.
122126func (b * Completer ) HasCompleted (ctx context.Context ) (bool , error ) {
123- dstBucket := b .client .Bucket (b .dstPrefixedBucket .bucket )
124- _ , err := dstBucket .Object (fmt .Sprintf ("%s/%s" , b .dstPrefixedBucket .prefix , CompletedFile )).Attrs (ctx )
127+ dstBucket := b .client .Bucket (b .dstPrefixedBucket .Bucket )
128+ _ , err := dstBucket .Object (fmt .Sprintf ("%s/%s" , b .dstPrefixedBucket .Prefix , CompletedFile )).Attrs (ctx )
125129 if errors .Is (err , storage .ErrObjectNotExist ) {
126130 return false , nil
127131 }
@@ -140,16 +144,7 @@ func NewBucketReadWriter(ctx context.Context, downscopedToken string, dstURL str
140144 opt (bucketOption )
141145 }
142146
143- client , err := storage .NewClient (
144- ctx ,
145- option .WithTokenSource (
146- oauth2 .StaticTokenSource (
147- & oauth2.Token {
148- AccessToken : downscopedToken ,
149- },
150- ),
151- ),
152- )
147+ client , err := storage .NewClient (ctx , gcsClientOptions (downscopedToken )... )
153148 if err != nil {
154149 return nil , fmt .Errorf ("failed to create storage client: %w" , err )
155150 }
@@ -193,7 +188,7 @@ func NewBucketReadWriter(ctx context.Context, downscopedToken string, dstURL str
193188}
194189
195190// bucketFromObjectURL parses a valid objectURL and returns a Bucket.
196- func bucketFromObjectURL (objectURL string ) (* prefixedBucket , error ) {
191+ func bucketFromObjectURL (objectURL string ) (* PrefixedBucket , error ) {
197192 url , err := url .Parse (objectURL )
198193 if err != nil {
199194 return nil , err
@@ -203,9 +198,9 @@ func bucketFromObjectURL(objectURL string) (*prefixedBucket, error) {
203198 return nil , fmt .Errorf ("invalid object URL: %s" , objectURL )
204199 }
205200
206- return & prefixedBucket {
207- bucket : url .Host ,
208- prefix : strings .TrimLeft (url .Path , "/" ),
201+ return & PrefixedBucket {
202+ Bucket : url .Host ,
203+ Prefix : strings .TrimLeft (url .Path , "/" ),
209204 }, nil
210205}
211206
@@ -214,10 +209,10 @@ func bucketFromObjectURL(objectURL string) (*prefixedBucket, error) {
214209// It then opens a writer for each object under the same name specified by the destinationPrefix.
215210func (b * ReadWriter ) newObjectReadWriteCloser (ctx context.Context ) error {
216211 logger := zerolog .Ctx (ctx )
217- query := & storage.Query {Prefix : b .srcPrefixedBucket .prefix + "/" }
212+ query := & storage.Query {Prefix : b .srcPrefixedBucket .Prefix + "/" }
218213
219- srcBucket := b .client .Bucket (b .srcPrefixedBucket .bucket )
220- dstBucket := b .client .Bucket (b .dstPrefixedBucket .bucket )
214+ srcBucket := b .client .Bucket (b .srcPrefixedBucket .Bucket )
215+ dstBucket := b .client .Bucket (b .dstPrefixedBucket .Bucket )
221216
222217 it := srcBucket .Objects (ctx , query )
223218 var rwc []* ReadWriteCloser
@@ -227,7 +222,7 @@ func (b *ReadWriter) newObjectReadWriteCloser(ctx context.Context) error {
227222 if errors .Is (err , iterator .Done ) {
228223 break
229224 } else if err != nil {
230- logger .Debug ().Err (err ).Msgf ("failed to list objects from source bucket %s" , b .srcPrefixedBucket .prefix )
225+ logger .Debug ().Err (err ).Msgf ("failed to list objects from source bucket %s" , b .srcPrefixedBucket .Prefix )
231226 return err
232227 }
233228
@@ -243,7 +238,7 @@ func (b *ReadWriter) newObjectReadWriteCloser(ctx context.Context) error {
243238 rwc = append (rwc , & ReadWriteCloser {
244239 name : blobFromObjectName (obj .Name ),
245240 Reader : reader ,
246- Writer : dstBucket .Object (objectPathWithPrefix (obj .Name , b .dstPrefixedBucket .prefix )).NewWriter (ctx ),
241+ Writer : dstBucket .Object (objectPathWithPrefix (obj .Name , b .dstPrefixedBucket .Prefix )).NewWriter (ctx ),
247242 })
248243 }
249244
@@ -254,8 +249,8 @@ func (b *ReadWriter) newObjectReadWriteCloser(ctx context.Context) error {
254249
255250// newObjectWriteCloser creates a new writer for the destination bucket.
256251func (b * ReadWriter ) newObjectWriteCloser (ctx context.Context ) * ReadWriteCloser {
257- dstBucket := b .client .Bucket (b .dstPrefixedBucket .bucket )
258- writer := dstBucket .Object (fmt .Sprintf ("%s/data_%s.csv" , b .dstPrefixedBucket .prefix , shortHex ())).NewWriter (ctx )
252+ dstBucket := b .client .Bucket (b .dstPrefixedBucket .Bucket )
253+ writer := dstBucket .Object (fmt .Sprintf ("%s/data_%s.csv" , b .dstPrefixedBucket .Prefix , shortHex ())).NewWriter (ctx )
259254 return & ReadWriteCloser {
260255 name : CompletedFile ,
261256 Writer : writer ,
0 commit comments