Skip to content

Commit e255f32

Browse files
ferguseanclaude
andcommitted
Add 'mix' flag for subscribe request to produce single mixed audio stream
When subscribing to a call with multiple from-tags, adding the 'mix' flag produces a single destination media with audio_player activated to mix all audio sources into one stream, instead of creating separate destination medias per source. Handles bidirectional transitions between mix and non-mix modes on the same to-tag: retiring stale destination medias, deactivating audio_player, and compacting the media array to prevent index holes and unbounded growth across re-subscribe cycles. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b916bdc commit e255f32

5 files changed

Lines changed: 1170 additions & 41 deletions

File tree

daemon/call.c

Lines changed: 228 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4404,6 +4404,13 @@ static int monologue_subscribe_request1(struct call_media *src_media, struct cal
44044404
__auto_type ms = t_hash_table_lookup(ht, src_media);
44054405
if (ms)
44064406
dst_media = ms->media;
4407+
// Don't reuse a dest that was already claimed by another source in this
4408+
// request. This can happen when transitioning from mix mode (one shared
4409+
// dest) to non-mix mode (one dest per source): all ht entries point to
4410+
// the same old dest, but we need separate dests. Check head instead of
4411+
// length because i_queue_delete doesn't update length.
4412+
if (dst_media && dst_media->media_subscriptions.head)
4413+
dst_media = NULL;
44074414
if (!dst_media) {
44084415
// new media needed
44094416
dst_media = call_get_media(dst_ml, &src_media->type, src_media->type_id,
@@ -4486,14 +4493,205 @@ static int monologue_subscribe_request1(struct call_media *src_media, struct cal
44864493
return 0;
44874494
}
44884495

4496+
static void inject_reconfigure_destination_media(struct call_media *, bool, const sdp_ng_flags *);
4497+
4498+
/* called with call->master_lock held in W */
4499+
__attribute__((nonnull(1, 2, 3)))
4500+
static int monologue_subscribe_request_mix(const subscription_q *srms, struct call_monologue *dst_ml,
4501+
sdp_ng_flags *flags)
4502+
{
4503+
g_auto(str_ht) mid_tracker_dst = str_ht_new();
4504+
g_auto(subscription_store_ht) ht = subscription_store_ht_new();
4505+
4506+
__unsubscribe_medias_from_all(dst_ml, ht);
4507+
__call_monologue_init_from_flags(dst_ml, NULL, flags);
4508+
4509+
// For audio sources, create ONE destination media and subscribe all sources to it.
4510+
// For non-audio, fall through to normal per-source handling.
4511+
4512+
struct call_media *dst_audio_media = NULL;
4513+
bool dst_audio_initialized = false;
4514+
4515+
IQUEUE_FOREACH(srms, ms) {
4516+
struct call_media *src_media = ms->media;
4517+
if (!src_media)
4518+
continue;
4519+
4520+
if (src_media->type_id != MT_AUDIO) {
4521+
// non-audio: create separate destination media per source (normal behavior)
4522+
int ret = monologue_subscribe_request1(src_media, dst_ml, flags, ht);
4523+
if (ret)
4524+
return -1;
4525+
continue;
4526+
}
4527+
4528+
// Audio source: all go to one shared destination media
4529+
if (!dst_audio_media) {
4530+
// check all audio sources in ht for an existing dest media
4531+
IQUEUE_FOREACH(srms, check_ms) {
4532+
if (!check_ms->media || check_ms->media->type_id != MT_AUDIO)
4533+
continue;
4534+
__auto_type old_ms = t_hash_table_lookup(ht, check_ms->media);
4535+
if (old_ms) {
4536+
dst_audio_media = old_ms->media;
4537+
break;
4538+
}
4539+
}
4540+
if (!dst_audio_media)
4541+
dst_audio_media = call_get_media(dst_ml, &src_media->type,
4542+
src_media->type_id, NULL, false,
4543+
dst_ml->medias->len + 1, mid_tracker_dst);
4544+
}
4545+
4546+
__add_media_subscription(dst_audio_media, src_media,
4547+
&(struct sink_attrs) { .egress = !!flags->egress });
4548+
4549+
if (flags->rtcp_mirror)
4550+
__add_media_subscription(src_media, dst_audio_media,
4551+
&(struct sink_attrs) { .egress = !!flags->egress, .rtcp_only = true });
4552+
4553+
// Initialize destination media properties from first audio source
4554+
if (!dst_audio_initialized) {
4555+
struct stream_params *sp = &src_media->sp;
4556+
4557+
media_init_from_flags(src_media, flags);
4558+
media_init_from_flags(dst_audio_media, flags);
4559+
media_set_echo(src_media, flags);
4560+
media_set_echo_reverse(dst_audio_media, flags);
4561+
media_set_siprec_label(dst_audio_media, flags, src_media->unique_id);
4562+
media_update_label(dst_audio_media, flags, &src_media->label);
4563+
media_update_type(dst_audio_media, sp);
4564+
media_set_protocol(dst_audio_media, src_media, sp, flags);
4565+
media_gen_media_id(dst_audio_media, flags);
4566+
media_update_flags(dst_audio_media, sp);
4567+
media_update_crypto(dst_audio_media, sp, flags);
4568+
media_copy_format(dst_audio_media, src_media);
4569+
media_set_address_family(dst_audio_media, src_media, flags);
4570+
media_set_ptime(src_media, sp, flags->rev_ptime, flags->ptime);
4571+
media_set_ptime(dst_audio_media, sp, flags->ptime, flags->rev_ptime);
4572+
media_set_extmap(dst_audio_media, &src_media->extmap, media_extmap_strip, flags);
4573+
4574+
codec_store_populate(&dst_audio_media->codecs, &src_media->codecs,
4575+
.allow_asymmetric = !!flags->allow_asymmetric_codecs);
4576+
codec_store_strip(&dst_audio_media->codecs, &flags->codec_strip, flags->codec_except);
4577+
codec_store_strip(&dst_audio_media->codecs, &flags->codec_consume, flags->codec_except);
4578+
codec_store_strip(&dst_audio_media->codecs, &flags->codec_mask, flags->codec_except);
4579+
codec_store_offer(&dst_audio_media->codecs, &flags->codec_offer, &sp->codecs);
4580+
codec_store_transcode(&dst_audio_media->codecs, &flags->codec_transcode, &sp->codecs);
4581+
codec_store_synthesise(&dst_audio_media->codecs, &src_media->codecs);
4582+
4583+
if (!flags->inactive)
4584+
bf_copy(&dst_audio_media->media_flags, MEDIA_FLAG_SEND,
4585+
&src_media->media_flags, SP_FLAG_RECV);
4586+
else
4587+
MEDIA_CLEAR(dst_audio_media, SEND);
4588+
MEDIA_CLEAR(dst_audio_media, RECV);
4589+
4590+
__rtcp_mux_set(flags, dst_audio_media);
4591+
__generate_crypto(flags, dst_audio_media, src_media);
4592+
4593+
unsigned int num_ports = proto_num_ports(sp->num_ports, dst_audio_media, flags, false);
4594+
4595+
__init_interface(dst_audio_media, &flags->interface, num_ports);
4596+
if (dst_audio_media->logical_intf == NULL)
4597+
return -1;
4598+
4599+
__ice_offer(flags, dst_audio_media, src_media,
4600+
ice_is_restart(src_media->ice_agent, sp));
4601+
4602+
struct endpoint_map *em = __get_endpoint_map(dst_audio_media, num_ports,
4603+
NULL, flags, true);
4604+
if (!em)
4605+
return -1;
4606+
4607+
__num_media_streams(dst_audio_media, num_ports);
4608+
4609+
if (!__init_streams(dst_audio_media, NULL, flags))
4610+
return -1;
4611+
4612+
dst_audio_initialized = true;
4613+
} else {
4614+
// Additional audio sources: init the source side
4615+
media_init_from_flags(src_media, flags);
4616+
media_set_echo(src_media, flags);
4617+
media_set_ptime(src_media, &src_media->sp, flags->rev_ptime, flags->ptime);
4618+
}
4619+
4620+
update_init_subscribers(src_media, NULL, NULL, flags->opmode);
4621+
}
4622+
4623+
// Activate audio_player for the mixed destination media
4624+
if (dst_audio_media) {
4625+
MEDIA_SET(dst_audio_media, MIX);
4626+
inject_reconfigure_destination_media(dst_audio_media, true, flags);
4627+
}
4628+
4629+
// Retire stale audio destination medias left over from a previous
4630+
// non-mix topology (e.g. non-mix had 2 audio dests, mix reuses 1).
4631+
// Without this, sdp_create() would emit extra m=audio lines.
4632+
for (unsigned int i = 0; i < dst_ml->medias->len; i++) {
4633+
struct call_media *media = dst_ml->medias->pdata[i];
4634+
if (!media)
4635+
continue;
4636+
if (media == dst_audio_media)
4637+
continue;
4638+
if (media->type_id != MT_AUDIO)
4639+
continue;
4640+
if (media->media_subscriptions.head)
4641+
continue;
4642+
__disable_streams(media, 0);
4643+
dst_ml->medias->pdata[i] = NULL;
4644+
}
4645+
4646+
// Compact: collapse NULL holes so the mixed destination always
4647+
// occupies the lowest index. Without this, monologue_subscribe_answer
4648+
// resolves streams by SDP index (1-based) via call_get_media, so a
4649+
// leading NULL hole causes it to allocate a new empty media instead
4650+
// of finding the active mixed destination.
4651+
unsigned int dst_idx = 0;
4652+
for (unsigned int src_idx = 0; src_idx < dst_ml->medias->len; src_idx++) {
4653+
struct call_media *media = dst_ml->medias->pdata[src_idx];
4654+
if (!media)
4655+
continue;
4656+
if (dst_idx != src_idx) {
4657+
dst_ml->medias->pdata[dst_idx] = media;
4658+
dst_ml->medias->pdata[src_idx] = NULL;
4659+
media->index = dst_idx + 1;
4660+
}
4661+
dst_idx++;
4662+
}
4663+
if (dst_idx < dst_ml->medias->len)
4664+
t_ptr_array_set_size(dst_ml->medias, dst_idx);
4665+
4666+
monologue_open_ports(dst_ml);
4667+
monologue_media_start(dst_ml);
4668+
4669+
return 0;
4670+
}
4671+
44894672
/* called with call->master_lock held in W */
44904673
__attribute__((nonnull(1, 2, 3)))
44914674
int monologue_subscribe_request(const subscription_q *srms, struct call_monologue *dst_ml, sdp_ng_flags *flags) {
4675+
if (flags->mix)
4676+
return monologue_subscribe_request_mix(srms, dst_ml, flags);
4677+
44924678
g_auto(subscription_store_ht) ht = subscription_store_ht_new();
44934679

44944680
__unsubscribe_medias_from_all(dst_ml, ht);
44954681
__call_monologue_init_from_flags(dst_ml, NULL, flags);
44964682

4683+
// Deactivate audio_player on previously mixed medias before reuse
4684+
for (unsigned int i = 0; i < dst_ml->medias->len; i++) {
4685+
struct call_media *media = dst_ml->medias->pdata[i];
4686+
if (!media)
4687+
continue;
4688+
if (MEDIA_ISSET(media, MIX)) {
4689+
MEDIA_CLEAR(media, AUDIO_PLAYER);
4690+
audio_player_stop(media);
4691+
MEDIA_CLEAR(media, MIX);
4692+
}
4693+
}
4694+
44974695
IQUEUE_FOREACH(srms, ms) {
44984696
struct call_media *src_media = ms->media;
44994697
if (!src_media)
@@ -4526,10 +4724,12 @@ int monologue_subscribe_answer(struct call_monologue *dst_ml, sdp_ng_flags *flag
45264724
/* set src_media based on subscription (assuming it is one-to-one)
45274725
* TODO: this should probably be reworked to support one-to-multi subscriptions.
45284726
*/
4529-
__auto_type ms = dst_media->media_subscriptions.head;
4530-
if (!ms)
4727+
__auto_type head_ms = dst_media->media_subscriptions.head;
4728+
if (!head_ms)
45314729
continue;
4532-
struct call_media *src_media = ms->media;
4730+
struct call_media *src_media = head_ms->media;
4731+
4732+
bool is_mix = MEDIA_ISSET(dst_media, MIX);
45334733

45344734
rev_ms = call_get_media_subscription(src_media->media_subscribers_ht, dst_media);
45354735
if (rev_ms)
@@ -4545,7 +4745,7 @@ int monologue_subscribe_answer(struct call_monologue *dst_ml, sdp_ng_flags *flag
45454745
media_set_ptime(dst_media, sp, flags->ptime, 0);
45464746
media_update_extmap(dst_media, sp, NULL, flags);
45474747

4548-
if (flags->allow_transcoding) {
4748+
if (flags->allow_transcoding || is_mix) {
45494749
codec_store_populate(&dst_media->codecs, &sp->codecs,
45504750
.codec_set = flags->codec_set,
45514751
.answer_cs = &src_media->codecs,
@@ -4560,11 +4760,16 @@ int monologue_subscribe_answer(struct call_monologue *dst_ml, sdp_ng_flags *flag
45604760
return -1;
45614761
}
45624762

4563-
codec_handlers_update(src_media, dst_media, .flags = flags,
4564-
.allow_asymmetric = !!flags->allow_asymmetric_codecs);
4565-
codec_handlers_update(dst_media, src_media, .flags = flags, .sp = sp,
4566-
.allow_asymmetric = !!flags->allow_asymmetric_codecs,
4567-
.reset_transcoding = true);
4763+
if (is_mix) {
4764+
/* Mix mode: reconfigure audio_player with answer codecs for all subscriptions */
4765+
inject_reconfigure_destination_media(dst_media, true, flags);
4766+
} else {
4767+
codec_handlers_update(src_media, dst_media, .flags = flags,
4768+
.allow_asymmetric = !!flags->allow_asymmetric_codecs);
4769+
codec_handlers_update(dst_media, src_media, .flags = flags, .sp = sp,
4770+
.allow_asymmetric = !!flags->allow_asymmetric_codecs,
4771+
.reset_transcoding = true);
4772+
}
45684773

45694774
__dtls_logic(flags, dst_media, sp);
45704775

@@ -4578,15 +4783,26 @@ int monologue_subscribe_answer(struct call_monologue *dst_ml, sdp_ng_flags *flag
45784783
MEDIA_SET(dst_media, INITIALIZED);
45794784

45804785
update_init_subscribers(dst_media, sp, flags, flags->opmode);
4581-
update_init_subscribers(src_media, NULL, NULL, flags->opmode);
4786+
4787+
/* For mix mode, update all source subscriptions */
4788+
if (is_mix) {
4789+
IQUEUE_FOREACH(&dst_media->media_subscriptions, sub_ms) {
4790+
struct call_media *sub_src = sub_ms->media;
4791+
update_init_subscribers(sub_src, NULL, NULL, flags->opmode);
4792+
__media_unconfirm(sub_src, "subscribe answer event");
4793+
media_update_transcoding_flag(sub_src);
4794+
}
4795+
} else {
4796+
update_init_subscribers(src_media, NULL, NULL, flags->opmode);
4797+
__media_unconfirm(src_media, "subscribe answer event");
4798+
media_update_transcoding_flag(src_media);
4799+
}
45824800

45834801
__media_unconfirm(dst_media, "subscribe answer event");
4584-
__media_unconfirm(src_media, "subscribe answer event");
45854802

45864803
sdp_sp_move(&dst_media->sp, sp);
45874804

45884805
media_update_transcoding_flag(dst_media);
4589-
media_update_transcoding_flag(src_media);
45904806
}
45914807

45924808
monologue_media_start(dst_ml);

daemon/call_interfaces.c

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,9 @@ void call_ng_flags_flags(str *s, unsigned int idx, helper_arg arg) {
12081208
case CSH_LOOKUP("media handover"):
12091209
out->media_handover = true;
12101210
break;
1211+
case CSH_LOOKUP("mix"):
1212+
out->mix = true;
1213+
break;
12111214
case CSH_LOOKUP("mirror-RTCP"):
12121215
case CSH_LOOKUP("mirror-rtcp"):
12131216
case CSH_LOOKUP("RTCP-mirror"):
@@ -4252,41 +4255,42 @@ const char *call_subscribe_request_ng(ng_command_ctx_t *ctx) {
42524255
if (!dest_media)
42534256
continue;
42544257

4255-
// each media should be subscribed to just one other media
42564258
if (!dest_media->media_subscriptions.length)
42574259
continue;
42584260

4259-
struct call_media *source_media = dest_media->media_subscriptions.head->media;
4260-
struct call_monologue *source_ml = source_media->monologue;
4261+
/* iterate all subscriptions per dest media (supports mix mode
4262+
* where one dest media has multiple source subscriptions) */
4263+
IQUEUE_FOREACH(&dest_media->media_subscriptions, ms) {
4264+
struct call_media *source_media = ms->media;
4265+
struct call_monologue *source_ml = source_media->monologue;
4266+
4267+
parser->list_add_str_dup(from_list, &source_ml->tag);
4268+
4269+
if (media_labels.gen && dest_media->label.len) {
4270+
parser_arg label =
4271+
parser->dict_add_dict(media_labels, dest_media->label.s);
4272+
parser->dict_add_str(label, "tag", &source_ml->tag);
4273+
parser->dict_add_int(label, "index", source_media->index);
4274+
parser->dict_add_str(label, "type", &dest_media->type);
4275+
if (source_ml->label.len)
4276+
parser->dict_add_str(label, "label", &source_ml->label);
4277+
parser->dict_add_string(label, "mode", sdp_get_sendrecv(source_media));
4278+
}
42614279

4262-
parser->list_add_str_dup(from_list, &source_ml->tag);
4280+
if (tag_medias.gen) {
4281+
parser_arg tag_label = parser->list_add_dict(tag_medias);
4282+
parser->dict_add_str(tag_label, "tag", &source_ml->tag);
4283+
if (source_ml->label.len)
4284+
parser->dict_add_str(tag_label, "label", &source_ml->label);
42634285

4264-
if (media_labels.gen && dest_media->label.len) {
4265-
parser_arg label =
4266-
parser->dict_add_dict(media_labels, dest_media->label.s);
4267-
parser->dict_add_str(label, "tag", &source_ml->tag);
4268-
parser->dict_add_int(label, "index", source_media->index);
4269-
parser->dict_add_str(label, "type", &dest_media->type);
4270-
if (source_ml->label.len)
4271-
parser->dict_add_str(label, "label", &source_ml->label);
4272-
parser->dict_add_string(label, "mode", sdp_get_sendrecv(source_media));
4273-
}
4286+
parser_arg medias = parser->dict_add_list(tag_label, "medias");
42744287

4275-
if (tag_medias.gen) {
4276-
parser_arg tag_label = parser->list_add_dict(tag_medias);
4277-
parser->dict_add_str(tag_label, "tag", &source_ml->tag);
4278-
if (source_ml->label.len)
4279-
parser->dict_add_str(tag_label, "label", &source_ml->label);
4280-
4281-
parser_arg medias = parser->dict_add_list(tag_label, "medias");
4282-
4283-
// this is a bit strange because in this mode, each list can only
4284-
// ever get one entry...
4285-
parser_arg med_ent = parser->list_add_dict(medias);
4286-
parser->dict_add_int(med_ent, "index", source_media->index);
4287-
parser->dict_add_str(med_ent, "type", &dest_media->type);
4288-
parser->dict_add_str(med_ent, "label", &dest_media->label);
4289-
parser->dict_add_string(med_ent, "mode", sdp_get_sendrecv(source_media));
4288+
parser_arg med_ent = parser->list_add_dict(medias);
4289+
parser->dict_add_int(med_ent, "index", source_media->index);
4290+
parser->dict_add_str(med_ent, "type", &dest_media->type);
4291+
parser->dict_add_str(med_ent, "label", &dest_media->label);
4292+
parser->dict_add_string(med_ent, "mode", sdp_get_sendrecv(source_media));
4293+
}
42904294
}
42914295
}
42924296

include/call.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ enum {
219219
#define MEDIA_FLAG_SELECT_PT (1LL << 36)
220220
#define MEDIA_FLAG_RECRYPT (1LL << 37)
221221
#define MEDIA_FLAG_PUBLIC (1LL << 38)
222+
#define MEDIA_FLAG_MIX (1LL << 41)
222223
#define MEDIA_FLAG_EXTMAP_SHORT SHARED_FLAG_EXTMAP_SHORT
223224
#define MEDIA_FLAG_BUNDLE_ONLY (1LL << 40)
224225

include/call_interfaces.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ RTPE_NG_FLAGS_STR_CASE_HT_PARAMS
240240
reset:1,
241241
egress:1,
242242
siprec:1,
243+
mix:1,
243244
fragment:1,
244245
record_call:1,
245246
discard_recording:1,

0 commit comments

Comments
 (0)