Skip to content
194 changes: 134 additions & 60 deletions cmping.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,114 +116,188 @@ def main():
raise SystemExit(0 if pinger.received == expected_total else 1)


class AccountMaker:
class ProfileMaker:
def __init__(self, dc):
self.dc = dc
self.online = []
self.lock = threading.Lock()

def wait_all_online(self):
# Read list length once at start (atomic operation in Python)
total = len(self.online)
online_count = 0

def print_progress():
print(f"\r# Waiting for {total} profile(s) to come online: {online_count}/{total}", end="", flush=True)

print_progress()

# Make a copy to avoid issues with concurrent modifications
remaining = list(self.online)
while remaining:
ac = remaining.pop()
ac.wait_for_event(EventType.IMAP_INBOX_IDLE)

def _add_online(self, account):
account.start_io()
self.online.append(account)

def get_relay_account(self, domain):
# Try to find an existing account for this domain/IP
for account in self.dc.get_all_accounts():
addr = account.get_config("configured_addr")
if addr is not None:
# Extract the domain/IP from the configured address
addr_domain = addr.split("@")[1] if "@" in addr else None
if addr_domain == domain:
if account not in self.online:
break
else:
account = self.dc.add_account()
qr_url = create_qr_url(domain)
try:
account.set_config_from_qr(qr_url)
except Exception as e:
print(f"✗ Failed to configure account on {domain}: {e}")
raise
profile = remaining.pop()
# Wait for events until profile comes online
# Timeout is handled by caller via try/except around wait_all_online()
while True:
event = profile.wait_for_event()
if event.kind == EventType.IMAP_INBOX_IDLE:
online_count += 1
print_progress()
break
elif event.kind == EventType.ERROR:
# Log error but continue - profile setup may still succeed
print(f"\n✗ ERROR during profile setup: {event.msg}")
print_progress()
elif event.kind == EventType.WARNING:
print(f"\n⚠ WARNING during profile setup: {event.msg}")
print_progress()

print() # Final newline

def _add_online(self, profile):
# Call start_io() to bring profile online
# This is the time-consuming operation that benefits from parallelization
profile.start_io()
# Python list.append() is atomic, no lock needed
self.online.append(profile)

def get_relay_profile(self, domain):
# Lock needed to prevent multiple threads from selecting the same cached profile
# The check-then-act pattern (check if profile in online, then use it) requires atomicity
# All RPC calls (get_all_accounts, add_account, set_config_from_qr) are thread-safe on server side
with self.lock:
# Try to find an existing (cached) profile for this domain/IP
# that is not already online (to allow multiple profiles per domain)
is_cached = False
for profile in self.dc.get_all_accounts():
addr = profile.get_config("configured_addr")
if addr is not None:
# Extract the domain/IP from the configured address
addr_domain = addr.split("@")[1] if "@" in addr else None
if addr_domain == domain:
if profile not in self.online:
# Found a cached profile that we can reuse
is_cached = True
break
# Profile already online, continue looking for another one
else:
# No cached profile found, create a fresh one
profile = self.dc.add_account()
qr_url = create_qr_url(domain)
try:
profile.set_config_from_qr(qr_url)
except Exception as e:
print(f"✗ Failed to configure profile on {domain}: {e}")
raise

# Bring profile online outside the lock to allow parallelization
# start_io() is the time-consuming operation we want to parallelize
try:
self._add_online(account)
self._add_online(profile)
except Exception as e:
print(f"✗ Failed to bring account online for {domain}: {e}")
print(f"✗ Failed to bring profile online for {domain}: {e}")
raise

return account
return profile, is_cached


def perform_ping(args):
accounts_dir = xdg_cache_home().joinpath("cmping")
print(f"# using accounts_dir at: {accounts_dir}")
with Rpc(accounts_dir=accounts_dir) as rpc:
dc = DeltaChat(rpc)
maker = AccountMaker(dc)
maker = ProfileMaker(dc)

# Calculate total accounts needed
total_accounts = 1 + args.numrecipients
accounts_created = 0
# Calculate total profiles needed
total_profiles = 1 + args.numrecipients
profiles_setup = 0
profiles_cached = 0
profiles_created = 0

# Create sender account with progress
# Create sender profile with progress
print(
f"# Setting up accounts: {accounts_created}/{total_accounts}",
f"# Setting up profiles: {profiles_setup}/{total_profiles} (cached: {profiles_cached}, creating: {profiles_created})",
end="",
flush=True,
)
try:
sender = maker.get_relay_account(args.relay1)
accounts_created += 1
sender, is_cached = maker.get_relay_profile(args.relay1)
profiles_setup += 1
if is_cached:
profiles_cached += 1
else:
profiles_created += 1
print(
f"\r# Setting up accounts: {accounts_created}/{total_accounts}",
f"\r# Setting up profiles: {profiles_setup}/{total_profiles} (cached: {profiles_cached}, creating: {profiles_created})",
end="",
flush=True,
)
except Exception as e:
print(f"\r✗ Failed to setup sender account on {args.relay1}: {e}")
print(f"\r✗ Failed to setup sender profile on {args.relay1}: {e}")
sys.exit(1)

# Create receiver accounts with progress
# Create receiver profiles with progress - parallelize fresh profile creation
receivers = []
for i in range(args.numrecipients):
receiver_errors = []
receiver_lock = threading.Lock()

def setup_receiver_profile(i):
"""Setup a single receiver profile"""
nonlocal profiles_setup, profiles_cached, profiles_created
try:
receiver = maker.get_relay_account(args.relay2)
receivers.append(receiver)
accounts_created += 1
print(
f"\r# Setting up accounts: {accounts_created}/{total_accounts}",
end="",
flush=True,
)
# Each thread gets a profile for the domain (either cached or fresh)
# If multiple cached profiles exist, different threads may get different ones
# If no cached profiles exist, new ones are created
receiver, is_cached = maker.get_relay_profile(args.relay2)
with receiver_lock:
receivers.append(receiver)
profiles_setup += 1
if is_cached:
profiles_cached += 1
else:
profiles_created += 1
print(
f"\r# Setting up profiles: {profiles_setup}/{total_profiles} (cached: {profiles_cached}, creating: {profiles_created})",
end="",
flush=True,
)
except Exception as e:
print(
f"\r✗ Failed to setup receiver account {i+1} on {args.relay2}: {e}"
)
sys.exit(1)
with receiver_lock:
receiver_errors.append((i, e))

# Create threads for parallel profile setup
threads = []
for i in range(args.numrecipients):
t = threading.Thread(target=setup_receiver_profile, args=(i,))
t.start()
threads.append(t)

# Wait for all threads to complete
for t in threads:
t.join()

# Check for errors
if receiver_errors:
for i, e in receiver_errors:
print(f"\r✗ Failed to setup receiver profile {i+1} on {args.relay2}: {e}")
sys.exit(1)

# Account setup complete
# Profile setup complete
print(
f"\r# Setting up accounts: {accounts_created}/{total_accounts} - Complete!"
f"\r# Setting up profiles: {profiles_setup}/{total_profiles} (cached: {profiles_cached}, creating: {profiles_created}) - Complete!"
)

# Wait for all accounts to be online with timeout feedback
print("# Waiting for all accounts to be online...", end="", flush=True)
# Wait for all profiles to come online with progress
try:
maker.wait_all_online()
print(" Done!")
except Exception as e:
print(f"\n✗ Timeout or error waiting for accounts to be online: {e}")
print(f"\n✗ Timeout or error waiting for profiles to be online: {e}")
sys.exit(1)

# Create a group chat from sender and add all receivers
group = sender.create_group("cmping")
for receiver in receivers:
# Create a contact for the receiver account and add to group
# Create a contact for the receiver profile and add to group
contact = sender.create_contact(receiver)
group.add_contact(contact)

Expand Down