pgCompare is a database comparison and reconciliation tool that compares tables between source and target databases, and identifies differences.
┌──────────────────────────────────────────────────────────────────────────┐
│ MAIN ENTRY POINT │
│ pgCompare.main() │
└────────────────────┬─────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ COMMAND LINE PARSING │
│ CommandLineParser.parse() │
│ Options: --batch, --project, --report, --table, --help, --version │
└────────────────────┬─────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ APPLICATION INITIALIZATION │
│ ApplicationContext.initialize() │
│ - Load Settings │
│ - Connect to Repository DB │
│ - Load Project Config │
│ - Run Validation │
└────────────────────┬─────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ ACTION EXECUTION │
│ ApplicationContext.executeAction() │
│ │
│ ┌──────────────┬──────────────┬──────────────┬──────────────┐ │
│ │ │ │ │ │ │
│ ▼ ▼ ▼ ▼ ▼ │
│ INIT DISCOVER COMPARE CHECK COPY-TABLE │
└──────────────────────────────────────────────────────────────────────────┘
- pgCompare - Main entry point
Configuration and initialization classes
High-level orchestration and business logic
Core processing logic (threading, comparison, database operations)
Data transfer objects and domain models
Reusable services for database operations, SQL generation, etc.
Utility classes for common operations
Purpose: Application entry point
public class pgCompare
+ main(String[] args)
Calls:
- CommandLineParser.parse()
- ApplicationContext (constructor)
- ApplicationContext.initialize()
- ApplicationContext.executeAction()
- LoggingUtils.write()
Purpose: Manages application lifecycle and context
public class ApplicationContext
- cmd: CommandLine
- pid: Integer
- action: String
- batchParameter: Integer
- connRepo: Connection
- connSource: Connection
- connTarget: Connection
+ ApplicationContext(CommandLine cmd)
+ initialize()
+ executeAction()
+ getPid()
+ getBatchParameter()
+ getConnRepo()
+ getConnSource()
+ getConnTarget()
- connectToRepository()
- connectToSourceAndTarget()
- setProjectConfig(Connection conn, Integer pid)
- logStartupInfo()
- logConfigurationParameters()
- handleRepoInitialization()
- performDiscovery()
- performCompare()
- performCopyTable()
Calls:
- Settings.Props
- Settings.setProjectConfig()
- ValidationUtils.all()
- LoggingUtils
- DatabaseConnectionService.getConnection()
- RepositoryInitializationService.createRepository()
- DiscoverController.discoverTables()
- DiscoverController.discoverColumns()
- CompareController.performCompare()
- TableController.performCopyTable()
Purpose: Parse command line arguments
public class CommandLineParser
+ parse(String[] args): CommandLine
+ showHelp()
+ showVersion()
- createOptions(): Options
Calls:
- Settings.VERSION
Purpose: Application settings and configuration
public class Settings
+ VERSION: String
+ Props: Properties
+ setEnvironment(Properties prop): Properties
+ setProjectConfig(Connection conn, Integer pid)
Calls:
- FileSystemUtils.loadPropertiesFromFile()
- SQLExecutionHelper.simpleSelectReturnString()
- JsonProcessingUtils.mergeJsonObjects()
Purpose: SQL constant definitions per database platform
public interface DB2SQLConstants
public interface MariaDBSQLConstants
public interface MSSQLSQLConstants
public interface MYSQLSQLConstants
public interface OracleSQLConstants
public interface PostgresSQLConstants
public interface RepoSQLConstants
public interface SnowflakeSQLConstants
Purpose: Orchestrates comparison and recheck operations
public class CompareController
+ performCompare(ApplicationContext context)
+ reconcileData(Connection connRepo, Connection connSource, Connection connTarget,
Integer rid, Boolean check, DataComparisonTable dct,
DataComparisonTableMap dctmSource, DataComparisonTableMap dctmTarget,
ColumnMetadata ciSource, ColumnMetadata ciTarget, Integer cid): JSONObject
- validatePerformCompareInputs(ApplicationContext context)
Calls:
- TableController.reconcileTables()
- RepoController.getTables()
- ReportController.createSummary()
- ThreadManager.createComparison()
- DataValidationThread.checkRows()
- ResultProcessor.processResults()
Purpose: Manages table operations and processing
public class TableController
+ getTableMap(Connection conn, Integer tid, String tableOrigin): DataComparisonTableMap
+ performCopyTable(ApplicationContext context): int
+ reconcileTables(CachedRowSet tablesResultSet, boolean isCheck,
RepoController repoController, ApplicationContext context): ComparisonResults
+ createDCTableFromResultSet(CachedRowSet resultSet, Integer pid): DataComparisonTable
- reconcileEnabledTable(DataComparisonTable table, boolean isCheck,
RepoController rpc, ApplicationContext context): JSONObject
- createSkippedTableResult(DataComparisonTable table): JSONObject
record ComparisonResults(int tablesProcessed, JSONArray runResults)
Calls:
- SQLExecutionHelper.simpleSelect()
- CompareController.reconcileData()
- ColumnManagementService (various methods)
- ColumnMetadataBuilder.build()
Purpose: Manages column metadata operations
public class ColumnController
+ getColumnInfo(JSONObject columnMap, String targetType, String platform,
String schema, String table, Boolean useDatabaseHash): ColumnMetadata
Calls:
- DatabaseMetadataService (platform methods)
- ColumnMetadataBuilder.build()
Purpose: Orchestrates discovery of tables and columns
public class DiscoverController
+ discoverColumns(Properties props, Integer pid, String table,
Connection connRepo, Connection connSource, Connection connTarget)
+ discoverTables(Properties Props, Integer pid, String table,
Connection connRepo, Connection connSource, Connection connTarget)
- cleanupPreviousDiscovery(Connection connRepo, String sql, ArrayList<Object> binds)
- discoverTables(Properties props, Integer pid, String table,
Connection connRepo, Connection conn, String origin, boolean populateDCTable)
Calls:
- ColumnDiscoveryService.discoverColumns()
- DatabaseMetadataService.getTables()
- SQLExecutionHelper (various methods)
Purpose: Repository database operations
public class RepoController
+ completeTableHistory(Connection conn, Integer tid, Integer batchNbr,
Integer rowCount, String actionResult)
+ createCompareId(Connection connRepo, DataComparisonTableMap dctmTarget, long rid): Integer
+ deleteDataCompare(Connection conn, Integer tid, Integer batchNbr)
+ dcrCreate(Connection conn, int tid, String tableName, long rid): Integer
+ dcrUpdateRowCount(Connection conn, String targetType, Integer cid, Integer rowCount)
+ getTables(Integer pid, Connection conn, Integer batchParameter, String table, Boolean check): CachedRowSet
+ saveTableColumn(Connection conn, DataComparisonTableColumn dctc): DataComparisonTableColumn
+ saveTableColumnMap(Connection conn, DataComparisonTableColumnMap dctcm)
+ startTableHistory(Connection conn, Integer tid, Integer batchNbr): Integer
+ vacuumRepo(Connection conn)
Calls:
- SQLExecutionHelper (various methods)
- TableManagementService (various methods)
Purpose: Generate comparison reports and summaries
public class ReportController
record SummaryStatistics(int totalRows, int outOfSyncRows, long elapsedTime) {
+ getThroughput(): long
}
+ generateHtmlReport(JSONArray report, String filePath, String title)
+ generateCompleteReport(ApplicationContext context, int tablesProcessed,
JSONArray runResult, SummaryStatistics stats, boolean isCheck)
+ createSummary(ApplicationContext context, int tablesProcessed,
JSONArray runResult, boolean isCheck)
- createJobSummaryData(int tablesProcessed, SummaryStatistics stats): JSONObject
- createJobSummaryLayout(): JSONArray
- createRunResultLayout(): JSONArray
- createCheckResultLayout(): JSONArray
- createSection(String title, JSONArray data, JSONArray layout): JSONObject
- addCheckResultsToReport(JSONArray reportArray, JSONArray runResult)
- calculateSummaryStatistics(JSONArray runResult): SummaryStatistics
Calls:
- HTMLWriterUtils (various methods)
- DisplayOperations (various methods)
Purpose: Process and finalize comparison results
public class ResultProcessor
+ processResults(Connection connRepo, long tid, int cid): JSONObject
- configureDatabase(Connection connRepo)
- calculateReconciliationStats(Connection connRepo, long tid): ReconciliationStats
- updateResultWithStats(JSONObject result, ReconciliationStats stats)
- updateDatabaseResults(Connection connRepo, JSONObject result, int cid)
Calls:
- SQLExecutionHelper (various methods)
Purpose: Database-agnostic SQL execution utilities
public class SQLExecutionHelper
record QueryResult<T>(T result, boolean success, String errorMessage, long executionTimeMs)
+ simpleSelect(Connection conn, String sql, ArrayList<Object> binds): CachedRowSet
+ simpleSelectWithResult(Connection conn, String sql, ArrayList<Object> binds): QueryResult<CachedRowSet>
+ simpleSelectReturnInteger(Connection conn, String sql, ArrayList<Object> binds): Integer
+ simpleSelectReturnString(Connection conn, String sql, ArrayList<Object> binds): String
+ simpleUpdate(Connection conn, String sql, ArrayList<Object> binds, Boolean commit): Integer
+ executeBatch(Connection conn, List<String> sqlStatements, boolean commit): int[]
+ simpleUpdateReturning(Connection conn, String sql, ArrayList<Object> binds): CachedRowSet
+ simpleUpdateReturningInteger(Connection conn, String sql, ArrayList<Object> binds): Integer
+ simpleExecute(Connection conn, String sql)
- bindParameters(PreparedStatement stmt, ArrayList<Object> binds)
- validateParameters(Connection conn, String sql, ArrayList<Object> binds)
Calls:
- LoggingUtils.write()
Purpose: Build column metadata for comparisons
public class ColumnMetadataBuilder
+ build(JSONObject columnMap): ColumnMetadata
- buildColumnMetadata(CachedRowSet columns, String platform): ColumnMetadata
- buildColumnExpressionList(JSONArray columns): String
- buildPKExpressionList(JSONArray columns): String
- buildPKJSON(JSONArray columns): String
Calls:
- DatabaseMetadataService (enum methods)
- SQLExecutionHelper.simpleSelect()
Purpose: Snowflake-specific helper operations
public class SnowflakeHelper
+ getMetadataColumns(Connection conn, String schema, String table): JSONArray
+ buildSnowflakeSQL(String sql, Integer tid): String
Calls:
- SQLExecutionHelper.simpleSelect()
Purpose: Thread for loading data from source/target
public class DataComparisonThread extends Thread
- tid, batchNbr, cid: Integer
- parallelDegree, threadNumber: Integer
- modColumn, pkList, stagingTable, targetType: String
- sql: String
- q: BlockingQueue<DataComparisonResult[]>
- ts: ThreadSync
- useDatabaseHash: Boolean
+ DataComparisonThread(Integer threadNumber, DataComparisonTable dct,
DataComparisonTableMap dctm, ColumnMetadata cm, Integer cid,
ThreadSync ts, Boolean useDatabaseHash, String stagingTable,
BlockingQueue<DataComparisonResult[]> q)
+ run()
- initializeRepositoryConnection(String threadName): Connection
- initializeSourceTargetConnection(String threadName): Connection
- handleLoaderThreadBatch(String threadName, DataComparisonResult[] dc, int batchCommitSize)
- handleDirectDatabaseBatch(PreparedStatement stmtLoad, Connection connRepo)
- handleObserverCoordination(String threadName, boolean firstPass, boolean observerThrottle,
RepoController rpc, Connection connRepo, int rowsToReport)
- processRemainingRecords(boolean useLoaderThreads, DataComparisonResult[] dc,
PreparedStatement stmtLoad, RepoController rpc,
Connection connRepo, int rowsToReport)
- waitForQueuesToEmpty(String threadName)
- signalThreadCompletion()
- cleanupResources(String threadName, ResultSet rs, PreparedStatement stmt,
PreparedStatement stmtLoad, Connection connRepo, Connection conn)
Calls:
- DatabaseConnectionService.getConnection()
- HashingUtils.getMd5()
- RepoController.dcrUpdateRowCount()
- LoggingUtils.write()
- ThreadSync.observerWait()
Purpose: Thread for loading data into staging tables
public class DataLoaderThread extends Thread
- q: BlockingQueue<DataComparisonResult[]>
- stagingTable: String
- connRepo: Connection
+ DataLoaderThread(BlockingQueue<DataComparisonResult[]> q, String stagingTable,
Connection connRepo)
+ run()
- processQueue()
- loadBatch(DataComparisonResult[] results)
- cleanup()
Calls:
- SQLExecutionService.simpleUpdate()
- LoggingUtils.write()
Purpose: Validate and check out-of-sync rows
public class DataValidationThread
+ checkRows(Connection repoConn, Connection sourceConn, Connection targetConn,
DataComparisonTable dct, DataComparisonTableMap dctmSource,
DataComparisonTableMap dctmTarget, ColumnMetadata ciSource,
ColumnMetadata ciTarget, Integer cid): JSONObject
+ reCheck(Connection repoConn, Connection sourceConn, Connection targetConn,
DataComparisonTableMap dctmSource, DataComparisonTableMap dctmTarget,
String pkList, ArrayList<Object> binds, DataComparisonResult dcRow,
Integer cid): JSONObject
- extractColumnValue(CachedRowSet rowSet, int columnIndex): String
- removeInSyncRow(Connection repoConn, DataComparisonResult dcRow)
- updateResultCounts(Connection repoConn, JSONObject rowResult,
CachedRowSet sourceRow, CachedRowSet targetRow, Integer cid)
Calls:
- SQLExecutionService.simpleSelect()
- SQLExecutionService.simpleUpdate()
- ColumnMetadataUtils.createColumnFilterClause()
- ColumnMetadataUtils.findColumnAlias()
- DatabaseMetadataService.getQuoteChar()
- DataProcessingUtils.convertClobToString()
- SQLFixGenerationService.generateFixSQL()
Purpose: Monitor and coordinate comparison threads
public class ObserverThread extends Thread
- tid: Integer
- cid: Integer
- stagingTableSource, stagingTableTarget: String
- ts: ThreadSync
- loaderThreads: int
+ ObserverThread(Integer tid, Integer cid, String stagingTableSource,
String stagingTableTarget, ThreadSync ts, int loaderThreads)
+ run()
- executeReconciliationObserver(String threadName, Connection repoConn,
ArrayList<Object> binds, int cntEqual, int deltaCount,
int loaderThreads, DecimalFormat formatter,
int lastRun, RepoController rpc, int sleepTime)
- handleNoMatches(int cntEqual, int deltaCount, ArrayList<Object> binds,
RepoController rpc, Connection repoConn, PreparedStatement stmtSUS)
- isReconciliationComplete(int tmpRowCount, int loaderThreads): boolean
- handleSleepTiming(int tmpRowCount, int cntEqual, int sleepTime)
- performCleanup(String threadName, Connection repoConn, RepoController rpc)
Calls:
- DatabaseConnectionService.getConnection()
- RepoController.dcrUpdateRowCount()
- RepoController.vacuumRepo()
- ThreadSync methods
Purpose: Manage thread lifecycle
public class ThreadManager
+ createComparison(Connection connRepo, Integer cid, DataComparisonTable dct,
DataComparisonTableMap dctmSource, DataComparisonTableMap dctmTarget,
ColumnMetadata ciSource, ColumnMetadata ciTarget)
- createLoaderThreads(BlockingQueue<DataComparisonResult[]> queueSource,
BlockingQueue<DataComparisonResult[]> queueTarget,
String stagingTableSource, String stagingTableTarget,
Connection connRepo): List<DataLoaderThread>
- createComparisonThreads(DataComparisonTable dct, DataComparisonTableMap dctmSource,
DataComparisonTableMap dctmTarget, ColumnMetadata ciSource,
ColumnMetadata ciTarget, Integer cid, ThreadSync ts,
BlockingQueue queueSource, BlockingQueue queueTarget,
String stagingTableSource, String stagingTableTarget): List<Thread>
- startAllThreads(List<Thread> threads)
- waitForAllThreads(List<Thread> threads)
Calls:
- DataComparisonThread (constructor)
- DataLoaderThread (constructor)
- ObserverThread (constructor)
- StagingTableService.createStagingTable()
- StagingTableService.dropStagingTable()
Purpose: Thread synchronization object
public class ThreadSync
+ sourceComplete: boolean
+ targetComplete: boolean
+ sourceWaiting: boolean
+ targetWaiting: boolean
+ stopObserver: boolean
+ observerWait()
+ observerNotify()
Purpose: Column metadata model
@Data
@Builder
public class ColumnMetadata
+ columnList: String
+ nbrColumns: Integer
+ nbrPKColumns: Integer
+ columnExpressionList: String
+ pkExpressionList: String
+ pkList: String
+ pkJSON: String
Purpose: Comparison result model
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataComparisonResult
+ tid: Integer
+ tableName: String
+ pkHash: String
+ columnHash: String
+ pk: String
+ compareResult: String
+ threadNbr: Integer
+ batchNbr: Integer
Purpose: Table configuration model
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataComparisonTable
+ pid: Integer
+ tid: Integer
+ tableAlias: String
+ enabled: boolean
+ batchNbr: Integer
+ parallelDegree: Integer
Purpose: Column configuration model
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataComparisonTableColumn
+ tid: Integer
+ columnId: Integer
+ columnAlias: String
+ enabled: boolean
Purpose: Column mapping model
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataComparisonTableColumnMap
+ columnId: Integer
+ columnOrigin: String
+ columnName: String
+ dataType: String
+ dataClass: String
+ dataLength: Integer
+ numberPrecision: Integer
+ numberScale: Integer
+ columnNullable: boolean
+ columnPrimarykey: boolean
+ mapExpression: String
+ supported: boolean
+ preserveCase: boolean
+ mapType: String
Purpose: Table mapping model
@Data
@AllArgsConstructor
@NoArgsConstructor
public class DataComparisonTableMap
+ tid: Integer
+ destType: String
+ schemaName: String
+ tableName: String
+ modColumn: String
+ tableFilter: String
+ tablePreserveCase: boolean
+ schemaPreserveCase: boolean
+ batchNbr: Integer
+ compareSQL: String
+ tableAlias: String
+ pid: Integer
Purpose: Manage database connections across different platforms
public class DatabaseConnectionService
+ isConnectionValid(Connection conn): boolean
+ getConnection(String platform, String destType): Connection
- buildConnectionUrl(DatabasePlatform platform, String destType): String
- buildConnectionProperties(DatabasePlatform platform, String destType): Properties
- configureConnection(Connection conn, DatabasePlatform platform)
Calls:
- DatabaseMetadataService.DatabasePlatform
- Settings.Props
- LoggingUtils.write()
Purpose: Database platform configuration and metadata operations
public class DatabaseMetadataService
enum DatabasePlatform {
DB2, ORACLE, MARIADB, MYSQL, MSSQL, POSTGRES, SNOWFLAKE
+ getName(): String
+ getUrlTemplate(): String
+ isAutoCommit(): boolean
+ requiresAnsiMode(): boolean
+ getNativeCase(): String
+ getQuoteChar(): String
+ getColumnHashTemplate(): String
+ getConcatOperator(): String
+ getReplacePKSyntax(): String
+ fromString(String platform): DatabasePlatform
}
+ getNativeCase(String platform): String
+ getQuoteChar(String platform): String
+ getConcatOperator(String platform): String
+ getReplacePKSyntax(String platform): String
+ getTables(Connection conn, String schema, String tableFilter, String sql): JSONArray
Calls:
- DataProcessingUtils.ShouldQuoteString()
- LoggingUtils.write()
Purpose: SQL generation and syntax utilities
public class SQLSyntaxService
+ buildGetTablesSQL(Integer batchNbr, String table, Boolean check): String
+ generateCompareSQL(DataComparisonTableMap dctmSource, DataComparisonTableMap dctmTarget,
ColumnMetadata ciSource, ColumnMetadata ciTarget)
+ buildLoadSQL(String columnHashMethod, DataComparisonTableMap tableMap,
ColumnMetadata columnMetadata): String
Calls:
- DatabaseMetadataService.DatabasePlatform
- DataProcessingUtils.ShouldQuoteString()
- Settings.Props
- LoggingUtils.write()
Purpose: Generate SQL statements to fix data differences
public class SQLFixGenerationService
+ generateFixSQL(Connection sourceConn, Connection targetConn,
DataComparisonTableMap dctmSource, DataComparisonTableMap dctmTarget,
ArrayList<Object> binds, DataComparisonResult dcRow,
JSONObject rowResult): String
+ generateFixSQLForMultipleRows(Connection sourceConn, Connection targetConn,
DataComparisonTableMap dctmSource,
DataComparisonTableMap dctmTarget,
JSONArray checkResults): List<String>
- isMissingSource(JSONObject rowResult): boolean
- isMissingTarget(JSONObject rowResult): boolean
- isNotEqual(JSONObject rowResult): boolean
- generateDeleteSQL(DataComparisonTableMap dctmTarget, JSONObject pk): String
- generateInsertSQL(Connection sourceConn, DataComparisonTableMap dctmSource,
DataComparisonTableMap dctmTarget, ArrayList<Object> binds,
JSONObject pk): String
- generateUpdateSQL(Connection sourceConn, Connection targetConn,
DataComparisonTableMap dctmSource, DataComparisonTableMap dctmTarget,
ArrayList<Object> binds, JSONObject pk, JSONObject rowResult): String
- buildWhereClause(JSONObject pk, String quoteChar): String
- formatValue(Object value): String
- buildFilterFromPK(Connection conn, JSONObject pk, String quoteChar): String
Calls:
- SQLExecutionHelper.simpleSelect()
- DatabaseMetadataService.getQuoteChar()
- DataProcessingUtils.ShouldQuoteString()
- LoggingUtils.write()
Purpose: Discover and process columns from databases
public class ColumnDiscoveryService
+ discoverColumns(Properties props, Integer pid, String table,
Connection connRepo, Connection connSource, Connection connTarget)
+ loadColumns(Properties props, Integer tid, String schema, String tableName,
Connection connRepo, Connection connDest, String destRole,
Boolean populateDCTableColumn)
- clearPreviousMappings(Connection connRepo, Integer pid, String table)
- discoverColumnsForRole(Properties props, Integer pid, String table,
Connection connRepo, Connection connDest, String role)
- findExistingColumn(Connection connRepo, Integer tid, String columnName): Integer
- createOrUpdateColumn(Integer tid, String columnName, Integer cid,
Boolean populateDCTableColumn, Connection connRepo): DataComparisonTableColumn
- createColumnMapping(Connection connRepo, Integer tid, Integer columnID,
String destRole, JSONObject columnInfo)
Calls:
- SQLExecutionHelper (various methods)
- ColumnMetadataUtils.getColumns()
- RepoController.saveTableColumn()
- RepoController.saveTableColumnMap()
Purpose: Manage column metadata and mappings
public class ColumnManagementService
+ getColumnMapping(Connection connRepo, Integer tid): String
+ getSourceColumnMetadata(JSONObject columnMap, DataComparisonTableMap dctmSource): ColumnMetadata
+ getTargetColumnMetadata(JSONObject columnMap, DataComparisonTableMap dctmTarget, Boolean check): ColumnMetadata
+ saveTableColumn(Connection conn, DataComparisonTableColumn dctc): DataComparisonTableColumn
- validateTableColumnInputs(Connection conn, DataComparisonTableColumn dctc)
Calls:
- SQLExecutionHelper (various methods)
- ColumnController.getColumnInfo()
Purpose: Manage table configurations and operations
public class TableManagementService
+ saveTable(Connection conn, DataComparisonTable dataComparisonTable): DataComparisonTable
+ saveTableMap(Connection conn, DataComparisonTableMap dataComparisonTableMap)
+ getTables(Integer pid, Connection conn, Integer batchNbr, String table, Boolean check): CachedRowSet
+ completeTableHistory(Connection conn, Integer tid, Integer batchNbr,
Integer rowCount, String actionResult)
+ deleteDataCompare(Connection conn, Integer tid, Integer batchNbr)
- buildGetTablesSQL(Integer batchNbr, String table, Boolean check): String
- buildGetTablesBinds(Integer pid, Integer batchNbr, String table): ArrayList<Object>
Calls:
- SQLExecutionHelper (various methods)
- LoggingUtils.write()
Purpose: Manage staging tables for data loading
public class StagingTableService
+ createStagingTable(Connection conn, String location, Integer tid, Integer threadNbr): String
+ dropStagingTable(Connection conn, String stagingTable)
+ loadFindings(Connection conn, String location, Integer tid, String stagingTable,
Integer batchNbr, Integer threadNbr, String tableAlias)
- validateCreateStagingTableInputs(Connection conn, String location, Integer tid, Integer threadNbr)
- validateDropStagingTableInputs(Connection conn, String stagingTable)
- validateLoadFindingsInputs(Connection conn, String location, Integer tid, String stagingTable,
Integer batchNbr, Integer threadNbr, String tableAlias)
Calls:
- SQLExecutionHelper.simpleExecute()
- SQLExecutionHelper.simpleUpdate()
- Settings.Props
- LoggingUtils.write()
Purpose: Initialize and setup repository database
public class RepositoryInitializationService
enum DDLPhase {
SCHEMA_CREATION, TABLE_CREATION, INDEX_CREATION,
DATA_INSERTION, FUNCTION_CREATION
}
+ createRepository(Properties props, Connection conn)
- createSchema(Properties props, Connection conn)
- createTables(Connection conn)
- createIndexesAndConstraints(Connection conn)
- insertInitialData(Connection conn)
- createFunctions(Connection conn)
- executeDDL(Connection conn, String ddl, DDLPhase phase)
- getPropertyWithDefault(Properties props, String key, String defaultValue): String
Calls:
- DatabaseConnectionService.isConnectionValid()
- SQLExecutionHelper.simpleUpdate()
- RepoSQLConstants (SQL definitions)
- LoggingUtils.write()
Purpose: Logging operations and message formatting
public class LoggingUtils
+ initialize()
+ write(String level, String threadName, String message)
+ writeException(String level, String threadName, Exception e)
- getLogger(): Logger
- formatMessage(String threadName, String message): String
Purpose: Validation operations for connections and configuration
public class ValidationUtils
+ all(String action): boolean
+ validateConnection(Connection conn): boolean
+ validateTable(String tableName): boolean
+ validateSchema(String schemaName): boolean
Calls:
- LoggingUtils.write()
Purpose: Hashing operations for data comparison
public class HashingUtils
+ getMd5(String input): String
+ getSha256(String input): String
Purpose: Column metadata utilities and operations
public class ColumnMetadataUtils
+ getColumns(Properties Props, Connection conn, String schema, String table,
String destRole): JSONArray
+ createColumnFilterClause(Connection conn, Integer tid, String columnAlias,
String origin, String quoteChar): String
+ findColumnAlias(JSONArray columns, String columnName, String origin): String
- buildColumnQuery(String platform, String schema, String table): String
Calls:
- SQLExecutionHelper.simpleSelect()
- Platform-specific SQL constants
Purpose: Data processing and string utilities
public class DataProcessingUtils
+ ShouldQuoteString(boolean preserveCase, String value, String quoteChar): String
+ preserveCase(String value, boolean preserveCase, String nativeCase): String
+ convertClobToString(SerialClob clob): String
+ trimString(String value, int maxLength): String
Purpose: Data type conversion and casting operations
public class DataTypeCastingUtils
+ castToInteger(Object value): Integer
+ castToLong(Object value): Long
+ castToBoolean(Object value): Boolean
+ castToString(Object value): String
Purpose: JSON processing and manipulation utilities
public class JsonProcessingUtils
+ findOne(JSONArray jsonArray, String key, String value): JSONObject
+ mergeJsonObjects(JSONObject base, JSONObject overlay): JSONObject
+ arrayToList(JSONArray array): List<Object>
Purpose: File system operations and I/O
public class FileSystemUtils
+ loadPropertiesFromFile(String fileName): Properties
+ writeFile(String fileName, String content)
+ readFile(String fileName): String
+ fileExists(String fileName): boolean
Purpose: Console display formatting and output utilities
public class DisplayOperations
+ displayTableSummaries(JSONArray runResult)
+ displayJobSummary(int tablesProcessed, SummaryStatistics stats)
+ displayNoTablesMessage(boolean isCheck)
+ displayIndividualTableSummary(JSONObject tableResult, int indent)
+ printSummary(String message, int indent)
Calls:
- LoggingUtils.write()
Purpose: HTML report generation utilities
public class HTMLWriterUtils
+ writeHtmlContent(FileWriter writer, JSONArray report)
+ writeHtmlHeader(FileWriter writer, String title)
+ writeTableHeader(FileWriter writer, JSONArray layout)
+ writeTableRows(FileWriter writer, JSONArray sectionData, JSONArray layout)
+ writeHtmlFooter(FileWriter writer)
pgCompare.main()
├─> CommandLineParser.parse()
│ └─> CommandLineParser.createOptions()
├─> ApplicationContext (constructor)
│ └─> Settings.Props
├─> ApplicationContext.initialize()
│ ├─> LoggingUtils.initialize()
│ ├─> DatabaseConnectionService.getConnection()
│ ├─> Settings.setProjectConfig()
│ │ ├─> SQLExecutionHelper.simpleSelectReturnString()
│ │ └─> JsonProcessingUtils.mergeJsonObjects()
│ └─> ValidationUtils.all()
└─> ApplicationContext.executeAction()
├─> [ACTION: init] RepositoryInitializationService.createRepository()
├─> [ACTION: discover] DiscoverController.discoverTables()
├─> [ACTION: discover] DiscoverController.discoverColumns()
├─> [ACTION: compare/check] CompareController.performCompare()
└─> [ACTION: copy-table] TableController.performCopyTable()
CompareController.performCompare()
├─> RepoController.getTables()
│ └─> SQLExecutionHelper.simpleSelect()
├─> TableController.reconcileTables()
│ └─> For each table:
│ ├─> TableController.getTableMap()
│ │ └─> SQLExecutionHelper.simpleSelect()
│ ├─> ColumnManagementService.getColumnMapping()
│ │ └─> SQLExecutionHelper.simpleSelectReturnString()
│ ├─> ColumnController.getColumnInfo()
│ │ └─> ColumnMetadataBuilder.build()
│ ├─> CompareController.reconcileData()
│ │ ├─> [IF check] DataValidationThread.checkRows()
│ │ │ ├─> SQLExecutionHelper.simpleSelect()
│ │ │ └─> DataValidationThread.reCheck()
│ │ │ ├─> SQLExecutionHelper.simpleSelect()
│ │ │ ├─> DataProcessingUtils.convertClobToString()
│ │ │ └─> SQLFixGenerationService.generateFixSQL()
│ │ │ ├─> SQLFixGenerationService.generateDeleteSQL()
│ │ │ ├─> SQLFixGenerationService.generateInsertSQL()
│ │ │ └─> SQLFixGenerationService.generateUpdateSQL()
│ │ ├─> [IF compare] ThreadManager.createComparison()
│ │ │ ├─> StagingTableService.createStagingTable()
│ │ │ ├─> Create threads:
│ │ │ │ ├─> DataComparisonThread (source)
│ │ │ │ ├─> DataComparisonThread (target)
│ │ │ │ ├─> DataLoaderThread (source)
│ │ │ │ ├─> DataLoaderThread (target)
│ │ │ │ └─> ObserverThread
│ │ │ ├─> Start all threads
│ │ │ ├─> Wait for completion
│ │ │ └─> StagingTableService.dropStagingTable()
│ │ └─> ResultProcessor.processResults()
│ │ ├─> SQLExecutionHelper.simpleUpdate() [mark missing/not equal]
│ │ └─> SQLExecutionHelper.simpleUpdateReturning() [update counts]
│ └─> RepoController.completeTableHistory()
└─> ReportController.createSummary()
├─> ReportController.generateCompleteReport()
└─> HTMLWriterUtils.writeHtmlReport()
DataComparisonThread.run()
├─> initializeRepositoryConnection()
│ └─> DatabaseConnectionService.getConnection()
├─> initializeSourceTargetConnection()
│ └─> DatabaseConnectionService.getConnection()
├─> Execute SQL query
├─> For each row:
│ ├─> HashingUtils.getMd5() [if not using database hash]
│ ├─> [IF using loader threads] handleLoaderThreadBatch()
│ │ └─> BlockingQueue.put()
│ ├─> [IF direct load] handleDirectDatabaseBatch()
│ │ └─> PreparedStatement.executeLargeBatch()
│ └─> [IF observer checkpoint] handleObserverCoordination()
│ ├─> RepoController.dcrUpdateRowCount()
│ └─> ThreadSync.observerWait()
├─> processRemainingRecords()
│ └─> RepoController.dcrUpdateRowCount()
└─> cleanupResources()
DataLoaderThread.run()
└─> Loop until queue empty:
├─> BlockingQueue.take()
└─> SQLExecutionHelper.simpleUpdate() [load to staging table]
ObserverThread.run()
└─> Loop while not complete:
├─> Execute SQL to match rows
├─> RepoController.dcrUpdateRowCount()
├─> ThreadSync.observerNotify()
└─> [IF complete] RepoController.vacuumRepo()
ApplicationContext.executeAction() [ACTION: discover]
├─> DiscoverController.discoverTables()
│ ├─> DatabaseMetadataService.getTables()
│ │ └─> Execute platform-specific SQL
│ ├─> For each table:
│ │ ├─> TableManagementService.saveTable()
│ │ │ └─> SQLExecutionHelper.simpleUpdateReturningInteger()
│ │ └─> TableManagementService.saveTableMap()
│ │ └─> SQLExecutionHelper.simpleUpdate()
│ └─> SQLExecutionHelper.simpleUpdate() [cleanup]
└─> DiscoverController.discoverColumns()
├─> ColumnDiscoveryService.discoverColumns()
│ ├─> ColumnDiscoveryService.clearPreviousMappings()
│ │ └─> SQLExecutionHelper.simpleUpdate()
│ ├─> ColumnDiscoveryService.discoverColumnsForRole() [target]
│ │ ├─> SQLExecutionHelper.simpleSelect()
│ │ └─> ColumnDiscoveryService.loadColumns()
│ │ ├─> ColumnMetadataUtils.getColumns()
│ │ │ └─> SQLExecutionHelper.simpleSelect()
│ │ ├─> ColumnDiscoveryService.findExistingColumn()
│ │ │ └─> SQLExecutionHelper.simpleSelectReturnInteger()
│ │ ├─> RepoController.saveTableColumn()
│ │ │ └─> SQLExecutionHelper.simpleUpdateReturningInteger()
│ │ └─> RepoController.saveTableColumnMap()
│ │ └─> SQLExecutionHelper.simpleUpdate()
│ └─> ColumnDiscoveryService.discoverColumnsForRole() [source]
│ └─> [Same flow as target]
└─> DisplayOperations.displayResults()
pgCompare (main)
│
▼
CommandLineParser ────> Settings
│
▼
ApplicationContext
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
RepositoryInit DiscoverController CompareController
TableController ReportController
ColumnController
│ │
┌───────────────┼────────────────┐ │
│ │ │ │
▼ ▼ ▼ ▼
TableManagementService ThreadManager ResultProcessor
ColumnManagementService │ │
ColumnDiscoveryService │ │
DatabaseConnectionService │ │
▼ │
┌─────────────────┴──────────────┴──────────┐
│ │
▼ ▼
DataComparisonThread ObserverThread
DataLoaderThread
│
▼
┌──────────┴──────────┐
│ │
▼ ▼
DatabaseMetadataService SQLExecutionHelper
SQLSyntaxService (core.database)
SQLFixGenerationService
│ │
└──────────┬──────────┘
│
▼
┌───────┴───────┐
│ │
▼ ▼
LoggingUtils HashingUtils
ValidationUtils DataProcessingUtils
FileSystemUtils JsonProcessingUtils
ColumnMetadataUtils
DisplayOperations
HTMLWriterUtils
- dc_project - Project configurations
- dc_result - Comparison results summary
- dc_table - Table configurations
- dc_table_column - Column configurations
- dc_table_column_map - Column mappings (source/target)
- dc_table_map - Table mappings (source/target)
- dc_table_history - Table comparison history
- dc_source - Source data staging
- dc_target - Target data staging
- stagingtable_* - Temporary staging tables
RepoControllerprovides abstraction for repository operations- All repository SQL is centralized in
RepoSQLConstants
- Business logic separated into service classes
- Each service has a single responsibility
ThreadManagermanages thread lifecycle- Uses
BlockingQueuefor thread communication ThreadSyncfor thread coordination
ColumnMetadataBuilderbuilds complex column metadata- Uses method chaining for clarity
DatabaseConnectionServicecreates connectionsDatabaseMetadataService.DatabasePlatformenum factory
- Platform-specific SQL via interface constants
- Platform-specific operations via enum methods
┌──────────────────┐ ┌──────────────────┐
│ │ │ │
│ DataComparison │────────>│ BlockingQueue │
│ Thread (Source) │ │ (Source) │
│ │ │ │
└──────────────────┘ └────────┬─────────┘
│
▼
┌──────────────────┐
│ │
│ DataLoader │
│ Thread (Source) │
│ │
└──────────────────┘
│
▼
┌──────────────────┐
│ │
│ Staging Table │
│ (dc_source) │
│ │
└──────────────────┘
│
┌─────────────────┼─────────────────┐
│ │
▼ ▼
┌──────────────────┐ ┌──────────────────┐
│ │◄────────────>│ │
│ Observer Thread │ ThreadSync │ Staging Table │
│ │ │ (dc_target) │
└──────────────────┘ └──────────────────┘
▲
│
[Matches rows, updates counts]
This architecture provides a scalable, multi-threaded solution for comparing large database tables with support for multiple database platforms and comprehensive reporting capabilities.