diff --git a/PCAxis.Serializers/Parquet/ParquetBuilder.cs b/PCAxis.Serializers/Parquet/ParquetBuilder.cs index 605251e..4d1a97d 100644 --- a/PCAxis.Serializers/Parquet/ParquetBuilder.cs +++ b/PCAxis.Serializers/Parquet/ParquetBuilder.cs @@ -2,13 +2,13 @@ using System.Collections.Generic; using System.Linq; -using Parquet.Rows; +using Parquet.Data; using Parquet.Schema; using PCAxis.Paxiom; using PCAxis.Paxiom.Operations; -namespace PCAxis.Serializers +namespace PCAxis.Serializers.Parquet { public class ParquetBuilder { @@ -50,10 +50,10 @@ private static Dictionary BuildDataSymbolMap(PXMeta meta) /// - /// Populates the Parquet table based on the PXModel data and metadata. + /// Populates Parquet columns based on the PXModel data and metadata. /// - /// The populated Parquet table. - public Table PopulateTable() + /// The populated Parquet data columns. + public DataColumn[] PopulateColumns() { int matrixSize = model.Data.MatrixColumnCount * model.Data.MatrixRowCount; double[] data = new double[matrixSize]; @@ -63,6 +63,7 @@ public Table PopulateTable() .Select(v => v.IsContentVariable && v.Values.Count > 1) .ToArray(); + // Generate one logical row index per output row. var indices = GenerateDataPointIndices(variableValueCounts, isContentMulti); for (int m = 0; m < matrixSize; m++) @@ -70,27 +71,90 @@ public Table PopulateTable() data[m] = model.Data.ReadElement(m); } - List dataFields = CreateDataFields(); + ParquetSchema schema = CreateSchema(); + DataField[] dataFields = schema.GetDataFields(); + Dictionary fieldNameToColumnIndex = dataFields.Select((field, idx) => new { field.Name, idx }) + .ToDictionary(x => x.Name, x => x.idx); + + int rowCount = indices.Count; + // Keep an object buffer per column, then cast to typed arrays in one pass. + var columnBuffers = new object[dataFields.Length][]; + for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++) + { + columnBuffers[columnIndex] = new object[rowCount]; + } + + // Build each row once and write values directly into their column buffers. + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + var row = PopulateRow(indices[rowIndex], dataFields.Length, variableValueCounts, data, fieldNameToColumnIndex); + for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++) + { + columnBuffers[columnIndex][rowIndex] = row[columnIndex]; + } + } + + var columns = new DataColumn[dataFields.Length]; + + // Convert object buffers to field-typed arrays required by DataColumn. + for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++) + { + Array typedValues = ConvertToTypedArray(dataFields[columnIndex], columnBuffers[columnIndex]); + columns[columnIndex] = new DataColumn(dataFields[columnIndex], typedValues); + } + + return columns; + } + + private static Array ConvertToTypedArray(DataField field, object[] values) + { + // DataColumn expects an Array with the exact CLR element type of the field. + Type clrType = field.ClrType; + + if (clrType == typeof(double)) + { + var result = new double[values.Length]; + for (int i = 0; i < values.Length; i++) + { + result[i] = values[i] == null ? default : Convert.ToDouble(values[i]); + } + + return result; + } + + if (clrType == typeof(string)) + { + var result = new string[values.Length]; + for (int i = 0; i < values.Length; i++) + { + result[i] = values[i] as string; + } - var table = new Table(dataFields.ToArray()); + return result; + } - foreach (var index in indices) + if (clrType == typeof(DateTime)) { - var row = PopulateRow(index, dataFields, variableValueCounts, data); - table.Add(row); + var result = new DateTime[values.Length]; + for (int i = 0; i < values.Length; i++) + { + result[i] = values[i] == null ? default : (DateTime)values[i]; + } + + return result; } - return table; + throw new NotSupportedException($"Unsupported Parquet field type '{clrType}'."); } /// - /// Creates the Parquet schema fields based on the PXModel metadata. + /// Creates the Parquet schema based on the PXModel metadata. /// - /// The list of Parquet data fields. - private List CreateDataFields() + /// The Parquet schema. + private ParquetSchema CreateSchema() { - List dataFields = new List(); + var dataFields = new List(); int variableCount = model.Meta.Variables.Count; for (int i = 0; i < variableCount; i++) @@ -98,15 +162,15 @@ private List CreateDataFields() var variable = model.Meta.Variables[i]; if (variable.IsContentVariable && variable.Values.Count > 1) { - ParquetBuilder.AddContentVariableFields(dataFields, variable); + AddContentVariableFields(dataFields, variable); } else { - ParquetBuilder.AddNonContentVariableFields(dataFields, variable); + AddNonContentVariableFields(dataFields, variable); } } - return dataFields; + return new ParquetSchema(dataFields.Cast()); } @@ -161,16 +225,14 @@ private static void AddNonContentVariableFields(List dataFields, Vari /// Populates a single row in the Parquet table based on the specified index and data. /// /// The index representing the position of the row in the PXModel data. - /// The list of Parquet data fields representing the schema. + /// The number of fields in the row schema. /// The counts of values for each variable in the model. /// The array containing the PXModel data. /// The populated Parquet row. - private object[] PopulateRow(int[] index, List dataFields, int[] variableValueCounts, double[] data) + private object[] PopulateRow(int[] index, int fieldCount, int[] variableValueCounts, double[] data, Dictionary dataFieldIndices) { int variableCount = model.Meta.Variables.Count; - var row = new object[dataFields.Count]; - Dictionary dataFieldIndices = dataFields.Select((field, idx) => new { field.Name, idx }) - .ToDictionary(x => x.Name, x => x.idx); + var row = new object[fieldCount]; for (int i = 0; i < variableCount; i++) { diff --git a/PCAxis.Serializers/ParquetSerializer.cs b/PCAxis.Serializers/ParquetSerializer.cs index c58d346..e81f790 100644 --- a/PCAxis.Serializers/ParquetSerializer.cs +++ b/PCAxis.Serializers/ParquetSerializer.cs @@ -1,10 +1,13 @@ using System.IO; +using System.Linq; using System.Threading.Tasks; using Parquet; -using Parquet.Rows; +using Parquet.Data; +using Parquet.Schema; using PCAxis.Paxiom; +using PCAxis.Serializers.Parquet; namespace PCAxis.Serializers { @@ -34,19 +37,29 @@ public void Serialize(PXModel model, string path) public void Serialize(PXModel model, Stream stream) { var pb = new ParquetBuilder(model); - var table = pb.PopulateTable(); - WriteTableAsync(table, stream).GetAwaiter().GetResult(); + var columns = pb.PopulateColumns(); + WriteColumnsAsync(columns, stream).GetAwaiter().GetResult(); } /// - /// Asynchronously writes the Parquet table to the specified stream. + /// Asynchronously writes Parquet data columns to the specified stream. /// - /// The Parquet table to be written. - /// The stream to write the Parquet table. + /// The Parquet data columns to be written. + /// The stream to write the Parquet data. /// A Task representing the asynchronous operation. - private static async Task WriteTableAsync(Table table, Stream stream) + private static async Task WriteColumnsAsync(DataColumn[] columns, Stream stream) { - await table.WriteAsync(stream); + var schema = new ParquetSchema(columns.Select(column => column.Field).ToList()); + using (var writer = await ParquetWriter.CreateAsync(schema, stream)) + { + using (var rowGroupWriter = writer.CreateRowGroup()) + { + foreach (var column in columns) + { + await rowGroupWriter.WriteColumnAsync(column); + } + } + } } } } diff --git a/UnitTests/Parquet/ParquetSerializationIntegrationTests.cs b/UnitTests/Parquet/ParquetSerializationIntegrationTests.cs index 442f3e7..6a0df27 100644 --- a/UnitTests/Parquet/ParquetSerializationIntegrationTests.cs +++ b/UnitTests/Parquet/ParquetSerializationIntegrationTests.cs @@ -5,7 +5,9 @@ using Microsoft.VisualStudio.TestTools.UnitTesting; -using Parquet.Rows; +using Parquet; +using Parquet.Data; +using Parquet.Schema; using PCAxis.Paxiom; using PCAxis.Serializers; @@ -32,7 +34,7 @@ public void TestInitialize() [TestMethod, Description("Tests the serialization of PXModel to Parquet format and its correctness.")] [DynamicData(nameof(GetPxFilePaths))] - public void ShouldSerializePxModel(string pxFile) + public async Task ShouldSerializePxModel(string pxFile) { var model = GetPxModelFromFile(pxFile); @@ -41,47 +43,46 @@ public void ShouldSerializePxModel(string pxFile) SerializePxModelToParquet(model, outputFile); - // Sync wrapper around async call - Table table = ReadBackParquetFileSync(outputFile); + var columns = await ReadBackParquetFile(outputFile); // Assertion: Ensure that the table's row count equals the number of observations // for a single ContentsCode. If the model has multiple contents, the serializer // emits additional content columns rather than duplicating rows. int contentCount = model.Meta.ContentVariable != null ? model.Meta.ContentVariable.Values.Count : 1; int expectedRows = model.Data.MatrixSize / contentCount; - Assert.AreEqual(expectedRows, table.Count, $"Mismatch in matrix size for file {fileNameWithoutExtension}.parquet."); + Assert.HasCount(expectedRows, columns[0], $"Mismatch in matrix size for file {fileNameWithoutExtension}.parquet."); // Assertion: Calculate the amount of columns we should have, based on the metadata // Number of columns in meta, number of columns in table. var numberOfColsInPx = CalculateNumberOfColumnsFromPxFile(model); - var numberOfColsInParq = table.Schema.DataFields.Length; - - Assert.AreEqual(numberOfColsInParq, numberOfColsInPx, $"Mismatch in column number for {fileNameWithoutExtension}.parquet."); + int numberOfColsInParq = columns.Count; + Assert.AreEqual(numberOfColsInPx, numberOfColsInParq, $"Mismatch in column number for {fileNameWithoutExtension}.parquet."); } [TestMethod, Description("Tests correct ordering of time variable (pxfile: 16216.px)")] [DeploymentItem("TestFiles/14216.px")] - public void TestTimeVariableOrdering() + public async Task TestTimeVariableOrdering() { var pxFile = "14216.px"; var model = GetPxModelFromFile(pxFile); string fileNameWithoutExtension = Path.GetFileNameWithoutExtension(pxFile); string outputFile = Path.Combine(OutputDirectoryPath, $"{fileNameWithoutExtension}.parquet"); SerializePxModelToParquet(model, outputFile); - Table table = ReadBackParquetFileSync(outputFile); + var columns = await ReadBackParquetFile(outputFile); - Assert.AreEqual(2, table.Count, "Test number of rows"); + int rowCount = columns.Count == 0 ? 0 : columns[0].Length; + Assert.AreEqual(2, rowCount, "Test number of rows"); - Assert.AreEqual("0801", table[0].Values[0], "Test tettsted"); - Assert.AreEqual("2025", table[0].Values[1], "Test year"); - Assert.AreEqual(275.87, table[0].Values[3], "Test ContentsCode_Areal"); - Assert.AreEqual(double.Parse("1110887"), table[0].Values[5], "Test ContentsCode_Bosatte"); + Assert.AreEqual("0801", columns[0][0], "Test tettsted"); + Assert.AreEqual("2025", columns[1][0], "Test year"); + Assert.AreEqual(275.87, columns[3][0], "Test ContentsCode_Areal"); + Assert.AreEqual(double.Parse("1110887"), columns[5][0], "Test ContentsCode_Bosatte"); - Assert.AreEqual("0801", table[1].Values[0], "Test tettsted"); - Assert.AreEqual("2024", table[1].Values[1], "Test year"); - Assert.AreEqual(276.30, table[1].Values[3], "Test ContentsCode_Areal"); - Assert.AreEqual(double.Parse("1098061"), table[1].Values[5], "Test ContentsCode_Bosatte"); + Assert.AreEqual("0801", columns[0][1], "Test tettsted"); + Assert.AreEqual("2024", columns[1][1], "Test year"); + Assert.AreEqual(276.30, columns[3][1], "Test ContentsCode_Areal"); + Assert.AreEqual(double.Parse("1098061"), columns[5][1], "Test ContentsCode_Bosatte"); } private static int CalculateNumberOfColumnsFromPxFile(PXModel model) @@ -110,15 +111,50 @@ private static int CalculateNumberOfColumnsFromPxFile(PXModel model) return numberOfCols; } - private static Task ReadBackParquetFileAsync(string parquetFile) + private static async Task> ReadBackParquetFile(string parquetFile) { - return Table.ReadAsync(parquetFile); - } + using Stream fs = File.OpenRead(parquetFile); + using ParquetReader reader = await ParquetReader.CreateAsync(fs); + ParquetSchema schema = reader.Schema; + DataField[] dataFields = schema.GetDataFields(); + int totalRowCount = 0; + + // First pass: count rows across all row groups so each column array can be preallocated. + for (int i = 0; i < reader.RowGroupCount; i++) + { + using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(i); + totalRowCount += (int)rowGroupReader.RowCount; + } - // Synchronous wrapper around the asynchronous method - private static Table ReadBackParquetFileSync(string parquetFile) - { - return Task.Run(() => ReadBackParquetFileAsync(parquetFile)).Result; + var columns = new List(dataFields.Length); + // Allocate one dense array per column to avoid per-value list growth/conversion. + for (int fieldIndex = 0; fieldIndex < dataFields.Length; fieldIndex++) + { + columns.Add(new object[totalRowCount]); + } + + int rowOffset = 0; + + // Second pass: read each row group and copy values into final column arrays. + for (int i = 0; i < reader.RowGroupCount; i++) + { + using ParquetRowGroupReader rowGroupReader = reader.OpenRowGroupReader(i); + int rowCount = (int)rowGroupReader.RowCount; + + for (int columnIndex = 0; columnIndex < dataFields.Length; columnIndex++) + { + DataColumn columnData = await rowGroupReader.ReadColumnAsync(dataFields[columnIndex]); + for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + columns[columnIndex][rowOffset + rowIndex] = columnData.Data.GetValue(rowIndex); + } + } + + // Move the write window to the next row-group segment. + rowOffset += rowCount; + } + + return columns; } private static PXModel GetPxModelFromFile(string pxFile) @@ -133,11 +169,9 @@ private static PXModel GetPxModelFromFile(string pxFile) private static void SerializePxModelToParquet(PXModel model, string outputPath) { - using (FileStream stream = new FileStream(outputPath, FileMode.Create)) - { - var parquetSer = new ParquetSerializer(); - parquetSer.Serialize(model, stream); - } + using FileStream stream = new(outputPath, FileMode.Create); + var parquetSer = new ParquetSerializer(); + parquetSer.Serialize(model, stream); } public static IEnumerable GetPxFilePaths()