diff --git a/src/plugins/intermediate/CMakeLists.txt b/src/plugins/intermediate/CMakeLists.txt index 97993127..5e2f73ba 100644 --- a/src/plugins/intermediate/CMakeLists.txt +++ b/src/plugins/intermediate/CMakeLists.txt @@ -1,3 +1,4 @@ # List of output plugin to build and install add_subdirectory(anonymization) -add_subdirectory(filter) \ No newline at end of file +add_subdirectory(filter) +add_subdirectory(extender) \ No newline at end of file diff --git a/src/plugins/intermediate/extender/CMakeLists.txt b/src/plugins/intermediate/extender/CMakeLists.txt new file mode 100644 index 00000000..f2c465c4 --- /dev/null +++ b/src/plugins/intermediate/extender/CMakeLists.txt @@ -0,0 +1,28 @@ +add_library(extender-intermediate MODULE + msg_builder.h + extender.c + config.c + config.h +) + +install( + TARGETS extender-intermediate + LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/" +) + +if (ENABLE_DOC_MANPAGE) + # Build a manual page + set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-extender-inter.7.rst") + set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-extender-inter.7") + + add_custom_command(TARGET extender-intermediate PRE_BUILD + COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE} + DEPENDS ${SRC_FILE} + VERBATIM + ) + + install( + FILES "${DST_FILE}" + DESTINATION "${INSTALL_DIR_MAN}/man7" + ) +endif() \ No newline at end of file diff --git a/src/plugins/intermediate/extender/README.rst b/src/plugins/intermediate/extender/README.rst new file mode 100644 index 00000000..511cf325 --- /dev/null +++ b/src/plugins/intermediate/extender/README.rst @@ -0,0 +1,65 @@ +============================ + Extender (intermediate plugin) +============================ + +This intermediate plugin extends IPFIX records by adding new items to the existing record set using user-defined expressions. + + +Example configuration +--------------------- + +.. code-block:: xml + + + Extenders + extender + + + iana:VRFname + + iana:dot1qCustomerVlanId == 0 + vrf-0 + + + iana:dot1qCustomerVlanId == 1 + vrf-1 + + + + + +The above sample adds a new string field ``VRFname=vrf-0`` or ``VRFname=vrf-1`` to each IPFIX record depending on the value of ``dot1qCustomerVlanId``. + +Parameters +---------- + +``expr`` + The filter expression to evaluate. + +``id`` + The identifier string to be added to the record when the expression evaluates to true. + + +Supported operations +-------------------- + +Comparisons + - Operators: ``==``, ``<``, ``>``, ``<=``, ``>=``, ``!=`` + - Default: ``==`` (if the operator is omitted) + +Substring Matching + - Operator: ``contains`` + - Example: ``DNSName contains "example"`` + +Arithmetic + - Operators: ``+``, ``-``, ``*``, ``/``, ``%`` + +Bitwise Logic + - Operators: ``~`` (not), ``|`` (or), ``&`` (and), ``^`` (xor) + +List Membership + - Operator: ``in`` + - Example: ``port in [80, 443]`` + +Logical Operators + - Operators: ``and``, ``or``, ``not`` \ No newline at end of file diff --git a/src/plugins/intermediate/extender/config.c b/src/plugins/intermediate/extender/config.c new file mode 100644 index 00000000..54c7f972 --- /dev/null +++ b/src/plugins/intermediate/extender/config.c @@ -0,0 +1,233 @@ +/** + * \file src/plugins/intermediate/filter/config.c + * \author Michal Sedlak + * \brief The filter plugin config + * \date 2020 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#include "config.h" + +#include +#include + +/* + * + * ... + * + */ + +enum params_xml_nodes { + EXTENSION_EXPR = 1, + EXTENSION_ID = 2, + EXTENSION_VALUES = 3, + EXTENSION_VALUE = 4, + EXTENSION_IDS = 5 +}; + +static const struct fds_xml_args values_params[] = { + FDS_OPTS_ELEM(EXTENSION_EXPR, "expr", FDS_OPTS_T_STRING, 0), + FDS_OPTS_ELEM(EXTENSION_VALUE, "value", FDS_OPTS_T_STRING, 0), + FDS_OPTS_END +}; + +static const struct fds_xml_args ids_params[] = { + FDS_OPTS_ELEM(EXTENSION_ID, "id", FDS_OPTS_T_STRING, 0), + FDS_OPTS_NESTED(EXTENSION_VALUES, "values", values_params, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), + FDS_OPTS_END +}; + +static const struct fds_xml_args args_params[] = { + FDS_OPTS_ROOT("params"), + FDS_OPTS_NESTED(EXTENSION_IDS, "ids", ids_params, FDS_OPTS_P_OPT | FDS_OPTS_P_MULTI), + FDS_OPTS_END +}; + +int config_parse_values(ipx_ctx_t *ctx, const struct fds_xml_cont *content, config_ids_t* id) { + const struct fds_xml_cont *value_content; + while (fds_xml_next(content->ptr_ctx, &value_content) == FDS_OK) { + switch (value_content->id) { + case EXTENSION_VALUE: + assert(value_content->type == FDS_OPTS_T_STRING); + if (strlen(value_content->ptr_string) == 0) { + IPX_CTX_ERROR(ctx, "Extension value is empty!"); + return -1; + } + id->values[id->values_count].value = strdup(value_content->ptr_string); + if (!id->values[id->values_count].value) { + IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__); + return -1; + } + break; + case EXTENSION_EXPR: + assert(value_content->type == FDS_OPTS_T_STRING); + if (strlen(value_content->ptr_string) == 0) { + IPX_CTX_ERROR(ctx, "Filter expression is empty!"); + return -1; + } + id->values[id->values_count].expr = strdup(value_content->ptr_string); + if (!id->values[id->values_count].expr) { + IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__); + return -1; + } + break; + default: + break; + } + } + return 0; +} + +int config_parse_ids(ipx_ctx_t *ctx, const struct fds_xml_cont *content, config_ids_t* id) { + const struct fds_xml_cont *id_content; + while (fds_xml_next(content->ptr_ctx, &id_content) == FDS_OK) { + switch (id_content->id) { + case EXTENSION_ID: + // Ignored in this plugin + assert(id_content->type == FDS_OPTS_T_STRING); + if (strlen(id_content->ptr_string) == 0) { + IPX_CTX_ERROR(ctx, "Extension ID is empty!"); + return -1; + } + id->name = strdup(id_content->ptr_string); + if (!id->name) { + IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__); + return -1; + } + break; + case EXTENSION_VALUES: + if( id->values_count >= CONFIG_VALUSE_MAX ) { + IPX_CTX_ERROR(ctx, "Maximum number of extension values per id exceeded (%d)!", CONFIG_VALUSE_MAX); + return -1; + } + config_parse_values(ctx, id_content, id); + id->values_count++; + break; + default: + break; + } + } + return 0; +} + +struct config * +config_parse(ipx_ctx_t *ctx, const char *params) +{ + struct config *cfg = NULL; + fds_xml_t *parser = NULL; + + cfg = calloc(1, sizeof(struct config)); + if (!cfg) { + IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__); + goto error; + } + + parser = fds_xml_create(); + if (!parser) { + IPX_CTX_ERROR(ctx, "Memory allocation error (%s:%d)", __FILE__, __LINE__); + goto error; + } + + if (fds_xml_set_args(parser, args_params) != FDS_OK) { + IPX_CTX_ERROR(ctx, "Failed to parse the description of an XML document!"); + goto error; + } + + fds_xml_ctx_t *params_ctx = fds_xml_parse_mem(parser, params, true); + if (params_ctx == NULL) { + IPX_CTX_ERROR(ctx, "Failed to parse the configuration: %s", fds_xml_last_err(parser)); + goto error; + } + + const struct fds_xml_cont *content; + while (fds_xml_next(params_ctx, &content) == FDS_OK) { + switch (content->id) { + case EXTENSION_IDS: + { + if( cfg->ids_count >= CONFIG_IDS_MAX ) { + IPX_CTX_ERROR(ctx, "Maximum number of extension uniq ids exceeded (%d)!", CONFIG_IDS_MAX); + goto error; + } + config_ids_t* id = &cfg->ids[cfg->ids_count]; + id->name = NULL; + if( config_parse_ids(ctx, content, id) != 0 ) { + goto error; + } + cfg->ids_count++; + break; + } + default: + break; + } + } + fds_xml_destroy(parser); + return cfg; + +error: + fds_xml_destroy(parser); + config_destroy(cfg); + return NULL; +} + +void +config_destroy(struct config *cfg) +{ + if (cfg == NULL) { + return; + } + for( int i = 0; i < cfg->ids_count; i++) { + printf("Free filter %s\n", cfg->ids[i].name); + for( int v =0; v < cfg->ids[i].values_count; v++) { + if( cfg->ids[i].values[v].expr ) { + free(cfg->ids[i].values[v].expr); + cfg->ids[i].values[v].expr = NULL; + } + if( cfg->ids[i].values[v].value ) { + free(cfg->ids[i].values[v].value); + cfg->ids[i].values[v].value = NULL; + } + if( cfg->ids[i].values[v].filter ) { + fds_ipfix_filter_destroy(cfg->ids[i].values[v].filter); + cfg->ids[i].values[v].filter = NULL; + } + } + if( cfg->ids[i].name ) { + free(cfg->ids[i].name); + cfg->ids[i].name = NULL; + } + } + free(cfg); +} \ No newline at end of file diff --git a/src/plugins/intermediate/extender/config.h b/src/plugins/intermediate/extender/config.h new file mode 100644 index 00000000..a0dee876 --- /dev/null +++ b/src/plugins/intermediate/extender/config.h @@ -0,0 +1,84 @@ +/** + * \file src/plugins/intermediate/filter/config.h + * \author Michal Sedlak + * \brief The filter plugin config header + * \date 2020 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#ifndef CONFIG_H +#define CONFIG_H + +#define CONFIG_IDS_MAX 4 +#define CONFIG_VALUSE_MAX 12 + +#include + +typedef struct config_value { + char *expr; + char* value; + fds_ipfix_filter_t *filter; +} config_value_t; + +typedef struct config_ids { + char* name; + uint16_t id; + enum fds_iemgr_element_type data_type; + int values_count; + config_value_t values[CONFIG_IDS_MAX]; +} config_ids_t; + +struct config { + size_t max_extension_len; + int ids_count; + config_ids_t ids[CONFIG_IDS_MAX]; +}; + +typedef struct tmp_match { + u_int16_t id; + enum fds_iemgr_element_type data_type; + char* value; + bool matched; +} tmp_match_t; + + +struct config * +config_parse(ipx_ctx_t *ctx, const char *params); + +void +config_destroy(struct config *cfg); + +#endif // CONFIG_H \ No newline at end of file diff --git a/src/plugins/intermediate/extender/doc/ipfixcol2-extender-inter.7.rst b/src/plugins/intermediate/extender/doc/ipfixcol2-extender-inter.7.rst new file mode 100644 index 00000000..40b103b6 --- /dev/null +++ b/src/plugins/intermediate/extender/doc/ipfixcol2-extender-inter.7.rst @@ -0,0 +1,20 @@ +======================== + ipfixcol2-extender +======================== + +----------------------------------- +Extender (intermediate plugin) +----------------------------------- + +:Author: Jimmy Björklund (jimmy@lolo.company) +:Date: 2026-01-14 +:Copyright: Copyright © 2026 CESNET, z.s.p.o. +:Version: 1.0 +:Manual section: 7 +:Manual group: IPFIXcol collector + +Description +----------- + +.. include:: ../README.rst + :start-line: 3 diff --git a/src/plugins/intermediate/extender/extender.c b/src/plugins/intermediate/extender/extender.c new file mode 100644 index 00000000..a442d0a8 --- /dev/null +++ b/src/plugins/intermediate/extender/extender.c @@ -0,0 +1,544 @@ +/** + * \file src/plugins/intermediate/extender/extender.c + * \author Jimmy Björklund + * \brief The extender plugin implementation + * \date 2026 + */ + +#include +#include +#include +#include // ntohs +#include // be64toh +#include +#include // timespec +#include +#include + +#include + +#include "config.h" +#include "msg_builder.h" + +// Define VRFName IE ID if not available +#define IE_VRF_NAME 236 + +IPX_API struct ipx_plugin_info ipx_plugin_info = { + .type = IPX_PT_INTERMEDIATE, + .name = "extender", + .dsc = "Data record extender plugin", + .flags = 0, + .version = "0.0.1", + .ipx_min = "2.0.0" +}; + +// Linked list to map old template IDs to new ones +struct template_node { + uint16_t old_id; + uint16_t new_id; + struct fds_template *new_tmplt; + uint8_t *raw_buffer; + struct template_node *next; +}; + +struct plugin_ctx { + struct config *config; + ipx_ctx_t *ipx_ctx; + struct template_node *templates; + uint16_t next_template_id; +}; + + +struct plugin_ctx * +create_plugin_ctx() +{ + struct plugin_ctx *pctx = calloc(1, sizeof(struct plugin_ctx)); + if (!pctx) { + return NULL; + } + // Start allocating new template IDs from 40000 to avoid conflicts with standard IDs + pctx->next_template_id = 40000; + return pctx; +} + +void +destroy_plugin_ctx(struct plugin_ctx *pctx) +{ + if (!pctx) { + return; + } + + // Free template cache + struct template_node *node = pctx->templates; + while (node) { + struct template_node *next = node->next; + if (node->new_tmplt) { + fds_template_destroy(node->new_tmplt); + } + if (node->raw_buffer) { + free(node->raw_buffer); + } + free(node); + node = next; + } + + // Free filters explicitly + printf("Destroying plugin context and freeing filters\n"); + if (pctx->config) { + config_destroy(pctx->config); + } + free(pctx); +} + +static inline bool +record_belongs_to_set(struct fds_ipfix_set_hdr *set, struct fds_drec *record) +{ + uint8_t *set_begin = (uint8_t *) set; + uint8_t *set_end = set_begin + ntohs(set->length); + uint8_t *record_begin = record->data; + + return record_begin >= set_begin && record_begin < set_end; +} + + +static uint16_t size_of_data_type(enum fds_iemgr_element_type data_type) +{ + switch(data_type) { + case FDS_ET_BOOLEAN: + case FDS_ET_UNSIGNED_8: + case FDS_ET_SIGNED_8: + return 1; + case FDS_ET_UNSIGNED_16: + case FDS_ET_SIGNED_16: + return 2; + case FDS_ET_UNSIGNED_32: + case FDS_ET_SIGNED_32: + case FDS_ET_FLOAT_32: + return 4; + case FDS_ET_UNSIGNED_64: + case FDS_ET_SIGNED_64: + case FDS_ET_FLOAT_64: + return 8; + case FDS_ET_IPV4_ADDRESS: + return 4; + case FDS_ET_IPV6_ADDRESS: + return 16; + case FDS_ET_STRING: + case FDS_ET_OCTET_ARRAY: + return UINT16_MAX; // Variable length + default: + return 0; // Unknown size + } +} + +static size_t get_max_len(const config_ids_t* id) +{ + size_t max_len = 0; + for( int i =0; i < id->values_count; i++) { + if (id->values[i].value) { + size_t len = strlen(id->values[i].value); + if( len > max_len ) { + max_len = len; + } + } + } + return max_len; +} + +int ipx_plugin_init(ipx_ctx_t *ipx_ctx, const char *params) +{ + // Create the plugin context + struct plugin_ctx *pctx = create_plugin_ctx(); + if (!pctx) { + return IPX_ERR_DENIED; + } + + pctx->ipx_ctx = ipx_ctx; + + // Parse config + pctx->config = config_parse(ipx_ctx, params); + if (!pctx->config) { + destroy_plugin_ctx(pctx); + return IPX_ERR_DENIED; + } + + // Create the opts + for (int i = 0; i < pctx->config->ids_count; i++) { + for ( int v =0; v < pctx->config->ids[i].values_count; v++) { + config_ids_t* id = &pctx->config->ids[i]; + config_value_t* value = &id->values[v]; + printf("Config ID %s Value %s Expr %s\n", + id->name, + value->value, + value->expr); + int rc = fds_ipfix_filter_create(&value->filter, ipx_ctx_iemgr_get(ipx_ctx), value->expr); + if (rc != FDS_OK) { + const char *error = fds_ipfix_filter_get_error(value->filter); + IPX_CTX_ERROR(ipx_ctx, "Error creating filter: %s", error); + destroy_plugin_ctx(pctx); + return IPX_ERR_DENIED; + } + + // from the id lookup the iana template id and data length + const struct fds_iemgr_elem * elem = fds_iemgr_elem_find_name(ipx_ctx_iemgr_get(ipx_ctx), id->name); + if (!elem) { + IPX_CTX_ERROR(ipx_ctx, "Unknown ID (make sure case is correct): %s", id->name); + destroy_plugin_ctx(pctx); + return IPX_ERR_DENIED; + } + id->id = elem->id; + id->data_type = elem->data_type; + } + } + size_t tmp_len = 0; + for (int i = 0; i < pctx->config->ids_count; i++) { + const uint16_t size = size_of_data_type(pctx->config->ids[i].data_type); + if( size == UINT16_MAX ) { + size_t id_len = get_max_len(&pctx->config->ids[i]); + if (id_len < 255) + id_len += 1; + else + id_len += 3; // 255 + 2 bytes len + tmp_len += id_len; + } else { + tmp_len += size; + } + } + printf("Maximum extension length per record: %zu bytes\n", tmp_len); + pctx->config->max_extension_len = tmp_len; + ipx_ctx_private_set(ipx_ctx, pctx); + return IPX_OK; +} + +void +ipx_plugin_destroy(ipx_ctx_t *ipx_ctx, void *data) +{ + (void) ipx_ctx; + destroy_plugin_ctx(data); +} + + // Helper to find or create an extended template +struct fds_template * +get_or_create_extended_template(struct plugin_ctx *pctx, + const struct fds_template *old_tmplt, + msg_builder_s *builder, + const config_ids_t *extension, + int extension_count) +{ + // Search cache + for (struct template_node *node = pctx->templates; node != NULL; node = node->next) { + if (node->old_id == old_tmplt->id) { + return node->new_tmplt; + } + } + + if (old_tmplt->type == FDS_TYPE_TEMPLATE_OPTS) { + IPX_CTX_WARNING(pctx->ipx_ctx, "Skipping extension of Options Template ID %u", old_tmplt->id); + return NULL; + } + + // Create new template + uint16_t old_len = old_tmplt->raw.length; + uint16_t new_len = old_len + (4 * extension_count); // Add IE ID (2B) + Length (2B) + + // Debug logging + // Ensure we are reading 2 bytes at offset 2 for Count + if (old_len < 4) { + IPX_CTX_ERROR(pctx->ipx_ctx, "Template %u too short (%u bytes)", old_tmplt->id, old_len); + return NULL; + } + + uint8_t *buffer = calloc(1, new_len); + if (!buffer) { + return NULL; + } + + // Copy existing template record + memcpy(buffer, old_tmplt->raw.data, old_len); + + uint16_t old_count_n = *(uint16_t *)(old_tmplt->raw.data + 2); + uint16_t old_count = ntohs(old_count_n); + + // Update Template ID (first 2 bytes) + uint16_t new_id = pctx->next_template_id++; + *(uint16_t *)buffer = htons(new_id); + + // Update Field Count (next 2 bytes) + // We modify the buffer we just copied to + *(uint16_t *)(buffer + 2) = htons(old_count + extension_count); + + printf("Cloning Template %u -> %u. Old Count: %u, New Count: %u. Len: %u -> %u\n", + old_tmplt->id, new_id, old_count, old_count + extension_count, old_len, new_len); + + uint8_t *ptr = buffer + old_len; + for( int i = 0; i < extension_count; i++) { + *(uint16_t *)ptr = htons(extension[i].id); + *(uint16_t *)(ptr + 2) = htons(size_of_data_type(extension[i].data_type)); // TOTO Make function that returns length based on data type + ptr += 4; + } + + // Parse new template + struct fds_template *new_tmplt = NULL; + uint16_t parsed_len = new_len; + if (fds_template_parse(FDS_TYPE_TEMPLATE, buffer, &parsed_len, &new_tmplt) != FDS_OK) { + IPX_CTX_ERROR(pctx->ipx_ctx, "Failed to parse new template %u", new_id); + free(buffer); + return NULL; + } + + // Link fields to IE Manager definitions so plugins know how to print them + if (fds_template_ies_define(new_tmplt, ipx_ctx_iemgr_get(pctx->ipx_ctx), false) != FDS_OK) { + IPX_CTX_ERROR(pctx->ipx_ctx, "Failed to define IEs for template %u", new_id); + fds_template_destroy(new_tmplt); + free(buffer); + return NULL; + } + + // Note: fds_template might reference the buffer (zero-copy), so we must NOT free it yet. + // We store it in the cache node to be freed on plugin destroy. + + // Cache it + struct template_node *node = malloc(sizeof(struct template_node)); + if (!node) { + fds_template_destroy(new_tmplt); + free(buffer); + return NULL; + } + node->old_id = old_tmplt->id; + node->new_id = new_id; + node->new_tmplt = new_tmplt; + node->raw_buffer = buffer; // Take ownership + node->next = pctx->templates; + pctx->templates = node; + + // Write Template Set to output + // Set Header (4B) + Template Record (new_len) + // Padding to 4B boundary + uint16_t set_len = 4 + new_len; + uint16_t padding = 0; + if (set_len % 4 != 0) { + padding = 4 - (set_len % 4); + set_len += padding; + } + + struct fds_ipfix_set_hdr set_hdr; + set_hdr.flowset_id = htons(FDS_IPFIX_SET_TMPLT); + set_hdr.length = htons(set_len); + + msg_builder_write(builder, &set_hdr, 4); + msg_builder_write(builder, new_tmplt->raw.data, new_tmplt->raw.length); + if (padding > 0) { + uint8_t pad[4] = {0}; + msg_builder_write(builder, pad, padding); + } + return new_tmplt; +} + +// If there is no match we still need to add a default value, e.g., zero for integers, empty string for strings. +static void add_value(tmp_match_t* match, msg_builder_s* builder, struct ipx_ipfix_record* ref) +{ + switch (match->data_type) + { + case FDS_ET_STRING: + case FDS_ET_OCTET_ARRAY: + { + const char *val = match->matched ? match->value : ""; + size_t len = strlen(val); + + if (len < 255) { + uint8_t l = (uint8_t)len; + msg_builder_write(builder, &l, 1); + msg_builder_write(builder, val, len); + ref->rec.size += (1 + len); + } else { + uint8_t header[3]; + header[0] = 255; + *(uint16_t*)(header+1) = htons((uint16_t)len); + msg_builder_write(builder, header, 3); + msg_builder_write(builder, val, len); + ref->rec.size += (3 + len); + } + break; + } + case FDS_ET_UNSIGNED_8: + { + uint8_t v = match->matched ? (uint8_t)strtoul(match->value, NULL, 10) : 0; + msg_builder_write(builder, &v, 1); + ref->rec.size += 1; + break; + } + case FDS_ET_UNSIGNED_16: + { + uint16_t v = match->matched ? (uint16_t)strtoul(match->value, NULL, 10) : 0; + v = htons(v); + msg_builder_write(builder, &v, 2); + ref->rec.size += 2; + break; + } + case FDS_ET_UNSIGNED_32: + { + uint32_t v = match->matched ? (uint32_t)strtoul(match->value, NULL, 10) : 0; + v = htonl(v); + msg_builder_write(builder, &v, 4); + ref->rec.size += 4; + break; + } + case FDS_ET_UNSIGNED_64: + { + uint64_t v = match->matched ? (uint64_t)strtoull(match->value, NULL, 10) : 0; + v = htobe64(v); + msg_builder_write(builder, &v, 8); + ref->rec.size += 8; + break; + } + default: + printf("Unsupported data type for extension: %d\n", match->data_type); + break; + } +} + + + +int +ipx_plugin_process(ipx_ctx_t *ipx_ctx, void *data, ipx_msg_t *base_msg) +{ + struct plugin_ctx *pctx = (struct plugin_ctx *) data; + ipx_msg_ipfix_t *msg = (ipx_msg_ipfix_t *) base_msg; + + // Calculate maximum extension details for buffer allocation + size_t max_ext_len = pctx->config->max_extension_len; + + // Calculate maximum buffer size + struct fds_ipfix_msg_hdr *orig_hdr = (struct fds_ipfix_msg_hdr *) ipx_msg_ipfix_get_packet(msg); + uint32_t drec_cnt = ipx_msg_ipfix_get_drec_cnt(msg); + size_t orig_size = ntohs(orig_hdr->length); + // Add extra space for potential new template sets + extended records + size_t max_buffer_size = orig_size + (max_ext_len * drec_cnt) + 4096; + + uint8_t *buffer = calloc(1, max_buffer_size); + if (!buffer) { + IPX_CTX_ERROR(ipx_ctx, "Failed to allocate buffer"); + return IPX_ERR_NOMEM; + } + + msg_builder_s builder; + builder.msg = ipx_msg_ipfix_create(ipx_ctx, ipx_msg_ipfix_get_ctx(msg), buffer, 0); + if (!builder.msg) { + free(buffer); + return IPX_ERR_NOMEM; + } + + builder.buffer = buffer; + builder.msg_len = 0; + + msg_builder_write(&builder, orig_hdr, sizeof(struct fds_ipfix_msg_hdr)); + + struct ipx_ipfix_set *sets; + size_t set_cnt; + ipx_msg_ipfix_get_sets(msg, &sets, &set_cnt); + + int rc = IPX_OK; + size_t drec_idx = 0; + + for (size_t s = 0; s < set_cnt; s++) { + struct ipx_ipfix_set set = sets[s]; + uint16_t flowset_id = ntohs(set.ptr->flowset_id); + + if (flowset_id < FDS_IPFIX_SET_MIN_DSET) { + rc = msg_builder_copy_set(&builder, &set); + if (rc != IPX_OK) goto cleanup; + continue; + } + + // Data Set logic with splitting + uint16_t current_set_id = 0; + + for (;;) { + struct ipx_ipfix_record *record = ipx_msg_ipfix_get_drec(msg, drec_idx); + if (!record || !record_belongs_to_set(set.ptr, &record->rec)) break; + + struct fds_drec *drec = &record->rec; + tmp_match_t match[CONFIG_IDS_MAX]; + memset(match, 0, sizeof(match)); + int extension_count = 0; + + uint16_t target_id = flowset_id; + struct fds_template *new_tmplt = get_or_create_extended_template(pctx, + drec->tmplt, + &builder, + pctx->config->ids, + pctx->config->ids_count); + if (!new_tmplt) { + rc = IPX_ERR_NOMEM; + goto cleanup; + } + target_id = new_tmplt->id; + + // Manage Set Boundaries + if (target_id != current_set_id) { + if (current_set_id != 0) { + rc = msg_builder_end_dset(&builder); + if (rc != IPX_OK) goto cleanup; + } + msg_builder_begin_dset(&builder, target_id); + current_set_id = target_id; + } + + struct ipx_ipfix_record *ref = ipx_msg_ipfix_add_drec_ref(&builder.msg); + if (!ref) { + rc = IPX_ERR_NOMEM; + goto cleanup; + } + + memcpy(&ref->rec, drec, sizeof(struct fds_drec)); + ref->rec.data = builder.buffer + builder.msg_len; + + msg_builder_write(&builder, drec->data, drec->size); + + + for (int j = 0; j < pctx->config->ids_count; j++) { + match[extension_count].id = pctx->config->ids[j].id; + match[extension_count].data_type = pctx->config->ids[j].data_type; + match[extension_count].matched = false; + for( int v =0; v < pctx->config->ids[j].values_count; v++) { + config_value_t* value = &pctx->config->ids[j].values[v]; + if (fds_ipfix_filter_eval_biflow(value->filter, drec) != FDS_IPFIX_FILTER_NO_MATCH) { + match[extension_count].matched = true; + match[extension_count].value = value->value; + break; + } + } + extension_count++; + } + + // Update template pointer in the new record + ref->rec.tmplt = new_tmplt; + drec->tmplt = new_tmplt; + // Encode Variable Length String + for( int k = 0; k < extension_count; k++) { + add_value(&match[k], &builder, ref); + } + + drec_idx++; + } + + if (current_set_id != 0) { + rc = msg_builder_end_dset(&builder); + if (rc != IPX_OK) goto cleanup; + } + } + + msg_builder_finish(&builder); + ipx_msg_ipfix_destroy(msg); + if (msg_builder_is_empty_msg(&builder)) { + ipx_msg_ipfix_destroy(builder.msg); + } else { + ipx_ctx_msg_pass(ipx_ctx, (ipx_msg_t *) builder.msg); + } + return IPX_OK; + +cleanup: + ipx_msg_ipfix_destroy(builder.msg); + IPX_CTX_ERROR(ipx_ctx, "Failed to build extended message"); + return rc; +} \ No newline at end of file diff --git a/src/plugins/intermediate/extender/msg_builder.h b/src/plugins/intermediate/extender/msg_builder.h new file mode 100644 index 00000000..79ca2258 --- /dev/null +++ b/src/plugins/intermediate/extender/msg_builder.h @@ -0,0 +1,183 @@ +/** + * \file src/plugins/intermediate/filter/msg_builder.h + * \author Michal Sedlak + * \brief Helper for building new IPFIX messages + * \date 2020 + */ + +/* Copyright (C) 2020 CESNET, z.s.p.o. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * ALTERNATIVELY, provided that this notice is retained in full, this + * product may be distributed under the terms of the GNU General Public + * License (GPL) version 2 or later, in which case the provisions + * of the GPL apply INSTEAD OF those given above. + * + * This software is provided ``as is'', and any express or implied + * warranties, including, but not limited to, the implied warranties of + * merchantability and fitness for a particular purpose are disclaimed. + * In no event shall the company or contributors be liable for any + * direct, indirect, incidental, special, exemplary, or consequential + * damages (including, but not limited to, procurement of substitute + * goods or services; loss of use, data, or profits; or business + * interruption) however caused and on any theory of liability, whether + * in contract, strict liability, or tort (including negligence or + * otherwise) arising in any way out of the use of this software, even + * if advised of the possibility of such damage. + * + */ + +#ifndef MSG_BUILDER_H +#define MSG_BUILDER_H + +#include +#include +#include + +typedef struct { + ipx_msg_ipfix_t *msg; // The message being builded + + uint8_t *buffer; // The raw message bytes + size_t msg_len; // The number of bytes written so far + + struct fds_ipfix_set_hdr *current_set; // The current data set being created +} msg_builder_s; + +/** + * Write message bytes + */ +static inline void +msg_builder_write(msg_builder_s *b, const void *bytes, int count) +{ + memcpy(b->buffer + b->msg_len, bytes, count); + b->msg_len += count; +} + +/** + * Initialize the message builder from the original message + */ +static inline int +msg_builder_init(msg_builder_s *b, ipx_ctx_t *ipx_ctx, ipx_msg_ipfix_t *orig_msg) +{ + // Allocate buffer same as the size of the original message because the new message cannot be bigger + struct fds_ipfix_msg_hdr *orig_hdr = (struct fds_ipfix_msg_hdr *) ipx_msg_ipfix_get_packet(orig_msg); + b->buffer = malloc(ntohs(orig_hdr->length)); + if (!b->buffer) { + return IPX_ERR_NOMEM; + } + + // Create new message from the original + b->msg = ipx_msg_ipfix_create(ipx_ctx, ipx_msg_ipfix_get_ctx(orig_msg), b->buffer, 0); + if (!b->msg) { + free(b->buffer); + return IPX_ERR_NOMEM; + } + + // Initialize builder fields + b->msg_len = 0; + + // Copy the original header + msg_builder_write(b, orig_hdr, sizeof(struct fds_ipfix_msg_hdr)); + + return IPX_OK; +} + +/** + * Copy ipfix set from the original + */ +static inline int +msg_builder_copy_set(msg_builder_s *b, struct ipx_ipfix_set *set) +{ + struct ipx_ipfix_set *ref = ipx_msg_ipfix_add_set_ref(b->msg); + if (!ref) { + return IPX_ERR_NOMEM; + } + ref->ptr = (struct fds_ipfix_set_hdr *) (b->buffer + b->msg_len); + msg_builder_write(b, set->ptr, ntohs(set->ptr->length)); + return IPX_OK; +} + +/** + * Begin new data set + */ +static inline void +msg_builder_begin_dset(msg_builder_s *b, uint16_t flowset_id) +{ + b->current_set = (struct fds_ipfix_set_hdr *) (b->buffer + b->msg_len); + const struct fds_ipfix_set_hdr hdr = { .flowset_id = htons(flowset_id) }; + msg_builder_write(b, &hdr, sizeof(hdr)); +} + +/** + * Copy data record to the message + */ +static inline int +msg_builder_copy_drec(msg_builder_s *b, struct ipx_ipfix_record *drec) +{ + struct ipx_ipfix_record *ref = ipx_msg_ipfix_add_drec_ref(&b->msg); + if (!ref) { + return IPX_ERR_NOMEM; + } + memcpy(&ref->rec, &drec->rec, sizeof(struct fds_drec)); + ref->rec.data = b->buffer + b->msg_len; + msg_builder_write(b, drec->rec.data, drec->rec.size); + return IPX_OK; +} + +/** + * End the current data set + */ +static inline int +msg_builder_end_dset(msg_builder_s *b) +{ + uint16_t set_len = (uint8_t *) (b->buffer + b->msg_len) - (uint8_t *) b->current_set; + if (set_len <= sizeof(struct fds_ipfix_set_hdr)) { + // No data records written, don't even write the set header, rewind the buffer to the set start + b->msg_len -= set_len; + return IPX_OK; + } + + b->current_set->length = htons(set_len); + struct ipx_ipfix_set *ref = ipx_msg_ipfix_add_set_ref(b->msg); + if (!ref) { + return IPX_ERR_NOMEM; + } + // b->current_set is a pointer to the beginning of the data set + ref->ptr = b->current_set; + return IPX_OK; +} + +/** + * Finish the ipfix message + */ +static inline void +msg_builder_finish(msg_builder_s *b) +{ + // Set the correct message length in the message header + struct fds_ipfix_msg_hdr *hdr = (struct fds_ipfix_msg_hdr *) b->buffer; + hdr->length = htons(b->msg_len); + ipx_msg_ipfix_set_raw_size(b->msg, b->msg_len); +} + +/** + * Check if the message is empty meaning nothing was written except the header + */ +static inline bool +msg_builder_is_empty_msg(msg_builder_s *b) +{ + return b->msg_len <= sizeof(struct fds_ipfix_msg_hdr); +} + +#endif