Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1877,6 +1877,7 @@ void CompositionalMultiphaseBase::applySourceFluxBC( real64 const time,
localIndex const rankOffset = dofManager.rankOffset();

RAJA::ReduceSum< parallelDeviceReduce, real64 > massProd( 0.0 );
RAJA::ReduceSum< parallelDeviceReduce, localIndex > elementCount( 0 );

// note that the dofArray will not be used after this step (simpler to use dofNumber instead)
FieldSpecificationImpl::computeRhsContribution< FieldSpecificationAdd,
Expand Down Expand Up @@ -1913,7 +1914,8 @@ void CompositionalMultiphaseBase::applySourceFluxBC( real64 const time,
dofNumber,
rhsContributionArrayView,
localRhs,
massProd] GEOS_HOST_DEVICE ( localIndex const a )
massProd,
elementCount] GEOS_HOST_DEVICE ( localIndex const a )
{
// we need to filter out ghosts here, because targetSet may contain them
localIndex const ei = targetSet[a];
Expand All @@ -1924,6 +1926,7 @@ void CompositionalMultiphaseBase::applySourceFluxBC( real64 const time,

real64 const rhsValue = rhsContributionArrayView[a] / sizeScalingFactor; // scale the contribution by the sizeScalingFactor here!
massProd += rhsValue;
elementCount += 1;
if( useTotalMassEquation > 0 )
{
// for all "fluid components", we add the value to the total mass balance equation
Expand All @@ -1948,7 +1951,7 @@ void CompositionalMultiphaseBase::applySourceFluxBC( real64 const time,
// set the new sub-region statistics for this timestep
array1d< real64 > massProdArr{ m_numComponents };
massProdArr[fluidComponentId] = massProd.get();
wrapper.gatherTimeStepStats( time, dt, massProdArr.toViewConst(), targetSet.size() );
wrapper.gatherTimeStepStats( time, dt, massProdArr.toViewConst(), elementCount.get() );
} );
} );
} );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ void SinglePhaseBase::applySourceFluxBC( real64 const time_n,
localIndex const rankOffset = dofManager.rankOffset();

RAJA::ReduceSum< parallelDeviceReduce, real64 > massProd( 0.0 );
RAJA::ReduceSum< parallelDeviceReduce, localIndex > elementCount( 0 );

// note that the dofArray will not be used after this step (simpler to use dofNumber instead)
FieldSpecificationImpl::computeRhsContribution< FieldSpecificationAdd,
Expand Down Expand Up @@ -1096,7 +1097,8 @@ void SinglePhaseBase::applySourceFluxBC( real64 const time_n,
rhsContributionArrayView,
localRhs,
localMatrix,
massProd] GEOS_HOST_DEVICE ( localIndex const a )
massProd,
elementCount] GEOS_HOST_DEVICE ( localIndex const a )
{
// we need to filter out ghosts here, because targetSet may contain them
localIndex const ei = targetSet[a];
Expand All @@ -1111,6 +1113,7 @@ void SinglePhaseBase::applySourceFluxBC( real64 const time_n,
real64 const rhsValue = rhsContributionArrayView[a] / sizeScalingFactor; // scale the contribution by the sizeScalingFactor here!
localRhs[massRowIndex] += rhsValue;
massProd += rhsValue;
elementCount += 1;
//add the value to the energy balance equation if the flux is positive (i.e., it's a producer)
if( rhsContributionArrayView[a] > 0.0 )
{
Expand Down Expand Up @@ -1138,7 +1141,8 @@ void SinglePhaseBase::applySourceFluxBC( real64 const time_n,
dofNumber,
rhsContributionArrayView,
localRhs,
massProd] GEOS_HOST_DEVICE ( localIndex const a )
massProd,
elementCount] GEOS_HOST_DEVICE ( localIndex const a )
{
// we need to filter out ghosts here, because targetSet may contain them
localIndex const ei = targetSet[a];
Expand All @@ -1152,6 +1156,7 @@ void SinglePhaseBase::applySourceFluxBC( real64 const time_n,
real64 const rhsValue = rhsContributionArrayView[a] / sizeScalingFactor;
localRhs[rowIndex] += rhsValue;
massProd += rhsValue;
elementCount += 1;
} );
}

Expand All @@ -1161,7 +1166,7 @@ void SinglePhaseBase::applySourceFluxBC( real64 const time_n,
// set the new sub-region statistics for this timestep
array1d< real64 > massProdArr{ 1 };
massProdArr[0] = massProd.get();
wrapper.gatherTimeStepStats( time_n, dt, massProdArr.toViewConst(), targetSet.size() );
wrapper.gatherTimeStepStats( time_n, dt, massProdArr.toViewConst(), elementCount.get() );
} );
} );
} );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ void SourceFluxStatsAggregator::registerDataOnMesh( Group & meshBodies )
} );
}
} );

if( m_writeCSV > 0 && MpiWrapper::commRank() == 0 )
{
std::ofstream outputFile( m_csvFilename );
TableCSVFormatter const tableStatFormatter( m_csvLayout );
outputFile << tableStatFormatter.headerToString();
outputFile.close();
}
}

void SourceFluxStatsAggregator::gatherStatsForLog( bool logLevelActive,
Expand Down Expand Up @@ -215,9 +223,9 @@ void SourceFluxStatsAggregator::outputStatsToCSV( TableData & csvData )
{
if( m_writeCSV > 0 && MpiWrapper::commRank() == 0 )
{
std::ofstream outputFile( m_csvFilename );
std::ofstream outputFile( m_csvFilename, std::ios::app );
TableCSVFormatter const tableStatFormatter( m_csvLayout );
outputFile << tableStatFormatter.toString( csvData );
outputFile << tableStatFormatter.dataToString( csvData );
outputFile.close();
csvData.clear();
}
Expand All @@ -241,30 +249,31 @@ bool SourceFluxStatsAggregator::execute( real64 const GEOS_UNUSED_PARAM( time_n
{
TableData logData;
TableData csvData;
meshLevelStats.stats() = StatData();
meshLevelStats.stats().reset();
forAllFluxStatsWrappers( meshLevel,
[&] ( MeshLevel &, WrappedStats & fluxStats )
{
fluxStats.stats() = StatData();
fluxStats.stats().reset();
forAllRegionStatsWrappers( meshLevel, fluxStats.getFluxName(),
[&] ( ElementRegionBase & region, WrappedStats & regionStats )
{
regionStats.stats() = StatData();
regionStats.stats().reset();

forAllSubRegionStatsWrappers( region, regionStats.getFluxName(),
[&] ( ElementSubRegionBase &, WrappedStats & subRegionStats )
{
subRegionStats.stats().reset();
subRegionStats.finalizePeriod();
regionStats.stats().combine( subRegionStats.stats() );
regionStats.combine( subRegionStats );
} );
fluxStats.stats().combine( regionStats.stats() );
fluxStats.combine( regionStats );

gatherStatsForLog( regionsStatsOn,
fluxStats.getFluxName(), region.getName(), logData, regionStats );
gatherStatsForCSV( fluxStats.getFluxName(), region.getName(), csvData, regionStats );
} );

meshLevelStats.stats().combine( fluxStats.stats() );
meshLevelStats.combine( fluxStats );

gatherStatsForLog( fluxesStatsOn,
fluxStats.getFluxName(), allRegionsStr, logData, fluxStats );
Expand Down Expand Up @@ -370,7 +379,9 @@ void SourceFluxStatsAggregator::WrappedStats::finalizePeriod()

// produce the period stats of this rank
m_stats.m_elementCount = m_periodStats.m_elementCount;
m_statsPeriodStart = m_periodStats.m_periodStart;
// MPI ranks without cells in the flux region may bypass gatherTimeStepStats() entirely,
// leaving their m_periodStart behind. Take the max so all ranks agree on the most advanced time.
m_statsPeriodStart = MpiWrapper::max( m_periodStats.m_periodStart );
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can they be different over ranks?
Isn't SourceFluxStatistics::gatherTimeStepStats() called synchonously?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for mpi ranks without cells in the region, there is a bypass at CompositionalMultiphaseBase.cpp:1813-1816 - haven't checked single phase, but probably the same thing. This means some of the processes don't get to call SourceFluxStatistics::gatherTimeStepStats() and lag behind. Hence the need for a max.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!
Same thing, I would add a short comment to explain why this max() call.

m_statsPeriodDT = m_periodStats.m_timeStepDeltaTime + m_periodStats.m_periodPendingDeltaTime;

real64 const timeDivisor = m_statsPeriodDT > 0.0 ? 1.0 / m_statsPeriodDT : 0.0;
Expand All @@ -387,6 +398,13 @@ void SourceFluxStatsAggregator::WrappedStats::finalizePeriod()
// start a new timestep
m_periodStats.reset();
}
void SourceFluxStatsAggregator::WrappedStats::combine( WrappedStats const & other )
{
stats().combine( other.stats() );
// Some fluxes may lag behind (their ranks may have skipped gatherTimeStepStats()),
// so take the most advanced period start time across all combined stats.
m_statsPeriodStart = LvArray::math::max( m_statsPeriodStart, other.m_statsPeriodStart );
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't it be LvArray::math::min?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of the processes lag behind because the time doesn't get advanced. So we need to pick the most advanced time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I just would add a short comment to explain this choice.

}
void SourceFluxStatsAggregator::WrappedStats::PeriodStats::allocate( integer phaseCount )
{
if( m_timeStepMass.size() < phaseCount )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ class SourceFluxStatsAggregator final : public FieldStatisticsBase< FlowSolverBa
*/
void finalizePeriod();

/**
* @brief Aggregate the statistics of the instance with those of another one.
* @details Combines the statistics from the other object into this one and also advances the period
* start if the other object has a later time recorded.
* @param other the other WrappedStats object.
*/
void combine( WrappedStats const & other );

/**
* @return the reference to the wrapped stats data collected over the last period (one timestep or more), computed by finalizePeriod()
*/
Expand Down Expand Up @@ -151,7 +159,7 @@ class SourceFluxStatsAggregator final : public FieldStatisticsBase< FlowSolverBa
/// stats data collected over the last period (one timestep or more), computed by finalizePeriod()
StatData m_stats;
/// the start time of the wrapped stats period (in s)
real64 m_statsPeriodStart;
real64 m_statsPeriodStart{-LvArray::NumericLimits< real64 >::max};
/// the duration of the wrapped stats period (in s)
real64 m_statsPeriodDT;

Expand All @@ -169,7 +177,7 @@ class SourceFluxStatsAggregator final : public FieldStatisticsBase< FlowSolverBa
/// time that the current timestep is simulating.
real64 m_timeStepDeltaTime = 0.0;
/// start time of the current period.
real64 m_periodStart = 0.0;
real64 m_periodStart = -LvArray::NumericLimits< real64 >::max;
/// delta time from all previous time-step of the current period.
real64 m_periodPendingDeltaTime = 0.0;
/// number of cell elements targeted by this instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,11 @@ foreach( test_source ${fluidFlow_gtest_tests} )
FOLDER UnitTests )
geos_add_test( NAME ${test_name}
COMMAND ${test_name} )

if( ENABLE_MPI AND NOT ENABLE_CUDA AND NOT ENABLE_HIP )
set( nranks 2 )
geos_add_test( NAME ${test_name}_mpi
COMMAND ${test_name} -x ${nranks}
NUM_MPI_TASKS ${nranks} )
endif()
endforeach()
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "mainInterface/GeosxState.hpp"
#include "physicsSolvers/fluidFlow/SourceFluxStatistics.hpp"
#include "physicsSolvers/fluidFlow/SinglePhaseStatistics.hpp"
#include "common/MpiWrapper.hpp"

#include <gtest/gtest.h>

Expand Down Expand Up @@ -154,15 +155,19 @@ class FlowStatisticsTest : public ::testing::Test

void writeTableFiles( stdMap< string, string > const & files )
{
for( auto const & [fileName, content] : files )
if( MpiWrapper::commRank() == 0 )
{
std::ofstream os( fileName );
ASSERT_TRUE( os.is_open() );
os << content;
os.close();
for( auto const & [fileName, content] : files )
{
std::ofstream os( fileName );
ASSERT_TRUE( os.is_open() );
os << content;
os.close();

m_tableFileNames.push_back( fileName );
m_tableFileNames.push_back( fileName );
}
}
MpiWrapper::barrier();
}

void TearDown() override
Expand Down