Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 85 additions & 23 deletions PCAxis.Serializers/Parquet/ParquetBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
using System.Collections.Generic;
using System.Linq;

using Parquet.Rows;
using Parquet.Data;
using Parquet.Schema;

using PCAxis.Paxiom;
using PCAxis.Paxiom.Operations;

namespace PCAxis.Serializers
namespace PCAxis.Serializers.Parquet
{
public class ParquetBuilder
{
Expand Down Expand Up @@ -50,10 +50,10 @@ private static Dictionary<double, string> BuildDataSymbolMap(PXMeta meta)


/// <summary>
/// Populates the Parquet table based on the PXModel data and metadata.
/// Populates Parquet columns based on the PXModel data and metadata.
/// </summary>
/// <returns>The populated Parquet table.</returns>
public Table PopulateTable()
/// <returns>The populated Parquet data columns.</returns>
public DataColumn[] PopulateColumns()
{
int matrixSize = model.Data.MatrixColumnCount * model.Data.MatrixRowCount;
double[] data = new double[matrixSize];
Expand All @@ -63,50 +63,114 @@ public Table PopulateTable()
.Select(v => v.IsContentVariable && v.Values.Count > 1)
.ToArray();

// Generate one logical row index per output row.
var indices = GenerateDataPointIndices(variableValueCounts, isContentMulti);

for (int m = 0; m < matrixSize; m++)
{
data[m] = model.Data.ReadElement(m);
}

List<DataField> dataFields = CreateDataFields();
ParquetSchema schema = CreateSchema();
DataField[] dataFields = schema.GetDataFields();
Dictionary<string, int> fieldNameToColumnIndex = dataFields.Select((field, idx) => new { field.Name, idx })
.ToDictionary(x => x.Name, x => x.idx);

int rowCount = indices.Count;
// Keep an object buffer per column, then cast to typed arrays in one pass.
var columnBuffers = new object[dataFields.Length][];
for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
{
columnBuffers[columnIndex] = new object[rowCount];
}

// Build each row once and write values directly into their column buffers.
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
var row = PopulateRow(indices[rowIndex], dataFields.Length, variableValueCounts, data, fieldNameToColumnIndex);
for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
{
columnBuffers[columnIndex][rowIndex] = row[columnIndex];
}
}

var columns = new DataColumn[dataFields.Length];

// Convert object buffers to field-typed arrays required by DataColumn.
for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
{
Array typedValues = ConvertToTypedArray(dataFields[columnIndex], columnBuffers[columnIndex]);
columns[columnIndex] = new DataColumn(dataFields[columnIndex], typedValues);
}

return columns;
}

private static Array ConvertToTypedArray(DataField field, object[] values)
{
// DataColumn expects an Array with the exact CLR element type of the field.
Type clrType = field.ClrType;

if (clrType == typeof(double))
{
var result = new double[values.Length];
for (int i = 0; i < values.Length; i++)
{
result[i] = values[i] == null ? default : Convert.ToDouble(values[i]);
}

return result;
}

if (clrType == typeof(string))
{
var result = new string[values.Length];
for (int i = 0; i < values.Length; i++)
{
result[i] = values[i] as string;
}

var table = new Table(dataFields.ToArray());
return result;
}

foreach (var index in indices)
if (clrType == typeof(DateTime))
{
var row = PopulateRow(index, dataFields, variableValueCounts, data);
table.Add(row);
var result = new DateTime[values.Length];
for (int i = 0; i < values.Length; i++)
{
result[i] = values[i] == null ? default : (DateTime)values[i];
}

return result;
}

return table;
throw new NotSupportedException($"Unsupported Parquet field type '{clrType}'.");
}


/// <summary>
/// Creates the Parquet schema fields based on the PXModel metadata.
/// Creates the Parquet schema based on the PXModel metadata.
/// </summary>
/// <returns>The list of Parquet data fields.</returns>
private List<DataField> CreateDataFields()
/// <returns>The Parquet schema.</returns>
private ParquetSchema CreateSchema()
{
List<DataField> dataFields = new List<DataField>();
var dataFields = new List<DataField>();
int variableCount = model.Meta.Variables.Count;

for (int i = 0; i < variableCount; i++)
{
var variable = model.Meta.Variables[i];
if (variable.IsContentVariable && variable.Values.Count > 1)
{
ParquetBuilder.AddContentVariableFields(dataFields, variable);
AddContentVariableFields(dataFields, variable);
}
else
{
ParquetBuilder.AddNonContentVariableFields(dataFields, variable);
AddNonContentVariableFields(dataFields, variable);
}
}

return dataFields;
return new ParquetSchema(dataFields.Cast<Field>());
}


Expand Down Expand Up @@ -161,16 +225,14 @@ private static void AddNonContentVariableFields(List<DataField> dataFields, Vari
/// Populates a single row in the Parquet table based on the specified index and data.
/// </summary>
/// <param name="index">The index representing the position of the row in the PXModel data.</param>
/// <param name="dataFields">The list of Parquet data fields representing the schema.</param>
/// <param name="fieldCount">The number of fields in the row schema.</param>
/// <param name="variableValueCounts">The counts of values for each variable in the model.</param>
/// <param name="data">The array containing the PXModel data.</param>
/// <returns>The populated Parquet row.</returns>
private object[] PopulateRow(int[] index, List<DataField> dataFields, int[] variableValueCounts, double[] data)
private object[] PopulateRow(int[] index, int fieldCount, int[] variableValueCounts, double[] data, Dictionary<string, int> dataFieldIndices)
{
int variableCount = model.Meta.Variables.Count;
var row = new object[dataFields.Count];
Dictionary<string, int> dataFieldIndices = dataFields.Select((field, idx) => new { field.Name, idx })
.ToDictionary(x => x.Name, x => x.idx);
var row = new object[fieldCount];

for (int i = 0; i < variableCount; i++)
{
Expand Down
29 changes: 21 additions & 8 deletions PCAxis.Serializers/ParquetSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
using System.IO;
using System.Linq;
using System.Threading.Tasks;

using Parquet;
using Parquet.Rows;
using Parquet.Data;
using Parquet.Schema;

using PCAxis.Paxiom;
using PCAxis.Serializers.Parquet;

namespace PCAxis.Serializers
{
Expand Down Expand Up @@ -34,19 +37,29 @@ public void Serialize(PXModel model, string path)
public void Serialize(PXModel model, Stream stream)
{
var pb = new ParquetBuilder(model);
var table = pb.PopulateTable();
WriteTableAsync(table, stream).GetAwaiter().GetResult();
var columns = pb.PopulateColumns();
WriteColumnsAsync(columns, stream).GetAwaiter().GetResult();
}

/// <summary>
/// Asynchronously writes the Parquet table to the specified stream.
/// Asynchronously writes Parquet data columns to the specified stream.
/// </summary>
/// <param name="table">The Parquet table to be written.</param>
/// <param name="stream">The stream to write the Parquet table.</param>
/// <param name="columns">The Parquet data columns to be written.</param>
/// <param name="stream">The stream to write the Parquet data.</param>
/// <returns>A Task representing the asynchronous operation.</returns>
private static async Task WriteTableAsync(Table table, Stream stream)
private static async Task WriteColumnsAsync(DataColumn[] columns, Stream stream)
{
await table.WriteAsync(stream);
var schema = new ParquetSchema(columns.Select(column => column.Field).ToList());
using (var writer = await ParquetWriter.CreateAsync(schema, stream))
{
using (var rowGroupWriter = writer.CreateRowGroup())
{
foreach (var column in columns)
{
await rowGroupWriter.WriteColumnAsync(column);
}
}
}
}
}
}
96 changes: 65 additions & 31 deletions UnitTests/Parquet/ParquetSerializationIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

using Microsoft.VisualStudio.TestTools.UnitTesting;

using Parquet.Rows;
using Parquet;
using Parquet.Data;
using Parquet.Schema;

using PCAxis.Paxiom;
using PCAxis.Serializers;
Expand All @@ -32,7 +34,7 @@ public void TestInitialize()

[TestMethod, Description("Tests the serialization of PXModel to Parquet format and its correctness.")]
[DynamicData(nameof(GetPxFilePaths))]
public void ShouldSerializePxModel(string pxFile)
public async Task ShouldSerializePxModel(string pxFile)
{
var model = GetPxModelFromFile(pxFile);

Expand All @@ -41,47 +43,46 @@ public void ShouldSerializePxModel(string pxFile)

SerializePxModelToParquet(model, outputFile);

// Sync wrapper around async call
Table table = ReadBackParquetFileSync(outputFile);
var columns = await ReadBackParquetFile(outputFile);

// Assertion: Ensure that the table's row count equals the number of observations
// for a single ContentsCode. If the model has multiple contents, the serializer
// emits additional content columns rather than duplicating rows.
int contentCount = model.Meta.ContentVariable != null ? model.Meta.ContentVariable.Values.Count : 1;
int expectedRows = model.Data.MatrixSize / contentCount;
Assert.AreEqual(expectedRows, table.Count, $"Mismatch in matrix size for file {fileNameWithoutExtension}.parquet.");
Assert.HasCount(expectedRows, columns[0], $"Mismatch in matrix size for file {fileNameWithoutExtension}.parquet.");

// Assertion: Calculate the amount of columns we should have, based on the metadata
// Number of columns in meta, number of columns in table.

var numberOfColsInPx = CalculateNumberOfColumnsFromPxFile(model);
var numberOfColsInParq = table.Schema.DataFields.Length;

Assert.AreEqual(numberOfColsInParq, numberOfColsInPx, $"Mismatch in column number for {fileNameWithoutExtension}.parquet.");
int numberOfColsInParq = columns.Count;
Assert.AreEqual(numberOfColsInPx, numberOfColsInParq, $"Mismatch in column number for {fileNameWithoutExtension}.parquet.");
}

[TestMethod, Description("Tests correct ordering of time variable (pxfile: 16216.px)")]
[DeploymentItem("TestFiles/14216.px")]
public void TestTimeVariableOrdering()
public async Task TestTimeVariableOrdering()
{
var pxFile = "14216.px";
var model = GetPxModelFromFile(pxFile);
string fileNameWithoutExtension = Path.GetFileNameWithoutExtension(pxFile);
string outputFile = Path.Combine(OutputDirectoryPath, $"{fileNameWithoutExtension}.parquet");
SerializePxModelToParquet(model, outputFile);
Table table = ReadBackParquetFileSync(outputFile);
var columns = await ReadBackParquetFile(outputFile);

Assert.AreEqual(2, table.Count, "Test number of rows");
int rowCount = columns.Count == 0 ? 0 : columns[0].Length;
Assert.AreEqual(2, rowCount, "Test number of rows");

Assert.AreEqual("0801", table[0].Values[0], "Test tettsted");
Assert.AreEqual("2025", table[0].Values[1], "Test year");
Assert.AreEqual(275.87, table[0].Values[3], "Test ContentsCode_Areal");
Assert.AreEqual(double.Parse("1110887"), table[0].Values[5], "Test ContentsCode_Bosatte");
Assert.AreEqual("0801", columns[0][0], "Test tettsted");
Assert.AreEqual("2025", columns[1][0], "Test year");
Assert.AreEqual(275.87, columns[3][0], "Test ContentsCode_Areal");
Assert.AreEqual(double.Parse("1110887"), columns[5][0], "Test ContentsCode_Bosatte");

Assert.AreEqual("0801", table[1].Values[0], "Test tettsted");
Assert.AreEqual("2024", table[1].Values[1], "Test year");
Assert.AreEqual(276.30, table[1].Values[3], "Test ContentsCode_Areal");
Assert.AreEqual(double.Parse("1098061"), table[1].Values[5], "Test ContentsCode_Bosatte");
Assert.AreEqual("0801", columns[0][1], "Test tettsted");
Assert.AreEqual("2024", columns[1][1], "Test year");
Assert.AreEqual(276.30, columns[3][1], "Test ContentsCode_Areal");
Assert.AreEqual(double.Parse("1098061"), columns[5][1], "Test ContentsCode_Bosatte");
}

private static int CalculateNumberOfColumnsFromPxFile(PXModel model)
Expand Down Expand Up @@ -110,15 +111,50 @@ private static int CalculateNumberOfColumnsFromPxFile(PXModel model)
return numberOfCols;
}

private static Task<Table> ReadBackParquetFileAsync(string parquetFile)
private static async Task<List<object[]>> ReadBackParquetFile(string parquetFile)
{
return Table.ReadAsync(parquetFile);
}
using Stream fs = File.OpenRead(parquetFile);
using ParquetReader reader = await ParquetReader.CreateAsync(fs);
ParquetSchema schema = reader.Schema;
DataField[] dataFields = schema.GetDataFields();
int totalRowCount = 0;

// First pass: count rows across all row groups so each column array can be preallocated.
for (int i = 0; i < reader.RowGroupCount; i++)
{
using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(i);
totalRowCount += (int)rowGroupReader.RowCount;
}

// Synchronous wrapper around the asynchronous method
private static Table ReadBackParquetFileSync(string parquetFile)
{
return Task.Run(() => ReadBackParquetFileAsync(parquetFile)).Result;
var columns = new List<object[]>(dataFields.Length);
// Allocate one dense array per column to avoid per-value list growth/conversion.
for (int fieldIndex = 0; fieldIndex < dataFields.Length; fieldIndex++)
{
columns.Add(new object[totalRowCount]);
}

int rowOffset = 0;

// Second pass: read each row group and copy values into final column arrays.
for (int i = 0; i < reader.RowGroupCount; i++)
{
using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(i);
int rowCount = (int)rowGroupReader.RowCount;

for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++)
{
DataColumn columnData = await rowGroupReader.ReadColumnAsync(dataFields[columnIndex]);
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
columns[columnIndex][rowOffset + rowIndex] = columnData.Data.GetValue(rowIndex);
}
}

// Move the write window to the next row-group segment.
rowOffset += rowCount;
}

return columns;
}

private static PXModel GetPxModelFromFile(string pxFile)
Expand All @@ -133,11 +169,9 @@ private static PXModel GetPxModelFromFile(string pxFile)

private static void SerializePxModelToParquet(PXModel model, string outputPath)
{
using (FileStream stream = new FileStream(outputPath, FileMode.Create))
{
var parquetSer = new ParquetSerializer();
parquetSer.Serialize(model, stream);
}
using FileStream stream = new(outputPath, FileMode.Create);
var parquetSer = new ParquetSerializer();
parquetSer.Serialize(model, stream);
}

public static IEnumerable<object[]> GetPxFilePaths()
Expand Down
Loading