Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,31 +130,27 @@ Added functionality to send initial status messages when gathering peers and sta
## [3.0.1] - 2026-01-11

### Fixed:

* lastBroadcastData

---

## [3.0.2] - 2026-01-11

### Added:

* setDataDir & getDataDir

---

## [3.0.3] - 2026-01-11

### Fixed:

* Fix getDataDir() to handle uninitialized $dataDir

---

## [3.0.4] - 2026-01-13

### Added & Fixed:

* Extracted peer filtering from broadcast execution
* Reduced unnecessary processing during broadcasts

Expand All @@ -163,7 +159,6 @@ Added functionality to send initial status messages when gathering peers and sta
## [3.0.5] - 2026-01-18

### Added & Fixed:

* Handle additional RPCErrorException cases

---
Expand All @@ -186,7 +181,6 @@ Added functionality to send initial status messages when gathering peers and sta
## [3.2.0] - 2026-06-13

### Added

- Edit last broadcast message with `editLastBroadcastForAll()`.
- Optional `broadcastId` targeting for editing or deleting the last message of a specific broadcast.
- Metadata peer loading for targeted edit/delete calls when `allUsers` is empty and `broadcastId` is provided.
Expand All @@ -198,7 +192,6 @@ Added functionality to send initial status messages when gathering peers and sta
- Internal error logging to `data/broadcast-errors.log`.

### Changed

- Safer state handling using shared state references by broadcast id.
- Safer cancel behavior: `cancel()` now marks cancellation without clearing in-flight requests.
- `progress()` now includes edit, scheduled, self-destruct, total, elapsed, and TPS fields.
Expand All @@ -207,7 +200,6 @@ Added functionality to send initial status messages when gathering peers and sta
- `deleteAllBroadcastsForAll()` now uses one progress loop instead of concurrent progress edits from workers.

### Fixed

- Pause/resume/cancel state reference issue.
- Workers not stopping after `done`.
- Unsafe watchdog behavior that could duplicate sends.
Expand All @@ -218,16 +210,42 @@ Added functionality to send initial status messages when gathering peers and sta
## [3.2.1] - 2026-06-13

### Added

- Added support for editing last broadcast messages with media loaded from `data/{adminId}/media.txt`.
- Added compatibility for passing saved media values / `botApiFileId` into `editLastBroadcastForAll()`.

### Changed

- Relaxed the `$media` parameter in `BroadcastManager::editLastBroadcastForAll()` so it is no longer limited to `?array`.
- Edit-last-broadcast flow can now reuse the same saved media format used by regular broadcast sending.

### Notes

- Passing `null` as media keeps the existing media unchanged.
- Passing a saved media value attempts to update the edited message media/caption.

---

## [3.2.2] - 2026-06-15

### Changed
* Reduced default broadcast concurrency from `20` to `10`.
* Reduced the maximum allowed concurrency limit from `50` to `30` to reduce pressure on the MadelineProto event loop during large broadcasts.
* Progress status messages are now edited every `5` seconds instead of every second.
* Progress status updates now also perform a final update when the operation reaches completion.
* Slowed down broadcast workers with a small delay between processed jobs.
* Added a delay after each media album chunk sent with `sendMultiMedia`.
* Added a delay between sequential messages sent to the same peer.
* Broadcast control buttons are now displayed in English:
* `Pause`
* `Resume`
* `Cancel`

### Fixed
* Reduced the chance of `Timeout while waiting for updates.getChannelDifference` after heavy broadcasts.
* Reduced unnecessary progress-message edit calls during active broadcasts.
* Prevented noisy logs for harmless `MESSAGE_NOT_MODIFIED` errors during progress updates.
* Improved progress update stability by ignoring unchanged status edits instead of logging them as failures.

### Notes
* This release keeps the custom BroadcastManager flow, including saved message IDs, edit-last-broadcast, delete-last-broadcast, scheduled broadcasts, and self-destruct broadcasts.
* This is a stability and load-reduction update; it does not migrate to MadelineProto's official Broadcast API.

---
22 changes: 12 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# BroadcastManager

**High-Performance Telegram Broadcast Manager** for [MadelineProto](https://docs.madelineproto.xyz/).
Manage Telegram broadcasts efficiently: send text, media, albums, inline buttons, pin/unpin messages, delete broadcasts, edit broadcast, schedule broadcasts, run self-destruct deletion jobs, and track live progress.
Manage Telegram broadcasts efficiently: send text, media, albums, inline buttons, pin/unpin messages, delete previous broadcasts, edit the last broadcast, schedule broadcasts, run self-destruct deletion jobs, and track live progress.

[![AGPL License](https://img.shields.io/badge/license-AGPL--3.0-blue.svg)](LICENSE)
[![Made with PHP](https://img.shields.io/badge/Made%20with-PHP-blue)](https://github.com/WizardLoop/BroadcastManager)
Expand Down Expand Up @@ -94,10 +94,12 @@ $progress = $manager->progress($broadcastId);
Concurrency is clamped internally:

* Minimum: `1`
* Maximum: `50`
* Default: `20`
* Maximum: `30`
* Default: `10`
* Recommended examples: `10`

Live status messages are updated at most once every `5` seconds and once again when the operation finishes.

---

## Message Payloads
Expand Down Expand Up @@ -578,7 +580,7 @@ public function broadcastWithProgress(
array $messages,
$chatId = null,
bool $pin = false,
int $concurrency = 20,
int $concurrency = 10,
?int $selfDestructHours = null
): string;

Expand All @@ -587,8 +589,8 @@ public function editLastBroadcastForAll(
string $newText,
$chatId = null,
?array $buttons = null,
?array $media = null,
int $concurrency = 20,
$media = null,
int $concurrency = 10,
string $parseMode = 'HTML',
?string $broadcastId = null
): string;
Expand All @@ -599,7 +601,7 @@ public function scheduleBroadcastForAll(
int $scheduledAt,
$chatId = null,
bool $pin = false,
int $concurrency = 20,
int $concurrency = 10,
?int $selfDestructHours = null
): string;

Expand All @@ -610,11 +612,11 @@ public function listScheduledBroadcasts(): array;
public function deleteLastBroadcastForAll(
array $allUsers,
$chatId = null,
int $concurrency = 20,
int $concurrency = 10,
?string $broadcastId = null
): string;
public function deleteAllBroadcastsForAll(array $allUsers, $chatId = null, int $concurrency = 20): string;
public function unpinAllMessagesForAll(array $allUsers, $chatId = null, int $concurrency = 20): string;
public function deleteAllBroadcastsForAll(array $allUsers, $chatId = null, int $concurrency = 10): string;
public function unpinAllMessagesForAll(array $allUsers, $chatId = null, int $concurrency = 10): string;

public function runDueSelfDestructJobs(): array;
public function cancelSelfDestructJob(string $jobId): bool;
Expand Down
54 changes: 44 additions & 10 deletions src/BroadcastManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,15 @@
class BroadcastManager
{
private const MAX_ATTEMPTS = 3;
private const DEFAULT_CONCURRENCY = 20;
private const MAX_CONCURRENCY = 50;
private const DEFAULT_CONCURRENCY = 10;
private const MAX_CONCURRENCY = 30;
private const PROGRESS_UPDATE_INTERVAL = 5.0;
private const WORKER_IDLE_SLEEP = 1.0;
private const WORKER_RETRY_SLEEP = 1.0;
private const WORKER_PAUSED_SLEEP = 1.5;
private const WORKER_AFTER_JOB_SLEEP = 0.75;
private const SEND_MESSAGE_SLEEP = 0.35;
private const SEND_CHUNK_SLEEP = 0.75;

private const SEND_HARD_FAIL_RPCS = [
'INPUT_USER_DEACTIVATED',
Expand Down Expand Up @@ -919,25 +926,25 @@ private function startQueueWorkers(
\Amp\async(function () use (&$state, $handler, $hardFailRpcs, $rpcHandler, $retryThrowable): void {
while (!$state['cancel'] && !$state['done']) {
if ($state['queue']->isEmpty()) {
$this->api->sleep(0.5);
$this->api->sleep(self::WORKER_IDLE_SLEEP);
continue;
}

if ($state['paused']) {
$this->api->sleep(1);
$this->api->sleep(self::WORKER_PAUSED_SLEEP);
continue;
}

$job = $state['queue']->dequeue();

if (($job['availableAt'] ?? 0) > microtime(true)) {
$state['queue']->enqueue($job);
$this->api->sleep(0.5);
$this->api->sleep(self::WORKER_RETRY_SLEEP);
continue;
}

while ($state['paused'] && !$state['cancel'] && !$state['done']) {
$this->api->sleep(1);
$this->api->sleep(self::WORKER_PAUSED_SLEEP);
}

if ($state['cancel'] || $state['done']) {
Expand Down Expand Up @@ -981,7 +988,7 @@ private function startQueueWorkers(
}
}

$this->api->sleep(0.25);
$this->api->sleep(self::WORKER_AFTER_JOB_SLEEP);
}
});
}
Expand Down Expand Up @@ -1093,6 +1100,8 @@ private function sendMessagesToPeer(string $peer, array $messages): array
$messageIds[] = $messageId;
}
}

$this->api->sleep(self::SEND_CHUNK_SLEEP);
}

return $messageIds;
Expand All @@ -1115,6 +1124,8 @@ private function sendMessagesToPeer(string $peer, array $messages): array
if ($messageId > 0) {
$messageIds[] = $messageId;
}

$this->api->sleep(self::SEND_MESSAGE_SLEEP);
}

return $messageIds;
Expand Down Expand Up @@ -1642,13 +1653,13 @@ private function buildStatusControls(array $state): ?array

$id = (string) $state['id'];
$toggleAction = !empty($state['paused']) ? 'resume' : 'pause';
$toggleText = !empty($state['paused']) ? '▶️ המשך' : '⏸ השהייה';
$toggleText = !empty($state['paused']) ? 'Resume' : 'Pause';

return [
'inline_keyboard' => [
[
['text' => $toggleText, 'callback_data' => 'bm:' . $toggleAction . ':' . $id],
['text' => '🛑 ביטול', 'callback_data' => 'bm:cancel:' . $id],
['text' => 'Cancel', 'callback_data' => 'bm:cancel:' . $id],
],
],
];
Expand Down Expand Up @@ -1710,6 +1721,20 @@ private function startProgressLoop($chatId, ?int $statusId, array &$state, strin

$this->api->messages->editMessage($payload);
$last = $fingerprint;
} catch (RPCErrorException $e) {
if (
($e->rpc ?? '') === 'MESSAGE_NOT_MODIFIED'
|| str_contains($e->getMessage(), 'MESSAGE_NOT_MODIFIED')
) {
$last = $fingerprint;
$this->api->sleep(self::PROGRESS_UPDATE_INTERVAL);
continue;
}

if ($loggedFailures < 3) {
$loggedFailures++;
$this->logError('Failed to update status message.', $e);
}
} catch (Throwable $e) {
if ($loggedFailures < 3) {
$loggedFailures++;
Expand All @@ -1718,7 +1743,7 @@ private function startProgressLoop($chatId, ?int $statusId, array &$state, strin
}
}

$this->api->sleep(1);
$this->api->sleep(self::PROGRESS_UPDATE_INTERVAL);
}
});
}
Expand All @@ -1742,6 +1767,15 @@ private function editStatusMessage($chatId, ?int $statusId, string $text, ?array
}

$this->api->messages->editMessage($payload);
} catch (RPCErrorException $e) {
if (
($e->rpc ?? '') === 'MESSAGE_NOT_MODIFIED'
|| str_contains($e->getMessage(), 'MESSAGE_NOT_MODIFIED')
) {
return;
}

$this->logError('Failed to edit final status message.', $e);
} catch (Throwable $e) {
$this->logError('Failed to edit final status message.', $e);
}
Expand Down
Loading