Skip to content

Commit 60a1b91

Browse files
author
Colm Dougan
committed
HDDS-14004. EventNotification: Capture data to the completed operation ledger table
1 parent 97c9862 commit 60a1b91

10 files changed

Lines changed: 307 additions & 13 deletions
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.hadoop.ozone.om.ratis;
19+
20+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
21+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo.OperationArgs;
22+
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
/**
27+
* This class when given a request will determine if the request is one of
28+
* a subset of write requests which we want to save to the
29+
* OmCompletedRequestInfo ledger and if so it will return the
30+
* appropriate instance.
31+
*
32+
* NOTE: as per elsewhere - this class may not need to exist if we go
33+
* with the approach of the subset of response handlers which we want to
34+
* populate a OmCompletedRequestInfo ledger row can do so themselves
35+
* (e.g. in OMKeyCommitResponse::addToBatch)
36+
*/
37+
public final class OzoneManagerCompletedRequestInfoProvider {
38+
39+
private static final Logger LOG =
40+
LoggerFactory.getLogger(OzoneManagerCompletedRequestInfoProvider.class);
41+
42+
public OmCompletedRequestInfo get(long trxLogIndex, OzoneManagerProtocolProtos.OMRequest omRequest) {
43+
44+
switch (omRequest.getCmdType()) {
45+
case CreateVolume:
46+
logRequest("CreateVolume", omRequest);
47+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), null)
48+
.setVolumeName(omRequest.getCreateVolumeRequest().getVolumeInfo().getVolume())
49+
.setOpArgs(new OperationArgs.CreateVolumeArgs())
50+
.build();
51+
52+
case DeleteVolume:
53+
logRequest("DeleteVolume", omRequest);
54+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), null)
55+
.setVolumeName(omRequest.getDeleteVolumeRequest().getVolumeName())
56+
.setOpArgs(new OperationArgs.DeleteVolumeArgs())
57+
.build();
58+
59+
case CreateBucket:
60+
logRequest("CreateBucket", omRequest);
61+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), null)
62+
.setVolumeName(omRequest.getCreateBucketRequest().getBucketInfo().getVolumeName())
63+
.setBucketName(omRequest.getCreateBucketRequest().getBucketInfo().getBucketName())
64+
.setOpArgs(new OperationArgs.CreateBucketArgs())
65+
.build();
66+
67+
case DeleteBucket:
68+
logRequest("DeleteBucket", omRequest);
69+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), null)
70+
.setVolumeName(omRequest.getDeleteBucketRequest().getVolumeName())
71+
.setBucketName(omRequest.getDeleteBucketRequest().getBucketName())
72+
.setOpArgs(new OperationArgs.DeleteBucketArgs())
73+
.build();
74+
75+
case CreateKey:
76+
logRequest("CreateKey", omRequest);
77+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), omRequest.getCreateKeyRequest().getKeyArgs())
78+
.setOpArgs(new OperationArgs.CreateKeyArgs())
79+
.build();
80+
81+
case RenameKey:
82+
logRequest("RenameKey", omRequest);
83+
OzoneManagerProtocolProtos.RenameKeyRequest renameReq
84+
= (OzoneManagerProtocolProtos.RenameKeyRequest) omRequest.getRenameKeyRequest();
85+
86+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), omRequest.getRenameKeyRequest().getKeyArgs())
87+
.setOpArgs(new OperationArgs.RenameKeyArgs(renameReq.getToKeyName()))
88+
.build();
89+
90+
case DeleteKey:
91+
logRequest("DeleteKey", omRequest);
92+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), omRequest.getDeleteKeyRequest().getKeyArgs())
93+
.setOpArgs(new OperationArgs.DeleteKeyArgs())
94+
.build();
95+
96+
case CommitKey:
97+
logRequest("CommitKey", omRequest);
98+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), omRequest.getCommitKeyRequest().getKeyArgs())
99+
.setOpArgs(new OperationArgs.CommitKeyArgs())
100+
.build();
101+
102+
case CreateDirectory:
103+
logRequest("CreateDirectory", omRequest);
104+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), omRequest.getCreateDirectoryRequest().getKeyArgs())
105+
.setOpArgs(new OperationArgs.CreateDirectoryArgs())
106+
.build();
107+
108+
case CreateFile:
109+
logRequest("CreateFile", omRequest);
110+
111+
OzoneManagerProtocolProtos.CreateFileRequest createFileReq
112+
= (OzoneManagerProtocolProtos.CreateFileRequest) omRequest.getCreateFileRequest();
113+
114+
return requestInfoBuilder(trxLogIndex, omRequest.getCmdType(), omRequest.getCreateFileRequest().getKeyArgs())
115+
.setOpArgs(new OperationArgs.CreateFileArgs(createFileReq.getIsRecursive(),
116+
createFileReq.getIsOverwrite()))
117+
.build();
118+
119+
default:
120+
LOG.debug("Unhandled cmdType={}", omRequest.getCmdType());
121+
return null;
122+
}
123+
}
124+
125+
private static void logRequest(String label, OzoneManagerProtocolProtos.OMRequest omRequest) {
126+
if (LOG.isDebugEnabled()) {
127+
LOG.debug("---> {} {}", label, omRequest);
128+
}
129+
}
130+
131+
private OmCompletedRequestInfo.Builder requestInfoBuilder(long trxLogIndex,
132+
OzoneManagerProtocolProtos.Type cmdType,
133+
OzoneManagerProtocolProtos.KeyArgs keyArgs) {
134+
135+
OmCompletedRequestInfo.Builder builder = OmCompletedRequestInfo.newBuilder()
136+
.setTrxLogIndex(trxLogIndex)
137+
.setCmdType(cmdType)
138+
.setCreationTime(System.currentTimeMillis());
139+
140+
if (keyArgs != null) {
141+
builder.setVolumeName(keyArgs.getVolumeName());
142+
builder.setBucketName(keyArgs.getBucketName());
143+
builder.setKeyName(keyArgs.getKeyName());
144+
}
145+
146+
return builder;
147+
}
148+
}

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.ozone.om.OMMetadataManager;
4545
import org.apache.hadoop.ozone.om.S3SecretManager;
4646
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
47+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
4748
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
4849
import org.apache.hadoop.ozone.om.response.OMClientResponse;
4950
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -106,10 +107,13 @@ public final class OzoneManagerDoubleBuffer {
106107
private static class Entry {
107108
private final TermIndex termIndex;
108109
private final OMClientResponse response;
110+
// strawman approach: see comments in addToBatch()
111+
private final OmCompletedRequestInfo completedRequestInfo;
109112

110-
Entry(TermIndex termIndex, OMClientResponse response) {
113+
Entry(TermIndex termIndex, OMClientResponse response, OmCompletedRequestInfo completedRequestInfo) {
111114
this.termIndex = termIndex;
112115
this.response = response;
116+
this.completedRequestInfo = completedRequestInfo;
113117
}
114118

115119
TermIndex getTermIndex() {
@@ -119,6 +123,10 @@ TermIndex getTermIndex() {
119123
OMClientResponse getResponse() {
120124
return response;
121125
}
126+
127+
OmCompletedRequestInfo getCompletedRequestInfo() {
128+
return completedRequestInfo;
129+
}
122130
}
123131

124132
/**
@@ -404,11 +412,29 @@ private String addToBatch(Queue<Entry> buffer, BatchOperation batchOperation) {
404412
for (Entry entry: buffer) {
405413
OMClientResponse response = entry.getResponse();
406414
OMResponse omResponse = response.getOMResponse();
415+
OmCompletedRequestInfo completedRequestInfo = entry.getCompletedRequestInfo();
407416
lastTraceId = omResponse.getTraceID();
408417

409418
try {
410419
addToBatchWithTrace(omResponse,
411420
() -> response.checkAndUpdateDB(omMetadataManager, batchOperation));
421+
422+
// This is a strawman approach and requires some discussion
423+
// with the community on approach.
424+
//
425+
// TODO: would it be better to have each type of request we want
426+
// to capture populate a row in the ledger in the addToBatch
427+
// callback of the response (triggered above) e.g.
428+
// OMKeyCommitResponse::addToBatch could generate the suitable
429+
// OmCompletedRequestInfo row and populate (Q: if we went with
430+
// that approach then would it have access to all the necessary
431+
// request parameters? e.g. termIndex.getIndex(), cmdType and
432+
// all captured request parameters
433+
if (completedRequestInfo != null) {
434+
omMetadataManager.getCompletedRequestInfoTable().putWithBatch(
435+
batchOperation, completedRequestInfo.getTrxLogIndex(), completedRequestInfo);
436+
}
437+
412438
} catch (IOException ex) {
413439
// During Adding to RocksDB batch entry got an exception.
414440
// We should terminate the OM.
@@ -554,8 +580,9 @@ private void terminate(Throwable t, int status, OMResponse omResponse) {
554580
/**
555581
* Add OmResponseBufferEntry to buffer.
556582
*/
557-
public synchronized void add(OMClientResponse response, TermIndex termIndex) {
558-
currentBuffer.add(new Entry(termIndex, response));
583+
public synchronized void add(OMClientResponse response, TermIndex termIndex,
584+
OmCompletedRequestInfo completedRequestInfo) {
585+
currentBuffer.add(new Entry(termIndex, response, completedRequestInfo));
559586
notify();
560587
}
561588

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,7 @@ private OMResponse createErrorResponse(
629629
}
630630
OMResponse omResponse = omResponseBuilder.build();
631631
OMClientResponse omClientResponse = new DummyOMClientResponse(omResponse);
632-
ozoneManagerDoubleBuffer.add(omClientResponse, termIndex);
632+
ozoneManagerDoubleBuffer.add(omClientResponse, termIndex, null);
633633
return omResponse;
634634
}
635635

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/upgrade/OMPrepareRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, Execut
103103
// the snapshot index in the prepared state.
104104
OzoneManagerDoubleBuffer doubleBuffer =
105105
ozoneManager.getOmRatisServer().getOmStateMachine().getOzoneManagerDoubleBuffer();
106-
doubleBuffer.add(response, context.getTermIndex());
106+
doubleBuffer.add(response, context.getTermIndex(), null);
107107

108108
OzoneManagerRatisServer omRatisServer = ozoneManager.getOmRatisServer();
109109
final RaftServer.Division division = omRatisServer.getServerDivision();

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.apache.hadoop.ozone.om.helpers.ListOpenFilesResult;
7676
import org.apache.hadoop.ozone.om.helpers.OMAuditLogger;
7777
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
78+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
7879
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
7980
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
8081
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
@@ -91,6 +92,7 @@
9192
import org.apache.hadoop.ozone.om.helpers.TenantStateList;
9293
import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
9394
import org.apache.hadoop.ozone.om.helpers.TenantUserList;
95+
import org.apache.hadoop.ozone.om.ratis.OzoneManagerCompletedRequestInfoProvider;
9496
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
9597
import org.apache.hadoop.ozone.om.request.OMClientRequest;
9698
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
@@ -166,6 +168,7 @@
166168
import org.apache.hadoop.ozone.upgrade.UpgradeFinalization.StatusAndMessages;
167169
import org.apache.hadoop.ozone.util.PayloadUtils;
168170
import org.apache.hadoop.ozone.util.ProtobufUtils;
171+
import org.apache.ratis.server.protocol.TermIndex;
169172
import org.slf4j.Logger;
170173
import org.slf4j.LoggerFactory;
171174

@@ -177,10 +180,12 @@ public class OzoneManagerRequestHandler implements RequestHandler {
177180
static final Logger LOG =
178181
LoggerFactory.getLogger(OzoneManagerRequestHandler.class);
179182
private final OzoneManager impl;
183+
private final OzoneManagerCompletedRequestInfoProvider completedRequestInfoProvider;
180184
private FaultInjector injector;
181185

182186
public OzoneManagerRequestHandler(OzoneManager om) {
183187
this.impl = om;
188+
this.completedRequestInfoProvider = new OzoneManagerCompletedRequestInfoProvider();
184189
}
185190

186191
//TODO simplify it to make it shorter
@@ -426,6 +431,12 @@ public OMClientResponse handleWriteRequestImpl(OMRequest omRequest, ExecutionCon
426431
}
427432
}
428433

434+
@Override
435+
public OmCompletedRequestInfo handleSuccessfulWriteRequestImpl(TermIndex termIndex, OMRequest omRequest)
436+
throws IOException {
437+
return completedRequestInfoProvider.get(termIndex.getIndex(), omRequest);
438+
}
439+
429440
@VisibleForTesting
430441
public void setInjector(FaultInjector injector) {
431442
this.injector = injector;

hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/RequestHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
import java.io.IOException;
2121
import org.apache.hadoop.ozone.om.exceptions.OMException;
2222
import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
23+
import org.apache.hadoop.ozone.om.helpers.OmCompletedRequestInfo;
2324
import org.apache.hadoop.ozone.om.ratis.OzoneManagerDoubleBuffer;
2425
import org.apache.hadoop.ozone.om.response.OMClientResponse;
2526
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
2627
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
2728
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
29+
import org.apache.ratis.server.protocol.TermIndex;
2830

2931
/**
3032
* Handler to handleRequest the OmRequests.
@@ -61,7 +63,11 @@ default OMClientResponse handleWriteRequest(OMRequest omRequest, ExecutionContex
6163
OzoneManagerDoubleBuffer ozoneManagerDoubleBuffer) throws IOException {
6264
final OMClientResponse response = handleWriteRequestImpl(omRequest, context);
6365
if (omRequest.getCmdType() != Type.Prepare) {
64-
ozoneManagerDoubleBuffer.add(response, context.getTermIndex());
66+
67+
final OmCompletedRequestInfo omCompletedRequestInfo = handleSuccessfulWriteRequestImpl(
68+
context.getTermIndex(), omRequest);
69+
70+
ozoneManagerDoubleBuffer.add(response, context.getTermIndex(), omCompletedRequestInfo);
6571
}
6672
return response;
6773
}
@@ -74,4 +80,6 @@ default OMClientResponse handleWriteRequest(OMRequest omRequest, ExecutionContex
7480
* @return OMClientResponse
7581
*/
7682
OMClientResponse handleWriteRequestImpl(OMRequest omRequest, ExecutionContext context) throws IOException;
83+
84+
OmCompletedRequestInfo handleSuccessfulWriteRequestImpl(TermIndex index, OMRequest omRequest) throws IOException;
7785
}

0 commit comments

Comments
 (0)