Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ protected GridDhtLocalPartition localPartition() {
boolean deferred = false;
GridCacheVersion ver0 = null;

cctx.shared().database().checkpointReadLock();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that all calls to unswap are covered with checkpoint read lock?
Maybe it's finally worth to add assert? Something like:

assert !checkExpire || cctx.shared().database().checkpointLockIsHeldByThread();

assert !checkExpire || cctx.shared().database().checkpointLockIsHeldByThread();

lockEntry();

Expand Down Expand Up @@ -494,8 +494,6 @@ protected GridDhtLocalPartition localPartition() {
}
finally {
unlockEntry();

cctx.shared().database().checkpointReadUnlock();
}

if (obsolete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,14 @@ private IgniteInternalFuture<Collection<GridCacheEntryInfo>> getAsync(
boolean addReader = !e.deleted();

if (addReader) {
e.unswap(false);
cctx.shared().database().checkpointReadLock();

try {
e.unswap(false);
}
finally {
cctx.shared().database().checkpointReadUnlock();
}

// Entry will be removed on touch() if no data in cache,
// but they could be loaded from store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,14 @@ private void map(Iterable<GridDhtCacheEntry> entries) {
boolean needVal;

try {
// Must unswap entry so that isNewLocked returns correct value.
e.unswap(false);
cctx.shared().database().checkpointReadLock();
try {
// Must unswap entry so that isNewLocked returns correct value.
e.unswap(false);
}
finally {
cctx.shared().database().checkpointReadUnlock();
}

needVal = e.isNewLocked();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,19 +620,26 @@ IgniteInternalFuture<GridCacheReturn> lockAllAsync(
if (txEntry == null) {
GridDhtCacheEntry cached;

while (true) {
try {
cached = dhtCache.entryExx(key, topVer);
cctx.database().checkpointReadLock();

cached.unswap(read);
try {
while (true) {
try {
cached = dhtCache.entryExx(key, topVer);

break;
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Get removed entry: " + key);
cached.unswap(read);

break;
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Get removed entry: " + key);
}
}
}
finally {
cctx.database().checkpointReadUnlock();
}

addActiveCache(dhtCache.context(), false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1212,11 +1212,18 @@ private IgniteInternalFuture<Object> forceRebalanceKeys(Map<Integer, Collection<
for (IgniteTxEntry entry : entries) {
GridCacheVersion serReadVer = entry.entryReadVersion();

if (serReadVer != null) {
entry.cached().unswap();
cctx.database().checkpointReadLock();

if (!entry.cached().checkSerializableReadVersion(serReadVer))
return versionCheckError(entry);
try {
if (serReadVer != null) {
entry.cached().unswap();

if (!entry.cached().checkSerializableReadVersion(serReadVer))
return versionCheckError(entry);
}
}
finally {
cctx.database().checkpointReadUnlock();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1259,29 +1259,29 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx,
GridCacheEntryEx entry = entryEx(cacheCtx, txKey, entryTopVer != null ? entryTopVer : topologyVersion());

try {
entry.unswap(false);

// Check if lock is being explicitly acquired by the same thread.
if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
entry.lockedByThread(threadId, xidVer)) {
throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
"externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
", entry=" + entry +
", xidVer=" + xidVer +
", threadId=" + threadId +
", locNodeId=" + cctx.localNodeId() + ']');
}

CacheObject old = null;
GridCacheVersion readVer = null;

if (optimistic() && !implicit()) {
try {
if (needReadVer) {
if (primaryLocal(entry)) {
cctx.database().checkpointReadLock();
cctx.database().checkpointReadLock();

try {
try {
entry.unswap(false);

// Check if lock is being explicitly acquired by the same thread.
if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() &&
entry.lockedByThread(threadId, xidVer)) {
throw new IgniteCheckedException("Cannot access key within transaction if lock is " +
"externally held [key=" + CU.value(cacheKey, cacheCtx, false) +
", entry=" + entry +
", xidVer=" + xidVer +
", threadId=" + threadId +
", locNodeId=" + cctx.localNodeId() + ']');
}

if (optimistic() && !implicit()) {
try {
if (needReadVer) {
if (primaryLocal(entry)) {
EntryGetResult res = entry.innerGetVersioned(
null,
this,
Expand All @@ -1298,15 +1298,8 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx,
readVer = res.version();
}
}
finally {
cctx.database().checkpointReadUnlock();
}
}
}
else {
cctx.database().checkpointReadLock();

try {
else {
old = entry.innerGet(
null,
this,
Expand All @@ -1318,19 +1311,19 @@ private boolean enlistWriteEntry(GridCacheContext cacheCtx,
null,
keepBinary);
}
finally {
cctx.database().checkpointReadUnlock();
}
}
}
catch (ClusterTopologyCheckedException e) {
entry.touch();
catch (ClusterTopologyCheckedException e) {
entry.touch();

throw e;
throw e;
}
}
else
old = entry.rawGet();
}
finally {
cctx.database().checkpointReadUnlock();
}
else
old = entry.rawGet();

final GridCacheOperation op = rmv ? DELETE :
entryProc != null ? TRANSFORM : old != null ? UPDATE : CREATE;
Expand Down Expand Up @@ -2399,21 +2392,28 @@ private <K, V> Collection<KeyCacheObject> enlistRead(
optimistic() ? accessPolicy(cacheCtx, txKey, expiryPlc) : null;

if (needReadVer) {
getRes = primaryLocal(entry) ?
entry.innerGetVersioned(
null,
this,
/*metrics*/true,
/*event*/true,
null,
resolveTaskName(),
accessPlc,
!deserializeBinary,
null) : null;
cctx.database().checkpointReadLock();

if (getRes != null) {
val = getRes.value();
readVer = getRes.version();
try {
getRes = primaryLocal(entry) ?
entry.innerGetVersioned(
null,
this,
/*metrics*/true,
/*event*/true,
null,
resolveTaskName(),
accessPlc,
!deserializeBinary,
null) : null;

if (getRes != null) {
val = getRes.value();
readVer = getRes.version();
}
}
finally {
cctx.database().checkpointReadUnlock();
}
}
else {
Expand Down
Loading
Loading