Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ interface ClientMetrics {
long getWriteRequestsCount();

long getFilteredReadRequestsCount();

String getHostAddress();

String getUserName();

String getClientVersion();

String getServiceName();
}

/** Returns the user name */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public static UserMetrics toUserMetrics(ClusterStatusProtos.UserLoad userLoad) {
userLoad.getClientMetricsList().stream()
.map(clientMetrics -> new ClientMetricsImpl(clientMetrics.getHostName(),
clientMetrics.getReadRequestsCount(), clientMetrics.getWriteRequestsCount(),
clientMetrics.getFilteredRequestsCount()))
clientMetrics.getFilteredRequestsCount(), clientMetrics.getHostAddress(),
clientMetrics.getUserName(), clientMetrics.getServiceName(),
clientMetrics.getClientVersion()))
.forEach(builder::addClientMetris);
return builder.build();
}
Expand All @@ -49,7 +51,10 @@ public static ClusterStatusProtos.UserLoad toUserMetrics(UserMetrics userMetrics
.setHostName(clientMetrics.getHostName())
.setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
.setReadRequestsCount(clientMetrics.getReadRequestsCount())
.setFilteredRequestsCount(clientMetrics.getFilteredReadRequestsCount()).build())
.setFilteredRequestsCount(clientMetrics.getFilteredReadRequestsCount())
.setHostAddress(clientMetrics.getHostAddress()).setUserName(clientMetrics.getUserName())
.setServiceName(clientMetrics.getServiceName())
.setClientVersion(clientMetrics.getClientVersion()).build())
.forEach(builder::addClientMetrics);
return builder.build();
}
Expand Down Expand Up @@ -79,13 +84,27 @@ public static class ClientMetricsImpl implements UserMetrics.ClientMetrics {
private final String hostName;
private final long readRequestCount;
private final long writeRequestCount;
private final String hostAddress;
private final String userName;
private final String serviceName;
private final String clientVersion;

public ClientMetricsImpl(String hostName, long readRequest, long writeRequest,
long filteredReadRequestsCount) {
this(hostName, readRequest, writeRequest, filteredReadRequestsCount, null, null, null, null);
}

public ClientMetricsImpl(String hostName, long readRequest, long writeRequest,
long filteredReadRequestsCount, String hostAddress, String userName, String serviceName,
String clientVersion) {
this.hostName = hostName;
this.readRequestCount = readRequest;
this.writeRequestCount = writeRequest;
this.filteredReadRequestsCount = filteredReadRequestsCount;
this.hostAddress = hostAddress != null ? hostAddress : "Unknown";
this.userName = userName != null ? userName : "Unknown";
this.serviceName = serviceName != null ? serviceName : "Unknown";
this.clientVersion = clientVersion != null ? clientVersion : "Unknown";
}

@Override
Expand All @@ -107,6 +126,26 @@ public long getWriteRequestsCount() {
public long getFilteredReadRequestsCount() {
return filteredReadRequestsCount;
}

@Override
public String getHostAddress() {
return hostAddress;
}

@Override
public String getUserName() {
return userName;
}

@Override
public String getServiceName() {
return serviceName;
}

@Override
public String getClientVersion() {
return clientVersion;
}
}

private static class UserMetricsImpl implements UserMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ interface ClientMetrics {
void incrementFilteredReadRequests();

long getFilteredReadRequests();

String getHostAddress();

String getUserName();

String getClientVersion();

String getServiceName();
}

String getUser();
Expand Down Expand Up @@ -77,5 +85,6 @@ interface ClientMetrics {
* @param hostName hostname of the client
* @return Instance of ClientMetrics
*/
ClientMetrics getOrCreateMetricsClient(String hostName);
ClientMetrics getOrCreateMetricsClient(String hostName, String hostAddress, String userName,
String clientVersion, String serviceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,18 @@ static class ClientMetricsImpl implements ClientMetrics {
final LongAdder readRequestsCount = new LongAdder();
final LongAdder writeRequestsCount = new LongAdder();
final LongAdder filteredRequestsCount = new LongAdder();
private final String hostAddress;
private final String userName;
private final String clientVersion;
private final String serviceName;

public ClientMetricsImpl(String hostName) {
public ClientMetricsImpl(String hostName, String hostAddress, String userName,
String clientVersion, String serviceName) {
this.hostName = hostName;
this.hostAddress = hostAddress != null ? hostAddress : "Unknown";
this.userName = userName != null ? userName : "Unknown";
this.clientVersion = clientVersion != null ? clientVersion : "Unknown";
this.serviceName = serviceName != null ? serviceName : "Unknown";
}

@Override
Expand Down Expand Up @@ -109,6 +118,26 @@ public void incrementFilteredReadRequests() {
public long getFilteredReadRequests() {
return filteredRequestsCount.sum();
}

@Override
public String getHostAddress() {
return hostAddress;
}

@Override
public String getUserName() {
return userName;
}

@Override
public String getClientVersion() {
return clientVersion;
}

@Override
public String getServiceName() {
return serviceName;
}
}

public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
Expand Down Expand Up @@ -278,12 +307,13 @@ public Map<String, ClientMetrics> getClientMetrics() {
}

@Override
public ClientMetrics getOrCreateMetricsClient(String client) {
public ClientMetrics getOrCreateMetricsClient(String client, String hostAddress, String userName,
String clientVersion, String serviceName) {
ClientMetrics source = clientMetricsMap.get(client);
if (source != null) {
return source;
}
source = new ClientMetricsImpl(client);
source = new ClientMetricsImpl(client, hostAddress, userName, clientVersion, serviceName);
ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source);
if (prev != null) {
return prev;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ public enum Field {
MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE),
CLIENT_COUNT("#CLIENT", "Client Count", false, false, FieldValueType.INTEGER),
USER_COUNT("#USER", "User Count", false, false, FieldValueType.INTEGER),
CLIENT("CLIENT", "Client Hostname", true, true, FieldValueType.STRING);
CLIENT("CLIENT", "Client Hostname", true, true, FieldValueType.STRING),
HOST_ADDRESS("HADDR", "Client Host Address", true, true, FieldValueType.STRING),
CLIENT_VERSION("CVER", "Client Version", true, true, FieldValueType.STRING),
USER_NAME("USER", "User Name", true, true, FieldValueType.STRING),
SERVICE_NAME("SVC", "Service Name", true, true, FieldValueType.STRING);

private final String header;
private final String description;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ public final class ClientModeStrategy implements ModeStrategy {
new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true),
new FieldInfo(Field.HOST_ADDRESS, 0, true), new FieldInfo(Field.USER_NAME, 0, true),
new FieldInfo(Field.CLIENT_VERSION, 0, true), new FieldInfo(Field.SERVICE_NAME, 0, true));
private final Map<String, RequestCountPerSecond> requestCountPerSecondMap = new HashMap<>();

ClientModeStrategy() {
Expand Down Expand Up @@ -145,6 +147,10 @@ Record createRecord(String user, UserMetrics.ClientMetrics clientMetrics,
requestCountPerSecond.getWriteRequestCountPerSecond());
builder.put(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND,
requestCountPerSecond.getFilteredReadRequestCountPerSecond());
builder.put(Field.HOST_ADDRESS, clientMetrics.getHostAddress());
builder.put(Field.USER_NAME, clientMetrics.getUserName());
builder.put(Field.CLIENT_VERSION, clientMetrics.getClientVersion());
builder.put(Field.SERVICE_NAME, clientMetrics.getServiceName());
builder.put(Field.USER, user);
return builder.build();
}
Expand Down
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's already a ClientMetrics object defined in ClusterStatus.proto, which is used by the hbtop tool to provide some client related information. I think we should add these there and reuse that object on this UI, while also update hbtop to report the additional information.

Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ message ClientMetrics {

/** the current total filtered requests made from a client */
optional uint64 filtered_requests_count = 4;

optional string host_address = 5;
optional string user_name = 6;
optional string client_version = 7;
optional string service_name = 8;
}

/* Server-level protobufs */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class NettyServerRpcConnection extends ServerRpcConnection {
} else {
this.hostAddress = inetSocketAddress.getAddress().getHostAddress();
}

InetSocketAddress localSocketAddress = ((InetSocketAddress) channel.localAddress());
this.localHostAddress = (localSocketAddress != null && localSocketAddress.getAddress() != null)
? localSocketAddress.getAddress().getHostAddress()
: "*Unknown*";
this.remotePort = inetSocketAddress.getPort();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,24 @@ public static Optional<InetAddress> getRemoteAddress() {
return getCurrentCall().map(RpcCall::getRemoteAddress);
}

/**
* Returns the ServerRpcConnection for the current RPC request or not present if no connection is
* available. This allows access to connection-level information such as the client's IP address,
* authentication details, codec configuration, etc.
* <p>
* This method should only be called from within an RPC handler thread context.
* @return the current ServerRpcConnection, or Optional.empty() if called outside of an RPC
* context
*/
public static Optional<ServerRpcConnection> getCurrentServerRpcConnection() {
Optional<RpcCall> call = getCurrentCall();
if (call.isPresent() && call.get() instanceof ServerCall) {
ServerCall<?> serverCall = (ServerCall<?>) call.get();
return Optional.ofNullable(serverCall.getConnection());
}
return Optional.empty();
}

/**
* @param serviceName Some arbitrary string that represents a 'service'.
* @param services Available service instances
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ public int getPriority() {
return this.header.getPriority();
}

/**
* Get the ServerRpcConnection associated with this call.
* @return the connection object
*/
public T getConnection() {
return this.connection;
}

/*
* Short string representation without param info because param itself could be huge depends on
* the payload of a command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
justification = "False positive according to http://sourceforge.net/p/findbugs/bugs/1032/")
@InterfaceAudience.Private
abstract class ServerRpcConnection implements Closeable {
public abstract class ServerRpcConnection implements Closeable {

private static final TextMapGetter<RPCTInfo> getter = new RPCTInfoGetter();

Expand All @@ -114,6 +114,9 @@ abstract class ServerRpcConnection implements Closeable {
protected ConnectionHeader connectionHeader;
protected Map<String, byte[]> connectionAttributes;

// The local address of this server-side connection (the RegionServer's own IP).
protected String localHostAddress;

/**
* Codec the client asked use.
*/
Expand Down Expand Up @@ -160,6 +163,10 @@ public int getRemotePort() {
return remotePort;
}

public String getLocalHostAddress() {
return localHostAddress;
}

public VersionInfo getVersionInfo() {
if (connectionHeader != null && connectionHeader.hasVersionInfo()) {
return connectionHeader.getVersionInfo();
Expand Down Expand Up @@ -487,6 +494,10 @@ private void processConnectionHeader(ByteBuff buf) throws IOException {
this.hostAddress, this.remotePort, version, this.useSasl, this.ugi, serviceName);
}

public ConnectionHeader getConnectionHeader() {
return connectionHeader;
}

/**
* Send the response for connection header
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public synchronized void sendResponseIfReady() throws IOException {
this.responder.doRespond(getConnection(), this);
}

SimpleServerRpcConnection getConnection() {
@Override
public SimpleServerRpcConnection getConnection() {
return this.connection;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,10 @@ private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
.setHostName(clientMetrics.getHostName())
.setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
.setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
.setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
.setReadRequestsCount(clientMetrics.getReadRequestsCount())
.setHostAddress(clientMetrics.getHostAddress()).setUserName(clientMetrics.getUserName())
.setServiceName(clientMetrics.getServiceName())
.setClientVersion(clientMetrics.getClientVersion()).build())
.forEach(userLoadBldr::addClientMetrics);
return userLoadBldr.build();
}
Expand Down
Loading
Loading