From a3d846f3cfee3c265ce7cc3b4a79f6aa44708ea7 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Wed, 10 Jun 2026 21:04:12 +0300 Subject: [PATCH 1/3] IGNITE-28727 Create internal API method to lock cache entry with specific version --- .../processors/cache/GridCacheAdapter.java | 194 +++++++- .../processors/cache/GridCacheProxyImpl.java | 52 ++ .../processors/cache/IgniteInternalCache.java | 54 ++ .../GridDistributedCacheAdapter.java | 8 +- .../distributed/dht/GridDhtLockFuture.java | 73 ++- .../dht/GridDhtTransactionalCacheAdapter.java | 72 ++- .../dht/GridDhtTxLocalAdapter.java | 56 ++- .../dht/atomic/GridDhtAtomicCache.java | 1 + .../dht/colocated/GridDhtColocatedCache.java | 27 +- .../colocated/GridDhtColocatedLockFuture.java | 69 ++- .../distributed/near/GridNearAtomicCache.java | 1 + .../distributed/near/GridNearLockFuture.java | 89 +++- .../distributed/near/GridNearLockRequest.java | 16 +- .../near/GridNearLockResponse.java | 18 + .../GridNearPessimisticTxPrepareFuture.java | 3 +- .../near/GridNearTransactionalCache.java | 2 + .../distributed/near/GridNearTxLocal.java | 64 ++- .../transactions/IgniteTxLocalAdapter.java | 56 ++- .../cache/transactions/IgniteTxManager.java | 5 +- ...heVersionedEntryTransactionalLockTest.java | 463 ++++++++++++++++++ .../cache/LockTxEntryOneNodeTest.java | 339 +++++++++++++ .../testsuites/IgniteCacheTestSuite4.java | 4 + 22 files changed, 1580 insertions(+), 86 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryTransactionalLockTest.java create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index d494e5df0dfb1..cb9965817efc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -98,6 +98,7 @@ import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; @@ -159,6 +160,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT; import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; @@ -616,7 +618,8 @@ public void active(boolean active) { /** * @param keys Keys to lock. - * @param timeout Lock timeout. + * @param timeout Transaction timeout. + * @param waitTimeout Lock wait timeout. * @param tx Transaction. * @param isRead {@code True} for read operations. * @param retval Flag to return value. @@ -629,6 +632,7 @@ public void active(boolean active) { public abstract IgniteInternalFuture txLockAsync( Collection keys, long timeout, + long waitTimeout, IgniteTxLocalEx tx, boolean isRead, boolean retval, @@ -3106,6 +3110,194 @@ public CacheMetricsImpl metrics0() { } } + /** {@inheritDoc} */ + @Override public boolean lockTxEntry(CacheEntry entry, long waitTimeout) throws IgniteCheckedException { + A.notNull(entry, "entry"); + + return lockTxEntryAsync(entry, waitTimeout).get(); + } + + /** {@inheritDoc} */ + @Override public boolean lockTxEntries(Collection> entries, long waitTimeout) + throws IgniteCheckedException { + A.notNull(entries, "entries"); + + return lockTxEntriesAsync(entries, waitTimeout).get(); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture lockTxEntryAsync(CacheEntry entry, long waitTimeout) { + A.notNull(entry, "entry"); + + return lockTxEntriesAsync(Collections.singleton(entry), waitTimeout); + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture lockTxEntriesAsync( + Collection> entries, + long waitTimeout + ) { + A.notNull(entries, "entries"); + + GridNearTxLocal tx = tx(); + + if (tx == null) + return new GridFinishedFuture<>( + new IgniteCheckedException("Failed to acquire transactional lock without transaction.")); + + if (!tx.pessimistic()) + return new GridFinishedFuture<>( + new IgniteCheckedException("Failed to acquire transactional lock in optimistic transaction.")); + + // Wait for previous per-transaction async operations to finish. + tx.txState().awaitLastFuture(); + + if (!tx.init()) + return new GridFinishedFuture<>(new IgniteTxRollbackCheckedException( + "Failed to acquire transactional lock because transaction has been completed: " + tx)); + + if (entries.isEmpty()) + return new GridFinishedFuture<>(true); + + try { + tx.addActiveCache(ctx, false); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + + Collection keys = new ArrayList<>(entries.size()); + List txEntries = new ArrayList<>(entries.size()); + List expVers = new ArrayList<>(entries.size()); + Set txKeys = new HashSet<>(entries.size()); + + for (CacheEntry entry : entries) { + A.notNull(entry, "entry"); + + KeyCacheObject key = ctx.toCacheKeyObject(entry.getKey()); + IgniteTxKey txKey = ctx.txKey(key); + + if (!txKeys.add(txKey)) + continue; + + IgniteTxEntry lockedTxEntry = tx.entry(txKey); + + if (lockedTxEntry != null && lockedTxEntry.locked()) + continue; + + if (!(entry.version() instanceof GridCacheVersion)) { + tx.removeAndUnlockTxEntries(txEntries); + + return new GridFinishedFuture<>(new IgniteCheckedException("Failed to acquire transactional lock for entry with " + + "unsupported version type [entry=" + entry + ", version=" + entry.version() + ']')); + } + + CacheObject val = ctx.toCacheObject(entry.getValue()); + GridCacheEntryEx entryEx = ctx.isColocated() ? ctx.colocated().entryExx(key, tx.topologyVersion(), true) : entryEx(key); + + IgniteTxEntry txEntry = tx.addEntry( + READ, + val, + null, + null, + entryEx, + null, + null, + true, + -1L, + -1L, + null, + false, + false, + false, + false, + CU.isNearEnabled(ctx) + ); + + keys.add(key); + txEntries.add(txEntry); + expVers.add((GridCacheVersion)entry.version()); + } + + if (keys.isEmpty()) + return new GridFinishedFuture<>(true); + + // Acquire transactional lock future from concrete cache implementation. Use txLockAsync which + // delegates to cache-specific lockAllAsync implementations for distributed caches. + long timeout = tx.remainingTime(); + + IgniteInternalFuture lockFut = txLockAsync(keys, + timeout, + waitTimeout == 0 ? timeout : waitTimeout, + tx, + /*isRead*/true, + /*retval*/false, + tx.isolation(), + /*invalidate*/false, + /*createTtl*/0L, + /*accessTtl*/0L); + + IgniteInternalFuture res = new GridEmbeddedFuture<>( + lockFut, + (locked, ex) -> { + if (ex != null) + return new GridFinishedFuture<>(ex); + + if (!locked) { + tx.removeAndUnlockTxEntries(txEntries); + + return new GridFinishedFuture<>(false); + } + + try { + for (int i = 0; i < txEntries.size(); i++) { + GridCacheEntryEx cached = txEntries.get(i).cached(); + EntryGetResult getRes = cached.innerGetVersioned( + null, + tx, + /*update-metrics*/false, + /*event*/false, + null, + tx.resolveTaskName(), + null, + false, + null); + + if (getRes == null || !expVers.get(i).equals(getRes.version())) { + tx.removeAndUnlockTxEntries(txEntries); + + return new GridFinishedFuture<>(false); + } + } + + return new GridFinishedFuture<>(true); + } + catch (IgniteCheckedException | GridCacheEntryRemovedException e) { + tx.removeAndUnlockTxEntries(txEntries); + + return new GridFinishedFuture<>(e); + } + } + ); + + // Register this future in transaction's async-holder so that subsequent operations + // that call tx.txState().awaitLastFuture() will wait for it. + GridCacheAdapter.FutureHolder holder = tx.txState().lastAsyncFuture(); + + if (holder != null) { + holder.lock(); + + try { + holder.saveFuture(res); + } + finally { + holder.unlock(); + } + } + + return res; + } + /** {@inheritDoc} */ @Override public boolean isLockedByThread(K key) { A.notNull(key, "key"); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 82ac5cf606e9c..c2fa3ee509d0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -1348,6 +1348,58 @@ public IgniteInternalCache delegate() { } } + /** {@inheritDoc} */ + @Override public boolean lockTxEntry(CacheEntry entry, long waitTimeout) throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.lockTxEntry(entry, waitTimeout); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture lockTxEntryAsync(CacheEntry entry, long waitTimeout) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.lockTxEntryAsync(entry, waitTimeout); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public boolean lockTxEntries(Collection> entries, long waitTimeout) + throws IgniteCheckedException { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.lockTxEntries(entries, waitTimeout); + } + finally { + gate.leave(prev); + } + } + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture lockTxEntriesAsync( + Collection> entries, + long waitTimeout + ) { + CacheOperationContext prev = gate.enter(opCtx); + + try { + return delegate.lockTxEntriesAsync(entries, waitTimeout); + } + finally { + gate.leave(prev); + } + } + /** {@inheritDoc} */ @Override public boolean isLockedByThread(K key) { CacheOperationContext prev = gate.enter(opCtx); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 678a8227f41d5..3f3da559712e0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1346,6 +1346,60 @@ public boolean lock(K key, long timeout) */ public boolean isLocked(K key); + /** + * Acquires transactional lock for a cached object represented by the given entry only if a current entry version + * is equal to the entry version. This method works only in {@link TransactionConcurrency#PESSIMISTIC} transaction. + * + * @param entry Entry whose key, value and version should be used. + * @param waitTimeout Timeout in milliseconds to wait for lock to be acquired + * ({@code '0'} for no expiration), {@code -1} for immediate failure if + * lock cannot be acquired immediately). + * @return {@code True} if lock was acquired with the same entry version. + * @throws IgniteCheckedException If lock acquisition resulted in an error. + */ + public boolean lockTxEntry(CacheEntry entry, long waitTimeout) throws IgniteCheckedException; + + /** + * Acquires transactional locks for cached objects represented by the given entries only if current entry versions + * are equal to entry versions. This method works only in {@link TransactionConcurrency#PESSIMISTIC} transaction. + * + * @param entries Entries whose keys, values and versions should be used. + * @param waitTimeout Timeout in milliseconds to wait for locks to be acquired + * ({@code '0'} for no expiration), {@code -1} for immediate failure if + * locks cannot be acquired immediately). + * @return {@code True} if all locks were acquired with the same entry versions. + * @throws IgniteCheckedException If lock acquisition resulted in an error. + */ + public boolean lockTxEntries(Collection> entries, long waitTimeout) throws IgniteCheckedException; + + /** + * Asynchronously transactional lock for a cached object represented by the given entry only if a current entry + * version is equal to the entry version. This method works only in + * {@link TransactionConcurrency#PESSIMISTIC} transaction. + * + * @param entry Entry whose key, value and version should be used. + * @param waitTimeout Timeout in milliseconds to wait for lock to be acquired + * ({@code '0'} for no expiration), {@code -1} for immediate failure if + * lock cannot be acquired immediately). + * @return {@code True} if lock was acquired with the same entry version. + * @throws IgniteCheckedException If lock acquisition resulted in an error. + */ + public IgniteInternalFuture lockTxEntryAsync(CacheEntry entry, long waitTimeout); + + /** + * Asynchronously acquires transactional locks for cached objects represented by the given entries only if current + * entry versions are equal to entry versions. This method works only in + * {@link TransactionConcurrency#PESSIMISTIC} transaction. + * + * @param entries Entries whose keys, values and versions should be used. + * @param waitTimeout Timeout in milliseconds to wait for locks to be acquired + * ({@code '0'} for no expiration), {@code -1} for immediate failure if + * locks cannot be acquired immediately). + * @return {@code True} if all locks were acquired with the same entry versions. + * @throws IgniteCheckedException If lock acquisition resulted in an error. + */ + public IgniteInternalFuture lockTxEntriesAsync(Collection> entries, long waitTimeout); + /** * Checks if current thread owns a lock on this key. *

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java index 8d330906dd4f2..7586312d1bd06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java @@ -102,6 +102,7 @@ protected GridDistributedCacheAdapter(GridCacheContext ctx, GridCacheConcu @Override public IgniteInternalFuture txLockAsync( Collection keys, long timeout, + long waitTimeout, IgniteTxLocalEx tx, boolean isRead, boolean retval, @@ -112,7 +113,7 @@ protected GridDistributedCacheAdapter(GridCacheContext ctx, GridCacheConcu ) { assert tx != null; - return lockAllAsync(keys, timeout, tx, isInvalidate, isRead, retval, isolation, createTtl, accessTtl); + return lockAllAsync(keys, timeout, waitTimeout, tx, isInvalidate, isRead, retval, isolation, createTtl, accessTtl); } /** {@inheritDoc} */ @@ -121,6 +122,7 @@ protected GridDistributedCacheAdapter(GridCacheContext ctx, GridCacheConcu // Return value flag is true because we choose to bring values for explicit locks. return lockAllAsync(ctx.cacheKeysView(keys), + timeout, timeout, tx, false, @@ -133,7 +135,8 @@ protected GridDistributedCacheAdapter(GridCacheContext ctx, GridCacheConcu /** * @param keys Keys to lock. - * @param timeout Timeout. + * @param timeout Transaction timeout. + * @param waitTimeout Lock wait timeout. * @param tx Transaction * @param isInvalidate Invalidation flag. * @param isRead Indicates whether value is read or written. @@ -145,6 +148,7 @@ protected GridDistributedCacheAdapter(GridCacheContext ctx, GridCacheConcu */ protected abstract IgniteInternalFuture lockAllAsync(Collection keys, long timeout, + long waitTimeout, @Nullable IgniteTxLocalEx tx, boolean isInvalidate, boolean isRead, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index fef43785bef3a..bf11ce5854421 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -154,9 +154,12 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture obtainLockAsync( ); } + /** + * @param waitTimeout Lock wait timeout. + * @param timeout Transaction timeout. + * @return {@code True} if separate lock wait timeout expires before transaction timeout. + */ + private static boolean waitTimeoutExpiresFirst(long waitTimeout, long timeout) { + return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + } + /** {@inheritDoc} */ @Override public boolean localFinish(boolean commit, boolean clearThreadMap) throws IgniteCheckedException { if (log.isDebugEnabled()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 0bc9007092581..02880fa96b865 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -815,6 +815,7 @@ private IgniteInternalFuture asyncOp(final CO> op /** {@inheritDoc} */ @Override protected IgniteInternalFuture lockAllAsync(Collection keys, long timeout, + long waitTimeout, @Nullable IgniteTxLocalEx tx, boolean isInvalidate, boolean isRead, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index af7f997899e40..208b804312578 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -639,6 +639,7 @@ else if (!skipVals && ctx.statisticsEnabled()) @Override public IgniteInternalFuture lockAllAsync( Collection keys, long timeout, + long waitTimeout, @Nullable IgniteTxLocalEx tx, boolean isInvalidate, boolean isRead, @@ -659,6 +660,7 @@ else if (!skipVals && ctx.statisticsEnabled()) isRead, retval, timeout, + waitTimeout, createTtl, accessTtl, opCtx != null && opCtx.skipStore(), @@ -902,6 +904,7 @@ public void removeLocks(long threadId, GridCacheVersion ver, Collection lockAllAsync( final boolean txRead, final boolean retval, final long timeout, + final long waitTimeout, final long createTtl, final long accessTtl, final boolean skipStore, @@ -945,6 +949,7 @@ IgniteInternalFuture lockAllAsync( txRead, retval, timeout, + waitTimeout, createTtl, accessTtl, skipStore, @@ -968,6 +973,7 @@ IgniteInternalFuture lockAllAsync( txRead, retval, timeout, + waitTimeout, createTtl, accessTtl, skipStore, @@ -990,6 +996,7 @@ IgniteInternalFuture lockAllAsync( * @param txRead Tx read. * @param retval Return value flag. * @param timeout Lock timeout. + * @param waitTimeout Lock wait timeout. * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. @@ -1007,6 +1014,7 @@ private IgniteInternalFuture lockAllAsync0( final boolean txRead, boolean retval, final long timeout, + long waitTimeout, final long createTtl, final long accessTtl, boolean skipStore, @@ -1024,6 +1032,7 @@ private IgniteInternalFuture lockAllAsync0( txRead, retval, timeout, + waitTimeout == 0 ? timeout : waitTimeout, tx, threadId, createTtl, @@ -1077,7 +1086,7 @@ private IgniteInternalFuture lockAllAsync0( @Override public Exception apply(Boolean b, Exception e) { if (e != null) e = U.unwrap(e); - else if (!b) + else if (!b && !waitTimeoutExpiresFirst(waitTimeout, timeout)) e = new GridCacheLockTimeoutException(ver); return e; @@ -1101,7 +1110,8 @@ else if (!b) skipStore, skipReadThrough, keepBinaryInInterceptor, - keepBinary); + keepBinary, + waitTimeout); return new GridDhtEmbeddedFuture<>( new C2() { @@ -1109,8 +1119,8 @@ else if (!b) Exception e) { if (e != null) e = U.unwrap(e); - - assert !tx.empty(); + else if (ret != null && !ret.success()) + e = new GridCacheLockTimeoutException(ver); return e; } @@ -1119,6 +1129,15 @@ else if (!b) } } + /** + * @param waitTimeout Lock wait timeout. + * @param timeout Transaction timeout. + * @return {@code True} if separate lock wait timeout expires before transaction timeout. + */ + private static boolean waitTimeoutExpiresFirst(long waitTimeout, long timeout) { + return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + } + /** * @param nodeId Node ID. * @param res Response. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 1e322d8cb56d6..bb58c7b3a392c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -142,9 +142,12 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF @GridToStringExclude private volatile LockTimeoutObject timeoutObj; - /** Lock timeout. */ + /** Transaction timeout. */ private final long timeout; + /** Lock wait timeout. */ + private final long waitTimeout; + /** Transaction. */ @GridToStringExclude private final GridNearTxLocal tx; @@ -197,7 +200,8 @@ public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityF * @param tx Transaction. * @param read Read flag. * @param retval Flag to return value or not. - * @param timeout Lock acquisition timeout. + * @param timeout Transaction timeout. + * @param waitTimeout Lock wait timeout. * @param createTtl TTL for create operation. * @param accessTtl TTL for read operation. * @param skipStore Skip store flag. @@ -211,6 +215,7 @@ public GridDhtColocatedLockFuture( boolean read, boolean retval, long timeout, + long waitTimeout, long createTtl, long accessTtl, boolean skipStore, @@ -229,6 +234,7 @@ public GridDhtColocatedLockFuture( this.read = read; this.retval = retval; this.timeout = timeout; + this.waitTimeout = waitTimeout; this.createTtl = createTtl; this.accessTtl = accessTtl; this.skipStore = skipStore; @@ -626,6 +632,9 @@ private synchronized void onError(Throwable t) { if (err != null) success = false; + if (!success && err == null && waitTimeoutExpiresFirst()) + return onComplete(false, true, false); + return onComplete(success, true); } } @@ -638,6 +647,18 @@ private synchronized void onError(Throwable t) { * @return {@code True} if complete by this operation. */ private boolean onComplete(boolean success, boolean distribute) { + return onComplete(success, distribute, !success); + } + + /** + * Completeness callback. + * + * @param success {@code True} if lock was acquired. + * @param distribute {@code True} if need to distribute lock removal in case of failure. + * @param rollback {@code True} if should rollback tx on failure. + * @return {@code True} if complete by this operation. + */ + private boolean onComplete(boolean success, boolean distribute, boolean rollback) { if (log.isDebugEnabled()) { log.debug("Received onComplete(..) callback [success=" + success + ", distribute=" + distribute + ", fut=" + this + ']'); @@ -646,13 +667,13 @@ private boolean onComplete(boolean success, boolean distribute) { if (!DONE_UPD.compareAndSet(this, 0, 1)) return false; - if (!success) + if (!success && rollback) undoLocks(distribute, true); if (tx != null) { cctx.tm().txContext(tx); - if (success) + if (!rollback) tx.clearLockFuture(this); } @@ -768,7 +789,7 @@ void map() { if (isDone()) // Possible due to async rollback. return; - if (timeout > 0) { + if (lockTimeout() > 0) { timeoutObj = new LockTimeoutObject(); cctx.time().addTimeoutObject(timeoutObj); @@ -1083,6 +1104,7 @@ private synchronized void map0( isolation(), isInvalidate(), timeout, + waitTimeout, mappedKeys.size(), inTx() ? tx.size() : mappedKeys.size(), inTx() && tx.syncMode() == FULL_SYNC, @@ -1258,6 +1280,7 @@ private void lockLocally( read, retval, timeout, + waitTimeout, createTtl, accessTtl, skipStore, @@ -1477,6 +1500,20 @@ private boolean errorOrTimeoutOnTopologyVersion(IgniteCheckedException e, boolea return false; } + /** + * @return Timeout value for this lock future. + */ + private long lockTimeout() { + return waitTimeoutExpiresFirst() ? waitTimeout : timeout; + } + + /** + * @return {@code True} if separate lock wait timeout expires before transaction timeout. + */ + private boolean waitTimeoutExpiresFirst() { + return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + } + /** * Lock request timeout object. */ @@ -1485,7 +1522,7 @@ private class LockTimeoutObject extends GridTimeoutObjectAdapter { * Default constructor. */ LockTimeoutObject() { - super(timeout); + super(lockTimeout()); } /** Requested keys. */ @@ -1496,6 +1533,20 @@ private class LockTimeoutObject extends GridTimeoutObjectAdapter { if (log.isDebugEnabled()) log.debug("Timed out waiting for lock response: " + this); + if (waitTimeoutExpiresFirst()) { + synchronized (GridDhtColocatedLockFuture.this) { + requestedKeys = requestedKeys0(); + + clear(); // Stop response processing. + } + + synchronized (this) { + onComplete(false, true, false); + } + + return; + } + if (inTx()) { if (cctx.tm().deadlockDetectionEnabled()) { synchronized (GridDhtColocatedLockFuture.this) { @@ -1660,6 +1711,12 @@ void onResult(GridNearLockResponse res) { return; } + if (!res.lockAcquired()) { + onDone(false); + + return; + } + if (res.clientRemapVersion() != null) { assert cctx.kernalContext().clientNode(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 0a9b5a7c658b6..366f937e3801f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -594,6 +594,7 @@ private void processNearAtomicUpdateResponse( /** {@inheritDoc} */ @Override protected IgniteInternalFuture lockAllAsync(Collection keys, long timeout, + long waitTimeout, @Nullable IgniteTxLocalEx tx, boolean isInvalidate, boolean isRead, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index a5db4ea3e61de..bd57c530edd65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -124,13 +124,19 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture fut = cacheCtx.cache().txLockAsync(enlisted, + timeout, timeout, this, /*read*/entryProc != null, // Needed to force load from store. @@ -826,6 +827,7 @@ private IgniteInternalFuture putAllAsync0( log.debug("Before acquiring transaction lock for put on keys: " + enlisted); IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(enlisted, + timeout, timeout, this, /*read*/invokeVals != null, // Needed to force load from store. @@ -1736,6 +1738,7 @@ private IgniteInternalFuture removeAllAsync0( log.debug("Before acquiring transaction lock for remove on keys: " + enlisted); IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(enlisted, + timeout, timeout, this, false, @@ -1930,6 +1933,7 @@ public IgniteInternalFuture> getAllAsync( return new GridFinishedFuture<>(timeoutException()); IgniteInternalFuture fut = cacheCtx.cache().txLockAsync(lockKeys, + timeout, timeout, this, true, @@ -3233,6 +3237,23 @@ private void removeEntryMappings(IgniteTxEntry entry) { removeEntryFromMappings(entry, nearMap); } + /** + * Removes transaction entries and releases their acquired transactional locks. + * + * @param entries Entries to remove and unlock. + */ + public void removeAndUnlockTxEntries(Collection entries) { + if (F.isEmpty(entries)) + return; + + for (IgniteTxEntry entry : entries) { + txState().removeEntry(entry.txKey()); + removeEntryMappings(entry); + } + + unlockTxEntries(entries); + } + /** * @param entry Entry. */ @@ -3294,7 +3315,16 @@ private void unlockTxEntries(Collection entries) { else if (cacheCtx.cache().isColocated()) { UUID nodeId = entry.nodeId(); - if (nodeId == null || cctx.localNodeId().equals(nodeId)) + if (nodeId == null) { + ClusterNode primary = cacheCtx.affinity().primaryByKey(entry.key(), topologyVersion()); + + if (primary == null) + continue; + + nodeId = primary.id(); + } + + if (cctx.localNodeId().equals(nodeId)) colocatedLocKeys.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(entry.key()); else { colocatedRmtKeys @@ -4223,6 +4253,7 @@ public IgniteInternalFuture rollbackAsyncLocal() { * @param skipReadThrough Skip read-through cache store flag. * @param keepBinaryInInterceptor Handle binary in interceptor operation flag. * @param keepBinary Keep binary flag. + * @param waitTimeout waitTimeout Lock wait timeout. * @return Future with respond. */ public IgniteInternalFuture lockAllAsync(GridCacheContext cacheCtx, @@ -4234,7 +4265,8 @@ public IgniteInternalFuture lockAllAsync(GridCacheContext c boolean skipStore, boolean skipReadThrough, boolean keepBinaryInInterceptor, - boolean keepBinary) { + boolean keepBinary, + long waitTimeout) { assert pessimistic(); try { @@ -4261,6 +4293,7 @@ public IgniteInternalFuture lockAllAsync(GridCacheContext c IgniteInternalFuture fut = cacheCtx.colocated().lockAllAsyncInternal(keys, timeout, + waitTimeout, this, isInvalidate(), read, @@ -4275,10 +4308,20 @@ public IgniteInternalFuture lockAllAsync(GridCacheContext c return new GridEmbeddedFuture<>( fut, - new PLC1(ret, false) { - @Override protected GridCacheReturn postLock(GridCacheReturn ret) { - if (log.isDebugEnabled()) - log.debug("Acquired transaction lock on keys: " + keys); + new PLC1(ret, false, !waitTimeoutExpiresFirst(waitTimeout, timeout)) { + @Override protected GridCacheReturn postLock(GridCacheReturn ret) throws IgniteCheckedException { + assert fut.error() == null; + + boolean success = Boolean.TRUE.equals(fut.get()); + + ret.success(success); + + if (log.isDebugEnabled()) { + if (ret.success()) + log.debug("Successfully acquired transaction lock on keys: " + keys); + else + log.debug("Failed to acquire transaction lock on keys: " + keys); + } return ret; } @@ -4286,6 +4329,15 @@ public IgniteInternalFuture lockAllAsync(GridCacheContext c ); } + /** + * @param waitTimeout Lock wait timeout. + * @param timeout Transaction timeout. + * @return {@code True} if separate lock wait timeout expires before transaction timeout. + */ + private static boolean waitTimeoutExpiresFirst(long waitTimeout, long timeout) { + return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + } + /** {@inheritDoc} */ @Override protected GridCacheEntryEx entryEx( GridCacheContext cacheCtx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index cbb73652e033c..4ea1406a53bb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1089,6 +1089,36 @@ protected final void postLockWrite( long accessTtl, CacheEntryPredicate[] filter, boolean computeInvoke + ) throws IgniteCheckedException { + postLockWrite(cacheCtx, keys, ret, rmv, retval, read, accessTtl, filter, computeInvoke, false); + } + + /** + * Post lock processing for put or remove. + * + * @param cacheCtx Context. + * @param keys Keys. + * @param ret Return value. + * @param rmv {@code True} if remove. + * @param retval Flag to return value or not. + * @param read {@code True} if read. + * @param accessTtl TTL for read operation. + * @param filter Filter to check entries. + * @param computeInvoke If {@code true} computes return value for invoke operation. + * @param skipIfLockLost Return unsuccessful result if a separate lock wait timeout has removed the lock. + * @throws IgniteCheckedException If error. + */ + protected final void postLockWrite( + GridCacheContext cacheCtx, + Iterable keys, + GridCacheReturn ret, + boolean rmv, + boolean retval, + boolean read, + long accessTtl, + CacheEntryPredicate[] filter, + boolean computeInvoke, + boolean skipIfLockLost ) throws IgniteCheckedException { for (KeyCacheObject k : keys) { IgniteTxEntry txEntry = entry(cacheCtx.txKey(k)); @@ -1101,7 +1131,15 @@ protected final void postLockWrite( GridCacheEntryEx cached = txEntry.cached(); try { - assert cached.detached() || cached.lockedLocally(xidVersion()) || isRollbackOnly() : + boolean ownsLock = cached.detached() || cached.lockedLocally(xidVersion()); + + if (!ownsLock && skipIfLockLost) { + ret.success(false); + + return; + } + + assert ownsLock || isRollbackOnly() : "Transaction lock is not acquired [entry=" + cached + ", tx=" + this + ", nodeId=" + cctx.localNodeId() + ", threadId=" + threadId + ']'; @@ -1602,9 +1640,10 @@ protected PLC1(T arg) { /** * @param arg Argument. * @param commit Commit flag. + * @param rollback Rollback flag. */ - protected PLC1(T arg, boolean commit) { - super(arg, commit); + protected PLC1(T arg, boolean commit, boolean rollback) { + super(arg, commit, rollback); } } @@ -1635,13 +1674,16 @@ protected abstract class PostLockClosure1 implements IgniteBiClosure entries, GridCacheVe * @return {@code True} if transaction read entries should be unlocked. */ private boolean unlockReadEntries(IgniteInternalTx tx) { - if (tx.pessimistic()) - return !tx.readCommitted(); - else - return tx.serializable(); + return tx.pessimistic() || tx.serializable(); } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryTransactionalLockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryTransactionalLockTest.java new file mode 100644 index 0000000000000..f8adf5788a3c8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheVersionedEntryTransactionalLockTest.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionIsolation; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Tests transactional locks acquired only for unchanged cache entry versions. + */ +@RunWith(Parameterized.class) +public class CacheVersionedEntryTransactionalLockTest extends GridCommonAbstractTest { + @ClassRule + public static Timeout globalTimeout = new Timeout(120, TimeUnit.SECONDS); + + /** */ + private static Ignite ignite0; + + /** */ + private static Ignite ignite1; + + /** */ + private static Ignite ignite2; + + /** */ + private static Ignite ignite3; + + /** */ + private static Ignite client; + + /** */ + @Parameterized.Parameter(0) + public boolean useNearCache; + + /** */ + @Parameterized.Parameter(1) + public int backups; + + /** */ + @Parameterized.Parameter(2) + public boolean replicated; + + /** */ + @Parameterized.Parameter(3) + public boolean batch; + + /** + * Returns data for test. + * @return Test parameters. + */ + @Parameterized.Parameters(name = "useNearCache={0}, backups={1}, replicated={2}, batch={3}") + public static Collection testData() { + return List.of(new Object[][] { + {false, 0, false, false}, + {false, 0, false, true}, + {false, 0, true, false}, + {false, 0, true, true}, + {false, 1, false, false}, + {false, 1, false, true}, + {false, 1, true, false}, + {false, 1, true, true}, + {false, 2, false, false}, + {false, 2, false, true}, + {false, 2, true, false}, + {false, 2, true, true}, + {true, 0, false, false}, + {true, 0, false, true}, + {true, 0, true, false}, + {true, 0, true, true}, + {true, 1, false, false}, + {true, 1, false, true}, + {true, 1, true, false}, + {true, 1, true, true}, + {true, 2, false, false}, + {true, 2, false, true}, + {true, 2, true, false}, + {true, 2, true, true} + }); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + ignite0 = startGrid(0); + ignite1 = startGrid(1); + ignite2 = startGrid(2); + ignite3 = startGrid(3); + client = startClientGrid(); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + ignite0.destroyCache(DEFAULT_CACHE_NAME); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setConsistentId(igniteInstanceName); + } + + /** + * Creates transactional cache. + * + * @param ignite Node. + * @return Transactional cache. + */ + private IgniteCache transactionalCache(Ignite ignite) { + CacheConfiguration ccfg = + new CacheConfiguration<>(DEFAULT_CACHE_NAME) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setNearConfiguration(useNearCache ? new NearCacheConfiguration<>() : null) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setCacheMode(replicated ? CacheMode.REPLICATED : CacheMode.PARTITIONED) + .setBackups(backups); + + return (IgniteCache)ignite.createCache(ccfg); + } + + @Test + public void testLockBeforePutFromClientTest() throws Exception { + IgniteCache cache = transactionalCache(client); + + int key = 42; + + checkLockBeforePut(cache, key, READ_COMMITTED); + } + + @Test + public void testLockBeforePutLocalKeyTest() throws Exception { + IgniteCache cache = transactionalCache(ignite0); + + int key = primaryKey(cache); + + checkLockBeforePut(cache, key, READ_COMMITTED); + } + + @Test + public void testLockBeforePutRemoteKeyTest() throws Exception { + IgniteCache cache = transactionalCache(ignite0); + + int key = primaryKey(ignite1.cache(DEFAULT_CACHE_NAME)); + + checkLockBeforePut(cache, key, REPEATABLE_READ); + } + + @Test + public void testLockBeforePutLocalKeyRepeatableReadTest() throws Exception { + IgniteCache cache = transactionalCache(ignite0); + + int key = primaryKey(cache); + + checkLockBeforePut(cache, key, REPEATABLE_READ); + } + + @Test + public void testLockBeforePutRemoteKeyRepeatableReadTest() throws Exception { + IgniteCache cache = transactionalCache(ignite0); + + int key = primaryKey(ignite1.cache(DEFAULT_CACHE_NAME)); + + checkLockBeforePut(cache, key, REPEATABLE_READ); + } + + @Test + public void testEntryVersionDoesNotChangeWhenEntryIsNotUpdated() throws Exception { + IgniteCache cache = transactionalCache(ignite0); + + int localKey = primaryKey(cache); + int remoteKey = primaryKey(ignite1.cache(DEFAULT_CACHE_NAME)); + + cache.put(localKey, 0); + cache.put(remoteKey, 0); + + CacheEntry localEntry = cache.getEntry(localKey); + CacheEntry remoteEntry = cache.getEntry(remoteKey); + + try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + if (batch) { + assertTrue(acquireLockForEntries(cache, List.of(localEntry, remoteEntry), 0)); + } else { + assertTrue(acquireLockForEntry(cache, localEntry, 0)); + assertTrue(acquireLockForEntry(cache, remoteEntry, 0)); + } + + tx.commit(); + } + + assertEquals(localEntry.version(), cache.getEntry(localKey).version()); + assertEquals(remoteEntry.version(), cache.getEntry(remoteKey).version()); + + try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + if (batch) { + assertTrue(acquireLockForEntries(cache, List.of(localEntry, remoteEntry), 0)); + } else { + assertTrue(acquireLockForEntry(cache, localEntry, 0)); + assertTrue(acquireLockForEntry(cache, remoteEntry, 0)); + } + + tx.rollback(); + } + + assertEquals(localEntry.version(), cache.getEntry(localKey).version()); + assertEquals(remoteEntry.version(), cache.getEntry(remoteKey).version()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testVersionedEntryLockReturnsFalseWhenEntryIsLockedByAnotherTransactionNoWait() throws Exception { + checkReturningWhenCanNotWaitForLock(-1, false, false); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testVersionedEntryLockReturnsFalseWhenEntryIsLockedByAnotherTransaction() throws Exception { + checkReturningWhenCanNotWaitForLock(200, false, false); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testVersionedEntryLockReturnsFalseWhenLocalEntryIsLockedByAnotherTransaction() throws Exception { + checkReturningWhenCanNotWaitForLock(200, false, true); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testVersionedEntryLockReturnsFalseWhenEntryIsLockedByAnotherTransactionAndCommit() throws Exception { + checkReturningWhenCanNotWaitForLock(200, true, false); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testVersionedEntryLockReturnsFalseWhenLocalEntryIsLockedByAnotherTransactionAndCommit() throws Exception { + checkReturningWhenCanNotWaitForLock(200, true, true); + } + + /** + * Checks that lock acquisition can be retried in the same transaction after the competing transaction finishes. + * + * @throws Exception If failed. + */ + @Test + public void testVersionedEntryLockCanBeRetriedAfterWaitTimeout() throws Exception { + Ignite holder = ignite0; + Ignite initiator = ignite1; + + IgniteCache holderCache = transactionalCache(holder); + IgniteCache cache = initiator.cache(DEFAULT_CACHE_NAME); + + int key = primaryKey(holderCache); + + holderCache.put(key, 0); + + CacheEntry entry = cache.getEntry(key); + + try (Transaction holderTx = holder.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + holderCache.put(key, 42); + + try (Transaction tx = initiator.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + if (batch) + assertFalse(acquireLockForEntries(cache, List.of(entry), 200)); + else + assertFalse(acquireLockForEntry(cache, entry, 200)); + + holderTx.rollback(); + + if (batch) + assertTrue(acquireLockForEntries(cache, List.of(entry), 5_000)); + else + assertTrue(acquireLockForEntry(cache, entry, 5_000)); + + cache.put(key, 1); + + tx.commit(); + } + } + + assertEquals(1, cache.get(key).intValue()); + } + + /** + * Checks that the lock entry method returns {@code false} when can not wait for lock. + * + * @param timeout Timeout. + * @throws IgniteCheckedException If failed. + */ + private void checkReturningWhenCanNotWaitForLock(long timeout, boolean commit, boolean locForLocal) throws IgniteCheckedException { + Ignite holder = ignite0; + Ignite initiator = ignite1; + + IgniteCache holderCache = transactionalCache(holder); + IgniteCache cache = initiator.cache(DEFAULT_CACHE_NAME); + + List firstKeySet = locForLocal ? primaryKeys(cache, 2) : primaryKeys(holderCache, 2); + Integer concurentLockedKey = firstKeySet.get(0); + Integer txKey1 = firstKeySet.get(1); + Integer txKey2 = locForLocal ? primaryKey(holderCache) : primaryKey(cache); + + holderCache.put(concurentLockedKey, 0); + holderCache.put(txKey1, 0); + holderCache.put(txKey2, 0); + + CacheEntry entry = cache.getEntry(concurentLockedKey); + + try (Transaction holderTx = holder.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + holderCache.put(concurentLockedKey, 42); + + try (Transaction tx = initiator.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + long startWaiting = System.currentTimeMillis(); + + if (batch) { + assertFalse(acquireLockForEntries(cache, List.of( + cache.getEntry(txKey1), + entry, + cache.getEntry(txKey2) + ), timeout)); + + assertTrue(acquireLockForEntries(cache, List.of( + cache.getEntry(txKey1), + cache.getEntry(txKey2) + ), timeout)); + } + else { + assertTrue(acquireLockForEntry(cache, cache.getEntry(txKey1), timeout)); + assertTrue(acquireLockForEntry(cache, cache.getEntry(txKey2), timeout)); + + assertFalse(acquireLockForEntry(cache, entry, timeout)); + } + + long waitTime = System.currentTimeMillis() - startWaiting; + + assertTrue("Waited for lock for " + waitTime + " ms but timeout is " + timeout + " ms.", + waitTime > timeout - 50 && waitTime < timeout + 300); + + cache.put(txKey1, 42); + cache.put(txKey2, 42); + + if (commit) + tx.commit(); + } + + holderTx.rollback(); + } + + assertEquals(0, holderCache.get(concurentLockedKey).intValue()); + assertEquals(0, cache.get(concurentLockedKey).intValue()); + + if (commit) { + assertEquals(42, holderCache.get(txKey1).intValue()); + assertEquals(42, cache.get(txKey2).intValue()); + } else { + assertEquals(0, holderCache.get(txKey1).intValue()); + assertEquals(0, cache.get(txKey2).intValue()); + } + } + + private void checkLockBeforePut(IgniteCache cache, int key, TransactionIsolation txIsolation) throws IgniteCheckedException { + cache.put(key, 0); + + CacheEntry entry = cache.getEntry(key); + + assertNotNull(entry); + assertNotNull(entry.version()); + + Ignite ign = cache.unwrap(Ignite.class); + + try (Transaction tx = ign.transactions().txStart(PESSIMISTIC, txIsolation)) { + if (batch) + assertTrue(acquireLockForEntries(cache, List.of(entry), 0)); + else + assertTrue(acquireLockForEntry(cache, entry, 0)); + + assertEquals(0, cache.get(key).intValue()); + + cache.put(key, 1); + + assertEquals(1, cache.get(key).intValue()); + + tx.commit(); + } + + assertEquals(1, cache.get(key).intValue()); + assertTrue(cache.getEntry(key).version().compareTo(entry.version()) > 0); + } + + @SuppressWarnings("unchecked") + private static boolean acquireLockForEntry( + IgniteCache cache, + CacheEntry entry, + long timeout + ) throws IgniteCheckedException { + return cache.unwrap(IgniteCacheProxy.class).internalProxy().lockTxEntry(entry, timeout); + } + + @SuppressWarnings("unchecked") + private static boolean acquireLockForEntries( + IgniteCache cache, + List> entries, + long timeout + ) throws IgniteCheckedException { + return cache.unwrap(IgniteCacheProxy.class).internalProxy().lockTxEntries(entries, timeout); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java new file mode 100644 index 0000000000000..0e12cea317ff6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionTimeoutException; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Tests transactional entry locking through internal cache API. + */ +@RunWith(Parameterized.class) +public class LockTxEntryOneNodeTest extends GridCommonAbstractTest { + @ClassRule + public static Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS); + + /** */ + private static final int KEY = 1; + + /** */ + private static final int INIT_VAL = 1; + + /** */ + private static Ignite ignite; + + /** */ + @Parameterized.Parameter(0) + public boolean commit; + + /** */ + @Parameterized.Parameter(1) + public boolean useNearCache; + + /** */ + @Parameterized.Parameter(2) + public CacheMode cacheMode; + + /** Cache. */ + private IgniteCache cache; + + /** + * Returns data for test. + * @return Test parameters. + */ + @Parameterized.Parameters(name = "commit={0}, useNearCache={1}, cacheMode={2}") + public static Collection testData() { + return List.of(new Object[][] { + {false, false, PARTITIONED}, + {false, false, REPLICATED}, + {true, false, PARTITIONED}, + {true, false, REPLICATED}, + {false, true, PARTITIONED}, + {false, true, REPLICATED}, + {false, false, PARTITIONED}, + {false, false, REPLICATED}, + }); + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + ignite = startGrid(0); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cache = ignite.createCache(new CacheConfiguration(DEFAULT_CACHE_NAME) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setNearConfiguration(useNearCache ? new NearCacheConfiguration<>() : null) + .setCacheMode(cacheMode)); + + cache.put(KEY, INIT_VAL); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + ignite.destroyCache(DEFAULT_CACHE_NAME); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testLockTxEntryInPessimisticTransaction() throws Exception { + CacheEntry entry = cache.getEntry(KEY); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + assertTrue(acquireLockForEntry(entry, 0)); + + assertEquals(entry.getValue(), cache.get(KEY)); + + checkInaccessInOtherTx(cache); + + if (commit) + tx.commit(); + } + + assertEquals(INIT_VAL, cache.get(KEY).intValue()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testLockAlreadyLockedTxEntry() throws Exception { + CacheEntry entry = cache.getEntry(KEY); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + cache.put(KEY, 2); + + assertTrue(acquireLockForEntry(entry, 0)); + + assertEquals(2, cache.get(KEY).intValue()); + + checkInaccessInOtherTx(cache); + + if (commit) + tx.commit(); + } + + assertEquals(commit ? 2 : INIT_VAL, cache.get(KEY).intValue()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testFailToLockTxEntryInPessimisticTransaction() throws Exception { + CacheEntry entry = cache.getEntry(KEY); + + cache.put(KEY, 2); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + assertFalse(acquireLockForEntry(entry, 0)); + + assertEquals(2, cache.get(KEY).intValue()); + + checkAccessInOtherTx(cache); + + if (commit) + tx.commit(); + } + + assertEquals(2, cache.get(KEY).intValue()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testLockTxEntryReturnsFalseOnTimeoutWhenLockedInOtherTransaction() throws Exception { + CacheEntry entry = cache.getEntry(KEY); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + assertTrue(acquireLockForEntry(entry, 0)); + + IgniteInternalFuture lockFut = GridTestUtils.runAsync(new Callable() { + @Override public Boolean call() throws Exception { + boolean locked = true; + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + locked = acquireLockForEntry(entry, 100); + + assertFalse(locked); + + cache.put(2, 2); + + if (commit) + tx.commit(); + } + + return locked; + } + }); + + assertFalse(lockFut.get(10_000)); + + if (commit) + tx.commit(); + } + + assertEquals(INIT_VAL, cache.get(KEY).intValue()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testUpdateAfterLock() throws Exception { + CacheEntry entry = cache.getEntry(KEY); + + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { + assertTrue(acquireLockForEntry(entry, 0)); + + checkInaccessInOtherTx(cache); + + assertEquals(entry.getValue(), cache.get(KEY)); + + cache.put(KEY, 2); + + assertEquals(2, cache.get(KEY).intValue()); + + if (commit) + tx.commit(); + } + + assertEquals(commit ? 2 : INIT_VAL, cache.get(KEY).intValue()); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testLockTxEntryFailsWithoutTransaction() throws Exception { + CacheEntry entry = cache.getEntry(KEY); + + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + acquireLockForEntry(entry, 0); + + return null; + } + }, IgniteCheckedException.class, "without transaction"); + } + + /** + * @throws Exception If failed. + */ + @Test + public void testLockTxEntryAsyncFailsInOptimisticTransaction() throws Exception { + CacheEntry entry = cache.getEntry(KEY); + + try (Transaction tx = ignite.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + GridTestUtils.assertThrows(log, new Callable() { + @Override public Object call() throws Exception { + return acquireLockForEntry(entry, 0); + } + }, IgniteCheckedException.class, "optimistic transaction"); + } + } + + private void checkAccessInOtherTx(IgniteCache cache) throws IgniteCheckedException { + IgniteInternalFuture accessFut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 100, 1)) { + cache.get(KEY); + + tx.commit(); + } + + return null; + } + }); + + accessFut.get(10_000); + } + + private void checkInaccessInOtherTx(IgniteCache cache) throws IgniteCheckedException { + IgniteInternalFuture accessFut = GridTestUtils.runAsync(new Callable() { + @Override public Void call() { + try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 100, 1)) { + cache.get(KEY); + + tx.commit(); + } + + return null; + } + }); + + GridTestUtils.assertThrowsWithCause(new Callable() { + @Override public Object call() throws Exception { + return accessFut.get(); + } + }, TransactionTimeoutException.class); + } + + @SuppressWarnings("unchecked") + private boolean acquireLockForEntry( + CacheEntry entry, + long timeout + ) throws IgniteCheckedException { + return cache.unwrap(IgniteCacheProxy.class).internalProxy().lockTxEntry(entry, timeout); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index bce6d557f1bd6..eabfb19dc8e6f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartAtomicTest; import org.apache.ignite.internal.processors.cache.CacheStoreUsageMultinodeStaticStartTxTest; import org.apache.ignite.internal.processors.cache.CacheTxNotAllowReadFromBackupTest; +import org.apache.ignite.internal.processors.cache.CacheVersionedEntryTransactionalLockTest; import org.apache.ignite.internal.processors.cache.CrossCacheLockTest; import org.apache.ignite.internal.processors.cache.GridCacheMarshallingNodeJoinSelfTest; import org.apache.ignite.internal.processors.cache.GridCacheMultinodeUpdateAtomicNearEnabledSelfTest; @@ -95,6 +96,7 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCacheTypesTest; import org.apache.ignite.internal.processors.cache.IgniteStartCacheInTransactionSelfTest; import org.apache.ignite.internal.processors.cache.IgniteSystemCacheOnClientTest; +import org.apache.ignite.internal.processors.cache.LockTxEntryOneNodeTest; import org.apache.ignite.internal.processors.cache.MarshallerCacheJobRunNodeRestartTest; import org.apache.ignite.internal.processors.cache.distributed.CacheAffinityEarlyTest; import org.apache.ignite.internal.processors.cache.distributed.CacheDirectoryNameTest; @@ -304,6 +306,8 @@ public static List> suite(Collection ignoredTests) { GridTestUtils.addTestIfNeeded(suite, CacheVersionedEntryPartitionedTransactionalSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheVersionedEntryReplicatedAtomicSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, CacheVersionedEntryReplicatedTransactionalSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, CacheVersionedEntryTransactionalLockTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, LockTxEntryOneNodeTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheDhtTxPreloadSelfTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, GridCacheNearTxPreloadSelfTest.class, ignoredTests); From ec0e8717e70cafedc8f164bfcf4bff2539f6ad49 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Mon, 22 Jun 2026 19:34:32 +0300 Subject: [PATCH 2/3] Code style fix. --- .../processors/cache/IgniteInternalCache.java | 33 ++++---- .../distributed/dht/GridDhtLockFuture.java | 5 +- .../distributed/near/GridNearTxLocal.java | 2 +- .../transactions/IgniteTxLocalAdapter.java | 2 +- ...heVersionedEntryTransactionalLockTest.java | 81 ++++++++++++++++--- .../cache/LockTxEntryOneNodeTest.java | 27 ++++++- 6 files changed, 114 insertions(+), 36 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java index 3f3da559712e0..6e67c677496b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteInternalCache.java @@ -1347,12 +1347,12 @@ public boolean lock(K key, long timeout) public boolean isLocked(K key); /** - * Acquires transactional lock for a cached object represented by the given entry only if a current entry version - * is equal to the entry version. This method works only in {@link TransactionConcurrency#PESSIMISTIC} transaction. + * Acquires a transactional lock for the cached object represented by the given entry if the current cached version + * matches the entry version. This method works only in a {@link TransactionConcurrency#PESSIMISTIC} transaction. * * @param entry Entry whose key, value and version should be used. * @param waitTimeout Timeout in milliseconds to wait for lock to be acquired - * ({@code '0'} for no expiration), {@code -1} for immediate failure if + * ({@code 0} to use the transaction timeout, {@code -1} for immediate failure if * lock cannot be acquired immediately). * @return {@code True} if lock was acquired with the same entry version. * @throws IgniteCheckedException If lock acquisition resulted in an error. @@ -1360,12 +1360,13 @@ public boolean lock(K key, long timeout) public boolean lockTxEntry(CacheEntry entry, long waitTimeout) throws IgniteCheckedException; /** - * Acquires transactional locks for cached objects represented by the given entries only if current entry versions - * are equal to entry versions. This method works only in {@link TransactionConcurrency#PESSIMISTIC} transaction. + * Acquires transactional locks for the cached objects represented by the given entries if all current cached + * versions match the corresponding entry versions. This method works only in a + * {@link TransactionConcurrency#PESSIMISTIC} transaction. * * @param entries Entries whose keys, values and versions should be used. * @param waitTimeout Timeout in milliseconds to wait for locks to be acquired - * ({@code '0'} for no expiration), {@code -1} for immediate failure if + * ({@code 0} to use the transaction timeout, {@code -1} for immediate failure if * locks cannot be acquired immediately). * @return {@code True} if all locks were acquired with the same entry versions. * @throws IgniteCheckedException If lock acquisition resulted in an error. @@ -1373,30 +1374,30 @@ public boolean lock(K key, long timeout) public boolean lockTxEntries(Collection> entries, long waitTimeout) throws IgniteCheckedException; /** - * Asynchronously transactional lock for a cached object represented by the given entry only if a current entry - * version is equal to the entry version. This method works only in + * Asynchronously acquires a transactional lock for the cached object represented by the given entry if the current + * cached version matches the entry version. This method works only in a * {@link TransactionConcurrency#PESSIMISTIC} transaction. * * @param entry Entry whose key, value and version should be used. * @param waitTimeout Timeout in milliseconds to wait for lock to be acquired - * ({@code '0'} for no expiration), {@code -1} for immediate failure if + * ({@code 0} to use the transaction timeout, {@code -1} for immediate failure if * lock cannot be acquired immediately). - * @return {@code True} if lock was acquired with the same entry version. - * @throws IgniteCheckedException If lock acquisition resulted in an error. + * @return Future that resolves to {@code true} if the lock was acquired and the versions matched, or to + * {@code false} otherwise. */ public IgniteInternalFuture lockTxEntryAsync(CacheEntry entry, long waitTimeout); /** - * Asynchronously acquires transactional locks for cached objects represented by the given entries only if current - * entry versions are equal to entry versions. This method works only in + * Asynchronously acquires transactional locks for the cached objects represented by the given entries if all + * current cached versions match the corresponding entry versions. This method works only in a * {@link TransactionConcurrency#PESSIMISTIC} transaction. * * @param entries Entries whose keys, values and versions should be used. * @param waitTimeout Timeout in milliseconds to wait for locks to be acquired - * ({@code '0'} for no expiration), {@code -1} for immediate failure if + * ({@code 0} to use the transaction timeout, {@code -1} for immediate failure if * locks cannot be acquired immediately). - * @return {@code True} if all locks were acquired with the same entry versions. - * @throws IgniteCheckedException If lock acquisition resulted in an error. + * @return Future that resolves to {@code true} if all locks were acquired and all versions matched, or to + * {@code false} otherwise. */ public IgniteInternalFuture lockTxEntriesAsync(Collection> entries, long waitTimeout); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index bf11ce5854421..177c3b24414db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -452,11 +452,10 @@ private boolean isInvalidate() { if (log.isDebugEnabled()) log.debug("Failed to acquire lock with negative timeout: " + entry); - if (waitTimeoutExpiresFirst()) { + if (waitTimeoutExpiresFirst()) onComplete(false, false, false, false); - } else { + else onFailed(); - } return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 063cbf267f3fb..5f3e1d827b950 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -4253,7 +4253,7 @@ public IgniteInternalFuture rollbackAsyncLocal() { * @param skipReadThrough Skip read-through cache store flag. * @param keepBinaryInInterceptor Handle binary in interceptor operation flag. * @param keepBinary Keep binary flag. - * @param waitTimeout waitTimeout Lock wait timeout. + * @param waitTimeout Lock wait timeout. * @return Future with respond. */ public IgniteInternalFuture lockAllAsync(GridCacheContext cacheCtx, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 4ea1406a53bb3..bbc6e282fd5f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1674,7 +1674,7 @@ protected abstract class PostLockClosure1 implements IgniteBiClosure transactionalCache(Ignite ignite) { return (IgniteCache)ignite.createCache(ccfg); } + /** + * @throws Exception If failed. + */ @Test public void testLockBeforePutFromClientTest() throws Exception { IgniteCache cache = transactionalCache(client); @@ -176,6 +181,9 @@ public void testLockBeforePutFromClientTest() throws Exception { checkLockBeforePut(cache, key, READ_COMMITTED); } + /** + * @throws Exception If failed. + */ @Test public void testLockBeforePutLocalKeyTest() throws Exception { IgniteCache cache = transactionalCache(ignite0); @@ -185,6 +193,9 @@ public void testLockBeforePutLocalKeyTest() throws Exception { checkLockBeforePut(cache, key, READ_COMMITTED); } + /** + * @throws Exception If failed. + */ @Test public void testLockBeforePutRemoteKeyTest() throws Exception { IgniteCache cache = transactionalCache(ignite0); @@ -194,6 +205,9 @@ public void testLockBeforePutRemoteKeyTest() throws Exception { checkLockBeforePut(cache, key, REPEATABLE_READ); } + /** + * @throws Exception If failed. + */ @Test public void testLockBeforePutLocalKeyRepeatableReadTest() throws Exception { IgniteCache cache = transactionalCache(ignite0); @@ -203,6 +217,9 @@ public void testLockBeforePutLocalKeyRepeatableReadTest() throws Exception { checkLockBeforePut(cache, key, REPEATABLE_READ); } + /** + * @throws Exception If failed. + */ @Test public void testLockBeforePutRemoteKeyRepeatableReadTest() throws Exception { IgniteCache cache = transactionalCache(ignite0); @@ -212,45 +229,50 @@ public void testLockBeforePutRemoteKeyRepeatableReadTest() throws Exception { checkLockBeforePut(cache, key, REPEATABLE_READ); } + /** + * @throws Exception If failed. + */ @Test public void testEntryVersionDoesNotChangeWhenEntryIsNotUpdated() throws Exception { IgniteCache cache = transactionalCache(ignite0); - int localKey = primaryKey(cache); + int locKey = primaryKey(cache); int remoteKey = primaryKey(ignite1.cache(DEFAULT_CACHE_NAME)); - cache.put(localKey, 0); + cache.put(locKey, 0); cache.put(remoteKey, 0); - CacheEntry localEntry = cache.getEntry(localKey); + CacheEntry locEntry = cache.getEntry(locKey); CacheEntry remoteEntry = cache.getEntry(remoteKey); try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { if (batch) { - assertTrue(acquireLockForEntries(cache, List.of(localEntry, remoteEntry), 0)); - } else { - assertTrue(acquireLockForEntry(cache, localEntry, 0)); + assertTrue(acquireLockForEntries(cache, List.of(locEntry, remoteEntry), 0)); + } + else { + assertTrue(acquireLockForEntry(cache, locEntry, 0)); assertTrue(acquireLockForEntry(cache, remoteEntry, 0)); } tx.commit(); } - assertEquals(localEntry.version(), cache.getEntry(localKey).version()); + assertEquals(locEntry.version(), cache.getEntry(locKey).version()); assertEquals(remoteEntry.version(), cache.getEntry(remoteKey).version()); try (Transaction tx = ignite0.transactions().txStart(PESSIMISTIC, READ_COMMITTED)) { if (batch) { - assertTrue(acquireLockForEntries(cache, List.of(localEntry, remoteEntry), 0)); - } else { - assertTrue(acquireLockForEntry(cache, localEntry, 0)); + assertTrue(acquireLockForEntries(cache, List.of(locEntry, remoteEntry), 0)); + } + else { + assertTrue(acquireLockForEntry(cache, locEntry, 0)); assertTrue(acquireLockForEntry(cache, remoteEntry, 0)); } tx.rollback(); } - assertEquals(localEntry.version(), cache.getEntry(localKey).version()); + assertEquals(locEntry.version(), cache.getEntry(locKey).version()); assertEquals(remoteEntry.version(), cache.getEntry(remoteKey).version()); } @@ -342,6 +364,8 @@ public void testVersionedEntryLockCanBeRetriedAfterWaitTimeout() throws Exceptio * Checks that the lock entry method returns {@code false} when can not wait for lock. * * @param timeout Timeout. + * @param commit Whether to commit the transaction. + * @param locForLocal Whether the contended key should be local to the lock initiator. * @throws IgniteCheckedException If failed. */ private void checkReturningWhenCanNotWaitForLock(long timeout, boolean commit, boolean locForLocal) throws IgniteCheckedException { @@ -408,13 +432,26 @@ private void checkReturningWhenCanNotWaitForLock(long timeout, boolean commit, b if (commit) { assertEquals(42, holderCache.get(txKey1).intValue()); assertEquals(42, cache.get(txKey2).intValue()); - } else { + } + else { assertEquals(0, holderCache.get(txKey1).intValue()); assertEquals(0, cache.get(txKey2).intValue()); } } - private void checkLockBeforePut(IgniteCache cache, int key, TransactionIsolation txIsolation) throws IgniteCheckedException { + /** + * Checks locking an entry before updating it. + * + * @param cache Cache. + * @param key Key. + * @param txIsolation Transaction isolation. + * @throws IgniteCheckedException If failed. + */ + private void checkLockBeforePut( + IgniteCache cache, + int key, + TransactionIsolation txIsolation + ) throws IgniteCheckedException { cache.put(key, 0); CacheEntry entry = cache.getEntry(key); @@ -443,6 +480,15 @@ private void checkLockBeforePut(IgniteCache cache, int key, Tr assertTrue(cache.getEntry(key).version().compareTo(entry.version()) > 0); } + /** + * Acquires a transactional lock for an entry. + * + * @param cache Cache. + * @param entry Entry. + * @param timeout Lock wait timeout. + * @return {@code true} if the lock was acquired. + * @throws IgniteCheckedException If failed. + */ @SuppressWarnings("unchecked") private static boolean acquireLockForEntry( IgniteCache cache, @@ -452,6 +498,15 @@ private static boolean acquireLockForEntry( return cache.unwrap(IgniteCacheProxy.class).internalProxy().lockTxEntry(entry, timeout); } + /** + * Acquires transactional locks for entries. + * + * @param cache Cache. + * @param entries Entries. + * @param timeout Lock wait timeout. + * @return {@code true} if all locks were acquired. + * @throws IgniteCheckedException If failed. + */ @SuppressWarnings("unchecked") private static boolean acquireLockForEntries( IgniteCache cache, diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java index 0e12cea317ff6..f60649f34162f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/LockTxEntryOneNodeTest.java @@ -52,6 +52,7 @@ */ @RunWith(Parameterized.class) public class LockTxEntryOneNodeTest extends GridCommonAbstractTest { + /** Global test timeout. */ @ClassRule public static Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS); @@ -81,6 +82,7 @@ public class LockTxEntryOneNodeTest extends GridCommonAbstractTest { /** * Returns data for test. + * * @return Test parameters. */ @Parameterized.Parameters(name = "commit={0}, useNearCache={1}, cacheMode={2}") @@ -113,6 +115,7 @@ public static Collection testData() { super.afterTestsStopped(); } + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -293,6 +296,12 @@ public void testLockTxEntryAsyncFailsInOptimisticTransaction() throws Exception } } + /** + * Checks that another transaction can access the cache. + * + * @param cache Cache. + * @throws IgniteCheckedException If failed. + */ private void checkAccessInOtherTx(IgniteCache cache) throws IgniteCheckedException { IgniteInternalFuture accessFut = GridTestUtils.runAsync(new Callable() { @Override public Void call() { @@ -309,6 +318,12 @@ private void checkAccessInOtherTx(IgniteCache cache) throws Ig accessFut.get(10_000); } + /** + * Checks that another transaction cannot access the cache. + * + * @param cache Cache. + * @throws IgniteCheckedException If failed. + */ private void checkInaccessInOtherTx(IgniteCache cache) throws IgniteCheckedException { IgniteInternalFuture accessFut = GridTestUtils.runAsync(new Callable() { @Override public Void call() { @@ -329,11 +344,19 @@ private void checkInaccessInOtherTx(IgniteCache cache) throws }, TransactionTimeoutException.class); } + /** + * Acquires a transactional lock for an entry. + * + * @param entry Entry. + * @param timeout Lock wait timeout. + * @return {@code true} if the lock was acquired. + * @throws IgniteCheckedException If failed. + */ @SuppressWarnings("unchecked") private boolean acquireLockForEntry( - CacheEntry entry, + CacheEntry entry, long timeout ) throws IgniteCheckedException { return cache.unwrap(IgniteCacheProxy.class).internalProxy().lockTxEntry(entry, timeout); - } + } } From dc126f0c3f11cefe7437ee58ee98810822024778 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Tue, 23 Jun 2026 13:53:39 +0300 Subject: [PATCH 3/3] WIP --- .../cache/distributed/dht/GridDhtLockFuture.java | 2 +- .../dht/GridDhtTransactionalCacheAdapter.java | 12 ++++++------ .../cache/distributed/dht/GridDhtTxLocalAdapter.java | 2 +- .../dht/colocated/GridDhtColocatedCache.java | 2 +- .../dht/colocated/GridDhtColocatedLockFuture.java | 2 +- .../cache/distributed/near/GridNearLockFuture.java | 2 +- .../cache/distributed/near/GridNearTxLocal.java | 2 +- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 177c3b24414db..a11c6bcb8379e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -1204,7 +1204,7 @@ private long lockTimeout() { * @return {@code True} if separate lock wait timeout expires before transaction timeout. */ private boolean waitTimeoutExpiresFirst() { - return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + return timeout >= 0 && (timeout == 0 ? waitTimeout != 0 : timeout > waitTimeout); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 2f8fc05451a0a..6a7989664a0bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -1112,7 +1112,7 @@ public IgniteInternalFuture lockAllAsync( @Override public GridNearLockResponse apply(Boolean b, Exception e) { if (e != null) e = U.unwrap(e); - else if (!b && !waitTimeoutExpiresFirst(req)) + else if (!b && !waitTimeoutExpiresFirst(req.waitTimeout(), req.timeout())) e = new GridCacheLockTimeoutException(req.version()); boolean lockAcquired = e != null || b; @@ -1203,12 +1203,12 @@ private GridNearLockResponse sendClientLockRemapResponse(ClusterNode nearNode, } /** - * @param req Near lock request. + * @param waitTimeout Lock wait timeout. + * @param timeout Transaction timeout. * @return {@code True} if separate lock wait timeout expires before transaction timeout. */ - private static boolean waitTimeoutExpiresFirst(GridNearLockRequest req) { - return req.waitTimeout() < 0 || - (req.waitTimeout() > 0 && (req.timeout() <= 0 || req.waitTimeout() < req.timeout())); + private static boolean waitTimeoutExpiresFirst(long waitTimeout, long timeout) { + return timeout >= 0 && (timeout == 0 ? waitTimeout != 0 : timeout > waitTimeout); } /** @@ -1282,7 +1282,7 @@ private GridNearLockResponse createLockReply( boolean ownsLock = e.lockedBy(mappedVer) || ctx.mvcc().isRemoved(e.context(), mappedVer); - if (!ownsLock && waitTimeoutExpiresFirst(req)) { + if (!ownsLock && waitTimeoutExpiresFirst(req.waitTimeout(), req.timeout())) { res.lockAcquired(false); return res; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 45d00b985ef5f..bd85a2f9e5aca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -811,7 +811,7 @@ private IgniteInternalFuture obtainLockAsync( * @return {@code True} if separate lock wait timeout expires before transaction timeout. */ private static boolean waitTimeoutExpiresFirst(long waitTimeout, long timeout) { - return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + return timeout >= 0 && (timeout == 0 ? waitTimeout != 0 : timeout > waitTimeout); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 208b804312578..cd2be97f97189 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -1135,7 +1135,7 @@ else if (ret != null && !ret.success()) * @return {@code True} if separate lock wait timeout expires before transaction timeout. */ private static boolean waitTimeoutExpiresFirst(long waitTimeout, long timeout) { - return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + return timeout >= 0 && (timeout == 0 ? waitTimeout != 0 : timeout > waitTimeout); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index bb58c7b3a392c..d2c68d9b7bd09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -1511,7 +1511,7 @@ private long lockTimeout() { * @return {@code True} if separate lock wait timeout expires before transaction timeout. */ private boolean waitTimeoutExpiresFirst() { - return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + return timeout >= 0 && (timeout == 0 ? waitTimeout != 0 : timeout > waitTimeout); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index bd57c530edd65..62ea1975dd50e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -1441,7 +1441,7 @@ private long lockTimeout() { * @return {@code True} if separate lock wait timeout expires before transaction timeout. */ private boolean waitTimeoutExpiresFirst() { - return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + return timeout >= 0 && (timeout == 0 ? waitTimeout != 0 : timeout > waitTimeout); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 5f3e1d827b950..5725233d4da66 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -4335,7 +4335,7 @@ public IgniteInternalFuture lockAllAsync(GridCacheContext c * @return {@code True} if separate lock wait timeout expires before transaction timeout. */ private static boolean waitTimeoutExpiresFirst(long waitTimeout, long timeout) { - return waitTimeout < 0 || (waitTimeout > 0 && (timeout <= 0 || waitTimeout < timeout)); + return timeout >= 0 && (timeout == 0 ? waitTimeout != 0 : timeout > waitTimeout); } /** {@inheritDoc} */