Skip to content

Commit d80f1ff

Browse files
authored
Merge pull request apache#25916 Use built-in csv and json readers.
2 parents 5be2697 + c637363 commit d80f1ff

2 files changed

Lines changed: 43 additions & 16 deletions

File tree

sdks/python/apache_beam/yaml/yaml_provider.py

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -314,22 +314,6 @@ def expand(self, pcolls):
314314
for key in dir(apache_beam.io)
315315
if key.startswith('ReadFrom') or key.startswith('WriteTo')
316316
}
317-
ios['ReadFromCsv'] = lambda **kwargs: apache_beam.dataframe.io.ReadViaPandas(
318-
'csv', **kwargs)
319-
ios['WriteToCsv'] = lambda **kwargs: apache_beam.dataframe.io.WriteViaPandas(
320-
'csv', **kwargs)
321-
ios['ReadFromJson'] = (
322-
lambda *,
323-
orient='records',
324-
lines=True,
325-
**kwargs: apache_beam.dataframe.io.ReadViaPandas(
326-
'json', orient=orient, lines=lines, **kwargs))
327-
ios['WriteToJson'] = (
328-
lambda *,
329-
orient='records',
330-
lines=True,
331-
**kwargs: apache_beam.dataframe.io.WriteViaPandas(
332-
'json', orient=orient, lines=lines, **kwargs))
333317

334318
return InlineProvider(
335319
dict({

sdks/python/apache_beam/yaml/yaml_transform_test.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
# limitations under the License.
1616
#
1717

18+
import glob
1819
import logging
20+
import os
21+
import tempfile
1922
import unittest
2023

2124
import apache_beam as beam
@@ -102,6 +105,46 @@ def test_chain_with_root(self):
102105
''')
103106
assert_that(result, equal_to([41, 43, 47, 53, 61, 71, 83, 97, 113, 131]))
104107

108+
def test_csv_to_json(self):
109+
try:
110+
import pandas as pd
111+
except ImportError:
112+
raise unittest.SkipTest('Pandas not available.')
113+
114+
with tempfile.TemporaryDirectory() as tmpdir:
115+
data = pd.DataFrame([
116+
{
117+
'label': '11a', 'rank': 0
118+
},
119+
{
120+
'label': '37a', 'rank': 1
121+
},
122+
{
123+
'label': '389a', 'rank': 2
124+
},
125+
])
126+
input = os.path.join(tmpdir, 'input.csv')
127+
output = os.path.join(tmpdir, 'output.json')
128+
data.to_csv(input, index=False)
129+
130+
with beam.Pipeline() as p:
131+
result = p | YamlTransform(
132+
'''
133+
type: chain
134+
transforms:
135+
- type: ReadFromCsv
136+
path: %s
137+
- type: WriteToJson
138+
path: %s
139+
num_shards: 1
140+
''' % (repr(input), repr(output)))
141+
142+
output_shard = list(glob.glob(output + "*"))[0]
143+
result = pd.read_json(
144+
output_shard, orient='records',
145+
lines=True).sort_values('rank').reindex()
146+
pd.testing.assert_frame_equal(data, result)
147+
105148

106149
if __name__ == '__main__':
107150
logging.getLogger().setLevel(logging.INFO)

0 commit comments

Comments
 (0)