-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy path10_streaming_write.cpp
More file actions
159 lines (142 loc) · 5.32 KB
/
10_streaming_write.cpp
File metadata and controls
159 lines (142 loc) · 5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#include <openPMD/openPMD.hpp>
#include <algorithm>
#include <iostream>
#include <memory>
#include <numeric> // std::iota
#if openPMD_HAVE_MPI
#include <mpi.h>
#endif
using std::cout;
using namespace openPMD;
int main()
{
#if openPMD_HAVE_ADIOS2
using position_t = double;
auto backends = openPMD::getFileExtensions();
if (std::find(backends.begin(), backends.end(), "sst") == backends.end())
{
std::cout << "SST engine not available in ADIOS2." << std::endl;
return 0;
}
int mpi_rank{0}, mpi_size{1};
#if openPMD_HAVE_MPI
MPI_Init(nullptr, nullptr);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
#endif
// open file for writing
// use QueueFullPolicy = Discard in order to create a situation where from
// the reader's perspective steps are skipped. This tests the bug reported
// in https://github.com/openPMD/openPMD-api/issues/1747.
// Create the Series with linear write access, i.e. one Iteration after
// the other. The alternative would be random-access where multiple
// Iterations can be accessed independently from one another. This more
// restricted mode enables performance optimizations in the backends, and
// more importantly is compatible with streaming I/O.
Series series = Series(
"electrons.sst",
Access::CREATE_LINEAR,
#if openPMD_HAVE_MPI
MPI_COMM_WORLD,
#endif
R"(
{
"adios2": {
"engine": {
"parameters": {
"DataTransport": "WAN",
"QueueFullPolicy": "Discard"
}
}
}
})"
);
Datatype datatype = determineDatatype<position_t>();
constexpr unsigned long length = 10ul;
Extent global_extent = {mpi_size * length};
Dataset dataset = Dataset(datatype, global_extent);
std::shared_ptr<position_t> local_data(
new position_t[length], [](position_t const *ptr) { delete[] ptr; });
auto iterations = series.snapshots();
for (size_t i = 0; i < 100; ++i)
{
Iteration iteration = iterations[i];
Record electronPositions = iteration.particles["e"]["position"];
std::iota(
local_data.get(),
local_data.get() + length,
i * length * mpi_size + mpi_rank * length);
for (auto const &dim : {"x", "y", "z"})
{
RecordComponent pos = electronPositions[dim];
pos.resetDataset(dataset);
pos.storeChunk(local_data, Offset{length * mpi_rank}, {length});
}
// Use the `local_value` ADIOS2 dataset shape to send a dataset not via
// the data plane, but the control plane of ADIOS2 SST. This is
// advisable for datasets where each rank contributes only a single item
// since the control plane performs data aggregation, thus avoiding
// fully interconnected communication meshes for data that needs to be
// read by each reader. A local value dataset can only contain a single
// item per MPI rank, forming an array of length equal to the MPI size.
// https://adios2.readthedocs.io/en/v2.9.2/components/components.html#shapes
auto e_patches = iteration.particles["e"].particlePatches;
auto numParticles = e_patches["numParticles"];
auto numParticlesOffset = e_patches["numParticlesOffset"];
for (auto rc : {&numParticles, &numParticlesOffset})
{
rc->resetDataset(
{Datatype::ULONG,
{Extent::value_type(mpi_size)},
R"(adios2.dataset.shape = "local_value")"});
}
numParticles.storeChunk(
std::make_unique<unsigned long>(10), {size_t(mpi_rank)}, {1});
numParticlesOffset.storeChunk(
std::make_unique<unsigned long>(10 * ((unsigned long)mpi_rank)),
{size_t(mpi_rank)},
{1});
auto offset = e_patches["offset"];
for (auto const &dim : {"x", "y", "z"})
{
auto rc = offset[dim];
rc.resetDataset(
{Datatype::ULONG,
{Extent::value_type(mpi_size)},
R"(adios2.dataset.shape = "local_value")"});
rc.storeChunk(
std::make_unique<unsigned long>((unsigned long)mpi_rank),
{size_t(mpi_rank)},
{1});
}
auto extent = e_patches["extent"];
for (auto const &dim : {"x", "y", "z"})
{
auto rc = extent[dim];
rc.resetDataset(
{Datatype::ULONG,
{Extent::value_type(mpi_size)},
R"(adios2.dataset.shape = "local_value")"});
rc.storeChunk(
std::make_unique<unsigned long>(1), {size_t(mpi_rank)}, {1});
}
iteration.close();
}
/* The files in 'series' are still open until the object is destroyed, on
* which it cleanly flushes and closes all open file handles.
* When running out of scope on return, the 'Series' destructor is called.
* Alternatively, one can call `series.close()` to the same effect as
* calling the destructor, including the release of file handles.
*/
series.close();
#if openPMD_HAVE_MPI
MPI_Finalize();
#endif
return 0;
#else
std::cout << "The streaming example requires that openPMD has been built "
"with ADIOS2."
<< std::endl;
return 0;
#endif
}