Skip to content

Commit 88be89e

Browse files
author
kevin
committed
implement hsetex
Signed-off-by: kevin <kevin@kevin-desktop.lightspeed.mssnks.sbcglobal.net>
1 parent 0d10871 commit 88be89e

6 files changed

Lines changed: 803 additions & 22 deletions

File tree

src/commands/cmd_hash.cc

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -673,6 +673,121 @@ class CommandHTTL : public Commander {
673673
std::vector<Slice> fields_;
674674
};
675675

676+
class CommandHMSetEX : public Commander {
677+
public:
678+
Status Parse(const std::vector<std::string> &args) override {
679+
if (args.size() < 7) {
680+
return {Status::RedisParseErr, errWrongNumOfArguments};
681+
}
682+
683+
size_t pos = 2;
684+
685+
// Lambda to parse expiration value with validation
686+
auto parse_expire_value = [](const std::string &value_str) -> uint64_t {
687+
auto result = ParseInt<uint64_t>(value_str, 10);
688+
if (!result || *result <= 0) {
689+
return 0;
690+
}
691+
return result.GetValue();
692+
};
693+
694+
if (pos < args.size() && util::EqualICase(args[pos], std::string_view("FIELDS"))) {
695+
return {Status::RedisParseErr, "ERR Missing expiration option"};
696+
}
697+
698+
// Parse expiration option - optional
699+
params_.expire_params.option = SetEXExpireOption::kNoExpire;
700+
while (pos < args.size()) {
701+
const auto &opt = args[pos];
702+
if (util::EqualICase(opt, "KEEPTTL")) {
703+
params_.expire_params.option = SetEXExpireOption::kKEEPTTL;
704+
pos++;
705+
} else if (util::EqualICase(opt, "FNX")) {
706+
params_.condition = SetEXFieldCondition::kFNX;
707+
pos++;
708+
} else if (util::EqualICase(opt, "FXX")) {
709+
params_.condition = SetEXFieldCondition::kFXX;
710+
pos++;
711+
} else if (util::EqualICase(opt, "FIELDS")) {
712+
if (params_.expire_params.option == SetEXExpireOption::kNoExpire) {
713+
return {Status::RedisParseErr, "Invalid syntax: at least one expiration option is required before FIELDS"};
714+
} else {
715+
// FIELDS is a special case and should not be treated as an expiration option
716+
break;
717+
}
718+
} else {
719+
// got next must be a integer
720+
auto value = parse_expire_value(args[pos + 1]);
721+
params_.expire_params.value = value;
722+
if (value == 0) {
723+
return {Status::RedisParseErr, "Invalid expire time value"};
724+
}
725+
if (util::EqualICase(opt, "EX")) {
726+
params_.expire_params.option = SetEXExpireOption::kEX;
727+
} else if (util::EqualICase(opt, "PX")) {
728+
params_.expire_params.option = SetEXExpireOption::kPX;
729+
} else if (util::EqualICase(opt, "EXAT")) {
730+
params_.expire_params.option = SetEXExpireOption::kEXAT;
731+
} else if (util::EqualICase(opt, "PXAT")) {
732+
params_.expire_params.option = SetEXExpireOption::kPXAT;
733+
} else {
734+
return {Status::RedisParseErr, "Invalid syntax: expected EX, PX, EXAT, PXAT, KEEPTTL, FNX or FXX"};
735+
}
736+
pos += 2;
737+
}
738+
}
739+
// Parse FIELDS and field-value pairs
740+
if (pos >= args.size() || !util::EqualICase(args[pos], "FIELDS")) {
741+
return {Status::RedisParseErr, "mandatory argument FIELDS is missing"};
742+
}
743+
pos++;
744+
if (pos >= args.size()) {
745+
return {Status::RedisParseErr, "FIELDS requires numfields argument"};
746+
}
747+
748+
auto num_fields_result = ParseInt<uint64_t>(args[pos], 10);
749+
if (!num_fields_result) {
750+
return {Status::RedisParseErr, errValueNotInteger};
751+
}
752+
if (*num_fields_result <= 0) {
753+
return {Status::RedisParseErr, "numfields must be a positive integer"};
754+
}
755+
auto num_fields = *num_fields_result;
756+
pos++;
757+
758+
// Parse field-value pairs
759+
if (args.size() != pos + 2 * num_fields) {
760+
return {Status::RedisParseErr, "number of field-value pairs does not match numfields"};
761+
}
762+
763+
for (size_t i = 0; i < num_fields; i++) {
764+
field_values_.emplace_back(args[pos], args[pos + 1]);
765+
pos += 2;
766+
}
767+
return Commander::Parse(args);
768+
}
769+
770+
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
771+
uint64_t ret = 0;
772+
redis::Hash hash_db(srv->storage, conn->GetNamespace());
773+
774+
auto s = hash_db.MSetEx(ctx, args_[1], field_values_, params_, &ret);
775+
if (!s.ok()) {
776+
return {Status::RedisExecErr, s.ToString()};
777+
}
778+
if (ret == 0) {
779+
*output = redis::Integer(0);
780+
} else {
781+
*output = redis::Integer(1);
782+
}
783+
return Status::OK();
784+
}
785+
786+
private:
787+
HSetExParams params_;
788+
std::vector<FieldValue> field_values_;
789+
};
790+
676791
REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1, 1, 1),
677792
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 1, 1),
678793
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, "write", 1, 1, 1),
@@ -699,6 +814,7 @@ REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
699814
MakeCmdAttr<CommandHTTL>("hpttl", -5, "read-only", 1, 1, 1),
700815
MakeCmdAttr<CommandHTTL>("hexpiretime", -5, "read-only", 1, 1, 1),
701816
MakeCmdAttr<CommandHTTL>("hpexpiretime", -5, "read-only", 1, 1, 1),
702-
MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 1, 1, 1), )
817+
MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 1, 1, 1),
818+
MakeCmdAttr<CommandHMSetEX>("hsetex", -7, "write", 1, 1, 1), )
703819

704820
} // namespace redis

src/types/redis_hash.cc

Lines changed: 150 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -258,15 +258,14 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const Slice &user_key, const
258258
std::string sub_key = InternalKey(ns_key, field, metadata.version, storage_->IsSlotIdEncoded()).Encode();
259259
s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
260260
if (s.ok()) {
261-
std::string sub_key_expire =
262-
InternalKey(ns_key, field, metadata.version + ExpireVersionOffset, storage_->IsSlotIdEncoded()).Encode();
263261
uint64_t old_expired_timestamp = NoExpireTime;
264262
s = GetSubKeyExpireTimestampMS(ctx, user_key, field, metadata.version, &old_expired_timestamp);
265263
if (!s.ok() && !s.IsNotFound()) return s;
266264
if (old_expired_timestamp == NoExpireTime || old_expired_timestamp > util::GetTimeStampMS()) {
267265
*deleted_cnt += 1;
268266
}
269267
if (s.ok()) {
268+
auto sub_key_expire = GetSubKeyExpireInternalKey(user_key, field, metadata.version);
270269
s = batch->Delete(sub_key_expire);
271270
if (!s.ok()) return s;
272271
}
@@ -304,33 +303,33 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
304303
s = batch->PutLogData(log_data.Encode());
305304
if (!s.ok()) return s;
306305
std::unordered_set<std::string_view> field_set;
307-
308306
std::vector<rocksdb::Slice> keys;
307+
std::vector<rocksdb::Slice> origin_keys;
309308
std::vector<std::string> keys_encoded;
310309
std::vector<std::string_view> values;
311310
keys.reserve(field_values.size());
312311
values.reserve(field_values.size());
312+
origin_keys.reserve(field_values.size());
313313
for (auto it = field_values.rbegin(); it != field_values.rend(); it++) {
314314
if (!field_set.insert(it->field).second) {
315315
continue;
316316
}
317-
317+
origin_keys.emplace_back(it->field);
318318
keys_encoded.push_back(InternalKey(ns_key, it->field, metadata.version, storage_->IsSlotIdEncoded()).Encode());
319319
keys.emplace_back(keys_encoded.back());
320320
values.emplace_back(it->value);
321321
}
322322

323323
std::vector<rocksdb::PinnableSlice> values_vector(keys.size());
324324
std::vector<rocksdb::Status> statuses_vector(keys.size());
325-
if (metadata.size > 0) {
326-
rocksdb::ReadOptions read_options = ctx.DefaultMultiGetOptions();
327-
storage_->MultiGet(ctx, read_options, storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(),
328-
values_vector.data(), statuses_vector.data());
329-
}
325+
326+
rocksdb::ReadOptions read_options = ctx.DefaultMultiGetOptions();
327+
storage_->MultiGet(ctx, read_options, storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(),
328+
values_vector.data(), statuses_vector.data());
330329

331330
std::vector<uint64_t> expire_ats(field_values.size(), NoExpireTime);
332331
std::vector<rocksdb::Status> expire_statuses(field_values.size(), rocksdb::Status::OK());
333-
MGetSubKeyExpireTimestampMS(ctx, user_key, keys, metadata.version, &expire_ats, &expire_statuses);
332+
MGetSubKeyExpireTimestampMS(ctx, user_key, origin_keys, metadata.version, &expire_ats, &expire_statuses);
334333

335334
for (size_t field_index = 0; field_index < keys.size(); field_index++) {
336335
const rocksdb::Slice field_key = keys[field_index];
@@ -362,7 +361,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const st
362361
}
363362
// this key was once expired but the expire time exists may block the read
364363
if (expire_at != NoExpireTime && expire_at < util::GetTimeStampMS()) {
365-
auto sub_key_expire = GetSubKeyExpireInternalKey(user_key, field_key, metadata.version);
364+
auto sub_key_expire = GetSubKeyExpireInternalKey(user_key, origin_keys[field_index], metadata.version);
366365
s = batch->Delete(sub_key_expire);
367366
if (!s.ok()) return s;
368367
}
@@ -555,9 +554,6 @@ rocksdb::Status Hash::ExpireFields(engine::Context &ctx, const Slice &user_key,
555554
continue;
556555
}
557556
if (!s.ok()) return s;
558-
559-
std::string sub_key_expire =
560-
InternalKey(ns_key, field, metadata.version + ExpireVersionOffset, storage_->IsSlotIdEncoded()).Encode();
561557
uint64_t old_expired_timestamp = NoExpireTime;
562558
s = GetSubKeyExpireTimestampMS(ctx, user_key, field, metadata.version, &old_expired_timestamp);
563559
if (!s.ok() && !s.IsNotFound()) return s;
@@ -603,6 +599,7 @@ rocksdb::Status Hash::ExpireFields(engine::Context &ctx, const Slice &user_key,
603599
// set new expired time
604600
std::string expire_value;
605601
PutFixed64(&expire_value, expireat_ms);
602+
auto sub_key_expire = GetSubKeyExpireInternalKey(user_key, field, metadata.version);
606603
s = batch->Put(sub_key_expire, expire_value);
607604
if (!s.ok()) return s;
608605
results->push_back(FieldExpireResult::kExpireSet); // Expiration set successfully
@@ -633,8 +630,6 @@ rocksdb::Status Hash::TTLFields(engine::Context &ctx, const Slice &user_key, con
633630
}
634631
if (!s.ok()) return s;
635632

636-
std::string sub_key_expire =
637-
InternalKey(ns_key, field, metadata.version + ExpireVersionOffset, storage_->IsSlotIdEncoded()).Encode();
638633
uint64_t expired_time = 0;
639634
s = GetSubKeyExpireTimestampMS(ctx, user_key, field, metadata.version, &expired_time);
640635
if (!s.ok() && !s.IsNotFound()) return s;
@@ -677,12 +672,11 @@ rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice &user_key,
677672
continue;
678673
}
679674
if (!s.ok()) return s;
680-
std::string sub_key_expire =
681-
InternalKey(ns_key, field, metadata.version + ExpireVersionOffset, storage_->IsSlotIdEncoded()).Encode();
682675
uint64_t old_expired_timestamp = 0;
683676
s = GetSubKeyExpireTimestampMS(ctx, user_key, field, metadata.version, &old_expired_timestamp);
684677
if (!s.ok() && !s.IsNotFound()) return s;
685678
if (s.ok()) {
679+
auto sub_key_expire = GetSubKeyExpireInternalKey(user_key, field, metadata.version);
686680
s = batch->Delete(sub_key_expire);
687681
if (!s.ok()) return s;
688682
}
@@ -702,4 +696,142 @@ rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice &user_key,
702696
}
703697
return rocksdb::Status::OK();
704698
}
699+
700+
rocksdb::Status Hash::MSetEx(engine::Context &ctx, const Slice &user_key, const std::vector<FieldValue> &field_values,
701+
const HSetExParams &params, uint64_t *added_cnt) {
702+
*added_cnt = 0;
703+
std::string ns_key = AppendNamespacePrefix(user_key);
704+
705+
HashMetadata metadata;
706+
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
707+
if (!s.ok() && !s.IsNotFound()) return s;
708+
709+
uint64_t new_expire_at_ms = NoExpireTime;
710+
auto current_time_ms = util::GetTimeStampMS();
711+
712+
switch (params.expire_params.option) {
713+
case SetEXExpireOption::kNoExpire:
714+
new_expire_at_ms = NoExpireTime;
715+
break;
716+
case SetEXExpireOption::kEX: // seconds
717+
new_expire_at_ms = current_time_ms + (params.expire_params.value * 1000);
718+
break;
719+
case SetEXExpireOption::kPX: // milliseconds
720+
new_expire_at_ms = current_time_ms + params.expire_params.value;
721+
break;
722+
case SetEXExpireOption::kEXAT: // unix-time-seconds
723+
new_expire_at_ms = params.expire_params.value * 1000;
724+
break;
725+
case SetEXExpireOption::kPXAT: // unix-time-milliseconds
726+
new_expire_at_ms = params.expire_params.value;
727+
break;
728+
case SetEXExpireOption::kKEEPTTL:
729+
// Will handle per-field TTL preservation in the loop
730+
break;
731+
}
732+
733+
int added = 0;
734+
auto batch = storage_->GetWriteBatchBase();
735+
WriteBatchLogData log_data(kRedisHash);
736+
s = batch->PutLogData(log_data.Encode());
737+
if (!s.ok()) return s;
738+
739+
std::unordered_set<std::string_view> field_set;
740+
std::vector<rocksdb::Slice> origin_keys;
741+
std::vector<rocksdb::Slice> keys;
742+
std::vector<std::string> keys_encoded;
743+
std::vector<std::string_view> values;
744+
keys.reserve(field_values.size());
745+
values.reserve(field_values.size());
746+
747+
for (auto it = field_values.rbegin(); it != field_values.rend(); it++) {
748+
if (!field_set.insert(it->field).second) {
749+
continue;
750+
}
751+
origin_keys.emplace_back(it->field);
752+
keys_encoded.push_back(InternalKey(ns_key, it->field, metadata.version, storage_->IsSlotIdEncoded()).Encode());
753+
keys.emplace_back(keys_encoded.back());
754+
values.emplace_back(it->value);
755+
}
756+
757+
std::vector<rocksdb::PinnableSlice> values_vector(keys.size());
758+
std::vector<rocksdb::Status> statuses_vector(keys.size());
759+
760+
rocksdb::ReadOptions read_options = ctx.DefaultMultiGetOptions();
761+
storage_->MultiGet(ctx, read_options, storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(),
762+
values_vector.data(), statuses_vector.data());
763+
764+
std::vector<uint64_t> expire_ats(keys.size(), NoExpireTime);
765+
std::vector<rocksdb::Status> expire_statuses(keys.size(), rocksdb::Status::OK());
766+
MGetSubKeyExpireTimestampMS(ctx, user_key, origin_keys, metadata.version, &expire_ats, &expire_statuses);
767+
768+
for (size_t field_index = 0; field_index < keys.size(); field_index++) {
769+
const rocksdb::Slice field_key = keys[field_index];
770+
bool exists = false;
771+
772+
rocksdb::Status &expire_status = expire_statuses[field_index];
773+
if (!expire_status.ok() && !expire_status.IsNotFound()) {
774+
return expire_status;
775+
}
776+
777+
uint64_t expire_at = expire_ats[field_index];
778+
rocksdb::Status &field_status = statuses_vector[field_index];
779+
if (!field_status.ok() && !field_status.IsNotFound()) {
780+
return field_status;
781+
}
782+
if (field_status.ok()) {
783+
// Field exists, and not expired
784+
if (expire_at == NoExpireTime || expire_at > current_time_ms) {
785+
exists = true;
786+
}
787+
}
788+
789+
// FNX means set only when field not exists
790+
// skip if exists
791+
if (params.condition == SetEXFieldCondition::kFNX && exists) {
792+
continue;
793+
}
794+
795+
// FXX means set only when all field exists
796+
// any non-exists will abort the whole operation
797+
if (params.condition == SetEXFieldCondition::kFXX && !exists) {
798+
*added_cnt = 0;
799+
return rocksdb::Status::OK();
800+
}
801+
802+
if (!exists) {
803+
added++;
804+
}
805+
806+
s = batch->Put(field_key, values[field_index]);
807+
if (!s.ok()) return s;
808+
809+
if (params.expire_params.option == SetEXExpireOption::kKEEPTTL) {
810+
// do nothing just keep the ttl
811+
continue;
812+
}
813+
814+
auto sub_key_expire = GetSubKeyExpireInternalKey(user_key, origin_keys[field_index], metadata.version);
815+
if (new_expire_at_ms != NoExpireTime) {
816+
std::string expire_value;
817+
PutFixed64(&expire_value, new_expire_at_ms);
818+
s = batch->Put(sub_key_expire, expire_value);
819+
if (!s.ok()) return s;
820+
} else {
821+
s = batch->Delete(sub_key_expire);
822+
if (!s.ok()) return s;
823+
}
824+
}
825+
826+
if (added > 0) {
827+
*added_cnt = added;
828+
metadata.size += added;
829+
std::string bytes;
830+
metadata.Encode(&bytes);
831+
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
832+
if (!s.ok()) return s;
833+
}
834+
835+
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
836+
}
705837
} // namespace redis

0 commit comments

Comments
 (0)