Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 87 additions & 33 deletions bigfiles/README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,67 @@
# Scala CPS-Dataset-Comparison
# Scala CPS-Dataset-Comparison

This is scala implementation of the project. It is used for comparing big files (files that can not fit to RAM).

- [Project Structure](#project-structure)
- [How to run](#how-to-run)
- [Requirements](#requirements)
- [Switch to specific SDK](#switch-to-specific-sdk)
- [Requirements](#requirements)
- [Switch to specific SDK](#switch-to-specific-sdk)
- [How to run tests](#how-to-run-tests)

## Project Structure

The project is split into two SBT submodules:

| Module | Purpose |
|---------|---------|
| `core` | Pure comparison logic and data models: `Comparator`, `DatasetComparisonHelper`, `HashUtils`, all `analysis/*` classes. No CLI or I/O dependencies. |
| `cli` | CLI entry point, I/O, serialization: `DatasetComparison`, `MetricsSerializer`, `IOHandler`, `ArgsParser`, `Arguments`, `DiffComputeType`, `OutputFormatType`. Depends on `core`. |

```
bigfiles/
├── core/
│ ├── src/main/scala/za/co/absa/
│ │ ├── analysis/ # AnalyseStat, AnalysisResult, ColumnsDiff, RowsDiff,
│ │ │ # ComparisonMetrics, ComparisonMetricsCalculator, RowByRowAnalysis
│ │ ├── hash/HashUtils.scala
│ │ ├── Comparator.scala
│ │ └── DatasetComparisonHelper.scala
│ └── src/test/scala/ # ComparatorTest, ComparisonMetricsCalculatorTest,
│ # DatasetComparisonHelperTest, HashTableTest,
│ # RowByRowAnalysesTest, AnalysisResultTest, SparkTestSession
├── cli/
│ ├── src/main/scala/za/co/absa/
│ │ ├── io/IOHandler.scala
│ │ ├── parser/ # ArgsParser, Arguments, DiffComputeType, OutputFormatType
│ │ ├── DatasetComparison.scala
│ │ └── MetricsSerializer.scala
│ ├── src/main/resources/application.conf
│ └── src/test/scala/ # DatasetComparisonTest, ArgsParserTest, IOHandlerTest,
│ # MetricsSerializerTest, VersionTest, SparkTestSession
├── testdata/ # Shared test resources (symlinked into core & cli test resources)
│ ├── namesA.parquet
│ ├── namesB.parquet
│ ├── inputA.txt
│ ├── inputB.txt
│ └── out.txt
└── project/
└── Dependencies.scala # coreDependencies and cliDependencies groups
```

## How to run

First run assembly: `sbt assembly`
First run assembly from the `cli` module:

```bash
sbt cli/assembly
```

Then run:

```bash
spark-submit target/scala-2.12/dataset-comparison-assembly-1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path>

spark-submit cli/target/scala-2.12/dataset-comparison-assembly-1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path>
```

### Parameters:
| Parameter | Description | Required |
|-----------|-------------|----------|
Expand All @@ -29,23 +74,27 @@ spark-submit target/scala-2.12/dataset-comparison-assembly-1.0.jar -o <output-pa

Example:
```bash
spark-submit --class africa.absa.cps.DatasetComparison \
--conf "spark.driver.extraJavaOptions=-Dconfig.file=/../bigfiles/src/main/resources/application.conf" \
target/scala-2.11/dataset-comparison-assembly-0.1.0.jar \
spark-submit --class za.co.absa.DatasetComparison \
--conf "spark.driver.extraJavaOptions=-Dconfig.file=/../bigfiles/cli/src/main/resources/application.conf" \
cli/target/scala-2.12/dataset-comparison-assembly-1.0.jar \
-o "/test_files/output_names$(date '+%Y-%m-%d_%H%M%S')" \
--inputA /test_files/namesA.parquet \
--inputB /test_files/namesB.parquet \
-d Row

```

### Run with specific config
### Run with specific config

```bash
spark-submit --class za.co.absa.DatasetComparison --conf "spark.driver.extraJavaOptions=-Dconfig.file=/path/to/application.conf" target/scala-2.12/dataset-comparison-assembly-0.1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path> -d Row
spark-submit --class za.co.absa.DatasetComparison \
--conf "spark.driver.extraJavaOptions=-Dconfig.file=/path/to/application.conf" \
cli/target/scala-2.12/dataset-comparison-assembly-1.0.jar \
-o <output-path> --inputA <A-file-path> --inputB <B-file-path> -d Row
```

`-d Row` is optional parameter for detailed analyses that specifies which analyses to use. Now it can be only `Row`.
It will compute detailed analyses if number of different columns is less than 200, you can change this threshold in `src/main/resources/application.conf`.
It will compute detailed analyses if number of different columns is less than 200, you can change this threshold in `cli/src/main/resources/application.conf`.

### Spark configuration
you can set spark configuration in `spark-defaults.conf` file it is stored in `$SPARK_HOME/conf` directory
You will found there `spark-defaults.conf.template` remove `.template` from the file name and set your configuration there.
Expand All @@ -54,7 +103,6 @@ It could look like this:
```bash
spark.hadoop.fs.default.name hdfs://localhost:9999/ # set your hdfs uri
spark.hadoop.fs.defaultFS hdfs://localhost:9999/ # set your hdfs uri

```

### Requirements
Expand Down Expand Up @@ -88,18 +136,24 @@ sdk env install

## How to run tests

Tests are split between the two modules. Use the following commands:

| sbt command | Test | Info |
|--------------|----------------------|----------------------------------------------------------------------------------------------------------------------|
| `sbt test` | Unit & Integration | It will run tests in bigfiles/src/test/scala folder |
| sbt command | Module | Test files |
|--------------------|--------|------------|
| `sbt core/test` | core | ComparatorTest, ComparisonMetricsCalculatorTest, DatasetComparisonHelperTest, HashTableTest, RowByRowAnalysesTest, AnalysisResultTest |
| `sbt cli/test` | cli | DatasetComparisonTest, ArgsParserTest, IOHandlerTest, MetricsSerializerTest, VersionTest |
| `sbt test` | both | Runs all tests across both modules (root aggregate) |
| `sbt jacoco` | Jacoco code coverage | Runs all possible tests with code coverage - i.e. you need environment setup for all previous unit/integration tests |

`SparkTestSession` is duplicated into both `core/src/test/scala/` and `cli/src/test/scala/` as it is required by tests in both modules.

Test resources (`namesA.parquet`, `namesB.parquet`, `inputA.txt`, `inputB.txt`, `out.txt`) live in `testdata/` at the root and are symlinked into `core/src/test/resources/` and `cli/src/test/resources/`.

---------

## Installing hadoop

tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on-ubuntu-18-393h)
tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on-ubuntu-18-393h)
1. sdk install hadoop
2. ``$ echo "export PATH=\$PATH:\$HADOOP_HOME/bin:\$HADOOP_HOME/sbin" >> ~/.bashrc``
3. configure files core-site.xml, hdfs-site.xml, mapred-site.xml, yarn-site.xml in /Users/<username>/.sdkman/candidates/hadoop/3.3.5/etc
Expand Down Expand Up @@ -135,22 +189,22 @@ tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on
<value>false</value>
</property>
```
Add this into **core-site.xml** between <configuration> tags
Add this into **core-site.xml** between <configuration> tags
```xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9999</value>
</property>
```
Add this into **mapred-site.xml** between <configuration> tags

Add this into **mapred-site.xml** between <configuration> tags
```xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
```
Add this into **yarn-site.xml** between <configuration> tags
Add this into **yarn-site.xml** between <configuration> tags
```xml
<property>
<name>yarn.nodemanager.aux-services</name>
Expand All @@ -162,9 +216,9 @@ tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on
</property>
```
Add this into **hadoop-env.sh**
```export JAVA_HOME="/.../.sdkman/candidates/java/8.0.422-amzn"```
```export JAVA_HOME="/.../.sdkman/candidates/java/11.0.x-amzn"```

4. create directories by configuration for example:
4. create directories by configuration for example:
```
sudo mkdir -p /opt/hadoop_tmp/hdfs/datanode
sudo mkdir -p /opt/hadoop_tmp/hdfs/namenode
Expand All @@ -174,28 +228,28 @@ tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on
6. copy keys to authorized_keys `cat ~/.ssh/id_ed25519.pub >> ~/.ssh/authorized_keys`
7. copy keys into ssh localhost `ssh-copy-id username@localhost`
8. test ssh ` ssh <username>@localhost`
8. format namenode `hdfs namenode -format -force` (or `hadoop namenode -format`)
9. start hadoop `start-dfs.sh && start-yarn.sh`
10. add files to hdfs `hdfs dfs -put /path/to/file /path/to/hdfs`
11. stop hdfs `stop-dfs.sh && stop-yarn.sh`
9. format namenode `hdfs namenode -format -force` (or `hadoop namenode -format`)
10. start hadoop `start-dfs.sh && start-yarn.sh`
11. add files to hdfs `hdfs dfs -put /path/to/file /path/to/hdfs`
12. stop hdfs `stop-dfs.sh && stop-yarn.sh`

if something goes wrong check logs in /Users/<username>/.sdkman/candidates/hadoop/3.3.5/logs

ResourceManager web running on http://localhost:8088/cluster
hdfs running on port 9999
NameNode web interface http://localhost:9870/
NameNode web interface http://localhost:9870/

And you have to set remote login on:
![img.png](images/remote_login.png)

running with hadoop:
```bash
sbt assembly
spark-submit target/scala-2.12/dataset-comparison-assembly-0.1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path> --fsURI http://localhost:9999/
sbt cli/assembly
spark-submit cli/target/scala-2.12/dataset-comparison-assembly-1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path> --fsURI http://localhost:9999/
```
` spark-submit target/scala-2.12/dataset-comparison-assembly-0.1.0.jar -o /test_files/output --inputA /test_files/RUN20_edit.parquet --inputB /test_files/RUN20.parquet --fsURI hdfs://localhost:9999/`
`spark-submit cli/target/scala-2.12/dataset-comparison-assembly-1.0.jar -o /test_files/output --inputA /test_files/RUN20_edit.parquet --inputB /test_files/RUN20.parquet --fsURI hdfs://localhost:9999/`
or you can uncomment code at the start of validate part in ArgsParserTest and change
FS to HDFS_URI and then run `sbt test`
FS to HDFS_URI and then run `sbt cli/test`

### Setting up IntelliJ IDEA

Expand Down
51 changes: 33 additions & 18 deletions bigfiles/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import sbtassembly.MergeStrategy

import java.time.LocalDateTime

enablePlugins(GitVersioning, GitBranchPrompt)
enablePlugins(ScalafmtPlugin)

lazy val scala212 = "2.12.20"
lazy val scala211 = "2.11.12"
lazy val supportedScalaVersions = List(scala211, scala212)
Expand All @@ -28,27 +31,32 @@ ThisBuild / version := "0.1.0"
ThisBuild / scalaVersion := scala212
ThisBuild / organization := "za.co.absa"

lazy val root = (project in file("."))
// ── core module ───────────────────────────────────────────────────────────────
// Pure comparison logic and data models. No CLI or I/O concerns.
lazy val core = (project in file("core"))
.enablePlugins(JacocoFilterPlugin)
.enablePlugins(GitVersioning, GitBranchPrompt)
.enablePlugins(ScalafmtPlugin)
.settings(
name := "dataset-comparison-core",
crossScalaVersions := supportedScalaVersions,
libraryDependencies ++= coreDependencies(scalaVersion.value),
javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint"),
Test / fork := true,
Test / baseDirectory := (ThisBuild / baseDirectory).value / "core"
)

// ── cli module ────────────────────────────────────────────────────────────────
// CLI entry point, argument parsing, I/O and serialization. Depends on core.
lazy val cli = (project in file("cli"))
.enablePlugins(JacocoFilterPlugin)
.dependsOn(core)
.settings(
name := "dataset-comparison",
crossScalaVersions := supportedScalaVersions,
assembly / mainClass := Some("za.co.absa.DatasetComparison"),
libraryDependencies ++= bigfilesDependencies ++ Seq(
"org.apache.spark" %% "spark-core" % sparkVersionForScala(scalaVersion.value) % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersionForScala(scalaVersion.value) % Provided,
"org.json4s" %% "json4s-native" % jsonVersionForScala(scalaVersion.value),
"org.json4s" %% "json4s-jackson" % jsonVersionForScala(scalaVersion.value),
"org.apache.hadoop" % "hadoop-common" % hadoopVersionForScala(scalaVersion.value),
"org.apache.hadoop" % "hadoop-client" % hadoopVersionForScala(scalaVersion.value),
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersionForScala(scalaVersion.value),
"com.lihaoyi" %% "upickle" % unpickleVersionForScala(scalaVersion.value)
),
libraryDependencies ++= cliDependencies(scalaVersion.value),
javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint"),
Test / fork := true,
Test / baseDirectory := (ThisBuild / baseDirectory).value,
Test / baseDirectory := (ThisBuild / baseDirectory).value / "cli",
packageOptions := Seq(
ManifestAttributes(
("Built-By", System.getProperty("user.name")),
Expand All @@ -58,7 +66,14 @@ lazy val root = (project in file("."))
)
)

ThisBuild / assemblyMergeStrategy := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
// ── root aggregate ────────────────────────────────────────────────────────────
lazy val root = (project in file("."))
.enablePlugins(JacocoFilterPlugin)
.enablePlugins(GitVersioning, GitBranchPrompt)
.enablePlugins(ScalafmtPlugin)
.aggregate(core, cli)
.settings(
name := "dataset-comparison-root",
// Prevent root from being published or assembled
publish / skip := true
)
93 changes: 93 additions & 0 deletions bigfiles/cli/src/main/scala/za/co/absa/DatasetComparison.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/** Copyright 2020 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package za.co.absa

import za.co.absa.analysis.{AnalysisResult, ComparisonMetricsCalculator, RowByRowAnalysis}
import za.co.absa.parser.{ArgsParser, DiffComputeType}
import za.co.absa.io.IOHandler
import org.apache.spark.sql.SparkSession
import com.typesafe.config.ConfigFactory
import org.slf4j.{Logger, LoggerFactory}

import java.nio.file.Paths

object DatasetComparison {
private val logger: Logger = LoggerFactory.getLogger(this.getClass)

def main(args: Array[String]): Unit = {
val conf = ConfigFactory.load()
val threshold = conf.getInt("dataset-comparison.analysis.diff-threshold")

val arguments = ArgsParser.getArgs(args)

implicit val spark: SparkSession = SparkSession
.builder()
.appName("DatasetComparator")
.getOrCreate()

// validate arguments
ArgsParser.validate(arguments)

// read data
val rawDataA = IOHandler.sparkRead(arguments.inputA)
val rawDataB = IOHandler.sparkRead(arguments.inputB)

val (diffA, diffB) = Comparator.compare(rawDataA, rawDataB, arguments.exclude)

// write diff files
val out = arguments.out
IOHandler.dfWrite(Paths.get(out, "inputA_differences").toString, diffA, arguments.outFormat)
IOHandler.dfWrite(Paths.get(out, "inputB_differences").toString, diffB, arguments.outFormat)

val metrics = ComparisonMetricsCalculator
.calculate(rawDataA, rawDataB, diffA, diffB, arguments.exclude)

val metricsJson = MetricsSerializer.serialize(metrics)
IOHandler.jsonWrite(Paths.get(out, "metrics.json").toString, metricsJson)

arguments.diff match {
case DiffComputeType.Row =>
logger.info("Starting row-by-row analysis")
RowByRowAnalysis.analyze(diffA, diffB, threshold) match {
case AnalysisResult.Success(diffAToB, diffBToA) =>
logger.info("Computing row-by-row differences")
IOHandler.rowDiffWriteAsJson(Paths.get(out, "A_to_B_changes.json").toString, diffAToB)
IOHandler.rowDiffWriteAsJson(Paths.get(out, "B_to_A_changes.json").toString, diffBToA)
logger.info("Row-by-row analysis completed successfully")

case AnalysisResult.DatasetsIdentical =>
logger.info("Datasets are identical, no row-by-row analysis needed")

case AnalysisResult.OneSidedDifference(countA, countB) =>
logger.info(
s"""Detailed analysis will not be computed - one-sided difference:
|A: ${if (countA == 0) "All rows matched" else s"$countA differences (see inputA_differences)"}
|B: ${if (countB == 0) "All rows matched" else s"$countB differences (see inputB_differences)"}
|Row-by-row matching requires differences in both datasets
|""".stripMargin
)

case AnalysisResult.ThresholdExceeded(countA, countB, thresh) =>
logger.warn(
s"""Row-by-row analysis skipped - threshold exceeded:
|A differences: $countA
|B differences: $countB
|Threshold: $thresh
|Details available in inputA_differences and inputB_differences files
|""".stripMargin
)
}
case _ => logger.info("None DiffComputeType selected")
}
}
}
Loading
Loading