Skip to content

Commit 4625029

Browse files
committed
fix(server): adjust segment size and improve delete_segments logic
1 parent 4cc2f9f commit 4625029

4 files changed

Lines changed: 56 additions & 46 deletions

File tree

core/integration/tests/server/general.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use integration::iggy_harness;
2525
#[iggy_harness(
2626
test_client_transport = [Tcp, Http, Quic, WebSocket],
2727
server(
28+
segment.size = "1MiB",
2829
tcp.socket.override_defaults = true,
2930
tcp.socket.nodelay = true,
3031
quic.max_idle_timeout = "500s",

core/server/src/http/segments.rs

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,17 @@ use crate::http::COMPONENT;
2020
use crate::http::error::CustomError;
2121
use crate::http::jwt::json_web_token::Identity;
2222
use crate::http::shared::AppState;
23-
use crate::state::command::EntryCommand;
23+
use crate::shard::transmission::frame::ShardResponse;
24+
use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload};
2425
use axum::extract::{Path, Query, State};
2526
use axum::http::StatusCode;
2627
use axum::routing::delete;
2728
use axum::{Extension, Router, debug_handler};
2829
use err_trail::ErrContext;
2930
use iggy_common::Identifier;
30-
use iggy_common::IggyError;
3131
use iggy_common::Validatable;
3232
use iggy_common::delete_segments::DeleteSegments;
33+
use iggy_common::sharding::IggyNamespace;
3334
use send_wrapper::SendWrapper;
3435
use std::sync::Arc;
3536
use tracing::instrument;
@@ -57,44 +58,62 @@ async fn delete_segments(
5758
query.validate()?;
5859
let segments_count = query.segments_count;
5960

60-
let (numeric_stream_id, numeric_topic_id) = state
61-
.shard
62-
.shard()
63-
.resolve_topic_id(&query.stream_id, &query.topic_id)?;
64-
state.shard.shard().metadata.perm_delete_segments(
61+
let partition = state.shard.shard().resolve_partition_for_delete_segments(
6562
identity.user_id,
66-
numeric_stream_id,
67-
numeric_topic_id,
63+
&query.stream_id,
64+
&query.topic_id,
65+
partition_id as usize,
6866
)?;
69-
{
70-
let delete_future = SendWrapper::new(state.shard.shard().delete_segments_base(
71-
numeric_stream_id,
72-
numeric_topic_id,
73-
partition_id as usize,
74-
segments_count,
75-
));
7667

77-
delete_future.await.error(|e: &IggyError| {
78-
format!(
79-
"{COMPONENT} (error: {e}) - failed to delete segments for topic with ID: {topic_id} in stream with ID: {stream_id}"
80-
)
81-
})?
82-
};
68+
let namespace = IggyNamespace::new(
69+
partition.stream_id,
70+
partition.topic_id,
71+
partition.partition_id,
72+
);
73+
let request = ShardRequest::data_plane(
74+
namespace,
75+
ShardRequestPayload::DeleteSegments { segments_count },
76+
);
77+
78+
let delete_future = SendWrapper::new(state.shard.shard().send_to_data_plane(request));
79+
match delete_future.await? {
80+
ShardResponse::DeleteSegments {
81+
deleted_segments,
82+
deleted_messages,
83+
} => {
84+
state
85+
.shard
86+
.shard()
87+
.metrics
88+
.decrement_segments(deleted_segments as u32);
89+
state
90+
.shard
91+
.shard()
92+
.metrics
93+
.decrement_messages(deleted_messages);
94+
}
95+
ShardResponse::ErrorResponse(err) => return Err(err.into()),
96+
_ => unreachable!("Expected DeleteSegments"),
97+
}
8398

84-
let command = EntryCommand::DeleteSegments(DeleteSegments {
99+
let command = DeleteSegments {
85100
stream_id: query.stream_id.clone(),
86101
topic_id: query.topic_id.clone(),
87102
partition_id: query.partition_id,
88103
segments_count,
89-
});
90-
let state_future =
91-
SendWrapper::new(state.shard.shard().state.apply(identity.user_id, &command));
92-
93-
state_future.await
94-
.error(|e: &IggyError| {
95-
format!(
96-
"{COMPONENT} (error: {e}) - failed to apply delete segments, stream ID: {stream_id}, topic ID: {topic_id}"
97-
)
98-
})?;
104+
};
105+
let entry_command = crate::state::command::EntryCommand::DeleteSegments(command);
106+
let state_future = SendWrapper::new(
107+
state
108+
.shard
109+
.shard()
110+
.state
111+
.apply(identity.user_id, &entry_command),
112+
);
113+
state_future.await.error(|e: &iggy_common::IggyError| {
114+
format!(
115+
"{COMPONENT} (error: {e}) - failed to apply delete segments, stream ID: {stream_id}, topic ID: {topic_id}"
116+
)
117+
})?;
99118
Ok(StatusCode::NO_CONTENT)
100119
}

core/server/src/shard/system/segments.rs

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,7 @@ impl IggyShard {
352352
let namespace = IggyNamespace::new(stream_id, topic_id, partition_id);
353353

354354
// Drain segments from local_partitions
355-
let (segments, storages, stats, deleted_segments_count) = {
356-
355+
let (segments, storages, stats) = {
357356
let mut partitions = self.local_partitions.borrow_mut();
358357
let partition = partitions
359358
.get_mut(&namespace)
@@ -375,16 +374,8 @@ impl IggyShard {
375374
.indexes_mut()
376375
.drain(..upperbound)
377376
.collect::<Vec<_>>();
378-
(
379-
segments,
380-
storages,
381-
partition.stats.clone(),
382-
(upperbound - begin) as u32,
383-
)
377+
(segments, storages, partition.stats.clone())
384378
};
385-
if deleted_segments_count == 0 {
386-
return Ok(());
387-
}
388379

389380
for (mut storage, segment) in storages.into_iter().zip(segments.into_iter()) {
390381
let (msg_writer, index_writer) = storage.shutdown();
@@ -424,7 +415,6 @@ impl IggyShard {
424415
}
425416

426417
self.init_log_in_local_partitions(&namespace).await?;
427-
stats.decrement_segments_count(deleted_segments_count);
428418
stats.increment_segments_count(1);
429419
Ok(())
430420
}

web/docs/server-api.http

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJqdGkiOiJmODM0NTI5M
6666
"max_topic_size": "1000000000"
6767
}
6868

69-
69+
7070
### Create partitions
7171
POST {{url}}/streams/3/topics/3/partitions
7272
Content-Type: application/json

0 commit comments

Comments
 (0)