|
21 | 21 | import com.fasterxml.jackson.databind.ObjectMapper; |
22 | 22 | import com.fasterxml.jackson.databind.ObjectReader; |
23 | 23 | import org.hamcrest.CoreMatchers; |
| 24 | +import org.junit.jupiter.api.Assertions; |
| 25 | +import org.junit.jupiter.api.Assumptions; |
24 | 26 | import org.junit.jupiter.api.Disabled; |
25 | 27 | import org.junit.jupiter.api.Test; |
26 | 28 | import org.opencb.biodata.models.variant.Variant; |
27 | 29 | import org.opencb.biodata.models.variant.avro.*; |
28 | 30 | import org.opencb.cellbase.core.serializer.CellBaseJsonFileSerializer; |
29 | 31 | import org.opencb.cellbase.core.serializer.CellBaseSerializer; |
30 | 32 | import org.opencb.commons.utils.FileUtils; |
| 33 | +import org.rocksdb.Options; |
| 34 | +import org.rocksdb.RocksDB; |
| 35 | +import org.rocksdb.RocksDBException; |
| 36 | +import org.rocksdb.RocksIterator; |
31 | 37 |
|
32 | 38 | import java.io.BufferedReader; |
33 | 39 | import java.io.IOException; |
34 | 40 | import java.net.URISyntaxException; |
| 41 | +import java.nio.file.Files; |
35 | 42 | import java.nio.file.Path; |
36 | 43 | import java.nio.file.Paths; |
37 | 44 | import java.util.*; |
@@ -586,6 +593,90 @@ public void parse() throws Exception { |
586 | 593 |
|
587 | 594 | } |
588 | 595 |
|
| 596 | + @Test |
| 597 | + public void testGwasIndexer() throws RocksDBException, IOException { |
| 598 | + Path gwasDataDir = Paths.get("/opt/gwas-data/"); |
| 599 | + Assumptions.assumeTrue(Files.exists(gwasDataDir)); |
| 600 | + |
| 601 | + Path gwasFile = gwasDataDir.resolve("gwas_catalog_v1.0.2-associations_e105_r2022-04-07.tsv"); |
| 602 | + Path dbSnpTabixFile = gwasDataDir.resolve("All.vcf.gz"); |
| 603 | + Path genomeSequenceFilePath = gwasDataDir.resolve("Homo_sapiens.GRCh38.fa"); |
| 604 | + String assembly = "grch38"; |
| 605 | + |
| 606 | + Path outputDir = Paths.get("/tmp"); |
| 607 | + Path rocksDbDir = outputDir.resolve("integration.idx"); |
| 608 | + Object[] dbConnection = getDBConnection(rocksDbDir.toAbsolutePath().toString(), true); |
| 609 | + RocksDB rdb = (RocksDB) dbConnection[0]; |
| 610 | + |
| 611 | + GwasIndexer gwasIndexer = new GwasIndexer(gwasFile, dbSnpTabixFile, genomeSequenceFilePath, assembly, rdb); |
| 612 | + gwasIndexer.index(); |
| 613 | + |
| 614 | + CellBaseSerializer serializer = new CellBaseJsonFileSerializer(outputDir, CLINICAL_VARIANT_DATA, true); |
| 615 | + // DO NOT change the name of the rocksIterator variable - for some unexplainable reason Java VM crashes if it's |
| 616 | + // named "iterator" |
| 617 | + RocksIterator rocksIterator = rdb.newIterator(); |
| 618 | + |
| 619 | + ObjectMapper mapper = new ObjectMapper(); |
| 620 | + System.out.println("Reading from RocksDB index and serializing to " + serializer.getOutdir().resolve(serializer.getFileName())); |
| 621 | + int counter = 0; |
| 622 | + for (rocksIterator.seekToFirst(); rocksIterator.isValid(); rocksIterator.next()) { |
| 623 | + Variant variant = parseVariantFromVariantId(new String(rocksIterator.key())); |
| 624 | + if (variant != null) { |
| 625 | + VariantAnnotation variantAnnotation = mapper.readValue(rocksIterator.value(), VariantAnnotation.class); |
| 626 | + variant.setAnnotation(variantAnnotation); |
| 627 | + serializer.serialize(variant); |
| 628 | + counter++; |
| 629 | + if (counter % 10000 == 0) { |
| 630 | + System.out.printf(counter + " written"); |
| 631 | + } |
| 632 | + } |
| 633 | + } |
| 634 | + serializer.close(); |
| 635 | + System.out.println("Done."); |
| 636 | + serializer.close(); |
| 637 | + |
| 638 | + Path clinicalVariantFile = outputDir.resolve(CLINICAL_VARIANT_DATA + JSON_GZ_EXTENSION); |
| 639 | + Assertions.assertTrue(Files.exists(clinicalVariantFile)); |
| 640 | + |
| 641 | + // Read serialized variants and check some of them |
| 642 | + List<Variant> variantList = loadSerializedVariants(clinicalVariantFile.toAbsolutePath().toString()); |
| 643 | + Assertions.assertFalse(variantList.isEmpty()); |
| 644 | + Assertions.assertEquals(93, variantList.size()); |
| 645 | + boolean found = false; |
| 646 | + for (Variant variant : variantList) { |
| 647 | + assertNotNull(variant.getAnnotation().getGwas()); |
| 648 | + Assertions.assertEquals("EBI GWAS catalog", variant.getAnnotation().getGwas().get(0).getSource(), "Source"); |
| 649 | + if (variant.getChromosome().equals("11") && variant.getStart().equals(27658369) && variant.getReference().equals("C") |
| 650 | + && variant.getAlternate().equals("T")) { |
| 651 | + found = true; |
| 652 | + Assertions.assertEquals("rs6265", variant.getAnnotation().getGwas().get(0).getSnpId()); |
| 653 | + Assertions.assertEquals(3.0E-10, variant.getAnnotation().getGwas().get(0).getStudies().get(0).getTraits().get(0).getScores().get(0).getPvalue()); |
| 654 | + Assertions.assertEquals(9.522878745280337, variant.getAnnotation().getGwas().get(0).getStudies().get(0).getTraits().get(0).getScores().get(0).getPvalueMlog()); |
| 655 | + } |
| 656 | + } |
| 657 | + Assertions.assertTrue(found, "Expected GWAS variant not found in serialized variants."); |
| 658 | + |
| 659 | + // Clean and delete directories/files |
| 660 | + rdb.close(); |
| 661 | + org.apache.commons.io.FileUtils.deleteDirectory(rocksDbDir.toFile()); |
| 662 | + Files.deleteIfExists(clinicalVariantFile); |
| 663 | + } |
| 664 | + |
| 665 | + private Variant parseVariantFromVariantId(String variantId) { |
| 666 | + try { |
| 667 | + String[] parts = variantId.split(":", -1); // -1 to include empty fields |
| 668 | + if (parts[1].contains("-")) { |
| 669 | + String[] pos = parts[1].split("-"); |
| 670 | + return new Variant(parts[0].trim(), Integer.parseInt(pos[0].trim()), Integer.parseInt(pos[1].trim()), parts[2], parts[3]); |
| 671 | + } else { |
| 672 | + return new Variant(parts[0].trim(), Integer.parseInt(parts[1].trim()), parts[2], parts[3]); |
| 673 | + } |
| 674 | + } catch (Exception e) { |
| 675 | + System.out.printf("{}. Impossible to create the variant object from the variant ID: {}", e.getMessage(), variantId); |
| 676 | + return null; |
| 677 | + } |
| 678 | + } |
| 679 | + |
589 | 680 | private void cleanUp() throws URISyntaxException, IOException { |
590 | 681 | // Clean up temporary files/directories/indexes |
591 | 682 | org.apache.commons.io.FileUtils.deleteDirectory(Paths.get("/tmp/clinicalVariant1/").toFile()); |
@@ -726,17 +817,17 @@ private List<Variant> loadSerializedVariants(String fileName) { |
726 | 817 | // |
727 | 818 | // EvidenceEntry entry = buildEvidenceEntry(info); |
728 | 819 | // System.out.println(variant.toStringSimple() + " : " + entry.toString()); |
729 | | -//// if (variant != null) { |
730 | | -//// boolean success = updateRocksDB(variant); |
731 | | -//// // updateRocksDB may fail (false) if normalisation process fails |
732 | | -//// if (success) { |
733 | | -//// numberIndexedRecords++; |
734 | | -//// } |
735 | | -//// } |
736 | | -//// totalNumberRecords++; |
737 | | -//// if (totalNumberRecords % 1000 == 0) { |
738 | | -//// logger.info("{} records parsed", totalNumberRecords); |
739 | | -//// } |
| 820 | + //// if (variant != null) { |
| 821 | + //// boolean success = updateRocksDB(variant); |
| 822 | + //// // updateRocksDB may fail (false) if normalisation process fails |
| 823 | + //// if (success) { |
| 824 | + //// numberIndexedRecords++; |
| 825 | + //// } |
| 826 | + //// } |
| 827 | + //// totalNumberRecords++; |
| 828 | + //// if (totalNumberRecords % 1000 == 0) { |
| 829 | + //// logger.info("{} records parsed", totalNumberRecords); |
| 830 | + //// } |
740 | 831 | // } |
741 | 832 | // } |
742 | 833 | // } |
@@ -811,4 +902,38 @@ public void testVariant() { |
811 | 902 | System.out.println(v.toStringSimple()); |
812 | 903 | } |
813 | 904 |
|
| 905 | + private Object[] getDBConnection(String dbLocation, boolean forceCreate) { |
| 906 | + boolean indexingNeeded = forceCreate || !Files.exists(Paths.get(dbLocation)); |
| 907 | + // a static method that loads the RocksDB C++ library. |
| 908 | + RocksDB.loadLibrary(); |
| 909 | + // the Options class contains a set of configurable DB options |
| 910 | + // that determines the behavior of a database. |
| 911 | + Options options = new Options().setCreateIfMissing(true); |
| 912 | + |
| 913 | +// options.setMaxBackgroundCompactions(4); |
| 914 | +// options.setMaxBackgroundFlushes(1); |
| 915 | +// options.setCompressionType(CompressionType.NO_COMPRESSION); |
| 916 | +// options.setMaxOpenFiles(-1); |
| 917 | +// options.setIncreaseParallelism(4); |
| 918 | +// options.setCompactionStyle(CompactionStyle.LEVEL); |
| 919 | +// options.setLevelCompactionDynamicLevelBytes(true); |
| 920 | + |
| 921 | + RocksDB db = null; |
| 922 | + try { |
| 923 | + // a factory method that returns a RocksDB instance |
| 924 | + if (indexingNeeded) { |
| 925 | + db = RocksDB.open(options, dbLocation); |
| 926 | + } else { |
| 927 | + db = RocksDB.openReadOnly(options, dbLocation); |
| 928 | + } |
| 929 | + // do something |
| 930 | + } catch (RocksDBException e) { |
| 931 | + // do some error handling |
| 932 | + e.printStackTrace(); |
| 933 | + System.exit(1); |
| 934 | + } |
| 935 | + |
| 936 | + return new Object[]{db, options, dbLocation, indexingNeeded}; |
| 937 | + |
| 938 | + } |
814 | 939 | } |
0 commit comments