Production-ready gRPC server with TLS, observability, and streaming support.
- ✅ Full gRPC Support: Unary, client streaming, server streaming, bidirectional streaming
- ✅ TLS/mTLS: Secure communication with client authentication
- ✅ Health Checks: Built-in gRPC health checking protocol
- ✅ Reflection: Service discovery support
- ✅ Observability: Automatic metrics, logging, and tracing
- ✅ Interceptors: Custom middleware support
- ✅ Configuration: Keepalive, message sizes, concurrency limits
- ✅ Server Statistics: Track RPCs, streams, and performance
package main
import (
"context"
"github.com/xraph/forge"
"github.com/xraph/forge/extensions/grpc"
pb "your/proto/package"
)
func main() {
app := forge.NewApp(forge.AppConfig{
Name: "grpc-service",
})
// Register gRPC extension
app.RegisterExtension(grpc.NewExtension(
grpc.WithAddress(":50051"),
grpc.WithReflection(true),
grpc.WithHealthCheck(true),
))
// Start app
app.Start(context.Background())
// Get gRPC server and register service
grpcServer, _ := forge.Inject[grpc.GRPC](app.Container())
pb.RegisterYourServiceServer(grpcServer.GetServer(), &yourServiceImpl{})
app.Run(context.Background(), ":8080")
}extensions:
grpc:
address: ":50051"
max_recv_msg_size: 4194304 # 4MB
max_send_msg_size: 4194304 # 4MB
max_concurrent_streams: 100
connection_timeout: 120s
# TLS/mTLS
enable_tls: true
tls_cert_file: "server.crt"
tls_key_file: "server.key"
tls_ca_file: "ca.crt"
client_auth: true # Require client certificates
# Features
enable_health_check: true
enable_reflection: true
enable_metrics: true
enable_tracing: true
enable_logging: true
# Keepalive
keepalive:
time: 2h
timeout: 20s
enforcement_policy: true
min_time: 5m
permit_without_stream: falsegrpc.NewExtension(
grpc.WithAddress(":50051"),
grpc.WithMaxMessageSize(8 * 1024 * 1024), // 8MB
grpc.WithMaxConcurrentStreams(100),
grpc.WithTLS("server.crt", "server.key", "ca.crt"),
grpc.WithClientAuth(true),
grpc.WithHealthCheck(true),
grpc.WithReflection(true),
grpc.WithMetrics(true),
)# Generate CA
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 365 -key ca.key -out ca.crt
# Generate server certificate
openssl genrsa -out server.key 4096
openssl req -new -key server.key -out server.csr
openssl x509 -req -days 365 -in server.csr -CA ca.crt -CAkey ca.key -set_serial 01 -out server.crt
# Generate client certificate (for mTLS)
openssl genrsa -out client.key 4096
openssl req -new -key client.key -out client.csr
openssl x509 -req -days 365 -in client.csr -CA ca.crt -CAkey ca.key -set_serial 02 -out client.crtgrpc.NewExtension(
grpc.WithTLS("server.crt", "server.key", ""),
)grpc.NewExtension(
grpc.WithTLS("server.crt", "server.key", "ca.crt"),
grpc.WithClientAuth(true), // Verify client certificates
)// Create custom interceptor
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// Check authentication
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
token := md.Get("authorization")
if len(token) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing token")
}
// Validate token...
return handler(ctx, req)
}
// Register interceptor before starting the server
grpcServer, _ := forge.Inject[grpc.GRPC](app.Container())
grpcServer.AddUnaryInterceptor(authInterceptor)The extension automatically registers the gRPC health checking protocol.
grpcurl -plaintext localhost:50051 grpc.health.v1.Health/Checktype myHealthChecker struct{}
func (h *myHealthChecker) Check(ctx context.Context) error {
// Check dependencies (database, cache, etc.)
return nil
}
grpcServer.RegisterHealthChecker("my-service", &myHealthChecker{})Automatic metrics collection when enable_metrics: true:
grpc_unary_started_total- Total unary RPCs startedgrpc_unary_succeeded_total- Successful unary RPCsgrpc_unary_failed_total- Failed unary RPCs (with code label)grpc_unary_duration_seconds- Unary RPC duration histogramgrpc_stream_active- Current active streamsgrpc_stream_duration_seconds- Stream duration histogramgrpc_stream_succeeded_total- Successful streamsgrpc_stream_failed_total- Failed streams (with code label)
Get runtime statistics:
stats := grpcServer.GetStats()
fmt.Printf("Start Time: %d\n", stats.StartTime)
fmt.Printf("RPCs Started: %d\n", stats.RPCsStarted)
fmt.Printf("RPCs Succeeded: %d\n", stats.RPCsSucceeded)
fmt.Printf("RPCs Failed: %d\n", stats.RPCsFailed)
fmt.Printf("Active Streams: %d\n", stats.ActiveStreams)List registered services:
services := grpcServer.GetServices()
for _, service := range services {
fmt.Printf("Service: %s\n", service.Name)
for _, method := range service.Methods {
fmt.Printf(" Method: %s (client=%v, server=%v)\n",
method.Name, method.IsClientStream, method.IsServerStream)
}
}func TestGRPCService(t *testing.T) {
app := forge.NewApp(forge.AppConfig{Name: "test"})
app.RegisterExtension(grpc.NewExtension(
grpc.WithAddress("127.0.0.1:0"), // Random port
))
app.Start(context.Background())
defer app.Stop(context.Background())
// Get server and register service
grpcServer, _ := forge.Inject[grpc.GRPC](app.Container())
pb.RegisterTestServiceServer(grpcServer.GetServer(), &testImpl{})
// Create client and test...
}- Always enable TLS in production - Use
WithTLS()to secure your gRPC traffic - Use mTLS for service-to-service communication - Enable client authentication with
WithClientAuth(true) - Set appropriate message size limits - Use
WithMaxRecvMsgSize()andWithMaxSendMsgSize()to prevent memory exhaustion - Configure keepalive to detect dead connections - Adjust keepalive settings based on your network environment
- Enable health checks for load balancers - Use
WithHealthCheck(true)and register custom health checkers - Use reflection in development only - Disable in production for security
- Monitor metrics and set up alerts - Use the automatic metrics to track RPC performance
- Implement custom health checkers for dependencies - Check database, cache, and other service health
- Add custom interceptors for cross-cutting concerns - Authentication, authorization, rate limiting, etc.
- Use context for cancellation and deadlines - Always pass context through your RPC calls
The gRPC extension follows Forge's extension pattern:
- Configuration: Loads config from
ConfigManagerwith dual-key support - DI Registration: Registers the gRPC server as
"grpc"in the container - Lifecycle: Implements
Register(),Start(),Stop(),Health()methods - Observability: Integrates with Forge's logging and metrics systems
- Interceptors: Chains observability and custom interceptors
- TLS: Loads certificates and configures mTLS if enabled
- Health Checking: Implements standard gRPC health protocol
The gRPC extension is designed for high-performance production use:
- Zero-copy streaming where possible
- Connection pooling with configurable limits
- Concurrent stream handling with
MaxConcurrentStreams - Message size limits to prevent memory exhaustion
- Keepalive configuration to detect and close dead connections
- Efficient observability with minimal overhead
Ensure the server is started before registering services:
app.Start(ctx) // Start first
grpcServer, _ := forge.Inject[grpc.GRPC](app.Container())
pb.RegisterYourServiceServer(grpcServer.GetServer(), &impl{}) // Register afterVerify certificate paths and permissions:
ls -la server.crt server.key ca.crt
openssl verify -CAfile ca.crt server.crtCheck if health check is enabled and custom health checkers are registered:
grpc.WithHealthCheck(true)
grpcServer.RegisterHealthChecker("my-service", &checker{})Reduce message size limits and concurrent streams:
grpc.WithMaxRecvMsgSize(1 * 1024 * 1024) // 1MB
grpc.WithMaxConcurrentStreams(50)See the v2/examples/ directory for complete examples:
grpc-basic/- Simple gRPC servicegrpc-advanced/- TLS, interceptors, health checks
MIT