-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathwhen-pytorch-datapipes-meets-gcs.html
More file actions
215 lines (170 loc) · 16.2 KB
/
when-pytorch-datapipes-meets-gcs.html
File metadata and controls
215 lines (170 loc) · 16.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta http-equiv="X-UA-Compatible" content="IE=edge" />
<meta name="HandheldFriendly" content="True" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<meta name="robots" content="" />
<link href="https://fonts.googleapis.com/css?family=Source+Code+Pro|Source+Sans+Pro:300,400,400i,700" rel="stylesheet">
<link rel="stylesheet" type="text/css" href="./theme/stylesheet/style.min.css">
<link rel="stylesheet" type="text/css" href="./theme/pygments/github.min.css">
<link rel="stylesheet" type="text/css" href="./theme/font-awesome/css/font-awesome.min.css">
<link href="https://sephib.github.io/feeds/all.atom.xml" type="application/atom+xml" rel="alternate" title="Geo Berry Atom">
<link rel="shortcut icon" href="/images/favicon.ico" type="image/x-icon">
<link rel="icon" href="/images/favicon.ico" type="image/x-icon">
<meta name="author" content="Sephi Berry" />
<meta name="description" content="The reality when using pipeline infrastructure" />
<meta name="keywords" content="python pytorch GCP">
<meta property="og:site_name" content="Geo Berry"/>
<meta property="og:title" content="When Pytorch Datapipes Meets GCS"/>
<meta property="og:description" content="The reality when using pipeline infrastructure"/>
<meta property="og:locale" content="en_US"/>
<meta property="og:url" content="./when-pytorch-datapipes-meets-gcs.html"/>
<meta property="og:type" content="article"/>
<meta property="article:published_time" content="2022-07-06 00:00:00+03:00"/>
<meta property="article:modified_time" content=""/>
<meta property="article:author" content="./author/sephi-berry.html">
<meta property="article:section" content="posts pipeline"/>
<meta property="article:tag" content="python pytorch GCP"/>
<meta property="og:image" content="/images/avatar_osnx.png">
<title>Geo Berry – When Pytorch Datapipes Meets GCS</title>
</head>
<body>
<aside>
<div>
<a href=".">
<img src="/images/avatar_osnx.png" alt="Sephi's Blog" title="Sephi's Blog">
</a>
<h1><a href=".">Sephi's Blog</a></h1>
<p>ML / Data Engineer | Project Manager | Geo-Spatial Specialist</p>
<nav>
<ul class="list">
<li><a href="./pages/about.html#about">About</a></li>
</ul>
</nav>
<ul class="social">
<li><a class="sc-linkedin" href="https://www.linkedin.com/in/berrygis" target="_blank"><i class="fa fa-linkedin"></i></a></li>
<li><a class="sc-github" href="https://github.com/sephib" target="_blank"><i class="fa fa-github"></i></a></li>
<li><a class="sc-twitter" href="https://twitter.com/geosephi" target="_blank"><i class="fa fa-twitter"></i></a></li>
</ul>
</div>
</aside>
<main>
<article class="single">
<header>
<h1 id="when-pytorch-datapipes-meets-gcs">When Pytorch Datapipes Meets GCS</h1>
<p>
Posted on Wed 06 July 2022 in <a href="./category/posts-pipeline.html">posts pipeline</a>
</p>
</header>
<div>
<!-- status: draft -->
<p><a rel="pytorch_gcs_logo"><img src="images/Pytorch_GCS_logo.png" width=700 height=300 /></a> </p>
<h1>Background</h1>
<p>Working in the ML arena requires optimal usage of data, in addition to maximum flexibility while manipulating of datasets. A common way to do so is to use <code>pipeline</code>s that allow for a structural framework to manage these processes. </p>
<p>Lately at <a rel="Artlist logo" href="https://artlist.io"><img src="images/Artlist Logo 64px.png" height=20 /></a> we wanted to run some image manipulations using the <a href="https://pytorch.org">pytorch framework</a>. Since our data are stored in Google Cloud Storage (GCS), we thought that we would be able to use <a href="https://pytorch.org/data/main/index.html">pytorch datapipes</a> as our pipeline framework. Of-the-bat it seem simple since the <a href="https://pytorch.org/data/main/torchdata.datapipes.iter.html#io-datapipes">IO datapipes</a> seems to be comprehensive, however, as usual and to be expected - once the implementation started we were challenged with some technical issues. </p>
<h1>Our Use Case</h1>
<p>We had some images that we needed to convert into embeddings and save them into a bucket. With the inspiration and stimulus of <a href="https://pytorch.org/data/main/examples.html">the documented examples</a> we built our <code>pytorch datapipe</code> as follows: </p>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">image_datapipe</span><span class="p">(</span><span class="n">root_dir</span><span class="p">):</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">FSSpecFileLister</span><span class="p">(</span><span class="n">root</span><span class="o">=</span><span class="n">root_dir</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">open_file_by_fsspec</span><span class="p">(</span><span class="n">mode</span><span class="o">=</span><span class="s2">"rb"</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">PIL_open</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">row_emb_processor</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">post_process</span><span class="p">)</span>
<span class="n">datapipe</span> <span class="o">=</span> <span class="n">datapipe</span><span class="o">.</span><span class="n">save_by_fsspec</span><span class="p">(</span><span class="n">filepath_fn</span><span class="o">=</span><span class="n">filepath_fn</span><span class="p">,</span> <span class="n">mode</span><span class="o">=</span><span class="s2">"wb"</span><span class="p">)</span>
<span class="k">return</span> <span class="n">datapipe</span>
</code></pre></div>
<p>Lets go over the various steps in the pipeline.</p>
<h2>Who care's about security</h2>
<p><code>FSSpecFileLister</code> is an object responsible for accessing the files in a filesystem.</p>
<p>In order to access the <code>GCS</code> filesystem, Pytorch's <code>open_file_by_fsspec</code> function uses the <a href="https://filesystem-spec.readthedocs.io/en/latest/">fsspec</a> library with <a href="https://github.com/pytorch/data/blob/cd38927904836f6f67ce33bfaee094fff4078402/torchdata/datapipes/iter/load/fsspec.py#L128">following code</a> </p>
<div class="highlight"><pre><span></span><code><span class="n">fs</span><span class="p">,</span> <span class="n">path</span> <span class="o">=</span> <span class="n">fsspec</span><span class="o">.</span><span class="n">core</span><span class="o">.</span><span class="n">url_to_fs</span><span class="p">(</span><span class="n">root</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
</code></pre></div>
<ul>
<li>The problem with this function is that it does not take into account the option to access secured buckets, i.e. how to pass credential for authentication in order to access private buckets - <a href="https://discuss.pytorch.org/t/using-a-google-cloud-storage-bucket-for-dataset/146253">see thread on pytorch discuss forum</a>.</li>
</ul>
<p>In order to solve this issue we wrapped the <code>fsspec.core.url_to_fs</code> function into an internal function, while introducing the option to supply the <code>credential token</code>.</p>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">_url_to_fs</span><span class="p">(</span><span class="n">root</span><span class="p">,</span> <span class="n">token</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">fs</span><span class="p">,</span> <span class="n">path</span> <span class="o">=</span> <span class="n">fsspec</span><span class="o">.</span><span class="n">core</span><span class="o">.</span><span class="n">url_to_fs</span><span class="p">(</span><span class="n">root</span><span class="p">,</span> <span class="n">token</span><span class="o">=</span><span class="n">token_path</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kwargs</span><span class="p">)</span>
<span class="c1"># hotfix - since the GCS fsspec implementation can return ('gcs', 'gs') as protocol</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">fs</span><span class="o">.</span><span class="n">protocol</span><span class="p">,</span> <span class="n">Tuple</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">len</span><span class="p">(</span><span class="n">fs</span><span class="o">.</span><span class="n">protocol</span><span class="p">)</span> <span class="o">></span> <span class="mi">1</span><span class="p">:</span>
<span class="n">fs</span><span class="o">.</span><span class="n">protocol</span> <span class="o">=</span> <span class="n">fs</span><span class="o">.</span><span class="n">protocol</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">return</span> <span class="n">fs</span><span class="p">,</span> <span class="n">path</span>
</code></pre></div>
<ul>
<li>BTW for <code>AWS S3</code> there is a dedicated <a href="https://pytorch.org/data/main/generated/torchdata.datapipes.iter.S3FileLister.html#torchdata.datapipes.iter.S3FileLister">S3FileListener handler</a></li>
</ul>
<h2>Now for the rest of the pipeline</h2>
<ul>
<li>Now we can add to our pipeline additional functionality.<br>
First we will open our image as a stream using <code>PIL.Image</code>: </li>
</ul>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">PIL_open</span><span class="p">(</span><span class="n">data</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">path_name</span><span class="o">=</span><span class="n">data</span><span class="p">[</span><span class="mi">0</span><span class="p">],</span>
<span class="n">file_stream</span><span class="o">=</span><span class="n">Image</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">gfs</span><span class="o">.</span><span class="n">open</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="mi">0</span><span class="p">]))</span>
<span class="p">)</span>
</code></pre></div>
<p>The <code>gfs</code> is using the implementation of <code>fsspec</code> for GCS - <a href="https://gcsfs.readthedocs.io/en/latest/">gcsfs</a></p>
<ul>
<li>In the next step we will create embeddings from our image stream. We used a wrapped <a href="https://github.com/openai/CLIP">CLIP model</a> (<code>image_processor</code>). Note: it is always good practice to wrap modular functionality in order to allow for future replacement with a new model version. </li>
</ul>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">row_emb_processor</span><span class="p">(</span><span class="n">data</span><span class="p">):</span>
<span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="o">=</span> <span class="n">image_processor</span><span class="o">.</span><span class="n">embed</span><span class="p">(</span><span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">])</span>
<span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span> <span class="o">=</span> <span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">]</span><span class="o">.</span><span class="n">squeeze</span><span class="p">()</span><span class="o">.</span><span class="n">numpy</span><span class="p">()</span><span class="o">.</span><span class="n">tolist</span><span class="p">()</span>
<span class="k">return</span> <span class="n">data</span>
</code></pre></div>
<ul>
<li>Since we want to save the embeddings in a <code>parquet</code> format, we will use the <code>post_process</code> step for this purpose, while allowing to consume the embeddings as <code>pandas dataframe</code>s. </li>
</ul>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">post_process</span><span class="p">(</span><span class="n">data</span><span class="p">):</span>
<span class="n">df</span> <span class="o">=</span> <span class="n">pd</span><span class="o">.</span><span class="n">DataFrame</span><span class="p">([[</span><span class="n">data</span><span class="p">[</span><span class="mi">1</span><span class="p">]]],</span>
<span class="n">index</span><span class="o">=</span><span class="p">[</span><span class="n">data</span><span class="p">[</span><span class="mi">0</span><span class="p">]],</span>
<span class="n">columns</span><span class="o">=</span><span class="p">[</span><span class="s2">"emb_CLIP"</span><span class="p">],</span>
<span class="p">)</span>
<span class="k">return</span> <span class="p">(</span><span class="n">data</span><span class="p">,</span> <span class="n">df</span><span class="o">.</span><span class="n">to_parquet</span><span class="p">())</span>
</code></pre></div>
<ul>
<li>Lastly, in our final step we will use the <code>save_by_fsspec</code> method to save the embeddings back into a GCS bucket. Since we already fixed the <code>url_to_fs</code> accessing the bucket is straight forward. All we need is to supply the target name of the file. </li>
</ul>
<div class="highlight"><pre><span></span><code><span class="k">def</span> <span class="nf">filepath_fn</span><span class="p">(</span><span class="n">data</span><span class="p">):</span>
<span class="k">return</span> <span class="sa">f</span><span class="s2">"gs://bucket/folder/asset_</span><span class="si">{</span><span class="n">data</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="si">}</span><span class="s2">.parq"</span>
</code></pre></div>
<h1>Additional Thoughts</h1>
<ul>
<li>
<p>Moving the data between the various pipe steps can be made easier when defining a <code>dataclass</code> object - by simply referencing the required property of the image in the various stages.<br>
e.g. instead of calling <code>data[1]</code> for the embedding - way not use <em><code>data.file_stream</code></em>. Hopefully this will be elaborated in a different post.</p>
</li>
<li>
<p>The issue of accessing the bucket securely has been addressed in <a href="https://github.com/pytorch/data/issues/497">this issue</a>.</p>
</li>
</ul>
<h1>Summary</h1>
<p>As a strong advocate for embracing <em>pipelines</em> whenever possible, the implementation of the various pipeline stages can be challenging. </p>
<p>There is no place to accumulate any technical debt in the security realm - thus solving the secure access between <code>pytorch datapipe</code>s and <code>GCS</code> will allow for code reuse and agility in future projects.</p>
</div>
<div class="tag-cloud">
<p>
<a href="./tag/python-pytorch-gcp.html">python pytorch GCP</a>
</p>
</div>
</article>
<footer>
<p>© </p>
<p> Powered by <a href="http://getpelican.com" target="_blank">Pelican</a> - <a href="https://github.com/alexandrevicenzi/flex" target="_blank">Flex</a> theme by <a href="http://alexandrevicenzi.com" target="_blank">Alexandre Vicenzi</a>
</p> </footer>
</main>
<script type="application/ld+json">
{
"@context" : "http://schema.org",
"@type" : "Blog",
"name": " Geo Berry ",
"url" : ".",
"image": "/images/avatar_osnx.png",
"description": "Sephi's Thoughts and Writings"
}
</script>
</body>
</html>