Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 20 additions & 33 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,6 @@ impl Ingester {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ShardClosed as i32,
};
persist_failures.push(persist_failure);
Expand All @@ -487,35 +486,25 @@ impl Ingester {
let mut total_requested_capacity = ByteSize::b(0);

for subrequest in persist_request.subrequests {
let queue_id = subrequest.queue_id();

let Some(shard) = state_guard.shards.get_mut(&queue_id) else {
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ShardNotFound as i32,
};
persist_failures.push(persist_failure);
continue;
let queue_id = match state_guard.find_open_queue(subrequest.index_uid.clone().unwrap(), subrequest.source_id.clone()) {
Err(err) => {
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
reason: err as i32,
};
persist_failures.push(persist_failure);
continue;
}
Ok(shard) => shard
};
let shard = state_guard.shards.get_mut(&queue_id).unwrap();

// A router can only know about a newly opened shard if it has been informed by the
// control plane, which confirms that the shard was correctly opened in the
// metastore.
shard.is_advertisable = true;

if shard.is_closed() {
let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ShardClosed as i32,
};
persist_failures.push(persist_failure);
continue;
}
let doc_mapper = shard.doc_mapper_opt.clone().expect("shard should be open");
let validate_docs = shard.validate_docs;
let follower_id_opt = shard.follower_id_opt().cloned();
Expand Down Expand Up @@ -545,7 +534,6 @@ impl Ingester {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::WalFull as i32,
};
persist_failures.push(persist_failure);
Expand All @@ -556,14 +544,15 @@ impl Ingester {
.get_mut(&queue_id)
.expect("rate limiter should be initialized");

// Because we return the shard with the most available capacity, if this hits, it
// means that no shard can receive this request, and it should be retried.
if !rate_limiter.acquire_bytes(requested_capacity) {
debug!("failed to persist records to shard `{queue_id}`: rate limited");

let persist_failure = PersistFailure {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: PersistFailureReason::ShardRateLimited as i32,
};
persist_failures.push(persist_failure);
Expand All @@ -590,7 +579,7 @@ impl Ingester {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
shard_id: None,
replication_position_inclusive: Some(from_position_exclusive),
num_persisted_docs: 0,
parse_failures,
Expand Down Expand Up @@ -626,7 +615,7 @@ impl Ingester {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid.clone(),
source_id: subrequest.source_id.clone(),
shard_id: subrequest.shard_id.clone(),
shard_id: None,
from_position_exclusive: Some(from_position_exclusive),
doc_batch: Some(valid_doc_batch.clone()),
};
Expand All @@ -636,11 +625,11 @@ impl Ingester {
.push(replicate_subrequest);
}
let pending_persist_subrequest = PendingPersistSubrequest {
queue_id,
queue_id: queue_id.clone(),
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
shard_id: None,
doc_batch: valid_doc_batch,
parse_failures,
expected_position_inclusive: None,
Expand Down Expand Up @@ -707,7 +696,6 @@ impl Ingester {
subrequest_id: replicate_failure.subrequest_id,
index_uid: replicate_failure.index_uid,
source_id: replicate_failure.source_id,
shard_id: replicate_failure.shard_id,
reason: persist_failure_reason as i32,
};
persist_failures.push(persist_failure);
Expand Down Expand Up @@ -757,7 +745,6 @@ impl Ingester {
subrequest_id: subrequest.subrequest_id,
index_uid: subrequest.index_uid,
source_id: subrequest.source_id,
shard_id: subrequest.shard_id,
reason: reason as i32,
};
persist_failures.push(persist_failure);
Expand Down
34 changes: 31 additions & 3 deletions quickwit/quickwit-ingest/src/ingest_v2/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
// limitations under the License.

use std::collections::HashMap;
use std::fmt;
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use std::{collections, fmt};

use mrecordlog::error::{DeleteQueueError, TruncateError};
use quickwit_common::pretty::PrettyDisplay;
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
use quickwit_doc_mapper::DocMapper;
use quickwit_proto::control_plane::AdviseResetShardsResponse;
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::ingest::ingester::{IngesterStatus, PersistFailureReason};
use quickwit_proto::ingest::{IngestV2Error, IngestV2Result, ShardState};
use quickwit_proto::types::{DocMappingUid, Position, QueueId};
use quickwit_proto::types::{DocMappingUid, IndexUid, Position, QueueId, split_queue_id};
use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockMappedWriteGuard, RwLockWriteGuard, watch};
use tracing::{error, info};

Expand Down Expand Up @@ -72,6 +72,34 @@ impl InnerIngesterState {
self.status = status;
self.status_tx.send(status).expect("channel should be open");
}

/// Returns the shard with the most available permits for this Source/Index.
/// TODO: Adding an auxiliary data structure to look this up in one shot could make this simpler.
pub fn find_open_queue(
&self,
index_id: IndexUid,
source_id: String,
) -> Result<QueueId, PersistFailureReason> {
// max heap, so that we choose the queue with the most available permits.
let mut candidate_shards = collections::BinaryHeap::new();
for (queue_id, ingester_shard) in &self.shards {
if !ingester_shard.is_open() {
continue;
}
let (shard_index_id, shard_source_id, _) = split_queue_id(queue_id).unwrap();
if shard_index_id == index_id && shard_source_id == source_id {
// match - this is a potential shard for us to write to.
let (rate_limiter, _) = self.rate_trackers.get(queue_id).unwrap();
let available_permits = rate_limiter.available_permits();
candidate_shards.push((available_permits, queue_id));
}
}
if candidate_shards.is_empty() {
return Err(PersistFailureReason::ShardNotFound);
}
let (_, queue_id) = candidate_shards.pop().unwrap();
Ok(queue_id.clone())
}
}

impl IngesterState {
Expand Down
2 changes: 0 additions & 2 deletions quickwit/quickwit-proto/protos/quickwit/ingester.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ message PersistSubrequest {
uint32 subrequest_id = 1;
quickwit.common.IndexUid index_uid = 2;
string source_id = 3;
quickwit.ingest.ShardId shard_id = 4;
quickwit.ingest.DocBatchV2 doc_batch = 5;
}

Expand Down Expand Up @@ -107,7 +106,6 @@ message PersistFailure {
uint32 subrequest_id = 1;
quickwit.common.IndexUid index_uid = 2;
string source_id = 3;
quickwit.ingest.ShardId shard_id = 4;
PersistFailureReason reason = 5;
}

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions quickwit/quickwit-proto/src/getters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ generate_getters! {
InitShardFailure,
OpenFetchStreamRequest,
OpenShardSubrequest,
PersistFailure,
PersistSubrequest,
PersistSuccess,
ReplicateFailure,
ReplicateSubrequest,
Expand Down
6 changes: 0 additions & 6 deletions quickwit/quickwit-proto/src/ingest/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,6 @@ impl OpenFetchStreamRequest {
}
}

impl PersistSubrequest {
pub fn queue_id(&self) -> QueueId {
queue_id(self.index_uid(), &self.source_id, self.shard_id())
}
}

impl PersistSuccess {
pub fn queue_id(&self) -> QueueId {
queue_id(self.index_uid(), &self.source_id, self.shard_id())
Expand Down
Loading