Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions extensions/common/metadata_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ namespace Istio {
namespace Common {

// Filter state key to store the peer metadata under.
// CelState is stored under these keys for CEL expression support.
constexpr absl::string_view DownstreamPeer = "downstream_peer";
constexpr absl::string_view UpstreamPeer = "upstream_peer";

// Filter state keys for WorkloadMetadataObject (FIELD accessor support).
constexpr absl::string_view DownstreamPeerObj = "downstream_peer_obj";
constexpr absl::string_view UpstreamPeerObj = "upstream_peer_obj";

// Special filter state key to indicate the filter is done looking for peer metadata.
// This is used by network metadata exchange on failure.
constexpr absl::string_view NoPeer = "peer_not_found";
Expand Down
38 changes: 29 additions & 9 deletions source/extensions/filters/http/istio_stats/istio_stats.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,27 +114,37 @@ enum class Reporter {
};

// Detect if peer info read is completed by TCP metadata exchange.
// Checks for WorkloadMetadataObject key (set atomically with CelState by peer_metadata filter).
bool peerInfoRead(Reporter reporter, const StreamInfo::FilterState& filter_state) {
const auto& filter_state_key =
reporter == Reporter::ServerSidecar || reporter == Reporter::ServerGateway
? Istio::Common::DownstreamPeer
: Istio::Common::UpstreamPeer;
? Istio::Common::DownstreamPeerObj
: Istio::Common::UpstreamPeerObj;
return filter_state.hasDataWithName(filter_state_key) ||
filter_state.hasDataWithName(Istio::Common::NoPeer);
}

std::optional<Istio::Common::WorkloadMetadataObject>
peerInfo(Reporter reporter, const StreamInfo::FilterState& filter_state) {
const auto& filter_state_key =
const auto& cel_state_key =
reporter == Reporter::ServerSidecar || reporter == Reporter::ServerGateway
? Istio::Common::DownstreamPeer
: Istio::Common::UpstreamPeer;
// This's a workaround before FilterStateObject support operation like `.labels['role']`.
// The workaround is to use CelState to store the peer metadata.
// Rebuild the WorkloadMetadataObject from the CelState.
const auto& obj_key = reporter == Reporter::ServerSidecar || reporter == Reporter::ServerGateway
? Istio::Common::DownstreamPeerObj
: Istio::Common::UpstreamPeerObj;

// Try reading as WorkloadMetadataObject first (new format, stored under *_obj key)
const auto* peer_info =
filter_state.getDataReadOnly<Istio::Common::WorkloadMetadataObject>(obj_key);
if (peer_info) {
return *peer_info;
}

// Fall back to CelState for backward compatibility with older deployments
const auto* cel_state =
filter_state.getDataReadOnly<Envoy::Extensions::Filters::Common::Expr::CelState>(
filter_state_key);
cel_state_key);
if (!cel_state) {
return {};
}
Expand All @@ -144,7 +154,7 @@ peerInfo(Reporter reporter, const StreamInfo::FilterState& filter_state) {
return {};
}

Istio::Common::WorkloadMetadataObject peer_info(
Istio::Common::WorkloadMetadataObject result(
extractString(obj, Istio::Common::InstanceNameToken),
extractString(obj, Istio::Common::ClusterNameToken),
extractString(obj, Istio::Common::NamespaceNameToken),
Expand All @@ -156,7 +166,17 @@ peerInfo(Reporter reporter, const StreamInfo::FilterState& filter_state) {
Istio::Common::fromSuffix(extractString(obj, Istio::Common::WorkloadTypeToken)),
extractString(obj, Istio::Common::IdentityToken));

return peer_info;
// Extract labels from the "labels" field
const auto& labels_it = obj.fields().find(Istio::Common::LabelsToken);
if (labels_it != obj.fields().end() && labels_it->second.has_struct_value()) {
std::vector<std::pair<std::string, std::string>> labels;
for (const auto& label : labels_it->second.struct_value().fields()) {
labels.push_back({std::string(label.first), std::string(label.second.string_value())});
}
result.setLabels(labels);
}

return result;
}

// Process-wide context shared with all filter instances.
Expand Down
18 changes: 14 additions & 4 deletions source/extensions/filters/http/peer_metadata/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,23 @@ void FilterConfig::setFilterState(StreamInfo::StreamInfo& info, bool downstream,
const PeerInfo& value) const {
const absl::string_view key =
downstream ? Istio::Common::DownstreamPeer : Istio::Common::UpstreamPeer;
const absl::string_view obj_key =
downstream ? Istio::Common::DownstreamPeerObj : Istio::Common::UpstreamPeerObj;
if (!info.filterState()->hasDataWithName(key)) {
// Use CelState to allow operation filter_state.upstream_peer.labels['role']
// Store CelState for CEL expressions like filter_state.downstream_peer.labels['role']
auto pb = value.serializeAsProto();
auto peer_info = std::make_unique<CelState>(FilterConfig::peerInfoPrototype());
peer_info->setValue(absl::string_view(pb->SerializeAsString()));
auto cel_state = std::make_unique<CelState>(FilterConfig::peerInfoPrototype());
cel_state->setValue(absl::string_view(pb->SerializeAsString()));
info.filterState()->setData(
key, std::move(peer_info), StreamInfo::FilterState::StateType::Mutable,
key, std::move(cel_state), StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::FilterChain, sharedWithUpstream());

// Also store WorkloadMetadataObject under a separate key for FIELD accessor support.
// WorkloadMetadataObject implements hasFieldSupport() + getField() for
// formatters using %FILTER_STATE(downstream_peer_obj:FIELD:fieldname)% syntax.
auto workload_metadata = std::make_unique<PeerInfo>(value);
info.filterState()->setData(
obj_key, std::move(workload_metadata), StreamInfo::FilterState::StateType::Mutable,
StreamInfo::FilterState::LifeSpan::FilterChain, sharedWithUpstream());
} else {
ENVOY_LOG(debug, "Duplicate peer metadata, skipping");
Expand Down
86 changes: 79 additions & 7 deletions source/extensions/filters/http/peer_metadata/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,10 @@ class PeerMetadataTest : public testing::Test {
downstream ? Istio::Common::DownstreamPeer : Istio::Common::UpstreamPeer));
}
void checkPeerNamespace(bool downstream, const std::string& expected) {
const auto* cel_state =
stream_info_.filterState()
->getDataReadOnly<Envoy::Extensions::Filters::Common::Expr::CelState>(
downstream ? Istio::Common::DownstreamPeer : Istio::Common::UpstreamPeer);
Protobuf::Struct obj;
ASSERT_TRUE(obj.ParseFromString(cel_state->value().data()));
EXPECT_EQ(expected, extractString(obj, "namespace"));
const auto* peer_info = stream_info_.filterState()->getDataReadOnly<PeerInfo>(
downstream ? Istio::Common::DownstreamPeerObj : Istio::Common::UpstreamPeerObj);
ASSERT_NE(peer_info, nullptr);
EXPECT_EQ(expected, peer_info->namespace_name_);
}

absl::string_view extractString(const Protobuf::Struct& metadata, absl::string_view key) {
Expand Down Expand Up @@ -488,6 +485,81 @@ TEST_F(PeerMetadataTest, UpstreamMXPropagationSkipPassthrough) {
checkNoPeer(false);
}

TEST_F(PeerMetadataTest, FieldAccessorSupport) {
const WorkloadMetadataObject pod("pod-foo-1234", "my-cluster", "default", "foo", "foo-service",
"v1alpha3", "myapp", "v1", Istio::Common::WorkloadType::Pod, "");
EXPECT_CALL(*metadata_provider_, GetMetadata(_))
.WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address)
-> std::optional<WorkloadMetadataObject> {
if (absl::StartsWith(address->asStringView(), "127.0.0.1")) {
return {pod};
}
return {};
}));
initialize(R"EOF(
downstream_discovery:
- workload_discovery: {}
)EOF");

const auto* peer_info =
stream_info_.filterState()->getDataReadOnly<PeerInfo>(Istio::Common::DownstreamPeerObj);
ASSERT_NE(peer_info, nullptr);

// Test hasFieldSupport
EXPECT_TRUE(peer_info->hasFieldSupport());

// Test getField() for all 9 fields
EXPECT_EQ("foo", std::get<absl::string_view>(peer_info->getField("workload")));
EXPECT_EQ("default", std::get<absl::string_view>(peer_info->getField("namespace")));
EXPECT_EQ("my-cluster", std::get<absl::string_view>(peer_info->getField("cluster")));
EXPECT_EQ("foo-service", std::get<absl::string_view>(peer_info->getField("service")));
EXPECT_EQ("v1alpha3", std::get<absl::string_view>(peer_info->getField("revision")));
EXPECT_EQ("myapp", std::get<absl::string_view>(peer_info->getField("app")));
EXPECT_EQ("v1", std::get<absl::string_view>(peer_info->getField("version")));
EXPECT_EQ("pod", std::get<absl::string_view>(peer_info->getField("type")));
EXPECT_EQ("pod-foo-1234", std::get<absl::string_view>(peer_info->getField("name")));
}

TEST_F(PeerMetadataTest, CelExpressionCompatibility) {
const WorkloadMetadataObject pod("pod-bar-5678", "test-cluster", "production", "bar",
"bar-service", "v2", "barapp", "v2",
Istio::Common::WorkloadType::Pod, "");
EXPECT_CALL(*metadata_provider_, GetMetadata(_))
.WillRepeatedly(Invoke([&](const Network::Address::InstanceConstSharedPtr& address)
-> std::optional<WorkloadMetadataObject> {
if (absl::StartsWith(address->asStringView(), "127.0.0.1")) {
return {pod};
}
return {};
}));
initialize(R"EOF(
downstream_discovery:
- workload_discovery: {}
)EOF");

// Verify CelState is stored under downstream_peer for CEL expressions
const auto* cel_state = stream_info_.filterState()
->getDataReadOnly<Envoy::Extensions::Filters::Common::Expr::CelState>(
Istio::Common::DownstreamPeer);
ASSERT_NE(cel_state, nullptr);

// Verify WorkloadMetadataObject is stored under downstream_peer_obj for FIELD accessor
const auto* peer_info =
stream_info_.filterState()->getDataReadOnly<PeerInfo>(Istio::Common::DownstreamPeerObj);
ASSERT_NE(peer_info, nullptr);

// Test that serializeAsProto still works for CEL compatibility
auto proto = peer_info->serializeAsProto();
ASSERT_NE(proto, nullptr);

// Verify the protobuf contains expected data
const auto* struct_proto = dynamic_cast<const google::protobuf::Struct*>(proto.get());
ASSERT_NE(struct_proto, nullptr);
EXPECT_EQ("production", extractString(*struct_proto, "namespace"));
EXPECT_EQ("bar", extractString(*struct_proto, "workload"));
EXPECT_EQ("test-cluster", extractString(*struct_proto, "cluster"));
}

} // namespace
} // namespace PeerMetadata
} // namespace HttpFilters
Expand Down