diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index 8ff89f4e9b5..958689acc83 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; @@ -88,7 +89,7 @@ public void doVerb(Message message) } catch (RejectException e) { - if (!command.isTrackingWarnings()) + if (!command.isTrackingWarnings() || e instanceof TombstoneOverwhelmingException) throw e; // make sure to log as the exception is swallowed diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java index c0050110513..351af987e1d 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java @@ -33,6 +33,8 @@ import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.filter.LocalReadSizeTooLargeException; +import org.apache.cassandra.db.filter.TombstoneOverwhelmingException; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.locator.InetAddressAndPort; @@ -146,6 +148,39 @@ public void rejectsRequestWithNonMatchingTransientness() .build()); } + @Test(expected = TombstoneOverwhelmingException.class) + public void tombstoneExceptionPropagatesWithoutWarningTracking() + { + ReadCommand command = new TombstoneThrowingReadCommand(metadata); + handler.doVerb(Message.builder(READ_REQ, command) + .from(peer()) + .withId(messageId()) + .build()); + } + + @Test(expected = TombstoneOverwhelmingException.class) + public void tombstoneExceptionPropagatesWithWarningTracking() + { + ReadCommand command = new TombstoneThrowingReadCommand(metadata); + handler.doVerb(Message.builder(READ_REQ, command) + .from(peer()) + .withFlag(MessageFlag.TRACK_WARNINGS) + .withId(messageId()) + .build()); + } + + @Test + public void rejectExceptionSwallowedWithWarningTracking() + { + ReadCommand command = new RejectThrowingReadCommand(metadata); + handler.doVerb(Message.builder(READ_REQ, command) + .from(peer()) + .withFlag(MessageFlag.TRACK_WARNINGS) + .withId(messageId()) + .build()); + // If we reach here without an exception, the RejectException was correctly swallowed + } + private static int messageId() { return random.nextInt(); @@ -199,6 +234,62 @@ public boolean isTrackingRepairedData() } } + private static class TombstoneThrowingReadCommand extends SinglePartitionReadCommand + { + TombstoneThrowingReadCommand(TableMetadata metadata) + { + super(metadata.epoch, + false, + 0, + false, + PotentialTxnConflicts.DISALLOW, + metadata, + FBUtilities.nowInSeconds(), + ColumnFilter.all(metadata), + RowFilter.none(), + DataLimits.NONE, + KEY, + new ClusteringIndexSliceFilter(Slices.ALL, false), + null, + false, + null); + } + + @Override + public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) + { + throw new TombstoneOverwhelmingException(1000, "test_query", metadata(), KEY, Clustering.EMPTY); + } + } + + private static class RejectThrowingReadCommand extends SinglePartitionReadCommand + { + RejectThrowingReadCommand(TableMetadata metadata) + { + super(metadata.epoch, + false, + 0, + false, + PotentialTxnConflicts.DISALLOW, + metadata, + FBUtilities.nowInSeconds(), + ColumnFilter.all(metadata), + RowFilter.none(), + DataLimits.NONE, + KEY, + new ClusteringIndexSliceFilter(Slices.ALL, false), + null, + false, + null); + } + + @Override + public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) + { + throw new LocalReadSizeTooLargeException("test read size too large"); + } + } + private static DecoratedKey key(TableMetadata metadata, int key) { return metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key));