Skip to content

Commit 773db9a

Browse files
committed
clear orphaned
1 parent 967aaa0 commit 773db9a

7 files changed

Lines changed: 234 additions & 33 deletions

File tree

src/java/org/apache/cassandra/db/compression/CompressionDictionary.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,5 +451,18 @@ public LightweightCompressionDictionary(String keyspaceName,
451451
this.checksum = checksum;
452452
this.size = size;
453453
}
454+
455+
@Override
456+
public String toString()
457+
{
458+
return "LightweightCompressionDictionary{" +
459+
"keyspaceName='" + keyspaceName + '\'' +
460+
", tableName='" + tableName + '\'' +
461+
", tableId='" + tableId + '\'' +
462+
", dictId=" + dictId +
463+
", checksum=" + checksum +
464+
", size=" + size +
465+
'}';
466+
}
454467
}
455468
}

src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@
3939

4040
public class CompressionDictionaryDetailsTabularData
4141
{
42+
/**
43+
* Position inside index names of tabular type of tabular data returned upon
44+
* listing dictionaries where table id is expected to be located.
45+
* We do not need to process this entry at all time, e.g. when not listing
46+
* orphaned compression dictionaries.
47+
*/
48+
public static final int TABULAR_DATA_TYPE_TABLE_ID_INDEX = 2;
49+
4250
/**
4351
* Position inside index names of tabular type of tabular data returned upon
4452
* listing dictionaries where raw dictionary is expected to be located.

src/java/org/apache/cassandra/schema/SystemDistributedKeyspace.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.cassandra.locator.InetAddressAndPort;
5959
import org.apache.cassandra.repair.CommonRange;
6060
import org.apache.cassandra.repair.messages.RepairOption;
61+
import org.apache.cassandra.tcm.ClusterMetadata;
6162
import org.apache.cassandra.utils.FBUtilities;
6263
import org.apache.cassandra.utils.TimeUUID;
6364

@@ -523,16 +524,85 @@ public static CompressionDictionary retrieveCompressionDictionary(String keyspac
523524
* @param tableName the table name to retrieve the dictionary for
524525
* @param tableId the table id to retrieve the dictionary for
525526
* @return the compression dictionaries identified by the specified keyspace and table,
526-
* or null if no dictionary exists or if an error occurs during retrieval
527+
* empty list if no dictionary exists or null if an error occurs during retrieval
527528
*/
528529
@Nullable
529530
public static List<LightweightCompressionDictionary> retrieveLightweightCompressionDictionaries(String keyspaceName, String tableName, String tableId)
530531
{
531532
String query = "SELECT keyspace_name, table_name, table_id, kind, dict_id, dict_length, dict_checksum FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' AND table_id='%s'";
532533
String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES, keyspaceName, tableName, tableId);
534+
return retrieveLightweightCompressionDictionariesInternal(fmtQuery);
535+
}
536+
537+
/**
538+
* Retrieves all compression dictionaries in a lightweight form.
539+
*
540+
* @return all compression dictionaries, lightweight form, empty list if no dictionary exists
541+
* or null if an error occurs during retrieval
542+
*/
543+
@Nullable
544+
public static List<LightweightCompressionDictionary> retrieveLightweightCompressionDictionaries()
545+
{
546+
String query = "SELECT keyspace_name, table_name, table_id, kind, dict_id, dict_length, dict_checksum FROM %s.%s";
547+
String fmtQuery = format(query, SchemaConstants.DISTRIBUTED_KEYSPACE_NAME, COMPRESSION_DICTIONARIES);
548+
return retrieveLightweightCompressionDictionariesInternal(fmtQuery);
549+
}
550+
551+
/**
552+
* Retrieves all orphaned compression dictionaries in a lightweight form.
553+
*
554+
* @return all orphaned compression dictionaries, lightweight form, empty list if no dictionary exists
555+
* or null if an error occurs during retrieval
556+
*/
557+
public static List<LightweightCompressionDictionary> retrieveOrphanedLightweightCompressionDictionaries()
558+
{
559+
List<LightweightCompressionDictionary> dicts = SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries();
560+
if (dicts == null || dicts.isEmpty())
561+
return List.of();
562+
563+
List<LightweightCompressionDictionary> orphaned = new ArrayList<>();
564+
for (LightweightCompressionDictionary dict : dicts)
565+
{
566+
TableMetadata tableMetadata = ClusterMetadata.current().schema.getTableMetadata(dict.keyspaceName, dict.tableName);
567+
if (tableMetadata == null || !tableMetadata.id.toLongString().equals(dict.tableId))
568+
orphaned.add(dict);
569+
}
570+
571+
return orphaned;
572+
}
573+
574+
/**
575+
* Removes all orphaned compression dictionaries.
576+
*/
577+
public static void clearOrphanedCompressionDictionaries()
578+
{
579+
for (LightweightCompressionDictionary orphanedDict : SystemDistributedKeyspace.retrieveOrphanedLightweightCompressionDictionaries())
580+
{
581+
try
582+
{
583+
QueryProcessor.execute(String.format("DELETE FROM %s.%s WHERE keyspace_name='%s' AND table_name='%s' AND table_id='%s' AND dict_id=%s",
584+
SchemaConstants.DISTRIBUTED_KEYSPACE_NAME,
585+
SystemDistributedKeyspace.COMPRESSION_DICTIONARIES,
586+
orphanedDict.keyspaceName,
587+
orphanedDict.tableName,
588+
orphanedDict.tableId,
589+
orphanedDict.dictId.id),
590+
ConsistencyLevel.ONE);
591+
}
592+
catch (Exception e)
593+
{
594+
logger.error("Unable to delete orphaned compression dictionary: {}, Reason: {}",
595+
orphanedDict.toString(),
596+
e.getMessage());
597+
}
598+
}
599+
}
600+
601+
private static List<LightweightCompressionDictionary> retrieveLightweightCompressionDictionariesInternal(String query)
602+
{
533603
try
534604
{
535-
UntypedResultSet result = QueryProcessor.execute(fmtQuery, ConsistencyLevel.ONE);
605+
UntypedResultSet result = QueryProcessor.execute(query, ConsistencyLevel.ONE);
536606
if (result.isEmpty())
537607
return Collections.emptyList();
538608
List<LightweightCompressionDictionary> dictionaries = new ArrayList<>();

src/java/org/apache/cassandra/service/StorageService.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import javax.management.openmbean.CompositeData;
6161
import javax.management.openmbean.OpenDataException;
6262
import javax.management.openmbean.TabularData;
63+
import javax.management.openmbean.TabularDataSupport;
6364

6465
import com.codahale.metrics.Meter;
6566
import com.google.common.annotations.VisibleForTesting;
@@ -105,6 +106,8 @@
105106
import org.apache.cassandra.db.commitlog.CommitLog;
106107
import org.apache.cassandra.db.compaction.CompactionManager;
107108
import org.apache.cassandra.db.compaction.OperationType;
109+
import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
110+
import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData;
108111
import org.apache.cassandra.db.guardrails.Guardrails;
109112
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
110113
import org.apache.cassandra.dht.BootStrapper;
@@ -5782,4 +5785,25 @@ public List<String> mutateSSTableRepairedState(boolean repaired, boolean preview
57825785
}
57835786
return sstablesTouched;
57845787
}
5788+
5789+
@Override
5790+
public TabularData getOrphanedCompressionDictionaries()
5791+
{
5792+
List<LightweightCompressionDictionary> dicts = SystemDistributedKeyspace.retrieveOrphanedLightweightCompressionDictionaries();
5793+
TabularDataSupport tabularData = new TabularDataSupport(CompressionDictionaryDetailsTabularData.TABULAR_TYPE);
5794+
5795+
if (dicts.isEmpty())
5796+
return tabularData;
5797+
5798+
for (LightweightCompressionDictionary dict : dicts)
5799+
tabularData.put(CompressionDictionaryDetailsTabularData.fromLightweightCompressionDictionary(dict));
5800+
5801+
return tabularData;
5802+
}
5803+
5804+
@Override
5805+
public void clearOrphanedCompressionDictionaries()
5806+
{
5807+
SystemDistributedKeyspace.clearOrphanedCompressionDictionaries();
5808+
}
57855809
}

src/java/org/apache/cassandra/service/StorageServiceMBean.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,4 +1400,8 @@ public void enableAuditLog(String loggerName, String includedKeyspaces, String e
14001400

14011401
/** Mutates the repaired state of all SSTables for the given SSTables */
14021402
public List<String> mutateSSTableRepairedState(boolean repaired, boolean preview, String keyspace, List<String> tables);
1403+
1404+
TabularData getOrphanedCompressionDictionaries();
1405+
1406+
void clearOrphanedCompressionDictionaries();
14031407
}

src/java/org/apache/cassandra/tools/NodeProbe.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2819,6 +2819,20 @@ public TrainingState getCompressionDictionaryTrainingState(String keyspace, Stri
28192819
return TrainingState.fromCompositeData(compositeData);
28202820
}
28212821

2822+
/**
2823+
* Gets all compression dictionaries which are orphaned, that is a table they were trained for was dropped.
2824+
* @return tabular data of orphaned dictionaries
2825+
*/
2826+
public TabularData getOrphanedCompressionDictionaries()
2827+
{
2828+
return ssProxy.getOrphanedCompressionDictionaries();
2829+
}
2830+
2831+
public void clearOrphanedCompressionDictionaries()
2832+
{
2833+
ssProxy.clearOrphanedCompressionDictionaries();
2834+
}
2835+
28222836
private CompressionDictionaryManagerMBean getDictionaryManagerProxy(String keyspace, String table) throws IOException
28232837
{
28242838
// Construct table-specific MBean name

src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java

Lines changed: 99 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.concurrent.TimeUnit;
26+
import java.util.function.Supplier;
2627

2728
import javax.management.openmbean.CompositeData;
2829
import javax.management.openmbean.TabularData;
@@ -37,6 +38,7 @@
3738
import org.apache.cassandra.db.compression.TrainingState;
3839
import org.apache.cassandra.io.util.File;
3940
import org.apache.cassandra.io.util.FileUtils;
41+
import org.apache.cassandra.schema.SystemDistributedKeyspace;
4042
import org.apache.cassandra.tools.NodeProbe;
4143
import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
4244
import org.apache.cassandra.utils.Clock;
@@ -61,7 +63,8 @@
6163
subcommands = { CompressionDictionaryCommandGroup.TrainDictionary.class,
6264
CompressionDictionaryCommandGroup.ListDictionaries.class,
6365
CompressionDictionaryCommandGroup.ExportDictionary.class,
64-
CompressionDictionaryCommandGroup.ImportDictionary.class })
66+
CompressionDictionaryCommandGroup.ImportDictionary.class,
67+
CompressionDictionaryCommandGroup.CleanupDictionaries.class})
6568
public class CompressionDictionaryCommandGroup
6669
{
6770
@Command(name = "train",
@@ -216,40 +219,43 @@ public static class ListDictionaries extends AbstractCommand
216219
@Override
217220
protected void execute(NodeProbe probe)
218221
{
219-
try
220-
{
221-
TableBuilder tableBuilder = new TableBuilder();
222-
TabularData tabularData = probe.listCompressionDictionaries(keyspace, table);
223-
List<String> indexNames = tabularData.getTabularType().getIndexNames();
224-
225-
List<String> columns = new ArrayList<>(indexNames);
226-
// ignore raw dict
227-
columns.remove(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX);
228-
tableBuilder.add(columns);
229-
230-
for (Object eachDict : tabularData.keySet())
231-
{
232-
final List<?> dictRow = (List<?>) eachDict;
233-
234-
List<String> rowValues = new ArrayList<>();
235-
236-
for (int i = 0; i < dictRow.size(); i++)
237-
{
238-
// ignore raw dict
239-
if (i == CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX)
240-
continue;
222+
ListingHelper.list(probe,
223+
() ->
224+
{
225+
try
226+
{
227+
return probe.listCompressionDictionaries(keyspace, table);
228+
}
229+
catch (Throwable t)
230+
{
231+
throw new RuntimeException(t);
232+
}
233+
},
234+
List.of(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_TABLE_ID_INDEX,
235+
CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX));
236+
}
237+
}
241238

242-
rowValues.add(dictRow.get(i).toString());
243-
}
244-
tableBuilder.add(rowValues);
245-
}
239+
@Command(name = "cleanup", description = "Clean up orphaned dictionaries by deleting them from " + SystemDistributedKeyspace.NAME
240+
+ '.' + SystemDistributedKeyspace.COMPRESSION_DICTIONARIES +
241+
" table, these are ones for which a table they were trained for was dropped.")
242+
public static class CleanupDictionaries extends AbstractCommand
243+
{
244+
@Option(names = {"-d", "--dry"}, description = "Only display orphaned dictionaries, do not remove them.")
245+
private boolean dry;
246246

247-
tableBuilder.printTo(probe.output().out);
247+
@Override
248+
protected void execute(NodeProbe probe)
249+
{
250+
if (dry)
251+
{
252+
ListingHelper.list(probe,
253+
probe::getOrphanedCompressionDictionaries,
254+
List.of(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX));
248255
}
249-
catch (Exception e)
256+
else
250257
{
251-
probe.output().err.printf("Failed to list dictionaries: %s%n", e.getMessage());
252-
System.exit(1);
258+
probe.clearOrphanedCompressionDictionaries();
253259
}
254260
}
255261
}
@@ -368,4 +374,66 @@ private void validateFile(File dictionaryFile, NodeProbe probe)
368374
}
369375
}
370376
}
377+
378+
private static class ListingHelper
379+
{
380+
/**
381+
* Lists dictionaries, the concrete mean of querying them is delegated to a specific subcommand.
382+
*
383+
* @param probe probe to query Cassandra with
384+
* @param tabularDataSupplier supplier of tabular data containing dictionaries to display
385+
* @param removedColumnsIndices supplier of an array with indexes of columns in tabular data to be ignored
386+
* when displaying them
387+
*/
388+
public static void list(NodeProbe probe,
389+
Supplier<TabularData> tabularDataSupplier,
390+
List<Integer> removedColumnsIndices)
391+
{
392+
try
393+
{
394+
TableBuilder tableBuilder = new TableBuilder();
395+
TabularData tabularData = tabularDataSupplier.get();
396+
List<String> indexNames = tabularData.getTabularType().getIndexNames();
397+
398+
// ignore columns not meant to be displayed to a user
399+
List<String> columns = new ArrayList<>();
400+
for (int i = 0; i < indexNames.size(); i++)
401+
{
402+
if (!removedColumnsIndices.contains(i))
403+
{
404+
columns.add(indexNames.get(i));
405+
}
406+
}
407+
408+
tableBuilder.add(columns);
409+
410+
boolean hasOuput = false;
411+
for (Object eachDict : tabularData.keySet())
412+
{
413+
final List<?> dictRow = (List<?>) eachDict;
414+
415+
List<String> rowValues = new ArrayList<>();
416+
417+
for (int i = 0; i < dictRow.size(); i++)
418+
{
419+
// ignore columns not meant to be displayed to a user
420+
if (removedColumnsIndices.contains(i))
421+
continue;
422+
423+
rowValues.add(dictRow.get(i).toString());
424+
hasOuput = true;
425+
}
426+
tableBuilder.add(rowValues);
427+
}
428+
429+
if (hasOuput)
430+
tableBuilder.printTo(probe.output().out);
431+
}
432+
catch (Exception e)
433+
{
434+
probe.output().err.printf("Failed to list dictionaries: %s%n", e.getMessage());
435+
System.exit(1);
436+
}
437+
}
438+
}
371439
}

0 commit comments

Comments
 (0)