Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions core/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,10 @@ fn main() -> Result<(), ServerError> {
// TODO: Persist the shards table and load it from the disk, so it does not have to be
// THIRTEENTH DISCRETE LOADING STEP.
// Shared resources bootstrap.
let shards_table = Box::new(DashMap::with_capacity(SHARDS_TABLE_CAPACITY));
let shards_table = Box::new(papaya::HashMap::with_capacity(SHARDS_TABLE_CAPACITY));
let shards_table = Box::leak(shards_table);
let shards_table: EternalPtr<DashMap<IggyNamespace, PartitionLocation>> = shards_table.into();
let shards_table: EternalPtr<papaya::HashMap<IggyNamespace, PartitionLocation>> =
shards_table.into();

let client_manager = Box::new(DashMap::new());
let client_manager = Box::leak(client_manager);
Expand All @@ -331,6 +332,7 @@ fn main() -> Result<(), ServerError> {

// Populate shards_table from SharedMetadata partitions (hierarchical traversal)
metadata.with_metadata(|metadata| {
let guard = shards_table.pin();
for (stream_id, stream_meta) in metadata.streams.iter() {
for (topic_id, topic_meta) in stream_meta.topics.iter() {
for (partition_id, _partition_meta) in topic_meta.partitions.iter().enumerate() {
Expand All @@ -341,7 +343,7 @@ fn main() -> Result<(), ServerError> {
));
// TODO(hubcio): LocalIdx is 0 until IggyPartitions is integrated
let location = PartitionLocation::new(shard_id, LocalIdx::new(0));
shards_table.insert(ns, location);
guard.insert(ns, location);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/server/src/shard/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ use crate::{
},
};
use ahash::AHashSet;
use dashmap::DashMap;
use iggy_common::EncryptorKind;
use iggy_common::SemanticVersion;
use iggy_common::sharding::{IggyNamespace, PartitionLocation};
use papaya::HashMap as PapayaMap;
use std::{
cell::{Cell, RefCell},
rc::Rc,
Expand All @@ -45,7 +45,7 @@ use std::{
#[derive(Default)]
pub struct IggyShardBuilder {
id: Option<u16>,
shards_table: Option<EternalPtr<DashMap<IggyNamespace, PartitionLocation>>>,
shards_table: Option<EternalPtr<PapayaMap<IggyNamespace, PartitionLocation>>>,
state: Option<FileState>,
client_manager: Option<ClientManager>,
connections: Option<Vec<ShardConnector<ShardFrame>>>,
Expand Down Expand Up @@ -76,7 +76,7 @@ impl IggyShardBuilder {

pub fn shards_table(
mut self,
shards_table: EternalPtr<DashMap<IggyNamespace, PartitionLocation>>,
shards_table: EternalPtr<PapayaMap<IggyNamespace, PartitionLocation>>,
) -> Self {
self.shards_table = Some(shards_table);
self
Expand Down
15 changes: 8 additions & 7 deletions core/server/src/shard/communication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ impl IggyShard {
}

pub fn find_shard(&self, namespace: &IggyNamespace) -> Option<&ShardConnector<ShardFrame>> {
self.shards_table.get(namespace).map(|location| {
let guard = self.shards_table.pin();
guard.get(namespace).map(|location| {
self.shards
.iter()
.find(|shard| shard.id == *location.shard_id)
Expand All @@ -163,21 +164,21 @@ impl IggyShard {
}

pub fn remove_shard_table_record(&self, namespace: &IggyNamespace) -> PartitionLocation {
self.shards_table
let guard = self.shards_table.pin();
*guard
.remove(namespace)
.map(|(_, location)| location)
.expect("remove_shard_table_record: namespace not found")
}

pub fn insert_shard_table_record(&self, ns: IggyNamespace, location: PartitionLocation) {
self.shards_table.insert(ns, location);
self.shards_table.pin().insert(ns, location);
}

pub fn get_current_shard_namespaces(&self) -> Vec<IggyNamespace> {
self.shards_table
let guard = self.shards_table.pin();
guard
.iter()
.filter_map(|entry| {
let (ns, location) = entry.pair();
.filter_map(|(ns, location)| {
if *location.shard_id == self.id {
Some(*ns)
} else {
Expand Down
9 changes: 4 additions & 5 deletions core/server/src/shard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use crate::{
};
use ahash::AHashSet;
use builder::IggyShardBuilder;
use dashmap::DashMap;
use iggy_common::SemanticVersion;
use iggy_common::sharding::{IggyNamespace, PartitionLocation};
use iggy_common::{EncryptorKind, IggyError};
use papaya::HashMap as PapayaMap;
use std::{
cell::{Cell, RefCell},
net::SocketAddr,
Expand Down Expand Up @@ -77,7 +77,7 @@ pub struct IggyShard {
pub(crate) local_partitions: RefCell<LocalPartitions>,
pub(crate) pending_partition_inits: RefCell<AHashSet<IggyNamespace>>,

pub(crate) shards_table: EternalPtr<DashMap<IggyNamespace, PartitionLocation>>,
pub(crate) shards_table: EternalPtr<PapayaMap<IggyNamespace, PartitionLocation>>,
pub(crate) state: FileState,

pub(crate) encryptor: Option<EncryptorKind>,
Expand Down Expand Up @@ -203,9 +203,8 @@ impl IggyShard {
}

async fn load_segments(&self) -> Result<(), IggyError> {
for shard_entry in self.shards_table.iter() {
let (namespace, location) = shard_entry.pair();

let guard = self.shards_table.pin();
for (namespace, location) in guard.iter() {
if *location.shard_id == self.id {
let stream_id = namespace.stream_id();
let topic_id: usize = namespace.topic_id();
Expand Down
4 changes: 2 additions & 2 deletions core/server/src/shard/system/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ impl IggyShard {

let namespaces_to_remove: Vec<_> = self
.shards_table
.pin()
.iter()
.filter_map(|entry| {
let (ns, _) = entry.pair();
.filter_map(|(ns, _)| {
if ns.stream_id() == stream_id {
Some(*ns)
} else {
Expand Down
4 changes: 2 additions & 2 deletions core/server/src/shard/system/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ impl IggyShard {

let namespaces_to_remove: Vec<_> = self
.shards_table
.pin()
.iter()
.filter_map(|entry| {
let (ns, _) = entry.pair();
.filter_map(|(ns, _)| {
if ns.stream_id() == stream && ns.topic_id() == topic_id {
Some(*ns)
} else {
Expand Down
Loading