Skip to content

Commit eeea943

Browse files
authored
ATK-4335: add delay detector (#3900)
* add delay detector * fix some delay detect logic * fix master and slave delay detect period * fix dbGroup manager cmd * fix that create delay detect table when dbGroup isn't useless
1 parent b9e3d88 commit eeea943

24 files changed

Lines changed: 866 additions & 219 deletions

src/main/java/com/actiontech/dble/DbleServer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ public void startup() throws Exception {
208208
this.config.testConnection();
209209
LOGGER.info("==========================================Test connection finish==================================");
210210

211+
this.config.createDelayDetectTable();
212+
LOGGER.info("==========================================Create delay detect table finish==================================");
213+
211214
// sync global status
212215
this.config.getAndSyncKeyVariables();
213216
LOGGER.info("=====================================Get And Sync KeyVariables finish=============================");

src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java

Lines changed: 86 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.io.IOException;
3131
import java.lang.reflect.Type;
3232
import java.util.*;
33+
import java.util.concurrent.atomic.AtomicLong;
3334
import java.util.concurrent.locks.ReentrantReadWriteLock;
3435

3536
public class PhysicalDbGroup {
@@ -42,11 +43,15 @@ public class PhysicalDbGroup {
4243
public static final int RW_SPLIT_ALL = 2;
4344
// weight
4445
public static final int WEIGHT = 0;
45-
private final List<PhysicalDbInstance> writeInstanceList;
46+
47+
enum USAGE {
48+
NONE, RW, SHARDING;
49+
}
4650

4751
private final String groupName;
4852
private final DbGroupConfig dbGroupConfig;
4953
private volatile PhysicalDbInstance writeDbInstance;
54+
private final List<PhysicalDbInstance> writeInstanceList;
5055
private Map<String, PhysicalDbInstance> allSourceMap = new HashMap<>();
5156

5257
private final int rwSplitMode;
@@ -55,8 +60,10 @@ public class PhysicalDbGroup {
5560
private final LocalReadLoadBalancer localReadLoadBalancer = new LocalReadLoadBalancer();
5661
private final ReentrantReadWriteLock adjustLock = new ReentrantReadWriteLock();
5762

58-
private boolean shardingUseless = true;
59-
private boolean rwSplitUseless = true;
63+
//delayDetection
64+
private AtomicLong logicTimestamp = new AtomicLong();
65+
66+
private USAGE usedFor = USAGE.NONE;
6067

6168
public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance writeDbInstances, PhysicalDbInstance[] readDbInstances, int rwSplitMode) {
6269
this.groupName = name;
@@ -66,8 +73,8 @@ public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance wri
6673
writeDbInstances.setDbGroup(this);
6774
this.writeDbInstance = writeDbInstances;
6875
this.writeInstanceList = Collections.singletonList(writeDbInstance);
69-
allSourceMap.put(writeDbInstances.getName(), writeDbInstances);
7076

77+
allSourceMap.put(writeDbInstances.getName(), writeDbInstances);
7178
for (PhysicalDbInstance readDbInstance : readDbInstances) {
7279
readDbInstance.setDbGroup(this);
7380
allSourceMap.put(readDbInstance.getName(), readDbInstance);
@@ -89,6 +96,54 @@ public PhysicalDbGroup(PhysicalDbGroup org) {
8996
writeInstanceList = Collections.singletonList(writeDbInstance);
9097
}
9198

99+
public void init(String reason) {
100+
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
101+
entry.getValue().init(reason);
102+
}
103+
}
104+
105+
// only fresh backend connection pool
106+
public void init(List<String> sourceNames, String reason) {
107+
for (String sourceName : sourceNames) {
108+
if (allSourceMap.containsKey(sourceName)) {
109+
allSourceMap.get(sourceName).init(reason, false);
110+
}
111+
}
112+
}
113+
114+
public void stop(String reason) {
115+
stop(reason, false);
116+
}
117+
118+
public void stop(String reason, boolean closeFront) {
119+
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
120+
dbInstance.stop(reason, closeFront);
121+
}
122+
}
123+
124+
// only fresh backend connection pool
125+
public void stop(List<String> sourceNames, String reason, boolean closeFront) {
126+
for (String sourceName : sourceNames) {
127+
if (allSourceMap.containsKey(sourceName)) {
128+
allSourceMap.get(sourceName).stop(reason, closeFront, false);
129+
}
130+
}
131+
132+
if (closeFront) {
133+
Iterator<PooledConnection> iterator = IOProcessor.BACKENDS_OLD.iterator();
134+
while (iterator.hasNext()) {
135+
PooledConnection con = iterator.next();
136+
if (con instanceof BackendConnection) {
137+
BackendConnection backendCon = (BackendConnection) con;
138+
if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) {
139+
backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn");
140+
iterator.remove();
141+
}
142+
}
143+
}
144+
}
145+
}
146+
92147
public String getGroupName() {
93148
return groupName;
94149
}
@@ -125,88 +180,46 @@ PhysicalDbInstance findDbInstance(BackendConnection exitsCon) {
125180
}
126181

127182
boolean isSlave(PhysicalDbInstance ds) {
128-
return !(writeDbInstance == ds);
183+
return writeDbInstance != ds;
129184
}
130185

131186
public int getRwSplitMode() {
132187
return rwSplitMode;
133188
}
134189

135190
public boolean isUseless() {
136-
return shardingUseless && rwSplitUseless;
191+
return usedFor == USAGE.NONE;
192+
}
193+
194+
public boolean usedForSharding() {
195+
return usedFor == USAGE.SHARDING;
137196
}
138197

139-
public boolean isShardingUseless() {
140-
return shardingUseless;
198+
public boolean usedForRW() {
199+
return usedFor == USAGE.RW;
141200
}
142201

143-
public boolean isRwSplitUseless() {
144-
return rwSplitUseless;
202+
public void setUsedForSharding() {
203+
usedFor = USAGE.SHARDING;
145204
}
146205

147-
public void setShardingUseless(boolean shardingUseless) {
148-
this.shardingUseless = shardingUseless;
206+
public void setUsedForRW() {
207+
usedFor = USAGE.RW;
149208
}
150209

151-
public void setRwSplitUseless(boolean rwSplitUseless) {
152-
this.rwSplitUseless = rwSplitUseless;
210+
public USAGE getUsedFor() {
211+
return usedFor;
153212
}
154213

155214
private boolean checkSlaveSynStatus() {
156-
return (dbGroupConfig.getDelayThreshold() != -1) &&
157-
(dbGroupConfig.isShowSlaveSql());
215+
return ((dbGroupConfig.getDelayThreshold() != -1) && dbGroupConfig.isShowSlaveSql()) ||
216+
dbGroupConfig.isDelayDetection();
158217
}
159218

160219
public PhysicalDbInstance getWriteDbInstance() {
161220
return writeDbInstance;
162221
}
163222

164-
public void init(String reason) {
165-
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
166-
entry.getValue().init(reason);
167-
}
168-
}
169-
170-
public void init(List<String> sourceNames, String reason) {
171-
for (String sourceName : sourceNames) {
172-
if (allSourceMap.containsKey(sourceName)) {
173-
allSourceMap.get(sourceName).init(reason, false);
174-
}
175-
}
176-
}
177-
178-
public void stop(String reason) {
179-
stop(reason, false);
180-
}
181-
182-
public void stop(String reason, boolean closeFront) {
183-
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
184-
dbInstance.stop(reason, closeFront);
185-
}
186-
}
187-
188-
public void stop(List<String> sourceNames, String reason, boolean closeFront) {
189-
for (String sourceName : sourceNames) {
190-
if (allSourceMap.containsKey(sourceName)) {
191-
allSourceMap.get(sourceName).stop(reason, closeFront, false);
192-
}
193-
}
194-
195-
if (closeFront) {
196-
Iterator<PooledConnection> iterator = IOProcessor.BACKENDS_OLD.iterator();
197-
while (iterator.hasNext()) {
198-
PooledConnection con = iterator.next();
199-
if (con instanceof BackendConnection) {
200-
BackendConnection backendCon = (BackendConnection) con;
201-
if (backendCon.getPoolDestroyedTime() != 0 && sourceNames.contains(backendCon.getInstance().getConfig().getInstanceName())) {
202-
backendCon.closeWithFront("old active backend conn will be forced closed by closing front conn");
203-
iterator.remove();
204-
}
205-
}
206-
}
207-
}
208-
}
209-
210223
public Collection<PhysicalDbInstance> getDbInstances(boolean isAll) {
211224
if (!isAll && rwSplitMode == RW_SPLIT_OFF) {
212225
return writeInstanceList;
@@ -230,18 +243,6 @@ public PhysicalDbInstance[] getReadDbInstances() {
230243
return readSources;
231244
}
232245

233-
/**
234-
* rwsplit user
235-
*
236-
* @param master
237-
* @param writeStatistical
238-
* @return
239-
* @throws IOException
240-
*/
241-
public PhysicalDbInstance rwSelect(Boolean master, Boolean writeStatistical) throws IOException {
242-
return rwSelect(master, writeStatistical, false);
243-
}
244-
245246
/**
246247
* rwsplit user
247248
*
@@ -546,6 +547,14 @@ public boolean checkInstanceExist(String instanceName) {
546547
return true;
547548
}
548549

550+
public AtomicLong getLogicTimestamp() {
551+
return logicTimestamp;
552+
}
553+
554+
public void setLogicTimestamp(AtomicLong logicTimestamp) {
555+
this.logicTimestamp = logicTimestamp;
556+
}
557+
549558
private void reportHeartbeatError(PhysicalDbInstance ins) throws IOException {
550559
final DbInstanceConfig config = ins.getConfig();
551560
String heartbeatError = "the dbInstance[" + config.getUrl() + "] can't reach. Please check the dbInstance status";
@@ -565,7 +574,9 @@ public boolean equalsBaseInfo(PhysicalDbGroup pool) {
565574
pool.getDbGroupConfig().getErrorRetryCount() == this.dbGroupConfig.getErrorRetryCount() &&
566575
pool.getDbGroupConfig().getRwSplitMode() == this.dbGroupConfig.getRwSplitMode() &&
567576
pool.getDbGroupConfig().getDelayThreshold() == this.dbGroupConfig.getDelayThreshold() &&
577+
pool.getDbGroupConfig().getDelayPeriodMillis() == this.dbGroupConfig.getDelayPeriodMillis() &&
578+
pool.getDbGroupConfig().getDelayDatabase().equals(this.dbGroupConfig.getDelayDatabase()) &&
568579
pool.getDbGroupConfig().isDisableHA() == this.dbGroupConfig.isDisableHA() &&
569-
pool.getGroupName().equals(this.groupName) && pool.isShardingUseless() == this.isShardingUseless() && pool.isRwSplitUseless() == this.isRwSplitUseless();
580+
pool.getGroupName().equals(this.groupName) && pool.getUsedFor() == this.getUsedFor();
570581
}
571582
}

src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,12 @@
1818
import com.actiontech.dble.net.factory.MySQLConnectionFactory;
1919
import com.actiontech.dble.net.service.AbstractService;
2020
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
21-
import com.actiontech.dble.singleton.Scheduler;
2221
import com.actiontech.dble.singleton.TraceManager;
2322
import com.actiontech.dble.util.StringUtil;
24-
import com.actiontech.dble.util.TimeUtil;
2523
import org.slf4j.Logger;
2624
import org.slf4j.LoggerFactory;
2725

2826
import java.io.IOException;
29-
import java.util.concurrent.TimeUnit;
3027
import java.util.concurrent.atomic.AtomicBoolean;
3128
import java.util.concurrent.atomic.LongAdder;
3229

@@ -54,7 +51,6 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
5451
private final LongAdder writeCount = new LongAdder();
5552

5653
private final AtomicBoolean isInitial = new AtomicBoolean(false);
57-
private AtomicBoolean initHeartbeat = new AtomicBoolean(false);
5854

5955
// connection pool
6056
private ConnectionPool connectionPool;
@@ -96,24 +92,30 @@ public void init(String reason, boolean isInitHeartbeat) {
9692
return;
9793
}
9894

95+
if (dbGroup.usedForSharding()) {
96+
checkPoolSize();
97+
}
98+
99+
LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
100+
start(reason, isInitHeartbeat);
101+
}
102+
103+
private void checkPoolSize() {
99104
int size = config.getMinCon();
100105
String[] physicalSchemas = dbGroup.getSchemas();
101106
int initSize = physicalSchemas.length;
102107
if (size < initSize) {
103-
LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes), so dble will create at least 1 conn for every schema, " +
108+
LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes/apNodes), so dble will create at least 1 conn for every schema, " +
104109
"minCon size before:{}, now:{}", this.dbGroup.getGroupName() + "." + name, size, initSize);
105110
config.setMinCon(initSize);
106111
}
107112

108113
initSize = Math.max(initSize, config.getMinCon());
109114
size = config.getMaxCon();
110115
if (size < initSize) {
111-
LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize);
116+
LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes/apNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize);
112117
config.setMaxCon(initSize);
113118
}
114-
115-
LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
116-
start(reason, isInitHeartbeat);
117119
}
118120

119121
public void createConnectionSkipPool(String schema, ResponseHandler handler) {
@@ -363,36 +365,33 @@ private void startHeartbeat() {
363365
LOGGER.info("the instance[{}] is disabled or fake node, skip to start heartbeat.", this.dbGroup.getGroupName() + "." + name);
364366
return;
365367
}
368+
heartbeat.start(heartbeatRecoveryTime);
369+
}
366370

367-
heartbeat.start();
368-
if (initHeartbeat.compareAndSet(false, true)) {
369-
370-
heartbeat.setScheduledFuture(Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> {
371-
if (DbleServer.getInstance().getConfig().isFullyConfigured()) {
372-
if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {
373-
return;
374-
}
375-
376-
heartbeat.heartbeat();
377-
}
378-
}, 0L, config.getPoolConfig().getHeartbeatPeriodMillis(), TimeUnit.MILLISECONDS));
379-
} else {
380-
LOGGER.warn("init dbInstance[{}] heartbeat, but it has been initialized, skip initialization.", heartbeat.getSource().getName());
381-
}
371+
private void stopHeartbeat(String reason) {
372+
heartbeat.stop(reason);
382373
}
383374

384375
public void start(String reason) {
385376
start(reason, true);
386377
}
387378

388379
public void start(String reason, boolean isStartHeartbeat) {
380+
startPool(reason);
381+
if (isStartHeartbeat) {
382+
startHeartbeat();
383+
}
384+
}
385+
386+
private void startPool(String reason) {
387+
if (disabled.get() || fakeNode) {
388+
LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason);
389+
return;
390+
}
389391
if ((dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) && !dbGroup.isUseless()) {
390392
LOGGER.info("start connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason);
391393
this.connectionPool.startEvictor();
392394
}
393-
if (isStartHeartbeat) {
394-
startHeartbeat();
395-
}
396395
}
397396

398397
public void stop(String reason, boolean closeFront) {
@@ -401,17 +400,18 @@ public void stop(String reason, boolean closeFront) {
401400

402401
public void stop(String reason, boolean closeFront, boolean isStopHeartbeat) {
403402
if (isStopHeartbeat) {
404-
final boolean stop = heartbeat.isStop();
405-
heartbeat.stop(reason);
406-
if (!stop) {
407-
initHeartbeat.set(false);
408-
}
403+
stopHeartbeat(reason);
409404
}
405+
stopPool(reason, closeFront);
406+
407+
isInitial.set(false);
408+
}
409+
410+
private void stopPool(String reason, boolean closeFront) {
410411
if (dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) {
411412
LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason);
412413
connectionPool.stop(reason, closeFront);
413414
}
414-
isInitial.set(false);
415415
}
416416

417417
public void closeAllConnection(String reason) {

0 commit comments

Comments
 (0)