This document provides practical examples for API gateways and service meshes to discover and consume FARP-enabled services across different discovery backends.
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"github.com/xraph/forge/extensions/discovery/backends"
"github.com/xraph/farp"
)
// GatewayDiscovery handles service discovery for the gateway
type GatewayDiscovery struct {
backend *backends.MDNSBackend
routes map[string]*farp.SchemaManifest
}
func NewGateway() (*GatewayDiscovery, error) {
// Configure mDNS backend to discover multiple service types
backend, err := backends.NewMDNSBackend(backends.MDNSConfig{
Domain: "local.",
ServiceTypes: []string{
"_octopus._tcp", // Custom application services
"_farp._tcp", // Generic FARP services
"_http._tcp", // Standard HTTP APIs
},
BrowseTimeout: 3 * time.Second,
WatchInterval: 30 * time.Second,
IPv6: true,
})
if err != nil {
return nil, err
}
if err := backend.Initialize(context.Background()); err != nil {
return nil, err
}
return &GatewayDiscovery{
backend: backend,
routes: make(map[string]*farp.SchemaManifest),
}, nil
}
// DiscoverServices finds all FARP-enabled services
func (g *GatewayDiscovery) DiscoverServices(ctx context.Context) error {
// Discover all configured service types
services, err := g.backend.DiscoverAllTypes(ctx)
if err != nil {
return fmt.Errorf("failed to discover services: %w", err)
}
fmt.Printf("Found %d services\n", len(services))
for _, svc := range services {
// Check if FARP is enabled
if svc.Metadata["farp.enabled"] != "true" {
continue
}
// Get manifest URL
manifestURL := svc.Metadata["farp.manifest"]
if manifestURL == "" {
fmt.Printf("Service %s has FARP enabled but no manifest URL\n", svc.Name)
continue
}
// Fetch and parse manifest
manifest, err := g.fetchManifest(ctx, manifestURL)
if err != nil {
fmt.Printf("Failed to fetch manifest for %s: %v\n", svc.Name, err)
continue
}
// Store routes
g.routes[svc.ID] = manifest
fmt.Printf("✓ Registered routes for %s (service type: %s)\n",
svc.Name,
svc.Metadata["mdns.service_type"])
}
return nil
}
// fetchManifest retrieves the FARP schema manifest
func (g *GatewayDiscovery) fetchManifest(ctx context.Context, url string) (*farp.SchemaManifest, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("unexpected status: %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var manifest farp.SchemaManifest
if err := json.Unmarshal(body, &manifest); err != nil {
return nil, err
}
return &manifest, nil
}
// WatchForChanges continuously watches for service changes
func (g *GatewayDiscovery) WatchForChanges(ctx context.Context) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := g.DiscoverServices(ctx); err != nil {
fmt.Printf("Error discovering services: %v\n", err)
}
}
}
}
func main() {
gateway, err := NewGateway()
if err != nil {
panic(err)
}
ctx := context.Background()
// Initial discovery
if err := gateway.DiscoverServices(ctx); err != nil {
panic(err)
}
// Watch for changes
go gateway.WatchForChanges(ctx)
// Start gateway server...
select {}
}# config/gateway.yaml
backends:
# mDNS/Bonjour discovery for local development
- type: mdns
enabled: true
config:
# Service types to discover
service_types:
- "_octopus._tcp" # Custom application services
- "_farp._tcp" # FARP-enabled services
- "_http._tcp" # Standard HTTP APIs
domain: "local." # mDNS domain
watch_interval: 30s # How often to poll
query_timeout: 5s # Query timeout
enable_ipv6: true # IPv6 support
# Consul for production
- type: consul
enabled: true
config:
address: "consul.service.consul:8500"
datacenter: "dc1"use std::time::Duration;
use mdns::{Record, RecordKind};
use tokio::time;
pub struct GatewayDiscovery {
service_types: Vec<String>,
domain: String,
watch_interval: Duration,
}
impl GatewayDiscovery {
pub fn new(config: MdnsConfig) -> Self {
Self {
service_types: config.service_types,
domain: config.domain,
watch_interval: Duration::from_secs(config.watch_interval),
}
}
/// Discover FARP-enabled services across all configured types
pub async fn discover_services(&self) -> Result<Vec<ServiceInstance>> {
let mut all_services = Vec::new();
for service_type in &self.service_types {
let services = self.discover_by_type(service_type).await?;
all_services.extend(services);
}
// Filter for FARP-enabled services
let farp_services: Vec<_> = all_services
.into_iter()
.filter(|svc| svc.has_farp_metadata())
.collect();
println!("Discovered {} FARP-enabled services", farp_services.len());
Ok(farp_services)
}
/// Discover services by specific mDNS service type
async fn discover_by_type(&self, service_type: &str) -> Result<Vec<ServiceInstance>> {
let query = format!("{}.{}", service_type, self.domain);
// Browse for services
let services = mdns::discover::all(&query, Duration::from_secs(3))?
.map(|response| {
let service = ServiceInstance::from_mdns(response);
service
})
.collect::<Vec<_>>()
.await;
Ok(services)
}
/// Watch for service changes continuously
pub async fn watch_for_changes(
&self,
callback: impl Fn(Vec<ServiceInstance>),
) {
let mut interval = time::interval(self.watch_interval);
loop {
interval.tick().await;
match self.discover_services().await {
Ok(services) => {
callback(services);
}
Err(e) => {
eprintln!("Discovery error: {}", e);
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct ServiceInstance {
pub id: String,
pub name: String,
pub address: String,
pub port: u16,
pub metadata: HashMap<String, String>,
}
impl ServiceInstance {
fn from_mdns(response: mdns::Response) -> Self {
let mut metadata = HashMap::new();
// Parse TXT records
for record in response.records() {
if let RecordKind::TXT(txt) = record.kind {
for entry in txt {
if let Some((key, value)) = entry.split_once('=') {
metadata.insert(key.to_string(), value.to_string());
}
}
}
}
Self {
id: response.instance_name().to_string(),
name: response.service_name().to_string(),
address: response.address().to_string(),
port: response.port(),
metadata,
}
}
fn has_farp_metadata(&self) -> bool {
self.metadata.get("farp.enabled") == Some(&"true".to_string())
}
pub fn manifest_url(&self) -> Option<&str> {
self.metadata.get("farp.manifest").map(|s| s.as_str())
}
}
// Usage
#[tokio::main]
async fn main() {
let config = MdnsConfig {
service_types: vec![
"_octopus._tcp".to_string(),
"_farp._tcp".to_string(),
],
domain: "local.".to_string(),
watch_interval: 30,
};
let discovery = GatewayDiscovery::new(config);
// Initial discovery
let services = discovery.discover_services().await.unwrap();
for svc in &services {
println!("Service: {} at {}:{}", svc.name, svc.address, svc.port);
if let Some(manifest_url) = svc.manifest_url() {
// Fetch and process schema manifest
let manifest = fetch_manifest(manifest_url).await.unwrap();
configure_routes(&manifest).await;
}
}
// Watch for changes
discovery.watch_for_changes(|services| {
println!("Services updated: {} services", services.len());
// Update routes...
}).await;
}package main
import (
"context"
"fmt"
consul "github.com/hashicorp/consul/api"
"github.com/xraph/farp"
)
type ConsulGateway struct {
client *consul.Client
}
func NewConsulGateway(address string) (*ConsulGateway, error) {
config := consul.DefaultConfig()
config.Address = address
client, err := consul.NewClient(config)
if err != nil {
return nil, err
}
return &ConsulGateway{client: client}, nil
}
// DiscoverFARPServices discovers all FARP-enabled services in Consul
func (g *ConsulGateway) DiscoverFARPServices(ctx context.Context) ([]*ServiceInstance, error) {
// Get all services
services, _, err := g.client.Catalog().Services(nil)
if err != nil {
return nil, err
}
var farpServices []*ServiceInstance
for serviceName := range services {
// Get service instances
instances, _, err := g.client.Health().Service(serviceName, "", true, nil)
if err != nil {
continue
}
for _, instance := range instances {
// Check for FARP metadata
if instance.Service.Meta["farp.enabled"] == "true" {
svc := &ServiceInstance{
ID: instance.Service.ID,
Name: instance.Service.Service,
Address: instance.Service.Address,
Port: instance.Service.Port,
Metadata: instance.Service.Meta,
}
farpServices = append(farpServices, svc)
// Fetch manifest
if manifestURL := svc.Metadata["farp.manifest"]; manifestURL != "" {
manifest, _ := fetchManifest(ctx, manifestURL)
// Configure routes from manifest...
_ = manifest
}
}
}
}
return farpServices, nil
}
// WatchServices watches for service changes
func (g *ConsulGateway) WatchServices(ctx context.Context) error {
// Use Consul's blocking queries for real-time updates
var waitIndex uint64
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Blocking query with wait index
services, meta, err := g.client.Catalog().Services(&consul.QueryOptions{
WaitIndex: waitIndex,
WaitTime: 5 * time.Minute,
})
if err != nil {
return err
}
waitIndex = meta.LastIndex
// Process service changes
for serviceName := range services {
// Check for FARP metadata and update routes...
}
}
}package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type K8sGateway struct {
clientset *kubernetes.Clientset
namespace string
}
func NewK8sGateway(namespace string) (*K8sGateway, error) {
// In-cluster config
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return &K8sGateway{
clientset: clientset,
namespace: namespace,
}, nil
}
// DiscoverFARPServices discovers FARP-enabled services via annotations
func (g *K8sGateway) DiscoverFARPServices(ctx context.Context) error {
// List services with FARP annotation
services, err := g.clientset.CoreV1().Services(g.namespace).List(ctx, metav1.ListOptions{
LabelSelector: "farp.enabled=true",
})
if err != nil {
return err
}
for _, svc := range services.Items {
// Check for FARP annotations
annotations := svc.Annotations
if annotations["farp.enabled"] != "true" {
continue
}
manifestURL := annotations["farp.manifest"]
if manifestURL == "" {
continue
}
// Fetch manifest
manifest, err := fetchManifest(ctx, manifestURL)
if err != nil {
fmt.Printf("Failed to fetch manifest for %s: %v\n", svc.Name, err)
continue
}
// Configure routes from manifest
fmt.Printf("✓ Configured routes for %s\n", svc.Name)
_ = manifest
}
return nil
}
// WatchServices watches for service changes using Kubernetes Watch API
func (g *K8sGateway) WatchServices(ctx context.Context) error {
watcher, err := g.clientset.CoreV1().Services(g.namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: "farp.enabled=true",
})
if err != nil {
return err
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
svc := event.Object.(*corev1.Service)
switch event.Type {
case "ADDED", "MODIFIED":
// Service added or updated
if manifestURL := svc.Annotations["farp.manifest"]; manifestURL != "" {
manifest, _ := fetchManifest(ctx, manifestURL)
// Update routes...
_ = manifest
}
case "DELETED":
// Service deleted - remove routes
fmt.Printf("Service %s deleted\n", svc.Name)
}
}
return nil
}package main
import (
"context"
"fmt"
"github.com/xraph/forge/extensions/discovery/backends"
)
type HybridGateway struct {
mdnsBackend *backends.MDNSBackend
consulBackend *backends.ConsulBackend
}
func NewHybridGateway() (*HybridGateway, error) {
// mDNS for local development
mdnsBackend, err := backends.NewMDNSBackend(backends.MDNSConfig{
Domain: "local.",
ServiceTypes: []string{"_octopus._tcp", "_farp._tcp"},
})
if err != nil {
return nil, err
}
// Consul for production
consulBackend, err := backends.NewConsulBackend(backends.ConsulConfig{
Address: "consul.service.consul:8500",
})
if err != nil {
return nil, err
}
return &HybridGateway{
mdnsBackend: mdnsBackend,
consulBackend: consulBackend,
}, nil
}
func (g *HybridGateway) DiscoverAllServices(ctx context.Context) ([]*ServiceInstance, error) {
var allServices []*ServiceInstance
// Discover from mDNS
mdnsServices, err := g.mdnsBackend.DiscoverAllTypes(ctx)
if err == nil {
allServices = append(allServices, mdnsServices...)
fmt.Printf("Found %d services via mDNS\n", len(mdnsServices))
}
// Discover from Consul
consulServices, err := g.consulBackend.ListServices(ctx)
if err == nil {
allServices = append(allServices, consulServices...)
fmt.Printf("Found %d services via Consul\n", len(consulServices))
}
// Deduplicate by service ID
uniqueServices := deduplicateServices(allServices)
return uniqueServices, nil
}- mDNS Discovery: Use
ServiceTypesfor multi-type discovery - FARP Filtering: Check
farp.enabledmetadata - Manifest Fetching: Use
farp.manifestURL to retrieve schemas - Watch for Changes: Poll (mDNS) or use blocking queries (Consul) or Watch API (K8s)
- Multi-Backend: Combine multiple backends for hybrid deployments
| Environment | Backend | Discovery Method | Watch Strategy |
|---|---|---|---|
| Local Dev | mDNS | Multi-type browse | Poll (30s) |
| Staging | Consul | Catalog API | Blocking queries |
| Production | Kubernetes | Service annotations | Watch API |
| Hybrid | mDNS + Consul | Both | Both |
farp.enabled- Check if FARP is availablefarp.manifest- Fetch full schema manifestmdns.service_type- Filter by service type (mDNS only)farp.capabilities- Determine service capabilities
For more information: