@@ -258,15 +258,14 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const
258258 std::string sub_key = InternalKey (ns_key, field, metadata.version , storage_->IsSlotIdEncoded ()).Encode ();
259259 s = storage_->Get (ctx, ctx.GetReadOptions (), sub_key, &value);
260260 if (s.ok ()) {
261- std::string sub_key_expire =
262- InternalKey (ns_key, field, metadata.version + ExpireVersionOffset, storage_->IsSlotIdEncoded ()).Encode ();
263261 uint64_t old_expired_timestamp = NoExpireTime;
264262 s = GetSubKeyExpireTimestampMS (ctx, user_key, field, metadata.version , &old_expired_timestamp);
265263 if (!s.ok () && !s.IsNotFound ()) return s;
266264 if (old_expired_timestamp == NoExpireTime || old_expired_timestamp > util::GetTimeStampMS ()) {
267265 *deleted_cnt += 1 ;
268266 }
269267 if (s.ok ()) {
268+ auto sub_key_expire = GetSubKeyExpireInternalKey (user_key, field, metadata.version );
270269 s = batch->Delete (sub_key_expire);
271270 if (!s.ok ()) return s;
272271 }
@@ -304,33 +303,33 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
304303 s = batch->PutLogData (log_data.Encode ());
305304 if (!s.ok ()) return s;
306305 std::unordered_set<std::string_view> field_set;
307-
308306 std::vector<rocksdb::Slice> keys;
307+ std::vector<rocksdb::Slice> origin_keys;
309308 std::vector<std::string> keys_encoded;
310309 std::vector<std::string_view> values;
311310 keys.reserve (field_values.size ());
312311 values.reserve (field_values.size ());
312+ origin_keys.reserve (field_values.size ());
313313 for (auto it = field_values.rbegin (); it != field_values.rend (); it++) {
314314 if (!field_set.insert (it->field ).second ) {
315315 continue ;
316316 }
317-
317+ origin_keys. emplace_back (it-> field );
318318 keys_encoded.push_back (InternalKey (ns_key, it->field , metadata.version , storage_->IsSlotIdEncoded ()).Encode ());
319319 keys.emplace_back (keys_encoded.back ());
320320 values.emplace_back (it->value );
321321 }
322322
323323 std::vector<rocksdb::PinnableSlice> values_vector (keys.size ());
324324 std::vector<rocksdb::Status> statuses_vector (keys.size ());
325- if (metadata.size > 0 ) {
326- rocksdb::ReadOptions read_options = ctx.DefaultMultiGetOptions ();
327- storage_->MultiGet (ctx, read_options, storage_->GetDB ()->DefaultColumnFamily (), keys.size (), keys.data (),
328- values_vector.data (), statuses_vector.data ());
329- }
325+
326+ rocksdb::ReadOptions read_options = ctx.DefaultMultiGetOptions ();
327+ storage_->MultiGet (ctx, read_options, storage_->GetDB ()->DefaultColumnFamily (), keys.size (), keys.data (),
328+ values_vector.data (), statuses_vector.data ());
330329
331330 std::vector<uint64_t > expire_ats (field_values.size (), NoExpireTime);
332331 std::vector<rocksdb::Status> expire_statuses (field_values.size (), rocksdb::Status::OK ());
333- MGetSubKeyExpireTimestampMS (ctx, user_key, keys , metadata.version , &expire_ats, &expire_statuses);
332+ MGetSubKeyExpireTimestampMS (ctx, user_key, origin_keys , metadata.version , &expire_ats, &expire_statuses);
334333
335334 for (size_t field_index = 0 ; field_index < keys.size (); field_index++) {
336335 const rocksdb::Slice field_key = keys[field_index];
@@ -362,7 +361,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
362361 }
363362 // this key was once expired but the expire time exists may block the read
364363 if (expire_at != NoExpireTime && expire_at < util::GetTimeStampMS ()) {
365- auto sub_key_expire = GetSubKeyExpireInternalKey (user_key, field_key , metadata.version );
364+ auto sub_key_expire = GetSubKeyExpireInternalKey (user_key, origin_keys[field_index] , metadata.version );
366365 s = batch->Delete (sub_key_expire);
367366 if (!s.ok ()) return s;
368367 }
@@ -555,9 +554,6 @@ rocksdb::Status Hash::ExpireFields(engine::Context &ctx, const Slice &user_key,
555554 continue ;
556555 }
557556 if (!s.ok ()) return s;
558-
559- std::string sub_key_expire =
560- InternalKey (ns_key, field, metadata.version + ExpireVersionOffset, storage_->IsSlotIdEncoded ()).Encode ();
561557 uint64_t old_expired_timestamp = NoExpireTime;
562558 s = GetSubKeyExpireTimestampMS (ctx, user_key, field, metadata.version , &old_expired_timestamp);
563559 if (!s.ok () && !s.IsNotFound ()) return s;
@@ -603,6 +599,7 @@ rocksdb::Status Hash::ExpireFields(engine::Context &ctx, const Slice &user_key,
603599 // set new expired time
604600 std::string expire_value;
605601 PutFixed64 (&expire_value, expireat_ms);
602+ auto sub_key_expire = GetSubKeyExpireInternalKey (user_key, field, metadata.version );
606603 s = batch->Put (sub_key_expire, expire_value);
607604 if (!s.ok ()) return s;
608605 results->push_back (FieldExpireResult::kExpireSet ); // Expiration set successfully
@@ -633,8 +630,6 @@ rocksdb::Status Hash::TTLFields(engine::Context &ctx, const Slice &user_key, con
633630 }
634631 if (!s.ok ()) return s;
635632
636- std::string sub_key_expire =
637- InternalKey (ns_key, field, metadata.version + ExpireVersionOffset, storage_->IsSlotIdEncoded ()).Encode ();
638633 uint64_t expired_time = 0 ;
639634 s = GetSubKeyExpireTimestampMS (ctx, user_key, field, metadata.version , &expired_time);
640635 if (!s.ok () && !s.IsNotFound ()) return s;
@@ -677,12 +672,11 @@ rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice &user_key,
677672 continue ;
678673 }
679674 if (!s.ok ()) return s;
680- std::string sub_key_expire =
681- InternalKey (ns_key, field, metadata.version + ExpireVersionOffset, storage_->IsSlotIdEncoded ()).Encode ();
682675 uint64_t old_expired_timestamp = 0 ;
683676 s = GetSubKeyExpireTimestampMS (ctx, user_key, field, metadata.version , &old_expired_timestamp);
684677 if (!s.ok () && !s.IsNotFound ()) return s;
685678 if (s.ok ()) {
679+ auto sub_key_expire = GetSubKeyExpireInternalKey (user_key, field, metadata.version );
686680 s = batch->Delete (sub_key_expire);
687681 if (!s.ok ()) return s;
688682 }
@@ -702,4 +696,142 @@ rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice &user_key,
702696 }
703697 return rocksdb::Status::OK ();
704698}
699+
700+ rocksdb::Status Hash::MSetEx (engine::Context &ctx, const Slice &user_key, const std::vector<FieldValue> &field_values,
701+ const HSetExParams ¶ms, uint64_t *added_cnt) {
702+ *added_cnt = 0 ;
703+ std::string ns_key = AppendNamespacePrefix (user_key);
704+
705+ HashMetadata metadata;
706+ rocksdb::Status s = GetMetadata (ctx, ns_key, &metadata);
707+ if (!s.ok () && !s.IsNotFound ()) return s;
708+
709+ uint64_t new_expire_at_ms = NoExpireTime;
710+ auto current_time_ms = util::GetTimeStampMS ();
711+
712+ switch (params.expire_params .option ) {
713+ case SetEXExpireOption::kNoExpire :
714+ new_expire_at_ms = NoExpireTime;
715+ break ;
716+ case SetEXExpireOption::kEX : // seconds
717+ new_expire_at_ms = current_time_ms + (params.expire_params .value * 1000 );
718+ break ;
719+ case SetEXExpireOption::kPX : // milliseconds
720+ new_expire_at_ms = current_time_ms + params.expire_params .value ;
721+ break ;
722+ case SetEXExpireOption::kEXAT : // unix-time-seconds
723+ new_expire_at_ms = params.expire_params .value * 1000 ;
724+ break ;
725+ case SetEXExpireOption::kPXAT : // unix-time-milliseconds
726+ new_expire_at_ms = params.expire_params .value ;
727+ break ;
728+ case SetEXExpireOption::kKEEPTTL :
729+ // Will handle per-field TTL preservation in the loop
730+ break ;
731+ }
732+
733+ int added = 0 ;
734+ auto batch = storage_->GetWriteBatchBase ();
735+ WriteBatchLogData log_data (kRedisHash );
736+ s = batch->PutLogData (log_data.Encode ());
737+ if (!s.ok ()) return s;
738+
739+ std::unordered_set<std::string_view> field_set;
740+ std::vector<rocksdb::Slice> origin_keys;
741+ std::vector<rocksdb::Slice> keys;
742+ std::vector<std::string> keys_encoded;
743+ std::vector<std::string_view> values;
744+ keys.reserve (field_values.size ());
745+ values.reserve (field_values.size ());
746+
747+ for (auto it = field_values.rbegin (); it != field_values.rend (); it++) {
748+ if (!field_set.insert (it->field ).second ) {
749+ continue ;
750+ }
751+ origin_keys.emplace_back (it->field );
752+ keys_encoded.push_back (InternalKey (ns_key, it->field , metadata.version , storage_->IsSlotIdEncoded ()).Encode ());
753+ keys.emplace_back (keys_encoded.back ());
754+ values.emplace_back (it->value );
755+ }
756+
757+ std::vector<rocksdb::PinnableSlice> values_vector (keys.size ());
758+ std::vector<rocksdb::Status> statuses_vector (keys.size ());
759+
760+ rocksdb::ReadOptions read_options = ctx.DefaultMultiGetOptions ();
761+ storage_->MultiGet (ctx, read_options, storage_->GetDB ()->DefaultColumnFamily (), keys.size (), keys.data (),
762+ values_vector.data (), statuses_vector.data ());
763+
764+ std::vector<uint64_t > expire_ats (keys.size (), NoExpireTime);
765+ std::vector<rocksdb::Status> expire_statuses (keys.size (), rocksdb::Status::OK ());
766+ MGetSubKeyExpireTimestampMS (ctx, user_key, origin_keys, metadata.version , &expire_ats, &expire_statuses);
767+
768+ for (size_t field_index = 0 ; field_index < keys.size (); field_index++) {
769+ const rocksdb::Slice field_key = keys[field_index];
770+ bool exists = false ;
771+
772+ rocksdb::Status &expire_status = expire_statuses[field_index];
773+ if (!expire_status.ok () && !expire_status.IsNotFound ()) {
774+ return expire_status;
775+ }
776+
777+ uint64_t expire_at = expire_ats[field_index];
778+ rocksdb::Status &field_status = statuses_vector[field_index];
779+ if (!field_status.ok () && !field_status.IsNotFound ()) {
780+ return field_status;
781+ }
782+ if (field_status.ok ()) {
783+ // Field exists, and not expired
784+ if (expire_at == NoExpireTime || expire_at > current_time_ms) {
785+ exists = true ;
786+ }
787+ }
788+
789+ // FNX means set only when field not exists
790+ // skip if exists
791+ if (params.condition == SetEXFieldCondition::kFNX && exists) {
792+ continue ;
793+ }
794+
795+ // FXX means set only when all field exists
796+ // any non-exists will abort the whole operation
797+ if (params.condition == SetEXFieldCondition::kFXX && !exists) {
798+ *added_cnt = 0 ;
799+ return rocksdb::Status::OK ();
800+ }
801+
802+ if (!exists) {
803+ added++;
804+ }
805+
806+ s = batch->Put (field_key, values[field_index]);
807+ if (!s.ok ()) return s;
808+
809+ if (params.expire_params .option == SetEXExpireOption::kKEEPTTL ) {
810+ // do nothing just keep the ttl
811+ continue ;
812+ }
813+
814+ auto sub_key_expire = GetSubKeyExpireInternalKey (user_key, origin_keys[field_index], metadata.version );
815+ if (new_expire_at_ms != NoExpireTime) {
816+ std::string expire_value;
817+ PutFixed64 (&expire_value, new_expire_at_ms);
818+ s = batch->Put (sub_key_expire, expire_value);
819+ if (!s.ok ()) return s;
820+ } else {
821+ s = batch->Delete (sub_key_expire);
822+ if (!s.ok ()) return s;
823+ }
824+ }
825+
826+ if (added > 0 ) {
827+ *added_cnt = added;
828+ metadata.size += added;
829+ std::string bytes;
830+ metadata.Encode (&bytes);
831+ s = batch->Put (metadata_cf_handle_, ns_key, bytes);
832+ if (!s.ok ()) return s;
833+ }
834+
835+ return storage_->Write (ctx, storage_->DefaultWriteOptions (), batch->GetWriteBatch ());
836+ }
705837} // namespace redis
0 commit comments