Describe the bug
The processors are confused when some data_types are saved but required to be processed again.
To Reproduce
The processors are confused by the following code:
import shutil
import straxen
from straxen.test_utils import nt_test_run_id
shutil.rmtree("./strax_test_data", ignore_errors=True)
st = straxen.test_utils.nt_test_context()
st.set_context_config({
"allow_multiprocess": True,
"timeout": 120,
})
st.make(
nt_test_run_id,
"peaklets",
processor=processor,
)
st.make(
nt_test_run_id,
("peaklets", "pulse_counts", "veto_regions"),
processor=processor,
allow_multiple=True,
)
assert st.is_stored(nt_test_run_id, "veto_regions")
The ThreadedMailboxProcessor and SingleThreadProcessor are both confused, so there are two problems.
If processor = "threaded_mailbox", you will see error:
2024-10-13 12:34:07,179 - utilix - DEBUG - Token exists at /home/xudc/.dbtoken
2024-10-13 12:34:07,180 - utilix - DEBUG - Token is valid.
/home/xudc/straxen/straxen/config/preprocessors.py:16: UserWarning: From straxen version 2.1.0 onward, URLConfig parameters will be sorted alphabetically before being passed to the plugins, this will change the lineage hash for non-sorted URLs. To load data processed with non-sorted URLs, you will need to use an older version.
warnings.warn(
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
/home/xudc/strax/strax/processing/general.py:396: UserWarning: endtime of things is not sorted! touching_windows will return the indices of the first and last things which are touching the container.
warnings.warn(
Multiple targets detected! This is only suitable for mass producing dataypes since only ['veto_regions'] will be subscribed in the mailbox system!
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
Exception in thread read_0:PulseProcessing_divide_outputs_mailbox:
Exception in thread discard_records:
Traceback (most recent call last):
Target Mailbox (veto_regions) killed, exception <class 'strax.mailbox.MailboxKilled'>, message (<class 'strax.mailbox.MailBoxAlreadyClosed'>, MailBoxAlreadyClosed("Can't send to closed pulse_counts_mailbox"), <traceback object at 0x7f9941ccd740>)
Exception in thread divide_outputs:veto_regions:
Traceback (most recent call last):
File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 980, in _bootstrap_inner
File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 980, in _bootstrap_inner
Traceback (most recent call last):
File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 980, in _bootstrap_inner
self.run()
self.run()
File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 917, in run
File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 917, in run
self.run()
self._target(*self._args, **self._kwargs)
File "/home/xudc/strax/strax/processors/threaded_mailbox.py", line 220, in discarder
File "/cvmfs/xenon.opensciencegrid.org/releases/nT/el9.2024.09.1/anaconda/envs/XENONnT_el9.2024.09.1/lib/python3.9/threading.py", line 917, in run
self._target(*self._args, **self._kwargs)
File "/home/xudc/strax/strax/mailbox.py", line 519, in divide_outputs
for _ in source:
File "/home/xudc/strax/strax/mailbox.py", line 389, in _read
source.throw(e)
File "/home/xudc/strax/strax/mailbox.py", line 441, in _read
self._target(*self._args, **self._kwargs)
raise MailboxKilled(self.killed_because)
File "/home/xudc/strax/strax/mailbox.py", line 293, in _send_from
strax.mailbox.MailboxKilled: (<class 'strax.mailbox.MailBoxAlreadyClosed'>, MailBoxAlreadyClosed("Can't send to closed pulse_counts_mailbox"), <traceback object at 0x7f9941ccd740>)
self.kill_from_exception(e)
File "/home/xudc/strax/strax/mailbox.py", line 203, in kill_from_exception
self.close()
File "/home/xudc/strax/strax/mailbox.py", line 357, in close
raise e
File "/home/xudc/strax/strax/mailbox.py", line 438, in _read
self.send(StopIteration)
File "/home/xudc/strax/strax/mailbox.py", line 310, in send
yield res
File "/home/xudc/strax/strax/mailbox.py", line 516, in divide_outputs
raise MailboxKilled(self.killed_because)
strax.mailbox.MailboxKilled: (<class 'strax.mailbox.MailBoxAlreadyClosed'>, MailBoxAlreadyClosed("Can't send to closed pulse_counts_mailbox"), <traceback object at 0x7f9941ccd800>)
mailboxes[d].send(x)
File "/home/xudc/strax/strax/mailbox.py", line 307, in send
raise MailBoxAlreadyClosed(f"Can't send to closed {self.name}")
strax.mailbox.MailBoxAlreadyClosed: Can't send to closed pulse_counts_mailbox
You specified _auto_append_rucio_local=True and you are not on dali compute nodes, so we will add the following rucio local path: /project/lgrandi/rucio/
convert_channel:: changed channel
convert_channel_like:: update area_per_channel
convert_channel_like:: update saturated_channel
Traceback (most recent call last):
File "/home/xudc/t.py", line 21, in <module>
st.make(
File "/home/xudc/strax/strax/context.py", line 1755, in make
for _ in self.get_iter(
File "/home/xudc/strax/strax/context.py", line 1646, in get_iter
generator.throw(e)
File "/home/xudc/strax/strax/context.py", line 1613, in get_iter
for n_chunks, result in enumerate(strax.continuity_check(generator), 1):
File "/home/xudc/strax/strax/chunk.py", line 363, in continuity_check
for chunk in chunk_iter:
File "/home/xudc/strax/strax/processors/threaded_mailbox.py", line 304, in iter
raise exc.with_traceback(traceback)
File "/home/xudc/strax/strax/mailbox.py", line 519, in divide_outputs
source.throw(e)
File "/home/xudc/strax/strax/mailbox.py", line 441, in _read
self.kill_from_exception(e)
File "/home/xudc/strax/strax/mailbox.py", line 203, in kill_from_exception
raise e
File "/home/xudc/strax/strax/mailbox.py", line 438, in _read
yield res
File "/home/xudc/strax/strax/mailbox.py", line 516, in divide_outputs
mailboxes[d].send(x)
File "/home/xudc/strax/strax/mailbox.py", line 307, in send
raise MailBoxAlreadyClosed(f"Can't send to closed {self.name}")
strax.mailbox.MailBoxAlreadyClosed: Can't send to closed pulse_counts_mailbox
If processor = "single_thread", you will see error:
2024-10-13 12:47:32,900 - utilix - DEBUG - Token exists at /home/xudc/.dbtoken
2024-10-13 12:47:32,901 - utilix - DEBUG - Token is valid.
/home/xudc/straxen/straxen/config/preprocessors.py:16: UserWarning: From straxen version 2.1.0 onward, URLConfig parameters will be sorted alphabetically before being passed to the plugins, this will change the lineage hash for non-sorted URLs. To load data processed with non-sorted URLs, you will need to use an older version.
warnings.warn(
/home/xudc/straxen/straxen/plugins/records/records.py:467: RuntimeWarning: invalid value encountered in divide
means = baseline_buffer / count
/home/xudc/straxen/straxen/plugins/records/records.py:470: RuntimeWarning: invalid value encountered in divide
res["baseline_rms_mean"][:] = (baseline_rms_buffer / count)[:]
/home/xudc/strax/strax/processing/general.py:396: UserWarning: endtime of things is not sorted! touching_windows will return the indices of the first and last things which are touching the container.
warnings.warn(
Multiple targets detected! This is only suitable for mass producing dataypes since only ['pulse_counts'] will be subscribed in the mailbox system!
You specified _auto_append_rucio_local=True and you are not on dali compute nodes, so we will add the following rucio local path: /project/lgrandi/rucio/
convert_channel:: changed channel
convert_channel_like:: update area_per_channel
convert_channel_like:: update saturated_channel
Traceback (most recent call last):
File "/home/xudc/t.py", line 21, in <module>
st.make(
File "/home/xudc/strax/strax/context.py", line 1755, in make
for _ in self.get_iter(
File "/home/xudc/strax/strax/context.py", line 1594, in get_iter
generator = processor(
File "/home/xudc/strax/strax/processors/single_thread.py", line 38, in __init__
self.post_office.register_producer(
File "/home/xudc/strax/strax/processors/post_office.py", line 131, in register_producer
self.register_producer(iterator, sub_topic)
File "/home/xudc/strax/strax/processors/post_office.py", line 135, in register_producer
raise RuntimeError(f"{topic} already has a producer")
RuntimeError: pulse_counts already has a producer
Expected behavior
No error happens and veto_regions is saved.
Screenshots
If applicable, add screenshots to help explain your problem.
Versions
strax dca3545
straxen 9d2a6b6111b0e43051f53d19fd394c6861465fdb
Describe the bug
The processors are confused when some
data_types are saved but required to be processed again.To Reproduce
The processors are confused by the following code:
The
ThreadedMailboxProcessorandSingleThreadProcessorare both confused, so there are two problems.If
processor = "threaded_mailbox", you will see error:If
processor = "single_thread", you will see error:Expected behavior
No error happens and
veto_regionsis saved.Screenshots
If applicable, add screenshots to help explain your problem.
Versions
strax dca3545
straxen 9d2a6b6111b0e43051f53d19fd394c6861465fdb