Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions csrc/config/model_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ infinicore::DataType ModelConfig::get_dtype() const {
return parse_dtype(dtype_str);
}

bool ModelConfig::contains_non_null(const std::string &key) const {
return config_json.contains(key) && !config_json.at(key).is_null();
}

size_t ModelConfig::get_rotary_dim() const {
size_t head_dim = get_head_dim();
double partial_rotary_factor = get_or<double>("partial_rotary_factor", 1.0);
Expand Down
14 changes: 14 additions & 0 deletions csrc/config/model_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ class ModelConfig {
return default_value;
}
}

bool contains_non_null(const std::string &key) const;

template <typename T>
T get_or_alias(const std::string &key, const std::string &alias, const T &default_value) const {
if (!key.empty() && config_json.contains(key) && !config_json.at(key).is_null()) {
return config_json.at(key).get<T>();
}
if (!alias.empty() && config_json.contains(alias) && !config_json.at(alias).is_null()) {
return config_json.at(alias).get<T>();
}
return default_value;
}

size_t get_kv_dim() const {
return get<size_t>("hidden_size") * get<size_t>("num_key_value_heads") / get<size_t>("num_attention_heads");
}
Expand Down
11 changes: 9 additions & 2 deletions csrc/engine/compiler/paged_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,28 @@ void PagedCompiler::compile() {
auto input = make_decode_input(b);

barrier_->wait();
(void)model_->forward(input);
infinicore::context::syncStream();
// Capture must not start with stale Marlin locks from previous
// warmup/capture attempts. This reset is intentionally outside
// graph capture; the current implementation still pays a memset
// before every graph replay in get_compiled().
model_->reset_runtime_state();
infinicore::context::syncStream();
barrier_->wait();

barrier_->wait();
infinicore::context::startGraphRecording();
auto output = model_->forward(input);
auto graph = infinicore::context::stopGraphRecording();
barrier_->wait();

auto shared_output = std::shared_ptr<InfinilmModel::Output>(
new InfinilmModel::Output{infinicore::graph::GraphTensor(output.logits)});
auto replay_output = std::shared_ptr<InfinilmModel::Output>(
new InfinilmModel::Output{shared_output->logits->resume_from_blob_()});

compiled_map_decode_[b] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output)};
compiled_map_decode_[b] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output), replay_output};
}
}
}
Expand Down Expand Up @@ -141,7 +148,7 @@ PagedCompiler::Compiled PagedCompiler::get_compiled(const InfinilmModel::Input &
model_->reset_runtime_state();

auto graph = std::get<0>(result->second.compiled);
auto shared_output = std::shared_ptr<InfinilmModel::Output>(new InfinilmModel::Output{std::get<1>(result->second.compiled)->logits->resume_from_blob_()});
auto shared_output = result->second.replay_output;

return std::make_tuple(graph, shared_output);
}
Expand Down
3 changes: 3 additions & 0 deletions csrc/engine/compiler/paged_compiler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class PagedCompiler : public GraphCompiler {
struct CompiledResult {
InfinilmModel::Input input;
Compiled compiled;
// Graph capture stores a GraphTensor in compiled. Replay returns a
// normal output handle restored from the same graph output blob.
std::shared_ptr<InfinilmModel::Output> replay_output;
};

std::unordered_map<
Expand Down
10 changes: 8 additions & 2 deletions csrc/engine/compiler/static_batching_compiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,21 @@ void StaticBatchingCompiler::compile() {
input.slot_mapping,
};

barrier_->wait();
(void)model_->forward(input);
infinicore::context::syncStream();
barrier_->wait();

barrier_->wait();
infinicore::context::startGraphRecording();
auto output = model_->forward(input);
auto graph = infinicore::context::stopGraphRecording();
barrier_->wait();

auto shared_output = std::shared_ptr<InfinilmModel::Output>(new InfinilmModel::Output{infinicore::graph::GraphTensor(output.logits)});
auto replay_output = std::shared_ptr<InfinilmModel::Output>(new InfinilmModel::Output{shared_output->logits->resume_from_blob_()});

compiled_map_[std::make_tuple(b, 1)] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output)};
compiled_map_[std::make_tuple(b, 1)] = CompiledResult{std::move(input), std::make_tuple(graph, shared_output), replay_output};
}
}

Expand All @@ -56,7 +62,7 @@ StaticBatchingCompiler::Compiled StaticBatchingCompiler::get_compiled(
graph_input.total_sequence_lengths.value()->copy_from(input.total_sequence_lengths.value());

auto graph = std::get<0>(result->second.compiled);
auto shared_output = std::shared_ptr<InfinilmModel::Output>(new InfinilmModel::Output{std::get<1>(result->second.compiled)->logits->resume_from_blob_()});
auto shared_output = result->second.replay_output;
return std::make_tuple(graph, shared_output);
}
} else {
Expand Down
3 changes: 3 additions & 0 deletions csrc/engine/compiler/static_batching_compiler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ class StaticBatchingCompiler : public GraphCompiler {
struct CompiledResult {
InfinilmModel::Input input;
Compiled compiled;
// Graph capture stores a GraphTensor in compiled. Replay returns a
// normal output handle restored from the same graph output blob.
std::shared_ptr<InfinilmModel::Output> replay_output;

@pengcheng888 pengcheng888 Jun 26, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个新增的replay_output变量,以及graph编译时新增和修改的代码。可以注释或解释一下么,不知道啥意思

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

已补充注释。这里的 replay_output 是 graph capture 时为输出保留的普通 Output handle;compiled 里保存的是 GraphTensor/graph 对象,replay 后需要通过这个 handle 拿回模型输出。这样 get_compiled 时可以直接返回可复用的 graph replay 结果。

这个不影响static 推理,已测试

};

std::unordered_map<
Expand Down
32 changes: 25 additions & 7 deletions csrc/engine/infer_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "spdlog/spdlog.h"
#include <future>
#include <stdexcept>
#include <unordered_set>

namespace infinilm::engine {

Expand All @@ -18,7 +19,9 @@ InferEngine::InferEngine(
backends::AttentionBackend attention_backend,
std::optional<infinicore::DataType> kv_cache_dtype,
bool use_mla,
const std::string &weight_load_mode)
const std::string &weight_load_mode,
const std::string &moe_ep_backend,
size_t moe_ep_size)
: communication_group_(distributed_config, device_type),
attention_backend_(attention_backend),
weight_load_mode_(weight_load_mode),
Expand All @@ -32,7 +35,12 @@ InferEngine::InferEngine(

// Load model config if model_path is provided, model_path must be valid, and config.json exists
this->model_config_ = infinilm::config::ConfigFactory::createConfig(config_str);
auto infinilm_config = std::make_shared<infinilm::global_state::InfinilmConfig>(attention_backend, this->model_config_, use_mla);
auto infinilm_config = std::make_shared<infinilm::global_state::InfinilmConfig>(
attention_backend,
this->model_config_,
use_mla,
moe_ep_backend,
moe_ep_size);

// Only support offline int8 kv cache quantization in this version
if (kv_cache_dtype.has_value()) {
Expand Down Expand Up @@ -67,19 +75,19 @@ void InferEngine::load_param(const std::string &name, const infinicore::Tensor &
}
}

void InferEngine::load_params(const std::unordered_map<std::string, infinicore::Tensor> &params) {
void InferEngine::load_params(const std::unordered_map<std::string, infinicore::Tensor> &params, bool strict) {
if (workers_.size() <= 1 || weight_load_mode_ == "sync") {
for (auto &worker : workers_) {
worker->load_params(params);
worker->load_params(params, strict);
}
return;
}

std::vector<std::future<void>> futures;
futures.reserve(workers_.size());
for (auto &worker : workers_) {
futures.emplace_back(std::async(std::launch::async, [&worker, &params] {
worker->load_params(params);
futures.emplace_back(std::async(std::launch::async, [&worker, &params, strict] {
worker->load_params(params, strict);
}));
}
for (auto &future : futures) {
Expand Down Expand Up @@ -118,7 +126,17 @@ std::vector<std::string> InferEngine::state_dict_keys() {
if (0 == workers_.size()) {
throw std::runtime_error(" Model object not found. ");
}
return workers_.front()->state_dict_keys();
std::vector<std::string> ordered_keys;
std::unordered_set<std::string> seen_keys;
for (auto &worker : workers_) {
for (const auto &key : worker->state_dict_keys()) {
// Preserve first-seen worker order while removing duplicate TP keys.
if (seen_keys.emplace(key).second) {
ordered_keys.push_back(key);
}
}
}
return ordered_keys;
}

//------------------------------------------------------
Expand Down
7 changes: 5 additions & 2 deletions csrc/engine/infer_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "rank_barrier.hpp"
#include "rank_worker.hpp"

#include <cstddef>
#include <optional>
#include <string>
#include <unordered_map>
Expand All @@ -31,13 +32,15 @@ class InferEngine {
backends::AttentionBackend attention_backend = backends::AttentionBackend::Default,
std::optional<infinicore::DataType> kv_cache_dtype = std::nullopt,
bool use_mla = false,
const std::string &weight_load_mode = "async");
const std::string &weight_load_mode = "async",
const std::string &moe_ep_backend = "disabled",
size_t moe_ep_size = 1);

// Load a parameter to all workers (each can extract its shard inside RankWorker)
void load_param(const std::string &name, const infinicore::Tensor &param);

// Load a batch of parameters to all workers, syncing each worker once after the batch.
void load_params(const std::unordered_map<std::string, infinicore::Tensor> &params);
void load_params(const std::unordered_map<std::string, infinicore::Tensor> &params, bool strict = true);

// process the weights after loading on all workers (e.g., for quantization)
void process_weights_after_loading();
Expand Down
10 changes: 8 additions & 2 deletions csrc/engine/rank_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,15 @@ void RankWorker::load_param(const std::string &name,
//------------------------------------------------------
// load_params -- synchronous batch load
//------------------------------------------------------
void RankWorker::load_params(const std::unordered_map<std::string, infinicore::Tensor> &params) {
void RankWorker::load_params(const std::unordered_map<std::string, infinicore::Tensor> &params, bool strict) {
{
std::lock_guard<std::mutex> lock(mutex_);
if (should_exit_) {
throw std::runtime_error("RankWorker is closing; cannot load_params");
}

pending_params_ = params;
pending_params_strict_ = strict;
job_cmd_ = Command::LOAD_BATCH;
has_job_ = true;
job_done_ = false;
Expand Down Expand Up @@ -295,6 +296,7 @@ void RankWorker::thread_loop() {
std::string local_param_name;
infinicore::Tensor local_param;
std::unordered_map<std::string, infinicore::Tensor> local_params;
bool local_params_strict = true;
Input local_args;
std::unique_ptr<cache::CacheConfig> local_cache_config;

Expand All @@ -314,6 +316,10 @@ void RankWorker::thread_loop() {
local_param = pending_param_;
} else if (local_cmd == Command::LOAD_BATCH) {
local_params = std::move(pending_params_);
// strict is copied with the batch because loading runs on
// the worker thread after the caller releases the mutex.
local_params_strict = pending_params_strict_;
pending_params_strict_ = true;
pending_params_.clear();
} else if (local_cmd == Command::PREPROCESS) {

Expand Down Expand Up @@ -353,7 +359,7 @@ void RankWorker::thread_loop() {

} else if (local_cmd == Command::LOAD_BATCH) {
try {
model_->load_parameters_no_sync(local_params);
model_->load_parameters_no_sync(local_params, local_params_strict);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

等价于这个写法么 model_->load_parameters_no_sync(local_params, strict);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,现在等价于直接调用 model_->load_parameters_no_sync(local_params, local_params_strict)。这里需要把 strict 继续传下去,否则 Python 侧传入的 non-strict load 对 MoE packed weight 不生效。

infinicore::context::syncStream();
} catch (const std::exception &e) {
{
Expand Down
3 changes: 2 additions & 1 deletion csrc/engine/rank_worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class RankWorker {
void load_param(const std::string &name,
const infinicore::Tensor &param);

void load_params(const std::unordered_map<std::string, infinicore::Tensor> &params);
void load_params(const std::unordered_map<std::string, infinicore::Tensor> &params, bool strict = true);

void process_weights_after_loading();

Expand Down Expand Up @@ -148,6 +148,7 @@ class RankWorker {
std::string pending_param_name_;
infinicore::Tensor pending_param_;
std::unordered_map<std::string, infinicore::Tensor> pending_params_;
bool pending_params_strict_ = true;
Input pending_args_;
std::unique_ptr<cache::CacheConfig> pending_cache_config_;

Expand Down
11 changes: 10 additions & 1 deletion csrc/global_state/infinilm_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

#include "../backends/attention_backends.hpp"
#include "../config/model_config.hpp"
#include <cstddef>
#include <memory>
#include <string>
#include <utility>

namespace infinilm::global_state {

Expand All @@ -15,14 +18,20 @@ struct InfinilmConfig {
InfinilmConfig() = default;
InfinilmConfig(const infinilm::backends::AttentionBackend &backend,
const std::shared_ptr<infinilm::config::ModelConfig> &model_config,
bool use_mla = false)
bool use_mla = false,
std::string moe_ep_backend = "disabled",
size_t moe_ep_size = 1)
: attention_backend(backend),
use_mla(use_mla),
moe_ep_backend(std::move(moe_ep_backend)),
moe_ep_size(moe_ep_size),
model_config(model_config) {}

public:
infinilm::backends::AttentionBackend attention_backend;
bool use_mla{false};
std::string moe_ep_backend{"disabled"};
size_t moe_ep_size{1};
std::shared_ptr<infinilm::config::ModelConfig> model_config;
};

Expand Down
17 changes: 16 additions & 1 deletion csrc/layers/causal_lm_templates/text_decoder_layer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "infinicore/ops.hpp"
#include "infinicore/tensor.hpp"
#include <memory>
#include <type_traits>
#include <tuple>
namespace infinilm::layers::causal_lm_templates {

Expand All @@ -32,7 +33,7 @@ class TextDecoderLayer : public infinicore::nn::Module {
post_attention_layernorm_ = this->register_module<infinicore::nn::RMSNorm>("post_attention_layernorm", hidden_size, rms_norm_eps, dtype, device);

self_attn_ = this->register_module<Attention>("self_attn", model_config, layer_idx, device);
mlp_ = this->register_module<MLP>("mlp", model_config, device);
mlp_ = register_mlp(model_config, layer_idx, device);
}

std::tuple<infinicore::Tensor, infinicore::Tensor> forward(const infinicore::Tensor &positions,
Expand Down Expand Up @@ -68,6 +69,20 @@ class TextDecoderLayer : public infinicore::nn::Module {
INFINICORE_NN_MODULE(MLP, mlp);

size_t layer_idx_;

private:
std::shared_ptr<MLP> register_mlp(std::shared_ptr<infinilm::config::ModelConfig> model_config,
size_t layer_idx,
const infinicore::Device &device) {
if constexpr (std::is_constructible_v<MLP,
std::shared_ptr<infinilm::config::ModelConfig>,
size_t,
const infinicore::Device &>) {
return this->register_module<MLP>("mlp", model_config, layer_idx, device);
} else {
return this->register_module<MLP>("mlp", model_config, device);
}
}
};

} // namespace infinilm::layers::causal_lm_templates
2 changes: 0 additions & 2 deletions csrc/layers/common_modules.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#pragma once
#include "mlp/mlp.hpp"
#include "mlp/moe_mlp.hpp"

#include "attention/attention.hpp"
#include "causal_lm_templates/text_causal_lm.hpp"
Expand All @@ -12,7 +11,6 @@
namespace infinilm::layers {

using MLP = infinilm::layers::mlp::MLP;
using MoeMLP = infinilm::layers::moe_mlp::MoeMLP;

namespace attention {

Expand Down
Loading