|
33 | 33 | import java.io.File; |
34 | 34 | import java.io.IOException; |
35 | 35 | import java.io.InputStreamReader; |
| 36 | +import java.io.UncheckedIOException; |
36 | 37 | import java.net.InetAddress; |
37 | 38 | import java.net.URI; |
38 | 39 | import java.nio.ByteBuffer; |
|
54 | 55 |
|
55 | 56 | import org.apache.accumulo.core.client.Accumulo; |
56 | 57 | import org.apache.accumulo.core.client.AccumuloClient; |
| 58 | +import org.apache.accumulo.core.client.admin.compaction.CompressionConfigurer; |
57 | 59 | import org.apache.accumulo.core.client.admin.compaction.TooManyDeletesSelector; |
58 | 60 | import org.apache.accumulo.core.client.summary.summarizers.DeletesSummarizer; |
59 | 61 | import org.apache.accumulo.core.clientImpl.Namespace; |
60 | 62 | import org.apache.accumulo.core.conf.DefaultConfiguration; |
61 | 63 | import org.apache.accumulo.core.conf.Property; |
| 64 | +import org.apache.accumulo.core.data.TableId; |
62 | 65 | import org.apache.accumulo.core.data.Value; |
63 | 66 | import org.apache.accumulo.core.file.FileOperations; |
64 | 67 | import org.apache.accumulo.core.file.FileSKVWriter; |
|
116 | 119 | import org.apache.accumulo.proxy.thrift.UnknownScanner; |
117 | 120 | import org.apache.accumulo.proxy.thrift.UnknownWriter; |
118 | 121 | import org.apache.accumulo.proxy.thrift.WriterOptions; |
| 122 | +import org.apache.accumulo.server.ServerContext; |
119 | 123 | import org.apache.accumulo.server.util.PortUtils; |
120 | 124 | import org.apache.accumulo.test.constraints.MaxMutationSize; |
121 | 125 | import org.apache.accumulo.test.constraints.NumericValueConstraint; |
@@ -1830,6 +1834,78 @@ public void testCompactionSelector() throws Exception { |
1830 | 1834 | assertEquals(1, countFiles(tableNames[1]), messagePrefix + tableNames[2]); |
1831 | 1835 | } |
1832 | 1836 |
|
| 1837 | + /** |
| 1838 | + * Retrieves the collective size of all the files in a table. |
| 1839 | + */ |
| 1840 | + private long getFileSizes(ServerContext ctx, String tableName) { |
| 1841 | + TableId tableId = TableId.of(ctx.tableOperations().tableIdMap().get(tableName)); |
| 1842 | + try (var tabletsMetadata = ctx.getAmple().readTablets().forTable(tableId).build();) { |
| 1843 | + return tabletsMetadata.stream().flatMap(tm -> tm.getFiles().stream()).mapToLong(stf -> { |
| 1844 | + try { |
| 1845 | + return FileSystem.getLocal(new Configuration()).getFileStatus(stf.getPath()).getLen(); |
| 1846 | + } catch (IOException e) { |
| 1847 | + throw new UncheckedIOException(e); |
| 1848 | + } |
| 1849 | + }).sum(); |
| 1850 | + } |
| 1851 | + } |
| 1852 | + |
| 1853 | + /** |
| 1854 | + * Testing the functionality for the CompactionConfigurer by testing an implementation of it. The |
| 1855 | + * implementation being tested is the CompressionConfigurer. |
| 1856 | + */ |
| 1857 | + @Test |
| 1858 | + public void testCompactionConfigurer() throws Exception { |
| 1859 | + // Delete the table to start fresh |
| 1860 | + client.deleteTable(sharedSecret, tableName); |
| 1861 | + // Create two tables |
| 1862 | + final String[] tableNames = getUniqueNameArray(2); |
| 1863 | + for (String tableName : tableNames) { |
| 1864 | + client.createTable(sharedSecret, tableName, true, TimeType.MILLIS); |
| 1865 | + client.setTableProperty(sharedSecret, tableName, "table.file.compress.type", "none"); |
| 1866 | + } |
| 1867 | + |
| 1868 | + // Create data to add to the tables |
| 1869 | + Map<ByteBuffer,List<ColumnUpdate>> mutation = new HashMap<>(); |
| 1870 | + byte[] data = new byte[100000]; |
| 1871 | + Arrays.fill(data, (byte) 65); |
| 1872 | + for (int i = 0; i < 10; i++) { |
| 1873 | + String row = String.format("%09d", i); |
| 1874 | + ColumnUpdate columnUpdate = new ColumnUpdate(s2bb("big"), s2bb("files")); |
| 1875 | + columnUpdate.setDeleteCell(false); |
| 1876 | + columnUpdate.setValue(data); |
| 1877 | + mutation.put(s2bb(row), List.of(columnUpdate)); |
| 1878 | + } |
| 1879 | + for (String tableName : tableNames) { |
| 1880 | + client.updateAndFlush(sharedSecret, tableName, mutation); |
| 1881 | + client.flushTable(sharedSecret, tableName, null, null, true); |
| 1882 | + } |
| 1883 | + |
| 1884 | + // Checking the sizes of the files before compaction |
| 1885 | + for (String tableName : tableNames) { |
| 1886 | + long sizes = getFileSizes(getCluster().getServerContext(), tableName); |
| 1887 | + assertTrue(sizes > data.length * 10.0 && sizes < data.length * 11.0); |
| 1888 | + } |
| 1889 | + |
| 1890 | + // Create a PluginConfig for the CompressionConfigurer |
| 1891 | + PluginConfig configurerCompact = new PluginConfig(CompressionConfigurer.class.getName(), |
| 1892 | + Map.of(CompressionConfigurer.LARGE_FILE_COMPRESSION_THRESHOLD, data.length + "", |
| 1893 | + CompressionConfigurer.LARGE_FILE_COMPRESSION_TYPE, "gz")); |
| 1894 | + |
| 1895 | + // Compacting the tables one with the Configurer, one without |
| 1896 | + client.compactTable(sharedSecret, tableNames[0], null, null, null, true, true, null, |
| 1897 | + configurerCompact); |
| 1898 | + client.compactTable(sharedSecret, tableNames[1], null, null, null, true, true, null, null); |
| 1899 | + |
| 1900 | + // Checking to see that the data sizes are the appropriate size. Based on the data, it will be |
| 1901 | + // significantly smaller with compression |
| 1902 | + long sizes1 = getFileSizes(getCluster().getServerContext(), tableNames[0]); |
| 1903 | + long sizes2 = getFileSizes(getCluster().getServerContext(), tableNames[1]); |
| 1904 | + assertTrue(sizes1 < data.length); |
| 1905 | + assertTrue(sizes1 < sizes2, "Size1 is " + sizes1 + ", size2 is " + sizes2); |
| 1906 | + assertTrue(sizes2 > data.length * 10.0 && sizes2 < data.length * 11.0); |
| 1907 | + } |
| 1908 | + |
1833 | 1909 | @Test |
1834 | 1910 | public void namespaceOperations() throws Exception { |
1835 | 1911 | // default namespace and accumulo namespace |
|
0 commit comments