Skip to content

Commit e1681f1

Browse files
authored
Merge pull request #4 from gtsystem/linux_fix
Fix issue on linux where if a stage takes to long to start and previous stage exit the processing fails
2 parents 0370636 + c2f2445 commit e1681f1

4 files changed

Lines changed: 43 additions & 12 deletions

File tree

.github/workflows/python-package.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,14 @@ on:
1212
jobs:
1313
build:
1414

15-
runs-on: ubuntu-20.04
15+
runs-on: ubuntu-24.04
1616
strategy:
1717
matrix:
18-
python-version: ["3.7", "3.9", "3.11"]
18+
python-version: ["3.8", "3.9", "3.11"]
1919
steps:
2020
- uses: actions/checkout@v2
2121
- name: Set up Python ${{ matrix.python-version }}
22-
uses: actions/setup-python@v2
22+
uses: actions/setup-python@v5
2323
with:
2424
python-version: ${{ matrix.python-version }}
2525
- name: Install dependencies
@@ -34,8 +34,8 @@ jobs:
3434
run: |
3535
python setup.py bdist_wheel
3636
- name: Archive artifact
37-
uses: actions/upload-artifact@v2
37+
uses: actions/upload-artifact@v4
3838
with:
39-
name: dist
39+
name: dist-${{ matrix.python-version }}
4040
path: |
4141
dist

parallelpipe.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55
\ map /
66
"""
77
from multiprocessing import Process, Queue
8-
from time import sleep
9-
import inspect
10-
import collections
118
from collections.abc import Iterable
9+
import time
1210

1311
import dill
1412

@@ -87,6 +85,11 @@ def run(self):
8785
for i in range(self._num_followers):
8886
put_item(EXIT)
8987
self._que_err.put(EXIT)
88+
while not self._que_out.empty():
89+
time.sleep(0.1)
90+
while not self._que_err.empty():
91+
time.sleep(0.1)
92+
9093

9194
class Stage(object):
9295
"""Represent a pool of parallel tasks that perform the same type of action on the input."""

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
setup(
1414
name='parallelpipe',
15-
version='0.3.0',
15+
version='0.3.1',
1616
author='Giuseppe Tribulato',
1717
author_email='gtsystem@gmail.com',
1818
py_modules=['parallelpipe'],

tests/test_parallelpipe.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import unittest
22
from parallelpipe import Stage, TaskException, stage, map_stage
3+
from time import sleep
34

45
def t1(x, fail_at=None):
56
"""Produce values from the given input iterator.
@@ -124,15 +125,15 @@ def test_exception_propagation(self):
124125
reducer = Stage(t3, sum).setup(workers=2, qsize=3)
125126
pipe = producer | mapper | reducer
126127

127-
with self.assertRaisesRegexp(TaskException, "failed at 200"):
128+
with self.assertRaisesRegex(TaskException, "failed at 200"):
128129
for res in pipe.results():
129130
pass
130131

131132

132133
producer = Stage(t1, range(1000), 10).setup(workers=2, qsize=10)
133134
pipe = producer | mapper | reducer
134135

135-
with self.assertRaisesRegexp(TaskException, "failed at 10"):
136+
with self.assertRaisesRegex(TaskException, "failed at 10"):
136137
for res in pipe.results():
137138
pass
138139

@@ -178,10 +179,37 @@ def consume(n):
178179
res = (range(1000) | fail2(5) | consume).execute()
179180
self.assertEqual(res, 10)
180181

181-
with self.assertRaisesRegexp(TaskException, "failure"):
182+
with self.assertRaisesRegex(TaskException, "failure"):
182183
(range(1000) | fail1(5) | consume).execute()
184+
185+
def test_slow_second_stage(self):
186+
187+
@stage(workers=2)
188+
def mapit(it):
189+
for item in it:
190+
yield item + 1
183191

192+
@stage(workers=1)
193+
def reduce(it):
194+
sleep(3) # simulate a long startup time
195+
tot = 0
196+
for item in it:
197+
tot += item
198+
sleep(2)
199+
yield 5
200+
yield tot
201+
202+
@stage(workers=2)
203+
def write(it):
204+
for item in it:
205+
yield item
206+
207+
res = list(([1] | mapit | reduce | write).results())
208+
self.assertEqual(res, [5, 2])
209+
184210

185211
if __name__ == '__main__':
212+
import multiprocessing
213+
multiprocessing.set_start_method('fork', force=False)
186214
unittest.main()
187215

0 commit comments

Comments
 (0)