Skip to content

Commit 847a73f

Browse files
committed
Try fix leak
1 parent 512176c commit 847a73f

2 files changed

Lines changed: 17 additions & 37 deletions

File tree

datafusion/physical-plan/src/joins/grace_hash_join/exec.rs

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1163,11 +1163,9 @@ fn process_partition_chunk(
11631163
reservation.try_grow(actual_reserved - reserved_bytes)?;
11641164
} else if reserved_bytes > actual_reserved {
11651165
let delta = reserved_bytes - actual_reserved;
1166-
let available = reservation.size();
1167-
if delta > available {
1168-
reservation.shrink(available);
1169-
} else {
1170-
reservation.shrink(delta);
1166+
if reservation.try_shrink(delta).is_err() {
1167+
// If accounting drifted, free whatever is left to avoid leaking
1168+
let _ = reservation.free();
11711169
}
11721170
}
11731171
reserved_bytes = actual_reserved;
@@ -1190,11 +1188,8 @@ fn process_partition_chunk(
11901188
partitions[i].spill_batch_auto(&part_batch, &request_msg, reservation)?;
11911189
}
11921190

1193-
let available = reservation.size();
1194-
if reserved_bytes > available {
1195-
reservation.shrink(available);
1196-
} else {
1197-
reservation.shrink(reserved_bytes);
1191+
if reservation.try_shrink(reserved_bytes).is_err() {
1192+
let _ = reservation.free();
11981193
}
11991194
Ok(())
12001195
}
@@ -1272,11 +1267,8 @@ impl PartitionWriter {
12721267
let total_accounted = self.buffered_bytes;
12731268
let coalesced = concat_batches(&self.schema, &self.buffer)?;
12741269
self.buffer.clear();
1275-
let available = reservation.size();
1276-
if total_accounted > available {
1277-
reservation.shrink(available);
1278-
} else {
1279-
reservation.shrink(total_accounted);
1270+
if reservation.try_shrink(total_accounted).is_err() {
1271+
let _ = reservation.free();
12801272
}
12811273
self.buffered_bytes = 0;
12821274

datafusion/physical-plan/src/joins/grace_hash_join/stream.rs

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -990,20 +990,13 @@ impl GraceHashJoinStream {
990990
};
991991
if bytes_to_free > 0 {
992992
let mut res = reservation.lock();
993-
let available = res.size();
994-
if bytes_to_free > available {
995-
// Don't panic on accounting drift; free what we can and log.
996-
let shrink = available;
997-
if shrink > 0 {
998-
res.shrink(shrink);
999-
}
993+
if res.try_shrink(bytes_to_free).is_err() {
994+
let freed = res.free();
1000995
debug!(
1001-
"Grace hash join reservation underflow: attempted to free {}, but reservation size is {}",
1002-
human_readable_size(bytes_to_free),
1003-
human_readable_size(available)
996+
"Grace hash join reservation underflow: freed {} after shrink failure (requested {})",
997+
human_readable_size(freed),
998+
human_readable_size(bytes_to_free)
1004999
);
1005-
} else {
1006-
res.shrink(bytes_to_free);
10071000
}
10081001
}
10091002
*left_fut = None;
@@ -1223,18 +1216,13 @@ impl GraceHashJoinStream {
12231216
};
12241217
if bytes_to_free > 0 {
12251218
let mut res = self.reservation.lock();
1226-
let available = res.size();
1227-
if bytes_to_free > available {
1228-
if available > 0 {
1229-
res.shrink(available);
1230-
}
1219+
if res.try_shrink(bytes_to_free).is_err() {
1220+
let freed = res.free();
12311221
debug!(
1232-
"Grace hash join reservation underflow on stream completion: attempted to free {}, but reservation size is {}",
1233-
human_readable_size(bytes_to_free),
1234-
human_readable_size(available)
1222+
"Grace hash join stream completion freed {} after shrink failure (requested {})",
1223+
human_readable_size(freed),
1224+
human_readable_size(bytes_to_free)
12351225
);
1236-
} else {
1237-
res.shrink(bytes_to_free);
12381226
}
12391227
}
12401228
if let Some(start) = current_join_start.take() {

0 commit comments

Comments
 (0)