From 181e135b3c56deaa9b9ea6b6f7bff135f0b78d58 Mon Sep 17 00:00:00 2001 From: frederic ferrandis Date: Thu, 11 Dec 2025 10:14:34 +0100 Subject: [PATCH 1/9] HD-4220: Implement last block size optimisation - separate C implementations in dedicated files - improve encoding and decoding functionality - add new tests - improvements to the encoding and decoding processes --- backend.c | 410 ++++++++++++++++++++++++++++++++++++++++++++ backend.go | 447 ++---------------------------------------------- backend.h | 70 ++++++++ backend_test.go | 360 ++++++++++++++++++++++++++++++++++++-- buffer.go | 176 +++++++++++++++---- buffer_test.go | 178 ++++++++++++++++++- memalign.go | 1 + 7 files changed, 1155 insertions(+), 487 deletions(-) create mode 100644 backend.c create mode 100644 backend.h diff --git a/backend.c b/backend.c new file mode 100644 index 0000000..b4e859d --- /dev/null +++ b/backend.c @@ -0,0 +1,410 @@ +#include +#include +#include +#include +#include +#include "backend.h" + +// shims to make working with frag arrays easier +char **makeStrArray(int n) { return calloc(n, sizeof(char *)); } + +void freeStrArray(char **arr) { free(arr); } + +uint64_t getOrigDataSize(struct fragment_header_s *header) { + return header->meta.orig_data_size; +} +uint32_t getBackendVersion(struct fragment_header_s *header) { + return header->meta.backend_version; +} +ec_backend_id_t getBackendID(struct fragment_header_s *header) { + return header->meta.backend_id; +} +uint32_t getECVersion(struct fragment_header_s *header) { + return header->libec_version; +} +int getHeaderSize() { return sizeof(struct fragment_header_s); } + +// shims because the fragment headers use misaligned fields + +// linearize is used when we have all data fragment. Instead of doing a true +// decoding, we just reassemble all the fragment linearized in a buffer. This is +// mainly a copy of liberasurecode fragment_to_string function, except that we +// won't do any addionnal allocation +// +// /!\ This function does not perform any header checksum validation. +// If fragments must be validated checks 'check_matrix_fragment' +// +// 'k' the number of data fragment used for the encoding. +// 'in' is an array of all data frags, in their index order. +// 'inlen' is the array size +// 'dest' is an already allocated buffer where data will be linearized +// 'destlen' is the buffer size, and hence, the maximum number of bytes +// linearized 'outlen' is a pointer containing the number of bytes really +// linearized in dest (always lower or equal to destlen) it returns dest if +// nothing went wrong, else null +char *linearize(int k, char **in, int inlen, char *dest, uint64_t destlen, + uint64_t *outlen) { + int i; + int curr_idx = 0; + int orig_data_size = -1; + + if (dest == NULL || outlen == NULL) { + return NULL; + } + + // The following perform small correctness checks before linearizing + // the buffer + int previous_idx = -1; + for (i = 0; i < inlen; i++) { + int index = get_fragment_idx(in[i]); + + if (orig_data_size < 0) { + orig_data_size = get_orig_data_size(in[i]); + } else if (get_orig_data_size(in[i]) != orig_data_size) { + return NULL; + } + + if (index >= k) { + return NULL; + } + + // Checks that the fragments are sorted. + if (previous_idx > index) { + return NULL; + } + previous_idx = index; + } + + // compute the number of bytes needed for the output + int tocopy = orig_data_size; + int string_off = 0; + *outlen = orig_data_size; + if (destlen < orig_data_size) { + *outlen = destlen; + tocopy = destlen; + } + + // copy in an ordered way all bytes of fragments in the buffer + for (i = 0; i < inlen && tocopy > 0; i++) { + char *f = get_data_ptr_from_fragment(in[i]); + int fsize = get_fragment_payload_size(in[i]); + int psize = tocopy > fsize ? fsize : tocopy; + memcpy(dest + string_off, f, psize); + tocopy -= psize; + string_off += psize; + } + return dest; +} + +bool check_matrix_fragment(char *frag, int frag_len, int piecesize) { + size_t offset = 0; + + bool aligned = (frag_len % (piecesize + getHeaderSize())) == 0; + if (!aligned) { + return false; + } + + while (offset < frag_len) { + if (is_invalid_fragment_header((fragment_header_t *)&frag[offset])) { + return false; + } + offset += piecesize + getHeaderSize(); + } + + return true; +} + +static inline void *alloc_data(size_t len) { + void *buf; + if (posix_memalign(&buf, 16, len) != 0) { + return NULL; + } + memset(buf, 0, len); + return buf; +} + +static inline void dealloc_data(void *pt, size_t len) { free(pt); } + +// instead of encoding K blocks of data, we divide and subencode blocks of +// 'piecesize' bytes. +// 'desc' : liberasurecode handle +// 'data' : the whole data to encode +// 'datalen' : the datalen +// 'piecesize' : the size of little blocks used for encoding +// 'ctx' : contains informations such as the ECN schema (see below) +// +void encode_chunk_prepare(int desc, char *data, int datalen, int piecesize, + struct encode_chunk_context *ctx) { + ctx->instance = liberasurecode_backend_instance_get_by_desc(desc); + int i; + const int k = ctx->instance->args.uargs.k; + const int m = ctx->instance->args.uargs.m; + + // here we compute the number of (k) subgroup of 'piecesize' bytes we can + // create + int block_size = piecesize * k; + ctx->number_of_subgroup = datalen / block_size; + if (ctx->number_of_subgroup * block_size != datalen) { + ctx->number_of_subgroup++; + } + + // Note: last chunk may be smaller than piecesize + ctx->chunk_size = piecesize; + + ctx->k = k; + ctx->m = m; + + ctx->datas = calloc(ctx->k, sizeof(char *)); + ctx->codings = calloc(ctx->m, sizeof(char *)); + ctx->frags_len = + (sizeof(fragment_header_t) + piecesize) * ctx->number_of_subgroup; + + for (i = 0; i < ctx->k; ++i) { + ctx->datas[i] = alloc_data(ctx->frags_len); + } + + for (i = 0; i < ctx->m; ++i) { + ctx->codings[i] = alloc_data(ctx->frags_len); + } +} + +// return real size of fragment header size +size_t get_fragment_header_size() { return sizeof(fragment_header_t); } + +int encode_chunk(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int nth); + +int encode_chunk_all(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int max) { + int i; + for (i = 0; i < max; i++) { + int err = encode_chunk(desc, data, datalen, ctx, i); + if (err != 0) { + return err; + } + } + return 0; +} + +// encode_chunk will encode a subset of the fragments data. +// It has to be considered that all the datas will not be divided in K blocks, +// but instead, they will be divided in N sub-blocks of K*chunksize fragments +// [-------------------data---------------------] +// {s1a | s1b | s1c | s1d}{s2a | s2b | s2c |d2d } +// fragment1 => [header1a|s1a|header2a|s2a] +// fragment2 => [header1b]s1b|header2b|s2b] +// fragment3 => [header1c]s1c|header2c|s2c] +// fragment4 => [header1d]s1d|header2d|s2d] +// this mapping will let be more efficient against get range pattern (when we +// are only interesting in having a small subset of data) especially when a +// whole fragment will be missing +int encode_chunk(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int nth) { + ec_backend_t ec = ctx->instance; + char *k_ref[ctx->k]; + char *m_ref[ctx->m]; + + int one_cell_size = sizeof(fragment_header_t) + ctx->chunk_size; + int i, ret; + char const *const dataend = data + datalen; + char *dataoffset = data + (ctx->k * nth) * ctx->chunk_size; + if (nth >= ctx->number_of_subgroup) { + return -1; + } + + // Do the mapping as described above + int tot_len_sum = 0; + for (i = 0; i < ctx->k; i++) { + char *ptr = &ctx->datas[i][nth * one_cell_size]; + fragment_header_t *hdr = (fragment_header_t *)ptr; + hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; + ptr = (char *)(hdr + 1); + if (dataoffset < dataend) { + int len_to_copy = ctx->chunk_size; + if (len_to_copy > dataend - dataoffset) { + len_to_copy = dataend - dataoffset; + } + tot_len_sum += len_to_copy; + memcpy(ptr, dataoffset, len_to_copy); + } + dataoffset += ctx->chunk_size; + k_ref[i] = ptr; + } + + for (i = 0; i < ctx->m; i++) { + char *ptr = &ctx->codings[i][nth * one_cell_size]; + fragment_header_t *hdr = (fragment_header_t *)ptr; + hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; + ptr = (char *)(hdr + 1); + m_ref[i] = ptr; + } + + // do the true encoding according the backend used (isa-l, cauchy ....) + ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, + ctx->chunk_size); + if (ret < 0) { + return -1; + } + + // fill the headers with true len, fragment len .... + ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, ctx->chunk_size, + tot_len_sum, k_ref, m_ref); + if (ret < 0) { + return -1; + } + return 0; +} + +int my_liberasurecode_encode_cleanup(int desc, size_t len, char **encoded_data, + char **encoded_parity) { + int i, k, m; + + ec_backend_t instance = liberasurecode_backend_instance_get_by_desc(desc); + if (NULL == instance) { + return -EBACKENDNOTAVAIL; + } + + k = instance->args.uargs.k; + m = instance->args.uargs.m; + + if (encoded_data) { + for (i = 0; i < k; i++) { + dealloc_data(encoded_data[i], len); + } + + free(encoded_data); + } + + if (encoded_parity) { + for (i = 0; i < m; i++) { + dealloc_data(encoded_parity[i], len); + } + free(encoded_parity); + } + + return 0; +} + +// Prepare memory, allocating stuff. +// Suitable for "buffermatrix": no data fragments allocated. +// Will also init chunk_size and number_of_subgroup +void encode_chunk_buffermatrix_prepare(int desc, char *data, int datalen, + int piecesize, int frags_len, + int number_of_subgroup, + struct encode_chunk_context *ctx) { + ctx->instance = liberasurecode_backend_instance_get_by_desc(desc); + int i; + const int k = ctx->instance->args.uargs.k; + const int m = ctx->instance->args.uargs.m; + + ctx->number_of_subgroup = number_of_subgroup; + + // Note: last subgroup may be smaller than the others + ctx->chunk_size = piecesize; + + ctx->k = k; + ctx->m = m; + + ctx->codings = calloc(ctx->m, sizeof(char *)); + ctx->frags_len = frags_len; + + for (i = 0; i < ctx->m; ++i) { + ctx->codings[i] = alloc_data(ctx->frags_len); + } +} + +// Encode a chunk using a buffer matrix as an input +// Same as above with the twist that data is not copied and can be directly +int encode_chunk_buffermatrix(int desc, + char *data, + int datalen, + int nbFrags, + struct encode_chunk_context *ctx, + int nth, + size_t fraglen) { + ec_backend_t ec = ctx->instance; + char *k_ref[ctx->k]; + char *m_ref[ctx->m]; + int one_cell_size = sizeof(fragment_header_t) + ctx->chunk_size; + int i, ret; + int tot_len_sum = 0; + + if (nth >= ctx->number_of_subgroup) { + return -1; + } + + // Create the array of "data" fragments + // No copy, just prepare the header + for (i = 0; i < ctx->k; i++) { + k_ref[i] = data + (nth + nbFrags * i) * one_cell_size; + fragment_header_t *hdr = (fragment_header_t *)k_ref[i]; + hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; + char *ptr = (char *)(hdr + 1); + k_ref[i] = ptr; + + // Computes actual data in the fragment + // If we are at the end of the data, we may have a smaller + // fragment than the others. fraglen will take of that. + int size = datalen - ((nth * ctx->k * ctx->chunk_size) + (i * fraglen)); + tot_len_sum += size > 0 ? (size > fraglen ? fraglen : size) : 0; + } + + // "coding" fragments. Those ones are allocated above + for (i = 0; i < ctx->m; i++) { + char *ptr = &ctx->codings[i][nth * one_cell_size]; + fragment_header_t *hdr = (fragment_header_t *)ptr; + hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; + ptr = (char *)(hdr + 1); + m_ref[i] = ptr; + } + + // do the true encoding according the backend used (isa-l, cauchy ....) + ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, + fraglen); + if (ret < 0) { + return -1; + } + + ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, fraglen, + tot_len_sum, k_ref, m_ref); + if (ret < 0) { + return -1; + } + return 0; +} + +// Helper function to compute everything in one go +int encode_chunk_buffermatrix_all(int desc, char *data, int datalen, + int nbfrags, struct encode_chunk_context *ctx, + int max) { + int i; + + for (i = 0; i < max; i++) { + int err = encode_chunk_buffermatrix(desc, data, datalen, nbfrags, ctx, i, ctx->chunk_size); + if (err != 0) { + return err; + } + } + return 0; +} + +int my_liberasurecode_encode_buffermatrix_cleanup(int desc, size_t len, + char **encoded_data, + char **encoded_parity) { + ec_backend_t instance = liberasurecode_backend_instance_get_by_desc(desc); + if (NULL == instance) { + return -EBACKENDNOTAVAIL; + } + + const int m = instance->args.uargs.m; + + if (encoded_parity) { + int i; + for (i = 0; i < m; i++) { + dealloc_data(encoded_parity[i], len); + } + } + free(encoded_parity); + + return 0; +} diff --git a/backend.go b/backend.go index 6e6cdee..b556486 100644 --- a/backend.go +++ b/backend.go @@ -3,413 +3,7 @@ package erasurecode /* #cgo pkg-config: erasurecode-1 #include -#include -#include -#include - -// shims to make working with frag arrays easier -char ** makeStrArray(int n) { return calloc(n, sizeof (char *)); } -void freeStrArray(char ** arr) { free(arr); } - -// shims because the fragment headers use misaligned fields -uint64_t getOrigDataSize(struct fragment_header_s *header) { return header->meta.orig_data_size; } -uint32_t getBackendVersion(struct fragment_header_s *header) { return header->meta.backend_version; } -ec_backend_id_t getBackendID(struct fragment_header_s *header) { return header->meta.backend_id; } -uint32_t getECVersion(struct fragment_header_s *header) { return header->libec_version; } -int getHeaderSize() { return sizeof(struct fragment_header_s); } - -// linearize is used when we have all data fragment. Instead of doing a true decoding, we just -// reassemble all the fragment linearized in a buffer. This is mainly a copy of liberasurecode -// fragment_to_string function, except that we won't do any addionnal allocation -// -// /!\ This function does not perform any header checksum validation. -// If fragments must be validated checks 'check_matrix_fragment' -// -// 'k' the number of data fragment used for the encoding. -// 'in' is an array of all data frags, in their index order. -// 'inlen' is the array size -// 'dest' is an already allocated buffer where data will be linearized -// 'destlen' is the buffer size, and hence, the maximum number of bytes linearized -// 'outlen' is a pointer containing the number of bytes really linearized in dest (always lower or equal to destlen) -// it returns dest if nothing went wrong, else null -char* linearize(int k, char **in, int inlen, char *dest, uint64_t destlen, uint64_t *outlen) { - int i; - int curr_idx = 0; - int orig_data_size = -1; - - if (dest == NULL || outlen == NULL) { - return NULL; - } - - // The following perform small correctness checks before linearizing - // the buffer - int previous_idx = -1; - for (i = 0; i < inlen; i++) { - int index = get_fragment_idx(in[i]); - - if (orig_data_size < 0) { - orig_data_size = get_orig_data_size(in[i]); - } else if(get_orig_data_size(in[i]) != orig_data_size) { - return NULL; - } - - if (index >= k) { - return NULL; - } - - // Checks that the fragments are sorted. - if (previous_idx > index) { - return NULL; - } - previous_idx = index; - } - - // compute the number of bytes needed for the output - int tocopy = orig_data_size; - int string_off = 0; - *outlen = orig_data_size; - if(destlen < orig_data_size) { - *outlen = destlen; - tocopy = destlen; - } - - // copy in an ordered way all bytes of fragments in the buffer - for (i = 0; i < inlen && tocopy > 0; i++) { - char *f = get_data_ptr_from_fragment(in[i]); - int fsize = get_fragment_payload_size(in[i]); - int psize = tocopy > fsize ? fsize : tocopy; - memcpy(dest + string_off, f, psize); - tocopy -= psize; - string_off += psize; - } - return dest; -} - -bool check_matrix_fragment(char *frag, int frag_len, int piecesize) { - size_t offset = 0; - - bool aligned = (frag_len % (piecesize + getHeaderSize())) == 0; - if (!aligned) { - return false; - } - - while (offset < frag_len) { - if (is_invalid_fragment_header((fragment_header_t*)&frag[offset])) { - return false; - } - offset += piecesize + getHeaderSize(); - } - - return true; -} - - -struct encode_chunk_context { - ec_backend_t instance; // backend instance - char **datas; // the K datas - char **codings; // the M codings - unsigned int number_of_subgroup; // number of subchunk in each K part - unsigned int chunk_size; // datasize of each subchunk - unsigned int frags_len; // allocating size of each K+M objects - int blocksize; // k-bounds of data - int k; - int m; -}; - -static inline void *alloc_data(size_t len) { - void *buf; - if (posix_memalign(&buf, 16, len) != 0) { - return NULL; - } - memset(buf, 0, len); - return buf; -} - -static inline void dealloc_data(void *pt, size_t len) { - free(pt); -} - -// instead of encoding K blocks of data, we divide and subencode blocks of -// 'piecesize' bytes. -// 'desc' : liberasurecode handle -// 'data' : the whole data to encode -// 'datalen' : the datalen -// 'piecesize' : the size of little blocks used for encoding -// 'ctx' : contains informations such as the ECN schema (see below) -// -void encode_chunk_prepare(int desc, - char *data, - int datalen, - int piecesize, - struct encode_chunk_context *ctx) -{ - ctx->instance = liberasurecode_backend_instance_get_by_desc(desc); - int i; - const int k = ctx->instance->args.uargs.k; - const int m = ctx->instance->args.uargs.m; - - // here we compute the number of (k) subgroup of 'piecesize' bytes we can create - int block_size = piecesize * k; - ctx->number_of_subgroup = datalen / block_size; - if(ctx->number_of_subgroup * block_size != datalen) { - ctx->number_of_subgroup++; - } - - ctx->chunk_size = piecesize; - - ctx->k = k; - ctx->m = m; - - ctx->datas = calloc(ctx->k, sizeof(char*)); - ctx->codings = calloc(ctx->m, sizeof(char*)); - ctx->frags_len = (sizeof(fragment_header_t) + piecesize) * ctx->number_of_subgroup; - - for (i = 0; i < ctx->k; ++i) { - ctx->datas[i] = alloc_data(ctx->frags_len); - } - - for (i = 0; i < ctx->m; ++i) { - ctx->codings[i] = alloc_data(ctx->frags_len); - } -} - -// return real size of fragment header size -size_t get_fragment_header_size() { - return sizeof(fragment_header_t); -} - -int encode_chunk(int desc, char *data, int datalen, struct encode_chunk_context *ctx, int nth); - -int encode_chunk_all(int desc, char *data, int datalen, struct encode_chunk_context *ctx, int max) { - int i; - for (i = 0; i < max ; i++) { - int err = encode_chunk(desc, data, datalen, ctx, i); - if (err != 0) { - return err; - } - } - return 0; -} - -// encode_chunk will encode a subset of the fragments data. -// It has to be considered that all the datas will not be divided in K blocks, but instead, -// they will be divided in N sub-blocks of K*chunksize fragments -// [-------------------data---------------------] -// {s1a | s1b | s1c | s1d}{s2a | s2b | s2c |d2d } -// fragment1 => [header1a|s1a|header2a|s2a] -// fragment2 => [header1b]s1b|header2b|s2b] -// fragment3 => [header1c]s1c|header2c|s2c] -// fragment4 => [header1d]s1d|header2d|s2d] -// this mapping will let be more efficient against get range pattern (when we are only interesting in -// having a small subset of data) especially when a whole fragment will be missing -int encode_chunk(int desc, char *data, int datalen, struct encode_chunk_context *ctx, int nth) -{ - ec_backend_t ec = ctx->instance; - char *k_ref[ctx->k]; - char *m_ref[ctx->m]; - - int one_cell_size = sizeof(fragment_header_t) + ctx->chunk_size; - int i, ret; - char const *const dataend = data + datalen; - char *dataoffset = data + (ctx->k * nth) * ctx->chunk_size; - if (nth >= ctx->number_of_subgroup) { - return -1; - } - - // Do the mapping as described above - int tot_len_sum = 0; - for (i = 0; i < ctx->k; i++) { - char *ptr = &ctx->datas[i][nth * one_cell_size]; - fragment_header_t *hdr = (fragment_header_t*)ptr; - hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; - ptr = (char*) (hdr + 1); - if(dataoffset < dataend) { - int len_to_copy = ctx->chunk_size; - if (len_to_copy > dataend - dataoffset) { - len_to_copy = dataend - dataoffset; - } - tot_len_sum += len_to_copy; - memcpy(ptr, dataoffset, len_to_copy); - } - dataoffset += ctx->chunk_size; - k_ref[i] = ptr; - } - - for (i = 0; i < ctx->m; i++) { - char *ptr = &ctx->codings[i][nth * one_cell_size]; - fragment_header_t *hdr = (fragment_header_t*)ptr; - hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; - ptr = (char*) (hdr + 1); - m_ref[i] = ptr; - } - - // do the true encoding according the backend used (isa-l, cauchy ....) - ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, ctx->chunk_size); - if (ret < 0) { - fprintf(stderr, "error encode ret = %d\n", ret); - return -1; - } - - // fill the headers with true len, fragment len .... - ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, ctx->chunk_size, tot_len_sum, k_ref, m_ref); - if (ret < 0) { - fprintf(stderr, "error encode ret = %d\n", ret); - return -1; - } - return 0; -} - -int my_liberasurecode_encode_cleanup(int desc, - size_t len, - char **encoded_data, - char **encoded_parity) -{ - int i, k, m; - - ec_backend_t instance = liberasurecode_backend_instance_get_by_desc(desc); - if (NULL == instance) { - return -EBACKENDNOTAVAIL; - } - - k = instance->args.uargs.k; - m = instance->args.uargs.m; - - if (encoded_data) { - for (i = 0; i < k; i++) { - dealloc_data(encoded_data[i], len); - } - - free(encoded_data); - } - - if (encoded_parity) { - for (i = 0; i < m; i++) { - dealloc_data(encoded_parity[i], len); - } - free(encoded_parity); - } - - return 0; -} - -// Prepare memory, allocating stuff. Suitable for "buffermatrix": no data fragments allocated. -void encode_chunk_buffermatrix_prepare(int desc, - char *data, - int datalen, - int piecesize, - int frags_len, - int number_of_subgroup, - struct encode_chunk_context *ctx) -{ - ctx->instance = liberasurecode_backend_instance_get_by_desc(desc); - int i; - const int k = ctx->instance->args.uargs.k; - const int m = ctx->instance->args.uargs.m; - - ctx->number_of_subgroup = number_of_subgroup; - - ctx->chunk_size = piecesize; - - ctx->k = k; - ctx->m = m; - - ctx->codings = calloc(ctx->m, sizeof(char*)); - ctx->frags_len = frags_len; - - for (i = 0; i < ctx->m; ++i) { - ctx->codings[i] = alloc_data(ctx->frags_len); - } -} - -// Encode a chunk using a buffer matrix as an input -// Same as above with the twist that data is not copied and can be directly -static int encode_chunk_buffermatrix(int desc, char *data, int datalen, int nbFrags, struct encode_chunk_context *ctx, int nth) -{ - ec_backend_t ec = ctx->instance; - char *k_ref[ctx->k]; - char *m_ref[ctx->m]; - int one_cell_size = sizeof(fragment_header_t) + ctx->chunk_size; - int i, ret; - int tot_len_sum = 0; - - if (nth >= ctx->number_of_subgroup) { - return -1; - } - - // Create the array of "data" fragments - // No copy, just prepare the header - for (i = 0 ; i < ctx->k ; i ++) { - k_ref[i] = data + (nth + nbFrags * i) * one_cell_size; - fragment_header_t *hdr = (fragment_header_t*)k_ref[i]; - hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; - char *ptr = (char*) (hdr + 1); - k_ref[i] = ptr; - - // Computes actual data in the fragment - int size = datalen - (nth * ctx->k + i) * ctx->chunk_size; - tot_len_sum += size > 0 ? (size > ctx->chunk_size ? ctx->chunk_size: size) : 0; - } - - // "coding" fragments. Those ones are allocated above - for (i = 0; i < ctx->m; i++) { - char *ptr = &ctx->codings[i][nth * one_cell_size]; - fragment_header_t *hdr = (fragment_header_t*)ptr; - hdr->magic = LIBERASURECODE_FRAG_HEADER_MAGIC; - ptr = (char*) (hdr + 1); - m_ref[i] = ptr; - } - - // do the true encoding according the backend used (isa-l, cauchy ....) - ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, ctx->chunk_size); - if (ret < 0) { - fprintf(stderr, "error encode ret = %d\n", ret); - return -1; - } - - ret = finalize_fragments_after_encode(ec, ctx->k, ctx->m, ctx->chunk_size, tot_len_sum, k_ref, m_ref); - if (ret < 0) { - fprintf(stderr, "error encode ret = %d\n", ret); - return -1; - } - return 0; -} - -// Helper function to compute everything in one go -int encode_chunk_buffermatrix_all(int desc, char *data, int datalen, int nbfrags, struct encode_chunk_context *ctx, int max) { - int i; - - for (i = 0; i < max ; i++) { - int err = encode_chunk_buffermatrix(desc, data, datalen, nbfrags, ctx, i); - if (err != 0) { - return err; - } - } - return 0; -} - -int my_liberasurecode_encode_buffermatrix_cleanup(int desc, - size_t len, - char **encoded_data, - char **encoded_parity) -{ - ec_backend_t instance = liberasurecode_backend_instance_get_by_desc(desc); - if (NULL == instance) { - return -EBACKENDNOTAVAIL; - } - - const int m = instance->args.uargs.m; - - if (encoded_parity) { - int i; - for (i = 0; i < m; i++) { - dealloc_data(encoded_parity[i], len); - } - } - free(encoded_parity); - - return 0; -} - +#include "backend.h" */ import "C" import ( @@ -528,7 +122,7 @@ type pool struct { // Assuming a splitSize around 2MiB or less const maxBuffer int = 2 * 1024 * 1024 -func (p *pool) New(size int) (interface{}, []byte) { +func (p *pool) New(size int) (any, []byte) { if size <= p.max { b := p.p.Get().(*bytes.Buffer) return b, b.Bytes() @@ -537,7 +131,7 @@ func (p *pool) New(size int) (interface{}, []byte) { return nil, make([]byte, size) } -func (p *pool) Release(b interface{}) { +func (p *pool) Release(b any) { if b != nil { p.p.Put(b) } @@ -545,7 +139,7 @@ func (p *pool) Release(b interface{}) { var globalPool = &pool{ p: sync.Pool{ - New: func() interface{} { + New: func() any { return bytes.NewBuffer(make([]byte, maxBuffer)) }}, max: maxBuffer, @@ -591,25 +185,13 @@ func InitBackend(params Params) (Backend, error) { } else { backend.pool = &pool{ p: sync.Pool{ - New: func() interface{} { + New: func() any { return bytes.NewBuffer(make([]byte, params.MaxBlockSize)) }}, max: params.MaxBlockSize, } } - // Workaround on init bug of Jerasure - // Apparently, jerasure will crash if the - // first encode is done concurrently with other encode. - res, err := backend.Encode(bytes.Repeat([]byte("1"), 1000)) - - if err != nil { - backend.Close() - return Backend{}, err - } - - res.Free() - return backend, nil } @@ -675,6 +257,8 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize nbFrags := C.int(bm.SubGroups()) + // This prepares the context for encoding + // It allocates the coding fragments (not the data fragments) C.encode_chunk_buffermatrix_prepare(backend.libecDesc, pData, pDataLen, cChunkSize, C.int(bm.FragLen()), nbFrags, &ctx) @@ -684,7 +268,14 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize for i := 0; i < int(ctx.number_of_subgroup); i++ { go func(nth int) { - r := C.encode_chunk_buffermatrix(backend.libecDesc, pData, pDataLen, nbFrags, &ctx, C.int(nth)) + fragLen := C.size_t(chunkSize) + // last subgroup has a different size + if i == int(ctx.number_of_subgroup)-1 { + fragLen = C.size_t(bm.FragLenLastSubGroup()) + } + r := C.encode_chunk_buffermatrix(backend.libecDesc, pData, pDataLen, + nbFrags, &ctx, C.int(nth), fragLen) + if r < 0 { atomic.AddUint32(&errCounter, 1) } @@ -851,7 +442,7 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int /* Fragments are sorted beforehand with the index of the first chunk. All chunks of a fragments share the same index. */ fragsIndex := make([]int, len(frags)) - for i := 0; i < len(frags); i += 1 { + for i := 0; i < len(frags); i++ { fragsIndex[i] = i } @@ -892,7 +483,7 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int return nil, errors.New("gaps in the provided fragments") } - lastDataFragIdxExcl += 1 + lastDataFragIdxExcl++ previousFragIdx = idx } fragsIndex = fragsIndex[:lastDataFragIdxExcl] @@ -955,7 +546,7 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int // DecodeMatrix tries to reconstruct the data fragments and returns the linearized data. func (backend *Backend) DecodeMatrix(frags []ValidatedFragment, pieceSize int) (*DecodeData, error) { if len(frags) == 0 { - return nil, errors.New("Decoding requires at least one fragment") + return nil, errors.New("decoding requires at least one fragment") } fragRangeLen := len(frags[0]) @@ -1039,7 +630,7 @@ type RangeMatrix struct { * the relevant requested range. The decoded buffer * will look like [p1 p2 p3 p4(*) p1(*) p2(*) p3 p4]. * Chunks not marked with a '*' are discarded. Note how - * the heading and trailing is unecessary and could be + * the heading and trailing is unnecessary and could be * discarded in the ideal case. * * Perfect cases occur when the request span a single group (column): diff --git a/backend.h b/backend.h new file mode 100644 index 0000000..fa6eb7d --- /dev/null +++ b/backend.h @@ -0,0 +1,70 @@ +#ifndef BACKEND_H +#define BACKEND_H + +#include +#include +#include + + +struct encode_chunk_context { + ec_backend_t instance; // backend instance + char **datas; // the K datas + char **codings; // the M codings + unsigned int number_of_subgroup; // number of subchunk in each K part + unsigned int chunk_size; // datasize of each subchunk + unsigned int frags_len; // allocating size of each K+M objects + int blocksize; // k-bounds of data + int k; + int m; + }; + +char **makeStrArray(int n); + +void freeStrArray(char **arr); + +uint64_t getOrigDataSize(struct fragment_header_s *header); + +uint32_t getBackendVersion(struct fragment_header_s *header); + +ec_backend_id_t getBackendID(struct fragment_header_s *header); + +uint32_t getECVersion(struct fragment_header_s *header); + +int getHeaderSize(void); + +char *linearize(int k, char **in, int inlen, char *dest, uint64_t destlen, + uint64_t *outlen); + +bool check_matrix_fragment(char *frag, int frag_len, int piecesize); + +void encode_chunk_prepare(int desc, char *data, int datalen, int piecesize, + struct encode_chunk_context *ctx); + +size_t get_fragment_header_size(void); + +int encode_chunk(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int nth); + +int encode_chunk_all(int desc, char *data, int datalen, + struct encode_chunk_context *ctx, int max); + +int my_liberasurecode_encode_cleanup(int desc, size_t len, char **encoded_data, + char **encoded_parity); + +void encode_chunk_buffermatrix_prepare(int desc, char *data, int datalen, + int piecesize, int frags_len, + int number_of_subgroup, + struct encode_chunk_context *ctx); + +int encode_chunk_buffermatrix_all(int desc, char *data, int datalen, + int nbfrags, struct encode_chunk_context *ctx, + int max); + +int encode_chunk_buffermatrix(int desc, char *data, int datalen, int nbFrags, + struct encode_chunk_context *ctx, int nth, size_t frag_len); + +int my_liberasurecode_encode_buffermatrix_cleanup(int desc, size_t len, + char **encoded_data, + char **encoded_parity); + +#endif \ No newline at end of file diff --git a/backend_test.go b/backend_test.go index 5645367..8ec9769 100644 --- a/backend_test.go +++ b/backend_test.go @@ -3,7 +3,9 @@ package erasurecode import ( "bytes" cryptorand "crypto/rand" + "encoding/binary" "fmt" + "io" "math/rand" "reflect" "strings" @@ -12,6 +14,7 @@ import ( "testing/quick" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var validParams = []Params{ @@ -510,7 +513,7 @@ func TestGC(t *testing.T) { wg.Wait() }) } - backend.Close() + _ = backend.Close() } func TestAvailableBackends(t *testing.T) { @@ -537,7 +540,7 @@ func BenchmarkEncode(b *testing.B) { } encoded.Free() } - backend.Close() + _ = backend.Close() } const DefaultChunkSize = 32768 @@ -569,7 +572,9 @@ func BenchmarkLinearizeM(b *testing.B) { if err != nil { b.Fatal("cannot create backend", err) } - defer backend.Close() + defer func() { + _ = backend.Close() + }() buf := bytes.Repeat([]byte("A"), test.size) encoded, err := backend.EncodeMatrix(buf, DefaultChunkSize) @@ -604,7 +609,9 @@ func BenchmarkDecodeM(b *testing.B) { if err != nil { b.Fatal("cannot create backend", err) } - defer backend.Close() + defer func() { + _ = backend.Close() + }() buf := bytes.Repeat([]byte("A"), test.size) encoded, err := backend.EncodeMatrix(buf, DefaultChunkSize) @@ -640,7 +647,9 @@ func BenchmarkReconstruct(b *testing.B) { if err != nil { b.Fatal("cannot create backend", err) } - defer backend.Close() + defer func() { + _ = backend.Close() + }() buf := bytes.Repeat([]byte("A"), test.size) encoded, err := backend.Encode(buf) @@ -668,7 +677,9 @@ func BenchmarkReconstructM(b *testing.B) { if err != nil { b.Fatal("cannot create backend", err) } - defer backend.Close() + defer func() { + _ = backend.Close() + }() buf := bytes.Repeat([]byte("A"), test.size) encoded, err := backend.EncodeMatrix(buf, DefaultChunkSize) @@ -738,7 +749,7 @@ func BenchmarkMatrix(b *testing.B) { decoded.Free() } }) - backend.Close() + _ = backend.Close() }) } }) @@ -761,7 +772,7 @@ func BenchmarkDecode(b *testing.B) { } decoded.Free() } - backend.Close() + _ = backend.Close() } func TestEncodeM(t *testing.T) { @@ -772,7 +783,7 @@ func TestEncodeM(t *testing.T) { } buf := make([]byte, 1024*1024) - cryptorand.Read(buf) + _, _ = cryptorand.Read(buf) testParams := []struct { chunkUnit int @@ -819,7 +830,7 @@ func TestEncodeM(t *testing.T) { result.Free() }) } - backend.Close() + _ = backend.Close() } func TestLinearizeMatrix(t *testing.T) { @@ -856,7 +867,7 @@ func TestLinearizeMatrix(t *testing.T) { t.Logf("TestLinearizeMatrix check %d-%d-%d", startIncl, endIncl, dataSize) data := make([]byte, dataSize) - cryptorand.Read(data) + _, _ = cryptorand.Read(data) encoded, err := backend.EncodeMatrix(data, DefaultChunkSize) if err != nil { @@ -871,7 +882,7 @@ func TestLinearizeMatrix(t *testing.T) { /* Decode the matrix as if it was requested and checks that the result matches the payload on the requested range. */ frags := make([][]byte, 0) - for i := 0; i < rangeM.FragCount; i += 1 { + for i := 0; i < rangeM.FragCount; i++ { fragIdx := (rangeM.FragFirstIncl + i) % k buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl] frags = append(frags, buffer) @@ -932,7 +943,7 @@ func TestDecodeMatrix(t *testing.T) { t.Logf("TestDecodeMatrix check %d-%d-%d-%d", startIncl, endIncl, dataSize, failedFragIdx) data := make([]byte, dataSize) - cryptorand.Read(data) + _, _ = cryptorand.Read(data) encoded, err := backend.EncodeMatrix(data, DefaultChunkSize) if err != nil { @@ -947,7 +958,7 @@ func TestDecodeMatrix(t *testing.T) { /* Decode the matrix as if it was requested and checks that the result matches the payload on the requested range. */ frags := make([][]byte, 0) - for i := 0; i < (k + m); i += 1 { + for i := 0; i < (k + m); i++ { fragIdx := i if fragIdx == failedFragIdx { continue @@ -991,7 +1002,7 @@ func TestValidateFragmentMatrix(t *testing.T) { dataSize := 7 * 1024 * 1024 data := make([]byte, dataSize) - cryptorand.Read(data) + _, _ = cryptorand.Read(data) encoded, err := backend.EncodeMatrix(data, DefaultChunkSize) if err != nil { @@ -1000,7 +1011,7 @@ func TestValidateFragmentMatrix(t *testing.T) { defer encoded.Free() fragSize := len(encoded.Data[0]) - for i := 0; i < len(encoded.Data); i += 1 { + for i := 0; i < len(encoded.Data); i++ { rangeMatrix := backend.GetRangeMatrix(0, dataSize-1, pieceSize, fragSize) assert.NotNil(rangeMatrix) @@ -1011,7 +1022,7 @@ func TestValidateFragmentMatrix(t *testing.T) { chunkSize := pieceSize + backend.headerSize offset := 0 for offset < len(frag) { - for altered := 0; altered < backend.headerSize; altered += 1 { + for altered := 0; altered < backend.headerSize; altered++ { t.Logf("frag %d altered offset %d altered %d", i, offset, altered) previous := frag[offset+altered] frag[offset+altered] = previous + 1 @@ -1094,7 +1105,7 @@ func TestReconstructM(t *testing.T) { } }) } - backend.Close() + _ = backend.Close() } func TestEncodeDecodeMatrix(t *testing.T) { @@ -1205,7 +1216,7 @@ func TestRangeMatrix(t *testing.T) { startIncl := 0 endIncl := 0 rangeM = backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize) - rangeM = backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize) + assert.Equal(t, rangeM.FragCount, 1) assert.Equal(t, rangeM.FragFirstIncl, 0) assert.Equal(t, rangeM.ReqStartIncl, startIncl) @@ -1242,3 +1253,314 @@ func TestRangeMatrix(t *testing.T) { totalRead = rangeM.FragCount * (rangeM.InFragRangeEndExcl - rangeM.InFragRangeStartIncl) assert.Equal(t, expectedTotalRead, totalRead) } + +func TestGetRangeMatrix(t *testing.T) { + type testCase struct { + name string + start int + end int + chunksize int + fragSize int + payloadSize int + expectedFragStart int + expectedFragEnd int + expectedDecStart int + expectedDecEnd int + } + + backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: 2, M: 1}) + defer func() { + _ = backend.Close() + }() + + testCases := []testCase{ + { + name: "First 128 bytes, 100k payload", + start: 0, + end: 128, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 100000, + expectedFragStart: 0, + expectedFragEnd: 32768 + backend.headerSize, + expectedDecStart: 0, + expectedDecEnd: 128, + }, + { + name: "First 128 bytes, 1MB payload", + start: 0, + end: 128, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 1000000, + expectedFragStart: 0, + expectedFragEnd: 32768 + backend.headerSize, + expectedDecStart: 0, + expectedDecEnd: 128, + }, + { + name: "64k Block in the middle, 100k payload", + start: 32768, + end: 32768 + 65536, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 100000, + expectedFragStart: 0, + expectedFragEnd: 65536 + 2*backend.headerSize, + expectedDecStart: 32768, + expectedDecEnd: 65536 + 32768, + }, + { + name: "64k Block in the middle, 1MB payload", + start: 500000, + end: 500000 + 65536, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 1000000, + expectedFragStart: 7 * (32768 + 80), + expectedFragEnd: 7*(32768+80) + 65536 + backend.headerSize*2, + expectedDecStart: 500000 - 458752, + expectedDecEnd: 500000 - 458752 + 65536, + }, + { + name: "Last 80 bytes, 100k payload", + start: 100000 - 80, + end: 100000, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 100000, + expectedFragStart: 32768 + 80, + expectedFragEnd: 32768 + 80 + 80 + 32768, + expectedDecStart: 100000 - 65536 - 80, + expectedDecEnd: 100000 - 65536, + }, + { + name: "Last 80 bytes, 1MB payload", + start: 1000000 - 80, + end: 1000000, + chunksize: 32768, + fragSize: 1048576, + payloadSize: 1000000, + expectedFragStart: (1000000 / 32768 / 2) * (32768 + 80), + expectedFragEnd: (1000000/32768/2)*(32768+80) + 32768 + backend.headerSize, + expectedDecStart: 1000000 - ((1000000 / 32768) * 32768) - 80, + expectedDecEnd: 1000000 - ((1000000 / 32768) * 32768), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + length := tc.end - tc.start + rm := backend.GetRangeMatrix(tc.start, tc.end, tc.chunksize, tc.fragSize) + require.NotNil(t, rm, "GetRangeMatrix returned nil") + assert.Equal(t, tc.expectedFragStart, rm.InFragRangeStartIncl, "FragRangeStart mismatch") + assert.Equal(t, tc.expectedFragEnd, rm.InFragRangeEndExcl, "FragRangeEnd mismatch") + assert.Equal(t, tc.expectedDecStart, rm.DecodedRangeStartIncl, "DecodedRangeStart mismatch") + assert.Equal(t, tc.expectedDecEnd, rm.DecodedRangeStartIncl+length, "DecodedRangeEnd mismatch") + }) + } +} + +func TestRange(t *testing.T) { + testCases := []struct { + name string + useNewFormat bool + }{ + {"OldFormat", false}, + {"NewFormat", true}, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + testRangeHelper(t, testCase.useNewFormat) + }) + } +} + +func testRangeHelper(t *testing.T, useNewFormat bool) { + backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: 2, M: 1}) + defer func() { + _ = backend.Close() + }() + + chunkSize := 32768 + size := 1020300 + + rm := backend.GetRangeMatrix(10, 20, chunkSize, size) + require.NotNil(t, rm, "GetRangeMatrix returned nil") + + buf := make([]byte, size) + for i := range buf { + buf[i] = byte('A' + i%26) + } + + bm := NewBufferMatrix(chunkSize, len(buf), backend.K) + if useNewFormat { + bm.UseNewFormat() + } + // bm.UseNewFormat() + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(t, err) + bm.Finish() + encodedData, err := backend.EncodeMatrixWithBufferMatrix(bm, chunkSize) + require.NoError(t, err) + defer encodedData.Free() + stripes := make([][]byte, backend.K+backend.M) + for i := range backend.K + backend.M { + stripes[i] = encodedData.Data[i] + stripes[i] = stripes[i][rm.InFragRangeStartIncl:rm.InFragRangeEndExcl] + } + + // Test decode + decodedData, err := backend.DecodeMatrix(stripes, chunkSize) + require.NoError(t, err) + defer decodedData.Free() + assert.Equal(t, buf[10:20], decodedData.Data[rm.DecodedRangeStartIncl:rm.DecodedRangeStartIncl+20-10], "Decoded data mismatch") + + // Test repair + decodedData2, err := backend.DecodeMatrix(stripes[1:], chunkSize) + require.NoError(t, err) + defer decodedData2.Free() + assert.Equal(t, buf[10:20], decodedData2.Data[rm.DecodedRangeStartIncl:rm.DecodedRangeStartIncl+20-10], "Decoded data mismatch") + + // Test last 80 bytes + rm = backend.GetRangeMatrix(size-80, size, chunkSize, size) + require.NotNil(t, rm, "GetRangeMatrix returned nil") + + for i := range backend.K + backend.M { + stripes[i] = encodedData.Data[i] + stripes[i] = stripes[i][rm.InFragRangeStartIncl:rm.InFragRangeEndExcl] + } + + // Test decode + decodedData3, err := backend.DecodeMatrix(stripes, chunkSize) + require.NoError(t, err) + defer decodedData3.Free() + assert.Equal(t, buf[size-80:size], decodedData3.Data[rm.DecodedRangeStartIncl:rm.DecodedRangeStartIncl+80], "Decoded data mismatch") + // Test repair + decodedData4, err := backend.DecodeMatrix(stripes[1:], chunkSize) + require.NoError(t, err) + defer decodedData4.Free() + assert.Equal(t, buf[size-80:size], decodedData4.Data[rm.DecodedRangeStartIncl:rm.DecodedRangeStartIncl+80], "Decoded data mismatch") +} + +// TestFormatOldNew tests the compatibility of the new format with the old one +// It uses the buffer matrix to encode the data in the old/new format and +// then decodes it using the backend. It checks that the data is the same +// and that the format is correct. +func TestFormatOldNew(t *testing.T) { + testCases := []struct { + useNewFormat bool + k, n int + }{ + {true, 2, 1}, + {true, 5, 1}, + {false, 2, 1}, + {false, 5, 1}, + } + for _, testCase := range testCases { + t.Run(fmt.Sprintf("%v-%d-%d", testCase.useNewFormat, testCase.k, testCase.n), func(t *testing.T) { + // use buffermatrix to storage format in new format and see if we can decode it + backend, err := InitBackend(Params{Name: "isa_l_rs_vand", K: testCase.k, M: testCase.n}) + require.NoError(t, err) + defer backend.Close() + buf := bytes.Repeat([]byte("A"), 1024*1024+rand.Intn(1024*1024)) //nolint:gosec + + bm := NewBufferMatrix(32768, len(buf), backend.K) + if testCase.useNewFormat { + bm.UseNewFormat() + } + _, err = io.Copy(bm, bytes.NewReader(buf)) + require.NoError(t, err) + bm.Finish() + + require.Equal(t, len(buf), bm.Length()) + + e, err := backend.EncodeMatrixWithBufferMatrix(bm, 32768) + require.NoError(t, err) + defer e.Free() + + // check the format / first 80 is the header, lets check it + for i := range len(e.Data) { + hdr := e.Data[i][0:80] + + var f fragheader + err = f.UnmarshalBinary(hdr) + require.NoError(t, err) + require.Equal(t, 32768, int(f.meta.size)) + require.Equal(t, 32768*testCase.k, int(f.meta.origDataSize)) //nolint:gosec + } + // case 1; fast decode + ddata, err := backend.DecodeMatrix(e.Data, 32768) + require.NoError(t, err) + require.Equal(t, buf, ddata.Data) + defer ddata.Free() + // case 2: missing data + rdata, err := backend.ReconstructMatrix(e.Data[1:], 0, 32768) + require.NoError(t, err) + require.Equal(t, e.Data[0], rdata.Data) + defer rdata.Free() + // case 3: slow decode + ddata2, err := backend.DecodeMatrix(e.Data[1:], 32768) + require.NoError(t, err) + require.Equal(t, buf, ddata2.Data) + defer ddata2.Free() + // case 4: rebuild missing coding + require.Equal(t, testCase.k, len(e.Data[:testCase.k])) + rdata2, err := backend.ReconstructMatrix(e.Data[:testCase.k], testCase.k, 32768) + require.NoError(t, err) + require.Equal(t, e.Data[testCase.k], rdata2.Data) + }) + } +} + +// duplicate fragment_header_t from libec +type fragheader struct { + meta fragmeta + magic uint32 + libecVersion uint32 + metadataChksum uint32 + padding [9]byte +} + +func (f *fragheader) UnmarshalBinary(data []byte) error { + if len(data) != 80 { + return fmt.Errorf("invalid size for fragment header: %d", len(data)) + } + if err := f.meta.UnmarshalBinary(data[0:63]); err != nil { + return err + } + f.magic = binary.BigEndian.Uint32(data[63:67]) + f.libecVersion = binary.BigEndian.Uint32(data[67:71]) + f.metadataChksum = binary.BigEndian.Uint32(data[71:75]) + copy(f.padding[:], data[75:80]) + return nil +} + +func (f *fragmeta) UnmarshalBinary(data []byte) error { + if len(data) != 63 { + return fmt.Errorf("invalid size for fragment metadata: %d", len(data)) + } + f.idx = binary.BigEndian.Uint32(data[0:4]) + f.size = binary.LittleEndian.Uint32(data[4:8]) + f.fragBackendMetadataSize = binary.LittleEndian.Uint32(data[8:12]) + f.origDataSize = binary.LittleEndian.Uint64(data[12:20]) + f.checksumType = data[20] + copy(f.checksum[:], data[21:53]) + f.checksumMismatch = data[53] + f.backendID = data[54] + f.backendVersion = binary.BigEndian.Uint32(data[55:59]) + return nil +} + +type fragmeta struct { + idx uint32 + size uint32 + fragBackendMetadataSize uint32 + origDataSize uint64 + checksumType uint8 + checksum [32]byte + checksumMismatch uint8 + backendID uint8 + backendVersion uint32 +} diff --git a/buffer.go b/buffer.go index e737e02..bded614 100644 --- a/buffer.go +++ b/buffer.go @@ -6,14 +6,17 @@ import ( ) type BufferMatrix struct { - b []byte - zero []byte - hdrSize, bufSize int - len int // len of input - k int - curBlock int - leftInBlock int - finished bool + b []byte + zero []byte + hdrSize, bufSize int + len int // len of input + k int + curBlock int + leftInBlock int + finished bool + sizeOfLastSubGroup int + // getOffset func() (int, int) + newStyle bool } // FragLen returns the size of a "fragment" aligned to a block size (data + header) @@ -35,9 +38,9 @@ func (b BufferMatrix) maxLen() int { // NewBufferMatrix returns a new buffer suitable for data and organized // such as it can be injected into EncodeMatrixWithBuffer without allocation/copying // the data into shards -func NewBufferMatrix(bufSize int, l int, k int) *BufferMatrix { +func NewBufferMatrix(bufSize int, length int, k int) *BufferMatrix { var b BufferMatrix - b.Reset(bufSize, l, k) + b.Reset(bufSize, length, k) return &b } @@ -53,6 +56,8 @@ func (b *BufferMatrix) Reset(bufSize int, length int, k int) { b.curBlock = 0 b.finished = false + b.sizeOfLastSubGroup = b.FragLenLastSubGroup() + maxLen := b.maxLen() if cap(b.b) < maxLen { @@ -66,28 +71,30 @@ func (b *BufferMatrix) Reset(bufSize int, length int, k int) { if len(b.zero) < bufSize { b.zero = make([]byte, bufSize) } + b.newStyle = false } -var emptyErasureHeader = bytes.Repeat([]byte{0}, fragmentHeaderSize()) +// UseNewFormat sets the buffer to use the new format. +// The new format is more efficient for the last stripe/subgroup. +// Note: will panic if called after any Write() or ReadFrom() +func (b *BufferMatrix) UseNewFormat() { + if b.curBlock != 0 || b.leftInBlock != -1 || b.finished { + panic("UseNewOffset must be called before any Write") + } + b.newStyle = true +} -// getOffset returns current offset in buffer and size left in the current block -// So that it is safe to copy bytes at . -// If we are at a boundary, it will init the header and skip it. +// getOffset is a wrapper around getOffsetOld and getOffsetNew. +// It will call the right one depending on the newStyle flag. func (b *BufferMatrix) getOffset() (int, int) { - realCurBlock := b.getRealBlock(b.curBlock) - blockSize := b.hdrSize + b.bufSize - blockOffset := realCurBlock * blockSize - if b.leftInBlock == -1 { - // Start of a block - copy(b.b[blockOffset:], emptyErasureHeader) - b.leftInBlock = b.bufSize + if b.newStyle { + return b.getOffsetNew() } - - curOffset := blockOffset + (b.bufSize - b.leftInBlock) + b.hdrSize - - return curOffset, b.leftInBlock + return b.getOffsetOld() } +var emptyErasureHeader = bytes.Repeat([]byte{0}, fragmentHeaderSize()) + // Finish *must* be called after the final Write() *before* using the buffer // in EncodeMatrix // It is safe to call it multiple times. @@ -111,9 +118,84 @@ func (b *BufferMatrix) Finish() { b.finished = true } +// In b.b buffer, the data is organized as follow: +// - for each block, we have a header of size hdrSize +// - then the data of size bufSize +// - then the next block +// - etc. +// The data is organized in stripes of k blocks. +// It is meant to be split later and stored as shards. +// Shard 0 will contain block 0, k, 2k, 3k, etc. +// Shard 1 will contain block 1, k+1, 2k+1, 3k+1, etc. +// etc. +// So b.b is organized as follow: +// [hdr][block 0][hdr][block k][hdr][block 2k][hdr][block 3k] ... [hdr][block 1][hdr][block k+1] etc... +// When writing to buffer, we will write in the current block until it is full. +// Then we will skip the header and write in the next block. +// For example when block 0 is full, we will skip the header and write in block 1. When block 1 is full, we will skip the header and write in block 2, etc. +// Them, when all blocks from 0 to k-1 are full, we will write in block k, k+1, etc. + +// getRealBlock returns the real block index in the buffer. +// For example, if we have k=2 and 5 blocks in total, the buffer will be organized as follow: +// [hdr][block 0] [hdr][block 2] [hdr][block 4] [hdr][block 1] [hdr][block 3] +// So getRealBlock(0) will return 0, getRealBlock(1) will return 3, getRealBlock(2) will return 1, getRealBlock(3) will return 4, getRealBlock(4) will return 2. + +// getRealBlock returns the real block index in the buffer. +// blockidx is the block index in the incoming data (0-indexed) +// the return value is the block index in the buffer (0-indexed) func (b BufferMatrix) getRealBlock(blockidx int) int { - subgroup := b.SubGroups() - return (blockidx%b.k)*subgroup + (blockidx / b.k) + nbStripes := b.SubGroups() + return (blockidx%b.k)*nbStripes + (blockidx / b.k) +} + +// getOffSetNew returns current offset in buffer and size left in the current block +// Same a getOffset when blocks are not in the last stripe/subgroup +// When blocks are in the last stripe/subgroup, it will split the size left in k parts +// and return the offset and size left for the current block. +// For example, if we have 5*blocksize bytes and k=2, the buffer will be organized as follow: +// [hdr][block 0] [hdr][block 2] [hdr][block 4] / [hdr][block 1] [hdr][block 3] [hdr][block 5] +// where size(block4) + size(block5) == len - (4 * blocksize) == size of last subgroup +// when/if the size of last subgroup is not divisible by k, the block 4 may be one byte longer than block 5 +func (b *BufferMatrix) getOffsetNew() (int, int) { + realCurBlock := b.getRealBlock(b.curBlock) + blockSize := b.hdrSize + b.bufSize + blockOffset := realCurBlock * blockSize + if b.leftInBlock == -1 { + // Start of a block + copy(b.b[blockOffset:], emptyErasureHeader) + if b.IsBlockInLastSubGroup(b.curBlock) { + b.leftInBlock = b.FragLenLastSubGroup() + } else { + b.leftInBlock = b.bufSize + } + } + + bufSize := b.bufSize + if b.IsBlockInLastSubGroup(b.curBlock) { + bufSize = b.FragLenLastSubGroup() + } + + curOffset := blockOffset + (bufSize - b.leftInBlock) + b.hdrSize + + return curOffset, b.leftInBlock +} + +// getOffset returns current offset in buffer and size left in the current block +// So that it is safe to copy bytes at . +// If we are at a boundary, it will init the header and skip it. +func (b *BufferMatrix) getOffsetOld() (int, int) { + realCurBlock := b.getRealBlock(b.curBlock) + blockSize := b.hdrSize + b.bufSize + blockOffset := realCurBlock * blockSize + if b.leftInBlock == -1 { + // Start of a block + copy(b.b[blockOffset:], emptyErasureHeader) + b.leftInBlock = b.bufSize + } + + curOffset := blockOffset + (b.bufSize - b.leftInBlock) + b.hdrSize + + return curOffset, b.leftInBlock } func (b *BufferMatrix) Write(p []byte) (int, error) { @@ -122,13 +204,7 @@ func (b *BufferMatrix) Write(p []byte) (int, error) { for len(p) > 0 { curOffset, leftToCopy := b.getOffset() - var m int - - if len(p) > leftToCopy { - m = leftToCopy - } else { - m = len(p) - } + m := min(len(p), leftToCopy) n := copy(b.b[curOffset:], p[:m]) @@ -176,7 +252,12 @@ func (b BufferMatrix) RealData() []byte { for block := 0; len(res) < b.len; block++ { blockSize := b.hdrSize + b.bufSize curOffset := b.getRealBlock(block)*blockSize + b.hdrSize - res = append(res, b.b[curOffset:curOffset+b.bufSize]...) + if b.newStyle && b.IsBlockInLastSubGroup(block) { + amountToCopy := min(b.FragLenLastSubGroup(), b.len-len(res)) + res = append(res, b.b[curOffset:curOffset+amountToCopy]...) + } else { + res = append(res, b.b[curOffset:curOffset+b.bufSize]...) + } } return res[:b.len] @@ -189,3 +270,28 @@ func (b BufferMatrix) Bytes() []byte { func (b BufferMatrix) Length() int { return b.len } + +func (b *BufferMatrix) IsBlockInLastSubGroup(block int) bool { + cur := block / b.k + return cur == b.SubGroups()-1 +} + +func (b *BufferMatrix) ComputeSizeOfLastSubGroup() int { + // total of size already in previous subgroups + lastSubGroup := b.SubGroups() - 1 + totalSizeInPreviousSubGroups := lastSubGroup * b.k * (b.bufSize) + leftSize := b.len - totalSizeInPreviousSubGroups + return leftSize +} + +func (b *BufferMatrix) FragLenLastSubGroup() int { + if !b.newStyle { + return b.bufSize + } + + r := b.ComputeSizeOfLastSubGroup() / b.k + if b.ComputeSizeOfLastSubGroup()%b.k != 0 { + r++ + } + return r +} diff --git a/buffer_test.go b/buffer_test.go index 9cbded0..d9acace 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -92,7 +92,9 @@ func TestComparisonEncode(t *testing.T) { defer runtime.KeepAlive(b) backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: 5}) - defer backend.Close() + defer func() { + _ = backend.Close() + }() encoded2, err := backend.EncodeMatrixWithBufferMatrix(b, blockSize) assert.NoError(t, err) @@ -131,7 +133,9 @@ func TestEncodeBufferMatrix(t *testing.T) { defer runtime.KeepAlive(b) backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: 5}) - defer backend.Close() + defer func() { + _ = backend.Close() + }() encoded, err := backend.EncodeMatrixWithBufferMatrix(b, blockSize) assert.NoError(t, err) @@ -171,7 +175,9 @@ func BenchmarkEncodeMatrix(b *testing.B) { m := test.m blockSize := test.blockSize backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: 5}) - defer backend.Close() + defer func() { + _ = backend.Close() + }() b.Run("original", func(b *testing.B) { buf := bytes.Repeat([]byte("A"), size) b.ResetTimer() @@ -217,7 +223,7 @@ func BenchmarkBufferCopy(b *testing.B) { b.Run("original", func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - reader.Seek(0, 0) + _, _ = reader.Seek(0, 0) buf := bytes.NewBuffer(make([]byte, 0, test.size)) if _, err := io.Copy(buf, reader); err != nil { b.Fatal("cannot read buffer") @@ -228,7 +234,7 @@ func BenchmarkBufferCopy(b *testing.B) { b.ResetTimer() buf := NewBufferMatrix(test.blockSize, test.size, test.k) for i := 0; i < b.N; i++ { - reader.Seek(0, 0) + _, _ = reader.Seek(0, 0) buf.Reset(test.blockSize, test.size, test.k) if _, err := io.Copy(buf, reader); err != nil { b.Fatal("cannot read buffer") @@ -239,3 +245,165 @@ func BenchmarkBufferCopy(b *testing.B) { }) } } + +func TestBufferNew(t *testing.T) { + for _, test := range bufferTests { + t.Run(test.name, func(t *testing.T) { + size := test.size + k := test.k + blockSize := test.blockSize + b := NewBufferMatrix(blockSize, size, k) + b.UseNewFormat() + data := make([]byte, size) + for i := range size { + data[i] = byte(i) + } + n, err := io.Copy(b, bytes.NewReader(data)) + + assert.NoError(t, err) + assert.Equal(t, int64(size), n) + + newData := b.RealData() + assert.Equal(t, len(data), len(newData)) + assert.Equal(t, data, newData) + + b2 := NewBufferMatrix(blockSize, size, k) + b2.UseNewFormat() + n, err = b2.ReadFrom(bytes.NewReader(data)) + + assert.NoError(t, err) + assert.Equal(t, int64(size), n) + + newData = b2.RealData() + assert.Equal(t, len(data), len(newData)) + assert.Equal(t, data, newData) + }) + } +} + +func TestGetRealBlock(t *testing.T) { + b := NewBufferMatrix(32, 32*5, 2) + assert.Equal(t, 0, b.getRealBlock(0)) + assert.Equal(t, 3, b.getRealBlock(1)) + assert.Equal(t, 1, b.getRealBlock(2)) + assert.Equal(t, 4, b.getRealBlock(3)) + assert.Equal(t, 2, b.getRealBlock(4)) + assert.Equal(t, 5, b.getRealBlock(5)) +} + +func TestGetOffset(t *testing.T) { + b := NewBufferMatrix(32, 32*5, 2) + b.curBlock = 0 + off, size := b.getOffset() + assert.Equal(t, b.hdrSize, off) + assert.Equal(t, 32, size) + + // 2nd block is located after the first part. + // 1st part is 3 blocks long included the header (3*(hdrSize+32)) + // and the block is itself starts at offset hdrsize and is 32 bytes long + b.curBlock = 1 + off, size = b.getOffset() + assert.Equal(t, 3*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 32, size) +} + +// TestGetOffsetNew tests the new offset calculation +// for the case where the block size is not a multiple of the buffer size. +// It ensures that the offsets are calculated correctly +// for the first and last blocks in the buffer. +func TestGetOffsetNew(t *testing.T) { + b := NewBufferMatrix(32, 32*5, 2) + b.UseNewFormat() + b.curBlock = 0 + b.leftInBlock = -1 + off, size := b.getOffset() + assert.Equal(t, b.hdrSize, off) + assert.Equal(t, 32, size) + + // 2nd block is located after the first part. + // 1st part is 3 blocks long included the header (3*(hdrSize+32)) + // and the block is itself starts at offset hdrsize and is 32 bytes long + b.curBlock = 1 + b.leftInBlock = -1 + off, size = b.getOffset() + assert.Equal(t, 3*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 32, size) + + // 5th block is located at the end of the first part + b.curBlock = 4 + b.leftInBlock = -1 + off, size = b.getOffset() + assert.Equal(t, 2*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 16, size) + + // 6th block is located at the end of the second part + b.curBlock = 5 + b.leftInBlock = -1 + off, size = b.getOffset() + assert.Equal(t, 5*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 16, size) +} + +// TestGetOffsetNew200 tests the new offset calculation for the case +// of size=200 and block size=32. +func TestGetOffsetNew200(t *testing.T) { + b := NewBufferMatrix(32, 200, 2) + b.UseNewFormat() + b.curBlock = 0 + b.leftInBlock = -1 + off, size := b.getOffsetNew() + assert.Equal(t, b.hdrSize, off) + assert.Equal(t, 32, size) + + // 2nd block is located after the first part. + // 1st part is 3 blocks long included the header (3*(hdrSize+32)) + // and the block is itself starts at offset hdrsize and is 32 bytes long + b.curBlock = 1 + b.leftInBlock = -1 + off, size = b.getOffsetNew() + assert.Equal(t, 4*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 32, size) + + b.curBlock = 4 + b.leftInBlock = -1 + off, size = b.getOffsetNew() + assert.Equal(t, 2*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 32, size) + + b.curBlock = 6 + b.leftInBlock = -1 + off, size = b.getOffsetNew() + assert.Equal(t, 3*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 4, size) + + // 6th block is located at the end of the second part + b.curBlock = 7 + b.leftInBlock = -1 + off, size = b.getOffsetNew() + assert.Equal(t, 7*(b.hdrSize+32)+b.hdrSize, off) + assert.Equal(t, 4, size) +} + +// TestIsBlockInLastSubGroup tests the IsBlockInLastSubGroup method +func TestIsBlockInLastSubGroup(t *testing.T) { + b := NewBufferMatrix(32, 32*5, 2) + nbSubGroups := b.SubGroups() + assert.Equal(t, 3, nbSubGroups) + assert.True(t, b.IsBlockInLastSubGroup(4)) + assert.True(t, b.IsBlockInLastSubGroup(5)) + + b = NewBufferMatrix(32, 32*4+2, 4) + nbSubGroups = b.SubGroups() + assert.Equal(t, 2, nbSubGroups) + assert.False(t, b.IsBlockInLastSubGroup(3)) + assert.True(t, b.IsBlockInLastSubGroup(4)) +} + +func TestComputeSizeOfLastSubGroup(t *testing.T) { + b := NewBufferMatrix(32, 200, 2) + b.UseNewFormat() + size := b.FragLenLastSubGroup() + assert.Equal(t, 4, size) + s := b.ComputeSizeOfLastSubGroup() + assert.Equal(t, 8, s) +} diff --git a/memalign.go b/memalign.go index db8eb96..63cc62a 100644 --- a/memalign.go +++ b/memalign.go @@ -6,6 +6,7 @@ import ( ) func getAlignDifference(b []byte, align int) int { + // nolint:gosec return int(uintptr(unsafe.Pointer(&b[0])) & uintptr(align-1)) } From 98a5347f641d78253cd618b6cacce6a2248e3714 Mon Sep 17 00:00:00 2001 From: frederic ferrandis Date: Thu, 18 Dec 2025 19:17:42 +0100 Subject: [PATCH 2/9] HD-4220: Enable new style by default - enable new buffer style - rework all tests to only use *Matrix --- backend.c | 5 +- backend.go | 50 ---- backend_test.go | 735 ++++++++++++++++++++++-------------------------- buffer.go | 88 ++++-- buffer_test.go | 127 --------- 5 files changed, 411 insertions(+), 594 deletions(-) diff --git a/backend.c b/backend.c index b4e859d..af7cee3 100644 --- a/backend.c +++ b/backend.c @@ -329,10 +329,10 @@ int encode_chunk_buffermatrix(int desc, int i, ret; int tot_len_sum = 0; + if (nth >= ctx->number_of_subgroup) { return -1; } - // Create the array of "data" fragments // No copy, just prepare the header for (i = 0; i < ctx->k; i++) { @@ -349,6 +349,7 @@ int encode_chunk_buffermatrix(int desc, tot_len_sum += size > 0 ? (size > fraglen ? fraglen : size) : 0; } + // "coding" fragments. Those ones are allocated above for (i = 0; i < ctx->m; i++) { char *ptr = &ctx->codings[i][nth * one_cell_size]; @@ -358,9 +359,11 @@ int encode_chunk_buffermatrix(int desc, m_ref[i] = ptr; } + // do the true encoding according the backend used (isa-l, cauchy ....) ret = ec->common.ops->encode(ec->desc.backend_desc, k_ref, m_ref, fraglen); + if (ret < 0) { return -1; } diff --git a/backend.go b/backend.go index b556486..adf31cc 100644 --- a/backend.go +++ b/backend.go @@ -310,56 +310,6 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize }}, nil } -// EncodeMatrix encodes data in small subpart of chunkSize bytes -func (backend *Backend) EncodeMatrix(data []byte, chunkSize int) (*EncodeData, error) { - var wg sync.WaitGroup - var ctx C.struct_encode_chunk_context - pData := (*C.char)(unsafe.Pointer(&data[0])) - pDataLen := C.int(len(data)) - cChunkSize := C.int(chunkSize) - - C.encode_chunk_prepare(backend.libecDesc, pData, pDataLen, cChunkSize, &ctx) - - var errCounter uint32 - - wg.Add(int(ctx.number_of_subgroup)) - - for i := 0; i < int(ctx.number_of_subgroup); i++ { - go func(nth int) { - r := C.encode_chunk(backend.libecDesc, pData, pDataLen, &ctx, C.int(nth)) - if r < 0 { - atomic.AddUint32(&errCounter, 1) - } - wg.Done() - }(i) - } - wg.Wait() - - if errCounter != 0 { - return &EncodeData{nil, func() { - C.my_liberasurecode_encode_cleanup( - backend.libecDesc, C.size_t(ctx.frags_len), ctx.datas, ctx.codings) - }}, - fmt.Errorf("error encoding chunk (%+v encoding failed)", errCounter) - } - result := make([][]byte, backend.K+backend.M) - fragLen := ctx.frags_len - for i := 0; i < backend.K; i++ { - str := cGetArrayItem(ctx.datas, i) - result[i] = (*[1 << 30]byte)(str)[:int(C.int(fragLen)):int(C.int(fragLen))] - - } - for i := 0; i < backend.M; i++ { - str := cGetArrayItem(ctx.codings, i) - result[i+backend.K] = (*[1 << 30]byte)(str)[:int(C.int(fragLen)):int(C.int(fragLen))] - } - - return &EncodeData{result, func() { - C.my_liberasurecode_encode_cleanup( - backend.libecDesc, C.size_t(ctx.frags_len), ctx.datas, ctx.codings) - }}, nil -} - // DecodeData is the structure returned by all Decode* function // It contains a linearized data buffer and a Free closure (that can be null) // that clean some C dynamically allocated objects diff --git a/backend_test.go b/backend_test.go index 8ec9769..01cbffa 100644 --- a/backend_test.go +++ b/backend_test.go @@ -18,11 +18,6 @@ import ( ) var validParams = []Params{ - {Name: "liberasurecode_rs_vand", K: 2, M: 1}, - {Name: "liberasurecode_rs_vand", K: 10, M: 4}, - {Name: "liberasurecode_rs_vand", K: 4, M: 3}, - {Name: "liberasurecode_rs_vand", K: 8, M: 4}, - {Name: "liberasurecode_rs_vand", K: 15, M: 4}, {Name: "isa_l_rs_vand", K: 2, M: 1}, {Name: "isa_l_rs_vand", K: 2, M: 1, MaxBlockSize: 1}, {Name: "isa_l_rs_vand", K: 2, M: 1, MaxBlockSize: maxBuffer * 2}, @@ -151,76 +146,78 @@ func TestEncodeDecode(t *testing.T) { continue } backend, err := InitBackend(params) + if err != nil { t.Errorf("Error creating backend %v: %q", params, err) continue } + defer backend.Close() for patternIndex, pattern := range testPatterns { - data, err := backend.Encode(pattern) + bm := NewBufferMatrix(DefaultChunkSize, len(pattern), backend.K) + + _, err = io.Copy(bm, bytes.NewReader(pattern)) if err != nil { - t.Errorf("Error encoding %v: %q", params, err) + t.Errorf("Error copying pattern to buffer matrix: %q", err) break } + bm.Finish() + + data, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) + defer data.Free() expectedVersion := GetVersion() frags := data.Data for index, frag := range frags { - info := GetFragmentInfo(frag) - if info.Index != index { - t.Errorf("Expected frag %v to have index %v; got %v", index, index, info.Index) - } - if info.Size != len(frag)-80 { // 80 == sizeof (struct fragment_header_s) - t.Errorf("Expected frag %v to have size %v; got %v", index, len(frag)-80, info.Size) - } - if info.OrigDataSize != uint64(len(pattern)) { - t.Errorf("Expected frag %v to have orig_data_size %v; got %v", index, len(pattern), info.OrigDataSize) - } - if info.BackendName != params.Name { - t.Errorf("Expected frag %v to have backend %v; got %v", index, params.Name, info.BackendName) - } - if info.ErasureCodeVersion != expectedVersion { - t.Errorf("Expected frag %v to have EC version %v; got %v", index, expectedVersion, info.ErasureCodeVersion) - } - if !info.IsValid { - t.Errorf("Expected frag %v to be valid", index) + + for i := range bm.SubGroups() { + + start := i * (DefaultChunkSize + 80) + end := start + DefaultChunkSize + 80 + if i == bm.SubGroups()-1 { + end = start + bm.FragLenLastSubGroup() + 80 + } + + piece := frag[start:end] + require.True(t, backend.ValidateFragmentMatrix(piece, end-start-80)) + + info := GetFragmentInfo(piece) + if info.Index != index { + t.Errorf("Expected frag %v to have index %v; got %v", index, index, info.Index) + } + if info.Size != len(piece)-80 { // 80 == sizeof (struct fragment_header_s) + t.Errorf("Expected frag %v to have size %v; got %v", index, len(piece)-80, info.Size) + } + if info.BackendName != params.Name { + t.Errorf("Expected frag %v to have backend %v; got %v", index, params.Name, info.BackendName) + } + if info.ErasureCodeVersion != expectedVersion { + t.Errorf("Expected frag %v to have EC version %v; got %v", index, expectedVersion, info.ErasureCodeVersion) + } + if !info.IsValid { + t.Errorf("Expected frag %v to be valid", index) + } } } decode := func(frags [][]byte, description string) bool { - decoded, err := backend.Decode(frags) - if err != nil { - t.Errorf("%v: %v: %q for pattern %d", description, backend, err, patternIndex) - return false - } else if !bytes.Equal(decoded.Data, pattern) { - t.Errorf("%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, decoded.Data) - return false - } - decoded.Free() + decoded, err := backend.DecodeMatrix(frags, DefaultChunkSize) + require.NoError(t, err) + defer decoded.Free() + require.True(t, bytes.Equal(decoded.Data, pattern), + "%v:%d(%v) pattern: %v, got: %q", + description, patternIndex, backend, pattern, decoded.Data) return true } - var good bool - good = decode(frags, "all frags") - good = good && decode(shuf(frags), "all frags, shuffled") - good = good && decode(frags[:params.K], "data frags") - good = good && decode(shuf(frags[:params.K]), "shuffled data frags") - good = good && decode(frags[params.M:], "with parity frags") - good = good && decode(shuf(frags[params.M:]), "shuffled parity frags") + decode(frags, "all frags") + decode(shuf(frags), "all frags, shuffled") + decode(frags[:params.K], "data frags") + decode(shuf(frags[:params.K]), "shuffled data frags") + decode(frags[params.M:], "with parity frags") + decode(shuf(frags[params.M:]), "shuffled parity frags") - if !good { - break - } - data.Free() - } - - if _, err := backend.Decode([][]byte{}); err == nil { - t.Errorf("Expected error when decoding from empty fragment array") - } - - err = backend.Close() - if err != nil { - t.Errorf("Error closing backend %v: %q", backend, err) } } } @@ -236,41 +233,36 @@ func TestReconstruct(t *testing.T) { _ = backend.Close() continue } - for patternIndex, pattern := range testPatterns { - data, err := backend.Encode(pattern) - frags := data.Data - if err != nil { - t.Errorf("Error encoding %v: %q", params, err) - } - - reconstruct := func(recon_frags [][]byte, frag_index int, description string) bool { - data, err := backend.Reconstruct(recon_frags, frag_index) - if err != nil { - t.Errorf("%v: %v: %q for pattern %d", description, backend, err, patternIndex) - return false - } else if !bytes.Equal(data, frags[frag_index]) { - t.Errorf("%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, data) - return false - } - return true - } + defer func() { + _ = backend.Close() + }() - var good bool - good = reconstruct(shuf(frags[:params.K]), params.K+params.M-1, "last frag from data frags") - good = good && reconstruct(shuf(frags[params.M:]), 0, "first frag with parity frags") - if !good { - break - } - data.Free() - } + for patternIndex, pattern := range testPatterns { - if _, err := backend.Reconstruct([][]byte{}, 0); err == nil { - t.Errorf("Expected error when reconstructing from empty fragment array") - } + t.Run(fmt.Sprintf("%s_%d_%d-%d-%d", + params.Name, params.K, params.M, + patternIndex, + len(pattern)), + func(t *testing.T) { + bm := NewBufferMatrix(DefaultChunkSize, len(pattern), backend.K) + _, err = io.Copy(bm, bytes.NewReader(pattern)) + require.NoError(t, err) + bm.Finish() + data, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) + defer data.Free() + frags := data.Data - err = backend.Close() - if err != nil { - t.Errorf("Error closing backend %v: %q", backend, err) + reconstruct := func(recon_frags [][]byte, frag_index int, description string) bool { + data, err := backend.ReconstructMatrix(recon_frags, frag_index, DefaultChunkSize) + require.NoError(t, err, "%v: %v: %q for pattern %d", description, backend, err, patternIndex) + defer data.Free() + require.True(t, bytes.Equal(data.Data, frags[frag_index]), "%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, data.Data) + return true + } + reconstruct(shuf(frags[:params.K]), params.K+params.M-1, "last frag from data frags") + reconstruct(shuf(frags[params.M:]), 0, "first frag with parity frags") + }) } } } @@ -286,112 +278,124 @@ func TestIsInvalidFragment(t *testing.T) { _ = backend.Close() continue } + defer func() { + _ = backend.Close() + }() for patternIndex, pattern := range testPatterns { - data, err := backend.Encode(pattern) - if err != nil { - t.Errorf("Error encoding %v: %q", params, err) - continue - } - frags := data.Data - for index, frag := range frags { - if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid for pattern %d", backend, index, patternIndex) - } - fragCopy := make([]byte, len(frag)) - copy(fragCopy, frag) - - // corrupt the frag - corruptedByte := rand.Intn(len(frag)) //nolint:gosec - for 71 <= corruptedByte && corruptedByte < 80 { - // in the alignment padding -- try again - corruptedByte = rand.Intn(len(frag)) //nolint:gosec - } - frag[corruptedByte] ^= 0xff - if !backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly valid after inverting byte %d for pattern %d", backend, index, corruptedByte, patternIndex) - } - if corruptedByte < 4 || 8 <= corruptedByte && corruptedByte <= 59 { - /** corruption is in metadata; claim we were created by a version of - * libec that predates metadata checksums. Note that - * Note that a corrupted fragment size (bytes 4-7) will lead to a - * segfault when we try to verify the fragment -- there's a reason - * we added metadata checksums! - */ - copy(frag[63:67], []byte{9, 1, 1, 0}) - if 20 <= corruptedByte && corruptedByte <= 53 { - /** Corrupted data checksum type or data checksum - * We may or may not detect this type of error; in particular, - * - if data checksum type is not in ec_checksum_type_t, - * it is ignored - * - if data checksum is mangled, we may still be valid - * under the "alternative" CRC32; this seems more likely - * with the byte inversion when the data is short - * Either way, though, clearing the checksum type should make - * it pass. - */ - frag[20] = 0 - if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid after clearing metadata crc and disabling data crc", backend, index) - } - } else if corruptedByte >= 54 || 0 <= corruptedByte && corruptedByte < 4 { - /** Some corruptions of some bytes are still detectable. Since we're - * inverting the byte, we can detect: - * - frag index -- bytes 0-3 - * - data checksum type -- byte 20 - * - data checksum mismatch -- byte 54 - * - backend id -- byte 55 - * - backend version -- bytes 56-59 + bm := NewBufferMatrix(DefaultChunkSize, len(pattern), backend.K) + _, err = io.Copy(bm, bytes.NewReader(pattern)) + require.NoError(t, err) + bm.Finish() + data, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) + defer data.Free() + + parts := data.Data + for index, part := range parts { + for i := range bm.SubGroups() { + + start := i * (DefaultChunkSize + 80) + end := start + DefaultChunkSize + 80 + if i == bm.SubGroups()-1 { + end = start + bm.FragLenLastSubGroup() + 80 + } + + frag := part[start:end] + + if backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly invalid for pattern %d", backend, index, patternIndex) + } + fragCopy := make([]byte, len(frag)) + copy(fragCopy, frag) + + // corrupt the frag + corruptedByte := rand.Intn(len(frag)) //nolint:gosec + for 71 <= corruptedByte && corruptedByte < 80 { + // in the alignment padding -- try again + corruptedByte = rand.Intn(len(frag)) //nolint:gosec + } + frag[corruptedByte] ^= 0xff + if !backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly valid after inverting byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + } + if corruptedByte < 4 || 8 <= corruptedByte && corruptedByte <= 59 { + /** corruption is in metadata; claim we were created by a version of + * libec that predates metadata checksums. Note that + * Note that a corrupted fragment size (bytes 4-7) will lead to a + * segfault when we try to verify the fragment -- there's a reason + * we added metadata checksums! */ - if !backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly still valid after clearing metadata crc", backend, index) + copy(frag[63:67], []byte{9, 1, 1, 0}) + if 20 <= corruptedByte && corruptedByte <= 53 { + /** Corrupted data checksum type or data checksum + * We may or may not detect this type of error; in particular, + * - if data checksum type is not in ec_checksum_type_t, + * it is ignored + * - if data checksum is mangled, we may still be valid + * under the "alternative" CRC32; this seems more likely + * with the byte inversion when the data is short + * Either way, though, clearing the checksum type should make + * it pass. + */ + frag[20] = 0 + if backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly invalid after clearing metadata crc and disabling data crc", backend, index) + } + } else if corruptedByte >= 54 || 0 <= corruptedByte && corruptedByte < 4 { + /** Some corruptions of some bytes are still detectable. Since we're + * inverting the byte, we can detect: + * - frag index -- bytes 0-3 + * - data checksum type -- byte 20 + * - data checksum mismatch -- byte 54 + * - backend id -- byte 55 + * - backend version -- bytes 56-59 + */ + if !backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly still valid after clearing metadata crc", backend, index) + } + } else { + if backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly invalid after clearing metadata crc", backend, index) + } } - } else { + } else if corruptedByte >= 67 { + copy(frag[20:25], []byte{1, 0, 0, 0, 0}) + // And since we've changed the metadata, roll back version as above... + copy(frag[63:67], []byte{9, 1, 1, 0}) if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid after clearing metadata crc", backend, index) + t.Errorf("%v: frag %v unexpectedly invalid after clearing data crc", backend, index) + t.FailNow() } } - } else if corruptedByte >= 67 { - copy(frag[20:25], []byte{1, 0, 0, 0, 0}) - // And since we've changed the metadata, roll back version as above... - copy(frag[63:67], []byte{9, 1, 1, 0}) - if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid after clearing data crc", backend, index) - t.FailNow() - } - } - frag[corruptedByte] ^= 0xff - copy(frag[63:67], fragCopy[63:67]) - copy(frag[20:25], fragCopy[20:25]) - - if !bytes.Equal(frag, fragCopy) { - for i, orig := range fragCopy { - if frag[i] != orig { - t.Logf("%v != %v at index %v", frag[i], orig, i) + frag[corruptedByte] ^= 0xff + copy(frag[63:67], fragCopy[63:67]) + copy(frag[20:25], fragCopy[20:25]) + + if !bytes.Equal(frag, fragCopy) { + for i, orig := range fragCopy { + if frag[i] != orig { + t.Logf("%v != %v at index %v", frag[i], orig, i) + } } + t.Fatal(corruptedByte, frag, fragCopy) } - t.Fatal(corruptedByte, frag, fragCopy) - } - frag[corruptedByte]++ - if !backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly valid after incrementing byte %d for pattern %d", backend, index, corruptedByte, patternIndex) - } - frag[corruptedByte] -= 2 - if corruptedByte >= 63 && corruptedByte < 67 && frag[corruptedByte] != 0xff { - if backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly invalid after decrementing version byte %d for pattern %d", backend, index, corruptedByte, patternIndex) - } - } else { + frag[corruptedByte]++ if !backend.IsInvalidFragment(frag) { - t.Errorf("%v: frag %v unexpectedly valid after decrementing byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + t.Errorf("%v: frag %v unexpectedly valid after incrementing byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + } + frag[corruptedByte] -= 2 + if corruptedByte >= 63 && corruptedByte < 67 && frag[corruptedByte] != 0xff { + if backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly invalid after decrementing version byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + } + } else { + if !backend.IsInvalidFragment(frag) { + t.Errorf("%v: frag %v unexpectedly valid after decrementing byte %d for pattern %d", backend, index, corruptedByte, patternIndex) + } } } } - data.Free() - } - err = backend.Close() - if err != nil { - t.Errorf("Error closing backend %v: %q", backend, err) } } } @@ -431,7 +435,7 @@ func TestGC(t *testing.T) { input := bytes.Repeat([]byte("X"), 1000000) backend, err := InitBackend( Params{ - Name: "liberasurecode_rs_vand", + Name: "isa_l_rs_vand", K: 2, M: 1, }) @@ -447,52 +451,41 @@ func TestGC(t *testing.T) { }{{ "Reconstruct", func() { - encoded, err := backend.Encode(input) + bm := NewBufferMatrix(DefaultChunkSize, len(input), backend.K) + _, err = io.Copy(bm, bytes.NewReader(input)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) + defer encoded.Free() - if err != nil { - t.Fatal("cannot encode data") - return - } vect := encoded.Data - defer encoded.Free() oldData := vect[0][:] // force a copy - data, err := backend.Reconstruct(vect[1:3], 0) - if err != nil { - t.Fatalf("cannot reconstruct data, %s", err) - return - } - - if len(data) != len(oldData) { - t.Fatal("reconstructing failed") - return - } + data, err := backend.ReconstructMatrix(vect[1:3], 0, DefaultChunkSize) + require.NoError(t, err) + defer data.Free() + require.True(t, bytes.Equal(data.Data, oldData)) }, }, { "Decode", func() { - encoded, err := backend.Encode(input) - - if err != nil { - t.Fatal("cannot encode data") - return - } + bm := NewBufferMatrix(DefaultChunkSize, len(input), backend.K) + _, err = io.Copy(bm, bytes.NewReader(input)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer encoded.Free() + vect := encoded.Data - decoded, err := backend.Decode(vect[0:2]) - if err != nil { - t.Fatalf("cannot decode data: %v", err) - return - } + decoded, err := backend.DecodeMatrix(vect[0:2], DefaultChunkSize) + require.NoError(t, err) defer decoded.Free() - data := decoded.Data - if len(data) != len(input) { - t.Fatal("decoding failed") - return - } + require.True(t, bytes.Equal(decoded.Data, input)) }, }, } @@ -533,12 +526,13 @@ func BenchmarkEncode(b *testing.B) { buf := bytes.Repeat([]byte("A"), 1024*1024) b.ResetTimer() for i := 0; i < b.N; i++ { - encoded, err := backend.Encode(buf) - - if err != nil { - b.Fatal(err) - } - encoded.Free() + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) + defer encoded.Free() } _ = backend.Close() } @@ -577,26 +571,20 @@ func BenchmarkLinearizeM(b *testing.B) { }() buf := bytes.Repeat([]byte("A"), test.size) - encoded, err := backend.EncodeMatrix(buf, DefaultChunkSize) - - if err != nil { - b.Fatal(err) - } + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err = io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer encoded.Free() b.ResetTimer() for i := 0; i < b.N; i++ { decoded, err := backend.LinearizeMatrix(encoded.Data, DefaultChunkSize) - if err != nil { - b.Fatal(err) - } - if decoded != nil { - if decoded.Free != nil { - decoded.Free() - } - } else { - b.Fatal("decoded is nil") - } + require.NoError(b, err) + defer decoded.Free() + require.True(b, bytes.Equal(decoded.Data, buf)) } }) } @@ -614,27 +602,22 @@ func BenchmarkDecodeM(b *testing.B) { }() buf := bytes.Repeat([]byte("A"), test.size) - encoded, err := backend.EncodeMatrix(buf, DefaultChunkSize) + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err = io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) - if err != nil { - b.Fatal(err) - } defer encoded.Free() data := encoded.Data[1:] b.ResetTimer() for i := 0; i < b.N; i++ { decoded, err := backend.DecodeMatrix(data, DefaultChunkSize) - if err != nil { - b.Fatal(err) - } - if decoded != nil { - if decoded.Free != nil { - decoded.Free() - } - } else { - b.Fatal("decoded is nil") - } + require.NoError(b, err) + defer decoded.Free() + require.True(b, bytes.Equal(decoded.Data, buf)) } }) } @@ -652,50 +635,20 @@ func BenchmarkReconstruct(b *testing.B) { }() buf := bytes.Repeat([]byte("A"), test.size) - encoded, err := backend.Encode(buf) - - if err != nil { - b.Fatal(err) - } - defer encoded.Free() - flags := encoded.Data[1:] - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, err := backend.Reconstruct(flags, 0) - if err != nil { - b.Fatal(err) - } - } - }) - } -} - -func BenchmarkReconstructM(b *testing.B) { - for _, test := range decodeTests { - b.Run(test.String(), func(b *testing.B) { - backend, err := InitBackend(test.p) - if err != nil { - b.Fatal("cannot create backend", err) - } - defer func() { - _ = backend.Close() - }() - - buf := bytes.Repeat([]byte("A"), test.size) - encoded, err := backend.EncodeMatrix(buf, DefaultChunkSize) - - if err != nil { - b.Fatal(err) - } + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err = io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer encoded.Free() flags := encoded.Data[1:] b.ResetTimer() for i := 0; i < b.N; i++ { - ddata, err := backend.ReconstructMatrix(flags, 0, DefaultChunkSize) - if err != nil { - b.Fatal(err) - } - ddata.Free() + data, err := backend.ReconstructMatrix(flags, 0, DefaultChunkSize) + require.NoError(b, err, "cannot reconstruct matrix: %v", err) + defer data.Free() + require.True(b, bytes.Equal(data.Data, encoded.Data[0])) } }) } @@ -714,18 +667,28 @@ func BenchmarkMatrix(b *testing.B) { func(b *testing.B) { dtest.p.Checksum = crc backend, _ := InitBackend(dtest.p) + defer func() { + _ = backend.Close() + }() b.Run("Encode", func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - encoded, err := backend.EncodeMatrix(buf, blockSize) - if err != nil { - b.Fatal(err) - } - encoded.Free() + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) + defer encoded.Free() } }) b.Run("Decode", func(b *testing.B) { - encoded, _ := backend.EncodeMatrix(buf, blockSize) + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer encoded.Free() b.ResetTimer() @@ -738,18 +701,21 @@ func BenchmarkMatrix(b *testing.B) { } }) b.Run("Reconstruct", func(b *testing.B) { - encoded, _ := backend.EncodeMatrix(buf, blockSize) + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer encoded.Free() b.ResetTimer() for i := 0; i < b.N; i++ { + decoded, err := backend.ReconstructMatrix(encoded.Data[1:], 0, blockSize) - if err != nil { - b.Fatal(err) - } - decoded.Free() + require.NoError(b, err) + defer decoded.Free() } }) - _ = backend.Close() }) } }) @@ -758,21 +724,25 @@ func BenchmarkMatrix(b *testing.B) { func BenchmarkDecode(b *testing.B) { backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: 4, M: 2, W: 8, HD: 5}) - + defer func() { + _ = backend.Close() + }() buf := bytes.Repeat([]byte("A"), 1024*1024) - res, _ := backend.Encode(buf) + bm := NewBufferMatrix(DefaultChunkSize, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(b, err) + bm.Finish() + res, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(b, err) defer res.Free() for i := 0; i < b.N; i++ { - decoded, err := backend.Decode(res.Data) + decoded, _ := backend.DecodeMatrix(res.Data, DefaultChunkSize) + defer decoded.Free() - if err != nil { - b.Fatal(err) - } - decoded.Free() } - _ = backend.Close() + } func TestEncodeM(t *testing.T) { @@ -800,20 +770,21 @@ func TestEncodeM(t *testing.T) { testName := fmt.Sprintf("TestEncodeB-%d-%d", p.chunkUnit, p.lenToDecode) t.Run(testName, func(t *testing.T) { // Do the matrix encoding - result, err := backend.EncodeMatrix(buf, p.chunkUnit) - - if err != nil { - t.Errorf("failed to encode %+v", err) - } + bm := NewBufferMatrix(p.chunkUnit, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(t, err) + bm.Finish() + result, err := backend.EncodeMatrixWithBufferMatrix(bm, p.chunkUnit) + require.NoError(t, err) + defer result.Free() // Check that our linearized buffer // contains expected data when there is all data fragment. ddata, err := backend.LinearizeMatrix(result.Data, p.chunkUnit) - assert.NoError(t, err) - assert.Equal(t, len(buf), len(ddata.Data), "data mismatch") - assert.Equalf(t, buf, ddata.Data, "data mismatch") - - ddata.Free() + require.NoError(t, err) + defer ddata.Free() + require.Equal(t, len(buf), len(ddata.Data), "data mismatch") + require.True(t, bytes.Equal(buf, ddata.Data), "data mismatch") /* now do the same but with the slow path*/ /* we will run a matrix decoding but withtout some data part, to enforce repairing*/ @@ -824,10 +795,8 @@ func TestEncodeM(t *testing.T) { vect = append(vect, result.Data[5]) ddata2, _ := backend.DecodeMatrix(vect, p.chunkUnit) - assert.Equal(t, buf, ddata2.Data, "data mismatch") - ddata2.Free() - - result.Free() + require.True(t, bytes.Equal(buf, ddata2.Data), "data mismatch") + defer ddata2.Free() }) } _ = backend.Close() @@ -869,10 +838,12 @@ func TestLinearizeMatrix(t *testing.T) { data := make([]byte, dataSize) _, _ = cryptorand.Read(data) - encoded, err := backend.EncodeMatrix(data, DefaultChunkSize) - if err != nil { - t.Fatalf("failed to encode buffer: (%v)", err) - } + bm := NewBufferMatrix(DefaultChunkSize, len(data), backend.K) + _, err := io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer encoded.Free() fragSize := len(encoded.Data[0]) @@ -945,10 +916,12 @@ func TestDecodeMatrix(t *testing.T) { data := make([]byte, dataSize) _, _ = cryptorand.Read(data) - encoded, err := backend.EncodeMatrix(data, DefaultChunkSize) - if err != nil { - t.Fatalf("failed to encode buffer: (%v)", err) - } + bm := NewBufferMatrix(DefaultChunkSize, len(data), backend.K) + _, err := io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer encoded.Free() fragSize := len(encoded.Data[0]) @@ -1004,10 +977,12 @@ func TestValidateFragmentMatrix(t *testing.T) { data := make([]byte, dataSize) _, _ = cryptorand.Read(data) - encoded, err := backend.EncodeMatrix(data, DefaultChunkSize) - if err != nil { - t.Fatalf("failed to encode buffer: (%v)", err) - } + bm := NewBufferMatrix(DefaultChunkSize, len(data), backend.K) + _, err = io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer encoded.Free() fragSize := len(encoded.Data[0]) @@ -1074,14 +1049,15 @@ func TestReconstructM(t *testing.T) { testName := fmt.Sprintf("TestReconstruct-%d-%d", p.chunkUnit, p.fragNumber) t.Run(testName, func(t *testing.T) { // Do the matrix encoding - result, err := backend.EncodeMatrix(buf, p.chunkUnit) + bm := NewBufferMatrix(p.chunkUnit, len(buf), backend.K) + _, err := io.Copy(bm, bytes.NewReader(buf)) + require.NoError(t, err) + bm.Finish() + result, err := backend.EncodeMatrixWithBufferMatrix(bm, p.chunkUnit) + require.NoError(t, err) defer result.Free() - if err != nil { - t.Errorf("failed to encode %+v", err) - } - var vect [][]byte for i := 0; i < backend.K+backend.M; i++ { if i != p.fragNumber { @@ -1090,19 +1066,12 @@ func TestReconstructM(t *testing.T) { } ddata, err := backend.ReconstructMatrix(vect, p.fragNumber, p.chunkUnit) - if err != nil { - t.Errorf("cannot reconstruct fragment %d cause=%v", p.fragNumber, err) - return - } - if ddata == nil { - t.Fatal("unexpected error / fragment rebuilt is nil") - } + require.NoError(t, err) + require.NotNil(t, ddata) res := bytes.Compare(ddata.Data, result.Data[p.fragNumber]) - ddata.Free() - if res != 0 { - t.Errorf("Error, fragment rebuilt is different from the original one") - } + require.Equal(t, 0, res) + defer ddata.Free() }) } _ = backend.Close() @@ -1122,74 +1091,52 @@ func TestEncodeDecodeMatrix(t *testing.T) { t.Errorf("Error creating backend %v: %q", params, err) continue } - + defer func() { + _ = backend.Close() + }() for patternIndex, pattern := range testPatterns { t.Run(fmt.Sprintf("%s_%d_%d-%d-%d", params.Name, params.K, params.M, patternIndex, len(pattern)), func(t *testing.T) { - data, err := backend.EncodeMatrix(pattern, 32768) - if err != nil { - t.Errorf("Error encoding %v: %q", params, err) - return - } + bm := NewBufferMatrix(DefaultChunkSize, len(pattern), backend.K) + _, err := io.Copy(bm, bytes.NewReader(pattern)) + require.NoError(t, err) + bm.Finish() + data, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) defer data.Free() frags := data.Data decode := func(frags [][]byte, description string) bool { - decoded, err := backend.DecodeMatrix(frags, 32768) - if err != nil { - t.Errorf("%v: %v: %q for pattern %d", description, backend, err, patternIndex) - return false - } else if !bytes.Equal(decoded.Data, pattern) { - t.Errorf("%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, decoded.Data) - return false - } - decoded.Free() + decoded, err := backend.DecodeMatrix(frags, DefaultChunkSize) + require.NoError(t, err) + require.True(t, bytes.Equal(decoded.Data, pattern), "%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, decoded.Data) + defer decoded.Free() return true } - var good bool - good = decode(frags, "all frags") - good = good && decode(shuf(frags), "all frags, shuffled") - good = good && decode(frags[:params.K], "data frags") - good = good && decode(shuf(frags[:params.K]), "shuffled data frags") - good = good && decode(frags[params.M:], "with parity frags") - good = good && decode(shuf(frags[params.M:]), "shuffled parity frags") - - if !good { - return - } - - // remove := func(s [][]byte, i int) [][]byte { - // s[i] = s[len(s)-1] - // return s[:len(s)-1] - // } + decode(frags, "all frags") + decode(shuf(frags), "all frags, shuffled") + decode(frags[:params.K], "data frags") + decode(shuf(frags[:params.K]), "shuffled data frags") + decode(frags[params.M:], "with parity frags") + decode(shuf(frags[params.M:]), "shuffled parity frags") for fIdx := 0; fIdx < params.K; fIdx++ { newFrags := frags[fIdx+1:] if fIdx >= 1 { newFrags = append(newFrags, frags[0:fIdx]...) } - ddata, err := backend.ReconstructMatrix(newFrags, fIdx, 32768) - if err != nil { - t.Fatal("cannot reconstruct ", err) - } - if !bytes.Equal(ddata.Data, frags[fIdx]) { - ddata.Free() - t.Fatalf("part %d reconstructed not equal to original len: %q != %q", fIdx, ddata.Data, frags[fIdx]) - } - ddata.Free() + ddata, err := backend.ReconstructMatrix(newFrags, fIdx, DefaultChunkSize) + require.NoError(t, err) + require.True(t, bytes.Equal(ddata.Data, frags[fIdx]), "part %d reconstructed not equal to original len: %q != %q", fIdx, ddata.Data, frags[fIdx]) + defer ddata.Free() } }) } - - err = backend.Close() - if err != nil { - t.Errorf("Error closing backend %v: %q", backend, err) - } } } diff --git a/buffer.go b/buffer.go index bded614..42b0979 100644 --- a/buffer.go +++ b/buffer.go @@ -5,36 +5,83 @@ import ( "io" ) -type BufferMatrix struct { - b []byte - zero []byte +type BufferInfo struct { hdrSize, bufSize int - len int // len of input + len int k int curBlock int leftInBlock int - finished bool sizeOfLastSubGroup int - // getOffset func() (int, int) - newStyle bool + newStyle bool } // FragLen returns the size of a "fragment" aligned to a block size (data + header) -func (b BufferMatrix) FragLen() int { +func (b BufferInfo) FragLen() int { return b.SubGroups() * (b.bufSize + b.hdrSize) } // SubGroups returns the number of blocks inside a single fragment -func (b BufferMatrix) SubGroups() int { +func (b BufferInfo) SubGroups() int { nbBlocks := (b.len + b.bufSize - 1) / b.bufSize nbStripes := (nbBlocks + b.k - 1) / b.k return nbStripes } -func (b BufferMatrix) maxLen() int { +func (b BufferInfo) maxLen() int { return (b.SubGroups() * b.k) * (b.bufSize + b.hdrSize) } +func (b BufferInfo) IsBlockInLastSubGroup(block int) bool { + cur := block / b.k + return cur == b.SubGroups()-1 +} + +func (b BufferInfo) ComputeSizeOfLastSubGroup() int { + // total of size already in previous subgroups + lastSubGroup := b.SubGroups() - 1 + totalSizeInPreviousSubGroups := lastSubGroup * b.k * (b.bufSize) + leftSize := b.len - totalSizeInPreviousSubGroups + return leftSize +} + +func (b BufferInfo) FragLenLastSubGroup() int { + if !b.newStyle { + return b.bufSize + } + r := b.ComputeSizeOfLastSubGroup() / b.k + if b.ComputeSizeOfLastSubGroup()%b.k != 0 { + r++ + } + return r +} + +func (b *BufferInfo) init(bufSize int, length int, k int) { + b.newStyle = true + + hdrSize := fragmentHeaderSize() + b.hdrSize = hdrSize + b.bufSize = bufSize + b.len = length + b.k = k + b.leftInBlock = -1 + b.curBlock = 0 + + b.sizeOfLastSubGroup = b.FragLenLastSubGroup() +} + +func NewBufferInfo(bufSize int, length int, k int) *BufferInfo { + var b BufferInfo + b.init(bufSize, length, k) + return &b +} + +type BufferMatrix struct { + b []byte + zero []byte + finished bool + BufferInfo +} + // NewBufferMatrix returns a new buffer suitable for data and organized // such as it can be injected into EncodeMatrixWithBuffer without allocation/copying // the data into shards @@ -47,16 +94,7 @@ func NewBufferMatrix(bufSize int, length int, k int) *BufferMatrix { // Reset serves the same purpose as NewBufferMatrix but use the existing buffer and // tries to avoid allocation of the underlying buffer. func (b *BufferMatrix) Reset(bufSize int, length int, k int) { - hdrSize := fragmentHeaderSize() - b.hdrSize = hdrSize - b.bufSize = bufSize - b.len = length - b.k = k - b.leftInBlock = -1 - b.curBlock = 0 - b.finished = false - - b.sizeOfLastSubGroup = b.FragLenLastSubGroup() + b.init(bufSize, length, k) maxLen := b.maxLen() @@ -71,7 +109,7 @@ func (b *BufferMatrix) Reset(bufSize int, length int, k int) { if len(b.zero) < bufSize { b.zero = make([]byte, bufSize) } - b.newStyle = false + b.finished = false } // UseNewFormat sets the buffer to use the new format. @@ -84,6 +122,13 @@ func (b *BufferMatrix) UseNewFormat() { b.newStyle = true } +func (b *BufferMatrix) UseOldormat() { + if b.curBlock != 0 || b.leftInBlock != -1 || b.finished { + panic("UseNewOffset must be called before any Write") + } + b.newStyle = false +} + // getOffset is a wrapper around getOffsetOld and getOffsetNew. // It will call the right one depending on the newStyle flag. func (b *BufferMatrix) getOffset() (int, int) { @@ -96,7 +141,6 @@ func (b *BufferMatrix) getOffset() (int, int) { var emptyErasureHeader = bytes.Repeat([]byte{0}, fragmentHeaderSize()) // Finish *must* be called after the final Write() *before* using the buffer -// in EncodeMatrix // It is safe to call it multiple times. func (b *BufferMatrix) Finish() { if b.finished { diff --git a/buffer_test.go b/buffer_test.go index d9acace..f8ea204 100644 --- a/buffer_test.go +++ b/buffer_test.go @@ -71,51 +71,6 @@ func TestBuffer(t *testing.T) { } } -func TestComparisonEncode(t *testing.T) { - for _, test := range bufferTests { - t.Run(test.name, func(t *testing.T) { - size := test.size - k := test.k - m := test.m - blockSize := test.blockSize - b := NewBufferMatrix(blockSize, size, k) - - data := make([]byte, size) - for i := 0; i < size; i++ { - data[i] = byte(i) - } - n, err := io.Copy(b, bytes.NewReader(data)) - assert.Equal(t, size, int(n)) - b.Finish() - assert.NoError(t, err) - - defer runtime.KeepAlive(b) - - backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: 5}) - defer func() { - _ = backend.Close() - }() - - encoded2, err := backend.EncodeMatrixWithBufferMatrix(b, blockSize) - assert.NoError(t, err) - defer encoded2.Free() - - encoded, err := backend.EncodeMatrix(data, blockSize) - assert.NoError(t, err) - defer encoded.Free() - - for j := 0; j < len(encoded2.Data); j++ { - for i := 0; i < len(encoded2.Data[0]); i++ { - assert.Equal(t, encoded2.Data[j][i], encoded.Data[j][i]) - } - } - for i := 0; i < k+m; i++ { - assert.Equal(t, (encoded2.Data[i]), (encoded.Data[i])) - } - }) - } -} - func TestEncodeBufferMatrix(t *testing.T) { for _, test := range bufferTests { t.Run(test.name, func(t *testing.T) { @@ -164,88 +119,6 @@ func TestEncodeBufferMatrix(t *testing.T) { } } -// BenchmarkEncodeMatrix compares speeds of both style of encoding -// using a generic buffer (and then requiring some allocations in the C shim) -// using a specific buffer (less allocations) -func BenchmarkEncodeMatrix(b *testing.B) { - for _, test := range bufferTests { - b.Run(test.name, func(b *testing.B) { - size := test.size - k := test.k - m := test.m - blockSize := test.blockSize - backend, _ := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: 5}) - defer func() { - _ = backend.Close() - }() - b.Run("original", func(b *testing.B) { - buf := bytes.Repeat([]byte("A"), size) - b.ResetTimer() - for i := 0; i < b.N; i++ { - encoded, err := backend.EncodeMatrix(buf, blockSize) - - if err != nil { - b.Fatal(err) - } - encoded.Free() - } - }) - - b.Run("no copy", func(b *testing.B) { - buf := NewBufferMatrix(blockSize, size, k) - data := bytes.Repeat([]byte("A"), size) - _, err := io.Copy(buf, bytes.NewReader(data)) - assert.NoError(b, err) - buf.Finish() - - b.ResetTimer() - for i := 0; i < b.N; i++ { - encoded, err := backend.EncodeMatrixWithBufferMatrix(buf, blockSize) - - if err != nil { - b.Fatal(err) - } - encoded.Free() - } - }) - }) - } -} - -// BenchmarkBufferCopy compares basic buffer vs matrix implementation -// w.r.t. filling speeds. Please note the matrix implementation is expected -// to be slower. Speed gains will occur during encoding phase -func BenchmarkBufferCopy(b *testing.B) { - for _, test := range bufferTests { - b.Run(test.name, func(b *testing.B) { - originalData := bytes.Repeat([]byte("A"), test.size) - reader := bytes.NewReader(originalData) - b.Run("original", func(b *testing.B) { - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, _ = reader.Seek(0, 0) - buf := bytes.NewBuffer(make([]byte, 0, test.size)) - if _, err := io.Copy(buf, reader); err != nil { - b.Fatal("cannot read buffer") - } - } - }) - b.Run("buffermatrix", func(b *testing.B) { - b.ResetTimer() - buf := NewBufferMatrix(test.blockSize, test.size, test.k) - for i := 0; i < b.N; i++ { - _, _ = reader.Seek(0, 0) - buf.Reset(test.blockSize, test.size, test.k) - if _, err := io.Copy(buf, reader); err != nil { - b.Fatal("cannot read buffer") - } - buf.Finish() - } - }) - }) - } -} - func TestBufferNew(t *testing.T) { for _, test := range bufferTests { t.Run(test.name, func(t *testing.T) { From 12234f85968a70108f8b1162f226a24d7c9bcbb3 Mon Sep 17 00:00:00 2001 From: frederic ferrandis Date: Fri, 26 Dec 2025 13:20:48 +0100 Subject: [PATCH 3/9] HD-4220: adapt get range --- backend.go | 73 ++++++++++++++++++++---------- backend_test.go | 118 +++++++++++++++++++++++++++++++++++++++++------- 2 files changed, 150 insertions(+), 41 deletions(-) diff --git a/backend.go b/backend.go index adf31cc..9770ede 100644 --- a/backend.go +++ b/backend.go @@ -372,7 +372,6 @@ func (backend *Backend) ChunkInfo(fragRangeLen int, pieceSize int) ChunkInfo { if nrChunks*chunkSize != fragRangeLen { nrChunks++ } - return ChunkInfo{ ChunkSize: chunkSize, NrChunk: nrChunks, @@ -392,7 +391,7 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int /* Fragments are sorted beforehand with the index of the first chunk. All chunks of a fragments share the same index. */ fragsIndex := make([]int, len(frags)) - for i := 0; i < len(frags); i++ { + for i := range frags { fragsIndex[i] = i } @@ -591,41 +590,65 @@ type RangeMatrix struct { * p4 [-[*]- -] * */ -func (backend *Backend) GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize int) *RangeMatrix { - chunkSize := pieceSize + backend.headerSize - groupSize := pieceSize * backend.K +func (backend *Backend) GetRangeMatrix(startIncl, endIncl, cellDataSize, fragSize int) *RangeMatrix { + nrColumns := backend.K + cellSize := cellDataSize + backend.headerSize + lineSize := cellDataSize * nrColumns /* At this point we don't know what is the true payload size, but we can at least check that it doesn't exceed the maximum payload that this configuration can handle. */ - nrChunkByFrag := fragSize / chunkSize - dataLenPerFrag := fragSize - nrChunkByFrag*backend.headerSize - maxDataLen := dataLenPerFrag * backend.K - if startIncl >= maxDataLen || endIncl >= maxDataLen || startIncl > endIncl { + nrLines := fragSize / cellSize + + /* Inside a fragment (ie, inside a column), what is the + stored amount of data? */ + fragDataSize := fragSize - nrLines*backend.headerSize + + maxDataSize := fragDataSize * nrColumns + + if startIncl >= maxDataSize || endIncl >= maxDataSize || startIncl > endIncl { return nil } - pieceStartIncl := startIncl / pieceSize - pieceEndIncl := endIncl / pieceSize + /* convert cells indices to (x,y) indices */ + + /* Considering our (x,y) matrix as a single row, what + are the indices of the start and end of the range we are interested in? */ + idxStart := startIncl / cellDataSize + idxEnd := endIncl / cellDataSize + + /* as we have the indices of the first cell and the last cell, + * we can derive the indices of the first line and the last line + * Based on this first line and last line, we can deduce + * the amount of data to read in each fragment. + */ + lineStart := idxStart / nrColumns + lineEnd := idxEnd / nrColumns + + /* + * as we have the indices of the first cell and the last cell, + * we can compute the first column (e.g the first fragment) + * where to start the read + */ + columnStart := idxStart % nrColumns - groupStartIncl := pieceStartIncl / backend.K - groupEndIncl := pieceEndIncl / backend.K + nrCellsToRead := (idxEnd + 1 - idxStart) + dataOffset := idxStart * cellDataSize - fragFirstIncl := pieceStartIncl % backend.K - fragCount := (pieceEndIncl + 1 - pieceStartIncl) - dataOffset := pieceStartIncl * pieceSize + totalLines := (maxDataSize + lineSize - 1) / lineSize + isLastStripe := (lineEnd == totalLines-1) /* When wrapping around, we read the full groups. */ - if fragFirstIncl+fragCount > backend.K { - fragFirstIncl = 0 - fragCount = backend.K - dataOffset = groupStartIncl * groupSize + if columnStart+nrCellsToRead > nrColumns || isLastStripe { + columnStart = 0 + nrCellsToRead = nrColumns + dataOffset = lineStart * lineSize } /* For each fragment, this is the minimum range to read -- including the header -- to decode or repair the data. */ - inFragRangeStartIncl := groupStartIncl * chunkSize - inFragRangeEndExcl := (groupEndIncl + 1) * chunkSize + inFragRangeStartIncl := lineStart * cellSize + inFragRangeEndExcl := (lineEnd + 1) * cellSize /* The output buffer only contains the data necessary to read the range, and the requested range must be adjusted to be relative @@ -637,13 +660,13 @@ func (backend *Backend) GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize i linearizedRangeStartIncl := startIncl - dataOffset /* Decoding always works on a group boundary. */ - decodedRangeStartIncl := startIncl - groupStartIncl*groupSize + decodedRangeStartIncl := startIncl - lineStart*lineSize return &RangeMatrix{ ReqStartIncl: startIncl, ReqEndIncl: endIncl, - FragFirstIncl: fragFirstIncl, - FragCount: fragCount, + FragFirstIncl: columnStart, + FragCount: nrCellsToRead, InFragRangeStartIncl: inFragRangeStartIncl, InFragRangeEndExcl: inFragRangeEndExcl, DecodedRangeStartIncl: decodedRangeStartIncl, diff --git a/backend_test.go b/backend_test.go index 01cbffa..668e649 100644 --- a/backend_test.go +++ b/backend_test.go @@ -201,14 +201,13 @@ func TestEncodeDecode(t *testing.T) { } } - decode := func(frags [][]byte, description string) bool { + decode := func(frags [][]byte, description string) { decoded, err := backend.DecodeMatrix(frags, DefaultChunkSize) require.NoError(t, err) defer decoded.Free() require.True(t, bytes.Equal(decoded.Data, pattern), "%v:%d(%v) pattern: %v, got: %q", description, patternIndex, backend, pattern, decoded.Data) - return true } decode(frags, "all frags") @@ -253,12 +252,11 @@ func TestReconstruct(t *testing.T) { defer data.Free() frags := data.Data - reconstruct := func(recon_frags [][]byte, frag_index int, description string) bool { + reconstruct := func(recon_frags [][]byte, frag_index int, description string) { data, err := backend.ReconstructMatrix(recon_frags, frag_index, DefaultChunkSize) require.NoError(t, err, "%v: %v: %q for pattern %d", description, backend, err, patternIndex) defer data.Free() require.True(t, bytes.Equal(data.Data, frags[frag_index]), "%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, data.Data) - return true } reconstruct(shuf(frags[:params.K]), params.K+params.M-1, "last frag from data frags") reconstruct(shuf(frags[params.M:]), 0, "first frag with parity frags") @@ -802,17 +800,107 @@ func TestEncodeM(t *testing.T) { _ = backend.Close() } -func TestLinearizeMatrix(t *testing.T) { - assert := assert.New(t) +func TestTwoLinearizeMatrix(t *testing.T) { + backend, err := InitBackend(Params{Name: "isa_l_rs_vand", K: 2, M: 1, W: 8, HD: 5}) + require.NoError(t, err) + defer func() { + _ = backend.Close() + }() + currentChunkSize := 512 + dataSize := currentChunkSize*2 + 10 + startIncl := dataSize - 3 + endIncl := dataSize - 1 + + data := make([]byte, dataSize) + for i := range dataSize { + data[i] = byte('A' + i%26) + } + bm := NewBufferMatrix(currentChunkSize, len(data), backend.K) + _, err = io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, currentChunkSize) + require.NoError(t, err) + defer encoded.Free() + rangeM := backend.GetRangeMatrix(startIncl, endIncl, currentChunkSize, len(encoded.Data[0])) + require.NotNil(t, rangeM) + + /* Decode the matrix as if it was requested and + checks that the result matches the payload on the requested range. */ + frags := make([][]byte, 0) + for i := 0; i < rangeM.FragCount; i++ { + fragIdx := (rangeM.FragFirstIncl + i) % backend.K + buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl] + frags = append(frags, buffer) + } + + decoded, err := backend.LinearizeMatrix(frags, currentChunkSize) + require.NoError(t, err) + defer decoded.Free() + + expected := data[startIncl:endIncl] + + linearizedRangeEndExcl := rangeM.LinearizedRangeStartIncl + (endIncl - startIncl) + found := decoded.Data[rangeM.LinearizedRangeStartIncl:linearizedRangeEndExcl] + + require.True(t, bytes.Equal(expected, found)) +} + +func TestOneLinearizeMatrix(t *testing.T) { + backend, err := InitBackend(Params{Name: "isa_l_rs_vand", K: 4, M: 2, W: 8, HD: 5}) + require.NoError(t, err) + defer func() { + _ = backend.Close() + }() + dataSize := 105623 + startIncl := 59441 + endIncl := 64149 + data := make([]byte, dataSize) + for i := range dataSize { + data[i] = byte('A' + i%26) + } + bm := NewBufferMatrix(DefaultChunkSize, len(data), backend.K) + _, err = io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) + require.NoError(t, err) + defer encoded.Free() + + rangeM := backend.GetRangeMatrix(startIncl, endIncl, DefaultChunkSize, len(encoded.Data[0])) + require.NotNil(t, rangeM) + + /* Decode the matrix as if it was requested and + checks that the result matches the payload on the requested range. */ + frags := make([][]byte, 0) + for i := 0; i < rangeM.FragCount; i++ { + fragIdx := (rangeM.FragFirstIncl + i) % backend.K + buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl] + frags = append(frags, buffer) + } + + decoded, err := backend.LinearizeMatrix(frags, DefaultChunkSize) + require.NoError(t, err) + defer decoded.Free() + + expected := data[startIncl : endIncl+1] + + linearizedRangeEndExcl := rangeM.LinearizedRangeStartIncl + (endIncl - startIncl) + 1 + found := decoded.Data[rangeM.LinearizedRangeStartIncl:linearizedRangeEndExcl] + require.True(t, bytes.Equal(expected, found)) +} + +func TestLinearizeMatrix(t *testing.T) { pieceSize := DefaultChunkSize k := 4 m := 1 backend, err := InitBackend(Params{Name: "isa_l_rs_vand", K: k, M: m, W: 8, HD: m}) - if err != nil { - t.Fatalf("cannot init backend: (%v)", err) - } + require.NoError(t, err) + defer func() { + _ = backend.Close() + }() rangeValues := func(values []reflect.Value, rng *rand.Rand) { dataSize := 1 + rng.Intn(7*1024*1024) @@ -848,7 +936,7 @@ func TestLinearizeMatrix(t *testing.T) { fragSize := len(encoded.Data[0]) rangeM := backend.GetRangeMatrix(startIncl, endIncl, pieceSize, fragSize) - assert.NotNil(rangeM) + require.NotNil(t, rangeM) /* Decode the matrix as if it was requested and checks that the result matches the payload on the requested range. */ @@ -860,7 +948,7 @@ func TestLinearizeMatrix(t *testing.T) { } decoded, err := backend.LinearizeMatrix(frags, pieceSize) - assert.Nil(err) + require.NoError(t, err) defer decoded.Free() expected := data[startIncl : endIncl+1] @@ -874,9 +962,8 @@ func TestLinearizeMatrix(t *testing.T) { Values: rangeValues, } - if err := quick.Check(checkRange, &config); err != nil { - t.Error(err) - } + require.NoError(t, quick.Check(checkRange, &config)) + } func TestDecodeMatrix(t *testing.T) { @@ -1109,12 +1196,11 @@ func TestEncodeDecodeMatrix(t *testing.T) { defer data.Free() frags := data.Data - decode := func(frags [][]byte, description string) bool { + decode := func(frags [][]byte, description string) { decoded, err := backend.DecodeMatrix(frags, DefaultChunkSize) require.NoError(t, err) require.True(t, bytes.Equal(decoded.Data, pattern), "%v: Expected %v to roundtrip pattern %d, got %q", description, backend, patternIndex, decoded.Data) defer decoded.Free() - return true } decode(frags, "all frags") From 81ee4dc8af6d2d9df1f43452ac6628e69abcbca9 Mon Sep 17 00:00:00 2001 From: frederic ferrandis Date: Fri, 26 Dec 2025 20:32:25 +0100 Subject: [PATCH 4/9] HD-4220: linearize and reconstruct testing --- backend_test.go | 158 ++++++++++++++++++++++++++---------------------- buffer.go | 2 +- 2 files changed, 88 insertions(+), 72 deletions(-) diff --git a/backend_test.go b/backend_test.go index 668e649..29b8330 100644 --- a/backend_test.go +++ b/backend_test.go @@ -800,95 +800,111 @@ func TestEncodeM(t *testing.T) { _ = backend.Close() } -func TestTwoLinearizeMatrix(t *testing.T) { +func TestLinearizeMatrixAndReconstruct(t *testing.T) { backend, err := InitBackend(Params{Name: "isa_l_rs_vand", K: 2, M: 1, W: 8, HD: 5}) require.NoError(t, err) defer func() { _ = backend.Close() }() - currentChunkSize := 512 - dataSize := currentChunkSize*2 + 10 - startIncl := dataSize - 3 - endIncl := dataSize - 1 - data := make([]byte, dataSize) - for i := range dataSize { - data[i] = byte('A' + i%26) + testParams := []struct { + chunkSize int + dataSize int + startIncl int + endIncl int + useOldFormat bool + }{ + { + chunkSize: 512, + dataSize: 512*2 + 10, + startIncl: 512*2 - 3, + endIncl: 512*2 - 1, + }, + { + chunkSize: DefaultChunkSize, + dataSize: 105623, + startIncl: 59441, + endIncl: 64149, + }, + { + chunkSize: 512, + dataSize: 512*2 + 10, + startIncl: 512*2 - 3, + endIncl: 512*2 - 1, + useOldFormat: true, + }, + { + chunkSize: DefaultChunkSize, + dataSize: 105623, + startIncl: 59441, + endIncl: 64149, + useOldFormat: true, + }, } - bm := NewBufferMatrix(currentChunkSize, len(data), backend.K) - _, err = io.Copy(bm, bytes.NewReader(data)) - require.NoError(t, err) - bm.Finish() - encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, currentChunkSize) - require.NoError(t, err) - defer encoded.Free() - - rangeM := backend.GetRangeMatrix(startIncl, endIncl, currentChunkSize, len(encoded.Data[0])) - require.NotNil(t, rangeM) - /* Decode the matrix as if it was requested and - checks that the result matches the payload on the requested range. */ - frags := make([][]byte, 0) - for i := 0; i < rangeM.FragCount; i++ { - fragIdx := (rangeM.FragFirstIncl + i) % backend.K - buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl] - frags = append(frags, buffer) - } + for _, param := range testParams { + p := param + testName := fmt.Sprintf("TestLinearizeMatrixAndReconstruct(oldformat=%v)-%d-%d-%d-%d", + p.useOldFormat, p.chunkSize, p.dataSize, p.startIncl, p.endIncl, + ) + t.Run(testName, func(t *testing.T) { + currentChunkSize := p.chunkSize + dataSize := p.dataSize + startIncl := p.startIncl + endIncl := p.endIncl + + data := make([]byte, dataSize) + for i := range dataSize { + data[i] = byte('A' + i%26) + } + bm := NewBufferMatrix(currentChunkSize, len(data), backend.K) + if p.useOldFormat { + bm.UseOldFormat() + } + _, err = io.Copy(bm, bytes.NewReader(data)) + require.NoError(t, err) + bm.Finish() + encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, currentChunkSize) + require.NoError(t, err) + defer encoded.Free() - decoded, err := backend.LinearizeMatrix(frags, currentChunkSize) - require.NoError(t, err) - defer decoded.Free() + rangeM := backend.GetRangeMatrix(startIncl, endIncl, currentChunkSize, len(encoded.Data[0])) + require.NotNil(t, rangeM) - expected := data[startIncl:endIncl] + /* Decode the matrix as if it was requested and + checks that the result matches the payload on the requested range. */ + frags := make([][]byte, 0) + for i := 0; i < rangeM.FragCount; i++ { + fragIdx := (rangeM.FragFirstIncl + i) % backend.K + buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl] + frags = append(frags, buffer) + } - linearizedRangeEndExcl := rangeM.LinearizedRangeStartIncl + (endIncl - startIncl) - found := decoded.Data[rangeM.LinearizedRangeStartIncl:linearizedRangeEndExcl] + decoded, err := backend.LinearizeMatrix(frags, currentChunkSize) + require.NoError(t, err) + defer decoded.Free() - require.True(t, bytes.Equal(expected, found)) -} + expected := data[startIncl:endIncl] -func TestOneLinearizeMatrix(t *testing.T) { - backend, err := InitBackend(Params{Name: "isa_l_rs_vand", K: 4, M: 2, W: 8, HD: 5}) - require.NoError(t, err) - defer func() { - _ = backend.Close() - }() - dataSize := 105623 - startIncl := 59441 - endIncl := 64149 - data := make([]byte, dataSize) - for i := range dataSize { - data[i] = byte('A' + i%26) - } - bm := NewBufferMatrix(DefaultChunkSize, len(data), backend.K) - _, err = io.Copy(bm, bytes.NewReader(data)) - require.NoError(t, err) - bm.Finish() - encoded, err := backend.EncodeMatrixWithBufferMatrix(bm, DefaultChunkSize) - require.NoError(t, err) - defer encoded.Free() + linearizedRangeEndExcl := rangeM.LinearizedRangeStartIncl + (endIncl - startIncl) + found := decoded.Data[rangeM.LinearizedRangeStartIncl:linearizedRangeEndExcl] - rangeM := backend.GetRangeMatrix(startIncl, endIncl, DefaultChunkSize, len(encoded.Data[0])) - require.NotNil(t, rangeM) + require.True(t, bytes.Equal(expected, found)) - /* Decode the matrix as if it was requested and - checks that the result matches the payload on the requested range. */ - frags := make([][]byte, 0) - for i := 0; i < rangeM.FragCount; i++ { - fragIdx := (rangeM.FragFirstIncl + i) % backend.K - buffer := encoded.Data[fragIdx][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl] - frags = append(frags, buffer) - } + frags2 := make([][]byte, 0) + for i := 0; i < backend.K+backend.M; i++ { + frags2 = append(frags2, encoded.Data[i][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl]) + } - decoded, err := backend.LinearizeMatrix(frags, DefaultChunkSize) - require.NoError(t, err) - defer decoded.Free() + // now do the same test, but this time, insted of linearizing, we are going to reconstruct the stripes + reconstructed, err := backend.ReconstructMatrix(frags2[1:], 0, currentChunkSize) - expected := data[startIncl : endIncl+1] + require.NoError(t, err) + defer reconstructed.Free() - linearizedRangeEndExcl := rangeM.LinearizedRangeStartIncl + (endIncl - startIncl) + 1 - found := decoded.Data[rangeM.LinearizedRangeStartIncl:linearizedRangeEndExcl] - require.True(t, bytes.Equal(expected, found)) + require.True(t, bytes.Equal(frags2[0], reconstructed.Data)) + }) + } } func TestLinearizeMatrix(t *testing.T) { diff --git a/buffer.go b/buffer.go index 42b0979..e28dc53 100644 --- a/buffer.go +++ b/buffer.go @@ -122,7 +122,7 @@ func (b *BufferMatrix) UseNewFormat() { b.newStyle = true } -func (b *BufferMatrix) UseOldormat() { +func (b *BufferMatrix) UseOldFormat() { if b.curBlock != 0 || b.leftInBlock != -1 || b.finished { panic("UseNewOffset must be called before any Write") } From 5decc2c6dcb6751f5fc9ae687eb0be8c48639d3f Mon Sep 17 00:00:00 2001 From: frederic ferrandis Date: Wed, 7 Jan 2026 13:54:28 +0100 Subject: [PATCH 5/9] HD-4220: Last block size Store in EncodeData structure the realsize With old format, it contains the FragLen itself With new format, this is sticked to the real data. In DecodeMatrix, we also manage the last stripe size to ensure we read the header.ChunkSize value instead of reading piecesize bytes --- backend.c | 5 ----- backend.go | 24 ++++++++++++++++++------ backend_test.go | 37 +++++++++++++++++++++++++++---------- buffer.go | 4 ++-- 4 files changed, 47 insertions(+), 23 deletions(-) diff --git a/backend.c b/backend.c index af7cee3..4846940 100644 --- a/backend.c +++ b/backend.c @@ -99,11 +99,6 @@ char *linearize(int k, char **in, int inlen, char *dest, uint64_t destlen, bool check_matrix_fragment(char *frag, int frag_len, int piecesize) { size_t offset = 0; - bool aligned = (frag_len % (piecesize + getHeaderSize())) == 0; - if (!aligned) { - return false; - } - while (offset < frag_len) { if (is_invalid_fragment_header((fragment_header_t *)&frag[offset])) { return false; diff --git a/backend.go b/backend.go index 9770ede..dbbb073 100644 --- a/backend.go +++ b/backend.go @@ -209,8 +209,13 @@ func (backend *Backend) Close() error { // EncodeData is returned by all encode* functions type EncodeData struct { - Data [][]byte // Slice of []bytes ==> our K+N encoded fragments - Free func() // cleanup closure (to free C allocated data once it becomes useless) + Data [][]byte // Slice of []bytes ==> our K+N encoded fragments + Free func() // cleanup closure (to free C allocated data once it becomes useless) + RealDataSize int64 // the real size of the data, without considering the padding +} + +func (e EncodeData) DataLen() int64 { + return e.RealDataSize } // Encode is the general purpose encoding function. It encodes data according @@ -240,13 +245,14 @@ func (backend *Backend) Encode(data []byte) (*EncodeData, error) { return &EncodeData{result, func() { C.my_liberasurecode_encode_cleanup( backend.libecDesc, C.size_t(fragLength), dataFrags, parityFrags) - }}, nil + }, int64(fragLength)}, nil } // EncodeMatrixWithBufferMatrix encodes data in small subpart of chunkSize bytes func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize int) (*EncodeData, error) { var wg sync.WaitGroup var ctx C.struct_encode_chunk_context + var totLen int64 data := bm.Bytes() dataLen := bm.Length() @@ -273,6 +279,7 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize if i == int(ctx.number_of_subgroup)-1 { fragLen = C.size_t(bm.FragLenLastSubGroup()) } + atomic.AddInt64(&totLen, int64(fragLen)+int64(backend.headerSize)) r := C.encode_chunk_buffermatrix(backend.libecDesc, pData, pDataLen, nbFrags, &ctx, C.int(nth), fragLen) @@ -288,7 +295,7 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize return &EncodeData{nil, func() { C.my_liberasurecode_encode_buffermatrix_cleanup( backend.libecDesc, C.size_t(ctx.frags_len), ctx.datas, ctx.codings) - }}, + }, totLen}, fmt.Errorf("error encoding chunk (%+v encoding failed)", errCounter) } result := make([][]byte, backend.K+backend.M) @@ -307,7 +314,7 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize runtime.KeepAlive(bm) C.my_liberasurecode_encode_buffermatrix_cleanup( backend.libecDesc, C.size_t(ctx.frags_len), ctx.datas, ctx.codings) - }}, nil + }, totLen}, nil } // DecodeData is the structure returned by all Decode* function @@ -512,8 +519,9 @@ func (backend *Backend) DecodeMatrix(frags []ValidatedFragment, pieceSize int) ( var totLen int64 for i := 0; i < chunkInfo.NrChunk; i++ { vect := make([][]byte, len(frags)) + nextBound := min((i+1)*chunkInfo.ChunkSize, fragRangeLen) for j := range frags { - vect[j] = frags[j][i*chunkInfo.ChunkSize : (i+1)*chunkInfo.ChunkSize] + vect[j] = frags[j][i*chunkInfo.ChunkSize : nextBound] } subdata, err := backend.Decode(vect) @@ -599,6 +607,10 @@ func (backend *Backend) GetRangeMatrix(startIncl, endIncl, cellDataSize, fragSiz can at least check that it doesn't exceed the maximum payload that this configuration can handle. */ nrLines := fragSize / cellSize + // + if nrLines*cellSize < fragSize { + nrLines++ + } /* Inside a fragment (ie, inside a column), what is the stored amount of data? */ diff --git a/backend_test.go b/backend_test.go index 29b8330..2deac43 100644 --- a/backend_test.go +++ b/backend_test.go @@ -815,16 +815,11 @@ func TestLinearizeMatrixAndReconstruct(t *testing.T) { useOldFormat bool }{ { - chunkSize: 512, - dataSize: 512*2 + 10, - startIncl: 512*2 - 3, - endIncl: 512*2 - 1, - }, - { - chunkSize: DefaultChunkSize, - dataSize: 105623, - startIncl: 59441, - endIncl: 64149, + chunkSize: 512, + dataSize: 512*2 + 10, + startIncl: 512*2 - 3, + endIncl: 512*2 - 1, + useOldFormat: false, }, { chunkSize: 512, @@ -833,6 +828,14 @@ func TestLinearizeMatrixAndReconstruct(t *testing.T) { endIncl: 512*2 - 1, useOldFormat: true, }, + { + chunkSize: DefaultChunkSize, + dataSize: 105623, + startIncl: 59441, + endIncl: 64149, + useOldFormat: false, + }, + { chunkSize: DefaultChunkSize, dataSize: 105623, @@ -840,6 +843,20 @@ func TestLinearizeMatrixAndReconstruct(t *testing.T) { endIncl: 64149, useOldFormat: true, }, + { + chunkSize: DefaultChunkSize, + dataSize: 105623, + startIncl: 105610, + endIncl: 105622, + useOldFormat: true, + }, + { + chunkSize: DefaultChunkSize, + dataSize: 105623, + startIncl: 105610, + endIncl: 105622, + useOldFormat: false, + }, } for _, param := range testParams { diff --git a/buffer.go b/buffer.go index e28dc53..433b52d 100644 --- a/buffer.go +++ b/buffer.go @@ -117,14 +117,14 @@ func (b *BufferMatrix) Reset(bufSize int, length int, k int) { // Note: will panic if called after any Write() or ReadFrom() func (b *BufferMatrix) UseNewFormat() { if b.curBlock != 0 || b.leftInBlock != -1 || b.finished { - panic("UseNewOffset must be called before any Write") + panic("UseNewFormat must be called before any Write") } b.newStyle = true } func (b *BufferMatrix) UseOldFormat() { if b.curBlock != 0 || b.leftInBlock != -1 || b.finished { - panic("UseNewOffset must be called before any Write") + panic("UseOldFormat must be called before any Write") } b.newStyle = false } From 0f060e968bb7e305b2f237e76ab2ed28e8d9d8a0 Mon Sep 17 00:00:00 2001 From: frederic ferrandis Date: Thu, 8 Jan 2026 09:09:54 +0100 Subject: [PATCH 6/9] HD-4220: add accessor to encodedata struct --- backend.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/backend.go b/backend.go index dbbb073..cb40856 100644 --- a/backend.go +++ b/backend.go @@ -217,6 +217,12 @@ type EncodeData struct { func (e EncodeData) DataLen() int64 { return e.RealDataSize } +func (e *EncodeData) GetFragment(index int) []byte { + if index < 0 || index >= len(e.Data) { + return nil + } + return e.Data[index][:e.RealDataSize] +} // Encode is the general purpose encoding function. It encodes data according // backend params and returns an EncodeData structure containing the Fragments From d25c645dd88ea3ac1e594f3472fb4ebe7e6720fc Mon Sep 17 00:00:00 2001 From: frederic ferrandis Date: Thu, 8 Jan 2026 14:22:17 +0100 Subject: [PATCH 7/9] HD-4220: ReconstructMatrix should size the generated buffer --- backend.go | 50 ++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/backend.go b/backend.go index cb40856..979045d 100644 --- a/backend.go +++ b/backend.go @@ -328,8 +328,17 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize // that clean some C dynamically allocated objects // If Free is not null, the closure should be used only when the Data is not needed anymore type DecodeData struct { - Data []byte - Free func() + Data []byte + Free func() + RealDataSize int64 +} + +func (d DecodeData) DataLen() int64 { + return d.RealDataSize +} + +func (d *DecodeData) GetFragment() []byte { + return d.Data[:d.RealDataSize] } // // bufPool is a pool of bytes.Buffer of max size maxBuffer. @@ -499,8 +508,9 @@ func (backend *Backend) LinearizeMatrix(frags []ValidatedFragment, pieceSize int } return &DecodeData{ - data[:totLen:totLen], - func() { + Data: data[:totLen:totLen], + RealDataSize: int64(totLen), + Free: func() { backend.pool.Release(dataB) }}, nil } @@ -539,9 +549,12 @@ func (backend *Backend) DecodeMatrix(frags []ValidatedFragment, pieceSize int) ( subdata.Free() } - return &DecodeData{data[:totLen:totLen], func() { - backend.pool.Release(dataB) - }}, nil + return &DecodeData{ + Data: data[:totLen:totLen], + RealDataSize: int64(totLen), + Free: func() { + backend.pool.Release(dataB) + }}, nil } // RangeMatrix describes information needed to decode a range of encoded frags @@ -718,8 +731,10 @@ func (backend *Backend) Decode(frags [][]byte) (*DecodeData, error) { runtime.KeepAlive(frags) // prevent frags from being GC-ed during decode C.freeStrArray(cFrags) - return &DecodeData{(*[1 << 30]byte)(unsafe.Pointer(data))[:int(dataLength):int(dataLength)], - func() { + return &DecodeData{ + Data: (*[1 << 30]byte)(unsafe.Pointer(data))[:int(dataLength):int(dataLength)], + RealDataSize: int64(dataLength), + Free: func() { C.liberasurecode_decode_cleanup(backend.libecDesc, data) }}, nil @@ -781,14 +796,17 @@ func (backend *Backend) ReconstructMatrix(frags [][]byte, fragIndex int, pieceSi dataB, data := backend.pool.New(dlen) var errCounter uint32 + var totLen int64 // TODO use goroutines here to leverage multicore computation wg.Add(chunkNr) for i := 0; i < chunkNr; i++ { go func(chunkIdx int) { vect := make([][]byte, len(frags)) - for j := 0; j < len(frags); j++ { - vect[j] = frags[j][chunkIdx*chunkSize : (chunkIdx+1)*chunkSize] + for j := range frags { + length := min(len(frags[j]), (chunkIdx+1)*chunkSize) + vect[j] = frags[j][chunkIdx*chunkSize : length] } + atomic.AddInt64(&totLen, int64(len(vect[0]))) if err := backend.reconstruct(vect, fragIndex, data[chunkIdx*chunkSize:]); err != nil { atomic.AddUint32(&errCounter, 1) } @@ -799,9 +817,13 @@ func (backend *Backend) ReconstructMatrix(frags [][]byte, fragIndex int, pieceSi if errCounter != 0 { return nil, errors.New("sub reconstruction failed") } - return &DecodeData{data[:dlen:dlen], func() { - backend.pool.Release(dataB) - }}, nil + + return &DecodeData{ + Data: data[:totLen:totLen], + RealDataSize: int64(totLen), + Free: func() { + backend.pool.Release(dataB) + }}, nil } // IsInvalidFragment is a wrapper on C implementation From 95770dcc3b8e142a841d75bdf848419050a69e10 Mon Sep 17 00:00:00 2001 From: frederic ferrandis Date: Fri, 9 Jan 2026 16:14:03 +0100 Subject: [PATCH 8/9] HD-4220: test GetRangeHelper with old format --- backend_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/backend_test.go b/backend_test.go index 2deac43..724a136 100644 --- a/backend_test.go +++ b/backend_test.go @@ -1461,10 +1461,9 @@ func testRangeHelper(t *testing.T, useNewFormat bool) { } bm := NewBufferMatrix(chunkSize, len(buf), backend.K) - if useNewFormat { - bm.UseNewFormat() + if !useNewFormat { + bm.UseOldFormat() } - // bm.UseNewFormat() _, err := io.Copy(bm, bytes.NewReader(buf)) require.NoError(t, err) bm.Finish() From aad156b33b4c66fe6ecb285910c0f1978c9e56a4 Mon Sep 17 00:00:00 2001 From: frederic ferrandis Date: Tue, 13 Jan 2026 10:53:43 +0100 Subject: [PATCH 9/9] HD-4220: fix concurrency potential bug --- backend.c | 6 +++++- backend.go | 4 ++-- backend_test.go | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/backend.c b/backend.c index 4846940..de6cb08 100644 --- a/backend.c +++ b/backend.c @@ -100,10 +100,14 @@ bool check_matrix_fragment(char *frag, int frag_len, int piecesize) { size_t offset = 0; while (offset < frag_len) { + if (offset + sizeof(fragment_header_t) > frag_len) { + // there is some remaining bytes, but not enough to read at least the header + return false; + } if (is_invalid_fragment_header((fragment_header_t *)&frag[offset])) { return false; } - offset += piecesize + getHeaderSize(); + offset += ((fragment_header_t *)&frag[offset])->meta.size + getHeaderSize(); } return true; diff --git a/backend.go b/backend.go index 979045d..5ad9557 100644 --- a/backend.go +++ b/backend.go @@ -282,7 +282,7 @@ func (backend *Backend) EncodeMatrixWithBufferMatrix(bm *BufferMatrix, chunkSize go func(nth int) { fragLen := C.size_t(chunkSize) // last subgroup has a different size - if i == int(ctx.number_of_subgroup)-1 { + if nth == int(ctx.number_of_subgroup)-1 { fragLen = C.size_t(bm.FragLenLastSubGroup()) } atomic.AddInt64(&totLen, int64(fragLen)+int64(backend.headerSize)) @@ -690,7 +690,7 @@ func (backend *Backend) GetRangeMatrix(startIncl, endIncl, cellDataSize, fragSiz all fragments (see (2) in the function's comment above). */ linearizedRangeStartIncl := startIncl - dataOffset - /* Decoding always works on a group boundary. */ + /* Decoding always works on a column boundary. */ decodedRangeStartIncl := startIncl - lineStart*lineSize return &RangeMatrix{ diff --git a/backend_test.go b/backend_test.go index 724a136..b4d1518 100644 --- a/backend_test.go +++ b/backend_test.go @@ -913,7 +913,7 @@ func TestLinearizeMatrixAndReconstruct(t *testing.T) { frags2 = append(frags2, encoded.Data[i][rangeM.InFragRangeStartIncl:rangeM.InFragRangeEndExcl]) } - // now do the same test, but this time, insted of linearizing, we are going to reconstruct the stripes + // now do the same test, but this time, instead of linearizing, we are going to reconstruct the stripes reconstructed, err := backend.ReconstructMatrix(frags2[1:], 0, currentChunkSize) require.NoError(t, err)