diff --git a/cmping.py b/cmping.py index 852284f..69848a7 100644 --- a/cmping.py +++ b/cmping.py @@ -116,47 +116,89 @@ def main(): raise SystemExit(0 if pinger.received == expected_total else 1) -class AccountMaker: +class ProfileMaker: def __init__(self, dc): self.dc = dc self.online = [] + self.lock = threading.Lock() def wait_all_online(self): + # Read list length once at start (atomic operation in Python) + total = len(self.online) + online_count = 0 + + def print_progress(): + print(f"\r# Waiting for {total} profile(s) to come online: {online_count}/{total}", end="", flush=True) + + print_progress() + + # Make a copy to avoid issues with concurrent modifications remaining = list(self.online) while remaining: - ac = remaining.pop() - ac.wait_for_event(EventType.IMAP_INBOX_IDLE) - - def _add_online(self, account): - account.start_io() - self.online.append(account) - - def get_relay_account(self, domain): - # Try to find an existing account for this domain/IP - for account in self.dc.get_all_accounts(): - addr = account.get_config("configured_addr") - if addr is not None: - # Extract the domain/IP from the configured address - addr_domain = addr.split("@")[1] if "@" in addr else None - if addr_domain == domain: - if account not in self.online: - break - else: - account = self.dc.add_account() - qr_url = create_qr_url(domain) - try: - account.set_config_from_qr(qr_url) - except Exception as e: - print(f"✗ Failed to configure account on {domain}: {e}") - raise + profile = remaining.pop() + # Wait for events until profile comes online + # Timeout is handled by caller via try/except around wait_all_online() + while True: + event = profile.wait_for_event() + if event.kind == EventType.IMAP_INBOX_IDLE: + online_count += 1 + print_progress() + break + elif event.kind == EventType.ERROR: + # Log error but continue - profile setup may still succeed + print(f"\n✗ ERROR during profile setup: {event.msg}") + print_progress() + elif event.kind == EventType.WARNING: + print(f"\n⚠ WARNING during profile setup: {event.msg}") + print_progress() + + print() # Final newline + + def _add_online(self, profile): + # Call start_io() to bring profile online + # This is the time-consuming operation that benefits from parallelization + profile.start_io() + # Python list.append() is atomic, no lock needed + self.online.append(profile) + + def get_relay_profile(self, domain): + # Lock needed to prevent multiple threads from selecting the same cached profile + # The check-then-act pattern (check if profile in online, then use it) requires atomicity + # All RPC calls (get_all_accounts, add_account, set_config_from_qr) are thread-safe on server side + with self.lock: + # Try to find an existing (cached) profile for this domain/IP + # that is not already online (to allow multiple profiles per domain) + is_cached = False + for profile in self.dc.get_all_accounts(): + addr = profile.get_config("configured_addr") + if addr is not None: + # Extract the domain/IP from the configured address + addr_domain = addr.split("@")[1] if "@" in addr else None + if addr_domain == domain: + if profile not in self.online: + # Found a cached profile that we can reuse + is_cached = True + break + # Profile already online, continue looking for another one + else: + # No cached profile found, create a fresh one + profile = self.dc.add_account() + qr_url = create_qr_url(domain) + try: + profile.set_config_from_qr(qr_url) + except Exception as e: + print(f"✗ Failed to configure profile on {domain}: {e}") + raise + # Bring profile online outside the lock to allow parallelization + # start_io() is the time-consuming operation we want to parallelize try: - self._add_online(account) + self._add_online(profile) except Exception as e: - print(f"✗ Failed to bring account online for {domain}: {e}") + print(f"✗ Failed to bring profile online for {domain}: {e}") raise - return account + return profile, is_cached def perform_ping(args): @@ -164,66 +206,98 @@ def perform_ping(args): print(f"# using accounts_dir at: {accounts_dir}") with Rpc(accounts_dir=accounts_dir) as rpc: dc = DeltaChat(rpc) - maker = AccountMaker(dc) + maker = ProfileMaker(dc) - # Calculate total accounts needed - total_accounts = 1 + args.numrecipients - accounts_created = 0 + # Calculate total profiles needed + total_profiles = 1 + args.numrecipients + profiles_setup = 0 + profiles_cached = 0 + profiles_created = 0 - # Create sender account with progress + # Create sender profile with progress print( - f"# Setting up accounts: {accounts_created}/{total_accounts}", + f"# Setting up profiles: {profiles_setup}/{total_profiles} (cached: {profiles_cached}, creating: {profiles_created})", end="", flush=True, ) try: - sender = maker.get_relay_account(args.relay1) - accounts_created += 1 + sender, is_cached = maker.get_relay_profile(args.relay1) + profiles_setup += 1 + if is_cached: + profiles_cached += 1 + else: + profiles_created += 1 print( - f"\r# Setting up accounts: {accounts_created}/{total_accounts}", + f"\r# Setting up profiles: {profiles_setup}/{total_profiles} (cached: {profiles_cached}, creating: {profiles_created})", end="", flush=True, ) except Exception as e: - print(f"\r✗ Failed to setup sender account on {args.relay1}: {e}") + print(f"\r✗ Failed to setup sender profile on {args.relay1}: {e}") sys.exit(1) - # Create receiver accounts with progress + # Create receiver profiles with progress - parallelize fresh profile creation receivers = [] - for i in range(args.numrecipients): + receiver_errors = [] + receiver_lock = threading.Lock() + + def setup_receiver_profile(i): + """Setup a single receiver profile""" + nonlocal profiles_setup, profiles_cached, profiles_created try: - receiver = maker.get_relay_account(args.relay2) - receivers.append(receiver) - accounts_created += 1 - print( - f"\r# Setting up accounts: {accounts_created}/{total_accounts}", - end="", - flush=True, - ) + # Each thread gets a profile for the domain (either cached or fresh) + # If multiple cached profiles exist, different threads may get different ones + # If no cached profiles exist, new ones are created + receiver, is_cached = maker.get_relay_profile(args.relay2) + with receiver_lock: + receivers.append(receiver) + profiles_setup += 1 + if is_cached: + profiles_cached += 1 + else: + profiles_created += 1 + print( + f"\r# Setting up profiles: {profiles_setup}/{total_profiles} (cached: {profiles_cached}, creating: {profiles_created})", + end="", + flush=True, + ) except Exception as e: - print( - f"\r✗ Failed to setup receiver account {i+1} on {args.relay2}: {e}" - ) - sys.exit(1) + with receiver_lock: + receiver_errors.append((i, e)) + + # Create threads for parallel profile setup + threads = [] + for i in range(args.numrecipients): + t = threading.Thread(target=setup_receiver_profile, args=(i,)) + t.start() + threads.append(t) + + # Wait for all threads to complete + for t in threads: + t.join() + + # Check for errors + if receiver_errors: + for i, e in receiver_errors: + print(f"\r✗ Failed to setup receiver profile {i+1} on {args.relay2}: {e}") + sys.exit(1) - # Account setup complete + # Profile setup complete print( - f"\r# Setting up accounts: {accounts_created}/{total_accounts} - Complete!" + f"\r# Setting up profiles: {profiles_setup}/{total_profiles} (cached: {profiles_cached}, creating: {profiles_created}) - Complete!" ) - # Wait for all accounts to be online with timeout feedback - print("# Waiting for all accounts to be online...", end="", flush=True) + # Wait for all profiles to come online with progress try: maker.wait_all_online() - print(" Done!") except Exception as e: - print(f"\n✗ Timeout or error waiting for accounts to be online: {e}") + print(f"\n✗ Timeout or error waiting for profiles to be online: {e}") sys.exit(1) # Create a group chat from sender and add all receivers group = sender.create_group("cmping") for receiver in receivers: - # Create a contact for the receiver account and add to group + # Create a contact for the receiver profile and add to group contact = sender.create_contact(receiver) group.add_contact(contact)