From 398334038dfc2c207a20538c6f89acfc747db77e Mon Sep 17 00:00:00 2001 From: Yura Sorokin Date: Tue, 10 Mar 2026 03:06:50 +0100 Subject: [PATCH] PS-10246 feature: Implement fixing next_event_position fields / checksums in the event common_header / footer in case of reconnecting to another node (part 4) https://perconadev.atlassian.net/browse/PS-10246 Implemented binlog event data rewrite in the 'gtid' replication mode. When user continues streaming from a different MySQL server, there is no guarantee that binlog files on the new server are physically identical to those on the old one and therefore may have different values of 'next_event_position' in the common header and 'crc' in the footer. That is why we had to implement the logic of updating certain fields in the serialized event data blocks before writing them to our own binlog files. Moreover, the old and the new servers may have different names for their binlog files and that is why in this mode we simply cannot react to file names extracted from the ROTATE events. As the result, we implemented our own binlog file name generation and event data splitting. We can now specify a new section '' in the main configuration file to enable this rewrite mode. If it is not present, then this mode is turned off. '' has 2 parameters: - '' - base binlog file name (e.g. 'binlog' if we want to get 'binlog.000001') - '' - the file size after reaching which we will perform binlog file rotation on the receiving side. Added new class 'binsrv::rewrite_config' that represents this '' section in the configuration file. Significantly refactored 'binsrv::events::reader_context' class. - It now operates on instances of 'binsrv::events::event_view' instead of 'binsrv::events::event'. - It now materialized required post-headers / bodies only when necessary. - It now has a helper method method for generating event post-header length array in the FDE post-header by providing hardcoded data extracted from 8.0 and 8.4 MySQL Servers ('get_hardcoded_post_header_lengths()'). The old existing 'get_post_header_lengths()' method renamed into 'get_known_post_header_lengths()'. - Reworked the way how we identify "info-only" events (those that should not be written to the binary log) - instead of keeping this value for the last processed event in the context, we now return it immediately in the 'process_event_view()' method. Added new class 'binsrv::composite_binlog_name' that helps with parsing / generating of the binlog file names in the 'binlog.000001' format. 'binsrv::storage' reworked to store binlog names as instances of this class instead of strings. Added new 'binsrv::events::materialize()' free function that is supposed to copy data from the binlog events represented via 'binsrv::events::event_view' into new buffers performing some basic adjustments (like adding / removing footers with checksums). Changed the way how binlog events interacts with 'binsrv::events::reader_context' - instead of automatically invoking 'process_event()' method in the 'binsrv::events::event' constructor, we now do this manually via 'process_event_view()' method. In the 'set_up_binsrv_environment.inc' MTR include file it is now possible to specify '$binsrv_rewrite_file_size' parameter before including that will be translated into '' configuration parameter. 'binlog_streaming.resume_streaming' MTR test case extended with two additional combinations: 'gtid_rewrite_buffered' and 'gtid_rewrite_unbuffered' that check streaming resume operations in the gtid mode with rewrite enabled. Updated README.md documentation file : we now have new section describing 'rewrite' configuration parameters - '' section. 'event_test' unit test (BOOST_TEST_MODULE EventTests) extended with additional test case 'EventMaterialization' that checks various combinations of the 'event_view' materializations (from a view with / without a footer to a new data block with / without a footer). --- CMakeLists.txt | 9 + README.md | 19 +- main_config.json | 6 +- .../include/set_up_binsrv_environment.inc | 7 + .../t/resume_streaming.combinations | 18 +- mtr/binlog_streaming/t/resume_streaming.test | 14 +- src/app.cpp | 377 +++++++++-- src/binsrv/composite_binlog_name.cpp | 107 +++ src/binsrv/composite_binlog_name.hpp | 65 ++ src/binsrv/composite_binlog_name_fwd.hpp | 30 + src/binsrv/events/common_header_view.hpp | 2 +- src/binsrv/events/event.cpp | 5 +- src/binsrv/events/event.hpp | 4 +- src/binsrv/events/event_view.cpp | 91 ++- src/binsrv/events/event_view.hpp | 36 +- src/binsrv/events/event_view_fwd.hpp | 12 + src/binsrv/events/footer_view.hpp | 2 +- src/binsrv/events/protocol_traits.cpp | 13 +- src/binsrv/events/protocol_traits.hpp | 7 +- src/binsrv/events/reader_context.cpp | 620 ++++++++++++++---- src/binsrv/events/reader_context.hpp | 47 +- src/binsrv/events/rotate_body_impl.cpp | 19 +- src/binsrv/events/rotate_body_impl.hpp | 8 +- src/binsrv/main_config.cpp | 5 +- src/binsrv/replication_config.cpp | 39 ++ src/binsrv/replication_config.hpp | 8 +- src/binsrv/rewrite_config.cpp | 35 + src/binsrv/rewrite_config.hpp | 41 ++ src/binsrv/rewrite_config_fwd.hpp | 28 + src/binsrv/storage.cpp | 65 +- src/binsrv/storage.hpp | 26 +- src/binsrv/storage_fwd.hpp | 2 +- tests/CMakeLists.txt | 3 + tests/event_test.cpp | 147 ++++- 34 files changed, 1602 insertions(+), 315 deletions(-) create mode 100644 src/binsrv/composite_binlog_name.cpp create mode 100644 src/binsrv/composite_binlog_name.hpp create mode 100644 src/binsrv/composite_binlog_name_fwd.hpp create mode 100644 src/binsrv/replication_config.cpp create mode 100644 src/binsrv/rewrite_config.cpp create mode 100644 src/binsrv/rewrite_config.hpp create mode 100644 src/binsrv/rewrite_config_fwd.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 69e6398..815e6c1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -255,6 +255,10 @@ set(source_files src/binsrv/binlog_file_metadata.hpp src/binsrv/binlog_file_metadata.cpp + src/binsrv/composite_binlog_name_fwd.hpp + src/binsrv/composite_binlog_name.hpp + src/binsrv/composite_binlog_name.cpp + src/binsrv/cout_logger.hpp src/binsrv/cout_logger.cpp @@ -293,10 +297,15 @@ set(source_files src/binsrv/replication_config_fwd.hpp src/binsrv/replication_config.hpp + src/binsrv/replication_config.cpp src/binsrv/replication_mode_type_fwd.hpp src/binsrv/replication_mode_type.hpp + src/binsrv/rewrite_config_fwd.hpp + src/binsrv/rewrite_config.hpp + src/binsrv/rewrite_config.cpp + src/binsrv/size_unit_fwd.hpp src/binsrv/size_unit.hpp src/binsrv/size_unit.cpp diff --git a/README.md b/README.md index d3d04d8..2f2594f 100644 --- a/README.md +++ b/README.md @@ -226,7 +226,7 @@ The `` may be one of the following (but not limited to): #### 'search_by_gtid_set' operation mode -In this mode the utility requires one additional command line parameter `` and will print to the standard output the minimal set of binlog files stored in the Binary Log Server data directory required to cover the specidfied GTID set ``. This operation makes sense only when the storage we are querying was created in GTID-based replication mode. +In this mode the utility requires one additional command line parameter `` and will print to the standard output the minimal set of binlog files stored in the Binary Log Server data directory required to cover the specified GTID set ``. This operation makes sense only when the storage we are querying was created in GTID-based replication mode. Along with the file name the output will also return its current size in bytes, timestamps, URI and optional initial / added GTIDs. For instance, ```bash @@ -339,7 +339,11 @@ The Percona Binary Log Server configuration file has the following format. "server_id": 42, "idle_time": 10, "verify_checksum": true, - "mode": "position" + "mode": "gtid", + "rewrite": { + "base_file_name": "binlog", + "file_size": "128M" + } }, "storage": { "backend": "s3", @@ -376,7 +380,7 @@ Currently we use the following mapping: - `` - the number of seconds the MySQL client library will wait to read data from a remote server (this parameter may affect the responsiveness of the program to graceful termination - see below). - `` - the number of seconds the MySQL client library will wait to write data to a remote server. -Note: you should specify either `` / `` pair or single ``. +Note: you should specify either `` / `` pair or single ``. #### \ optional section - `` - specifies the desired security state of the connection to the MySQL server, can be one of the `disabled` / `preferred` / `required` / `verify_ca` / `verify_identity` ([--ssl-mode](https://dev.mysql.com/doc/refman/8.4/en/connection-options.html#option_general_ssl-mode) `mysql` utility command line option). @@ -389,7 +393,7 @@ Note: you should specify either `` / `` pair o - `` (optional) - specifies the list of permissible ciphers for connection encryption ([--ssl-cipher](https://dev.mysql.com/doc/refman/8.4/en/connection-options.html#option_general_ssl-cipher) `mysql` utility command line option). #### \ optional section -- `` (optional) - specifies the list of permissible TLSv1.3 ciphersuites for encrypted connections ([--tls-ciphersuites](https://dev.mysql.com/doc/refman/8.4/en/connection-options.html#option_general_tls-ciphersuites) `mysql` utility command line option). +- `` (optional) - specifies the list of permissible TLSv1.3 cipher suites for encrypted connections ([--tls-ciphersuites](https://dev.mysql.com/doc/refman/8.4/en/connection-options.html#option_general_tls-ciphersuites) `mysql` utility command line option). - `` (optional) - specifies the list of permissible TLS protocols for encrypted connections ([--tls-version](https://dev.mysql.com/doc/refman/8.4/en/connection-options.html#option_general_tls-version) `mysql` utility command line option). #### \ section @@ -398,6 +402,11 @@ Note: you should specify either `` / `` pair o - `` - a boolean value which specifies whether the utility should verify event checksums. - `` - the replication mode, can be either `position` for position-based replication or `gtid` for GTID-based replication. +#### \ section +If this section is present, then the utility will not split binlog events the same way as they were on the original MySQL server. Instead, it will generate its own binlog file name sequence (based on the ``) and will change to a new binary log file when the size of the previous one riches the specified value (``). Having this section requires `` to be set to `gtid`. +- `` - the base name of the generated binlog file names in the "rewrite" mode. E.g. `rewritten_binlog` will cause `rewritten_binlog.000001`, `rewritten_binlog.000002`, etc. file names to be generated. +- `` - the maximum individual binlog file size after reaching which the utility will switch to a new one. The value is expected to be a string containing an integer followed by an optional suffix 'K' / 'M' / 'G' / 'T' / 'P', e.g. /\d+\[KMGTP\]?/. The minimal allowed value of this parameter is `1024` bytes. + #### \ section - `` - the type of the storage where the received binary logs should be stored: - `file` - local filesystem @@ -456,7 +465,7 @@ For example: - `https://key_id:secret@192.168.0.100:9000/binsrv-bucket/vault` - `key_id` will be used as `AWS_ACCESS_KEY_ID`, `secret` will be used as `AWS_SECRET_ACCESS_KEY`, `binsrv-bucket` will be the name of the bucket, `/vault` will be the virtual directory, `192.168.0.100:9000` will be the custom endpoint of the `S3`-compatible server, the connection will be established via secure HTTPS protocol. ##### Checkpointing on S3 -Please note that S3 API does not provide a way to append a portion of data to an existing object. Currently, in our S3 storage backend "append" operations are implemented as complete object overwrites meaning data re-uploads. Practically, if your typical binlog file size is '1G' and you set `` to '256M', you will upload '256M + 512M + 768M + 1024M = 2560M' (about 2.5 times more then your binlog file size in this example). So, keep balance between the value of this parameter and your tipical binlog size. Similar concerns can be rised regarding enabling ``. +Please note that S3 API does not provide a way to append a portion of data to an existing object. Currently, in our S3 storage backend "append" operations are implemented as complete object overwrites meaning data re-uploads. Practically, if your typical binlog file size is '1G' and you set `` to '256M', you will upload '256M + 512M + 768M + 1024M = 2560M' (about 2.5 times more then your binlog file size in this example). So, keep balance between the value of this parameter and your typical binlog size. Similar concerns can be raised regarding enabling ``. ### Resuming previous operation diff --git a/main_config.json b/main_config.json index 9217bb2..0d88637 100644 --- a/main_config.json +++ b/main_config.json @@ -30,7 +30,11 @@ "server_id": 42, "idle_time": 10, "verify_checksum": true, - "mode": "position" + "mode": "gtid", + "rewrite": { + "base_file_name": "binlog", + "file_size": "128M" + } }, "storage": { "backend": "file", diff --git a/mtr/binlog_streaming/include/set_up_binsrv_environment.inc b/mtr/binlog_streaming/include/set_up_binsrv_environment.inc index 7c611b1..6f8d5e1 100644 --- a/mtr/binlog_streaming/include/set_up_binsrv_environment.inc +++ b/mtr/binlog_streaming/include/set_up_binsrv_environment.inc @@ -17,6 +17,7 @@ # --let $binsrv_replication_mode = position | gtid # --let $binsrv_checkpoint_size = 2M (optional) # --let $binsrv_checkpoint_interval = 30s (optional) +# --let $binsrv_rewrite_file_size = 1K (optional) # --source set_up_binsrv_environment.inc --echo @@ -90,6 +91,12 @@ eval SET @binsrv_config_json = JSON_OBJECT( 'uri', @storage_uri ) ); + +if ($binsrv_rewrite_file_size != "") +{ + eval SET @binsrv_config_json = JSON_INSERT(@binsrv_config_json, '$.replication.rewrite', JSON_OBJECT('base_file_name', 'bnlg', 'file_size', '$binsrv_rewrite_file_size')); +} + if ($storage_backend == s3) { eval SET @binsrv_config_json = JSON_INSERT(@binsrv_config_json, '$.storage.fs_buffer_directory', '$binsrv_buffer_path'); diff --git a/mtr/binlog_streaming/t/resume_streaming.combinations b/mtr/binlog_streaming/t/resume_streaming.combinations index fa1c14d..09b9437 100644 --- a/mtr/binlog_streaming/t/resume_streaming.combinations +++ b/mtr/binlog_streaming/t/resume_streaming.combinations @@ -1,15 +1,25 @@ [position_buffered] -init-connect = SET @binsrv_buffering_mode = 'buffered' +init-connect = SET @binsrv_utility_mode = 'buffered' [position_unbuffered] -init-connect = SET @binsrv_buffering_mode = 'unbuffered' +init-connect = SET @binsrv_utility_mode = 'unbuffered' [gtid_buffered] gtid-mode=on enforce-gtid-consistency -init-connect = SET @binsrv_buffering_mode = 'buffered' +init-connect = SET @binsrv_utility_mode = 'buffered' [gtid_unbuffered] gtid-mode=on enforce-gtid-consistency -init-connect = SET @binsrv_buffering_mode = 'unbuffered' +init-connect = SET @binsrv_utility_mode = 'unbuffered' + +[gtid_rewrite_buffered] +gtid-mode=on +enforce-gtid-consistency +init-connect = SET @binsrv_utility_mode = 'rewrite_buffered' + +[gtid_rewrite_unbuffered] +gtid-mode=on +enforce-gtid-consistency +init-connect = SET @binsrv_utility_mode = 'rewrite_unbuffered' diff --git a/mtr/binlog_streaming/t/resume_streaming.test b/mtr/binlog_streaming/t/resume_streaming.test index f59d60a..1fc2ee8 100644 --- a/mtr/binlog_streaming/t/resume_streaming.test +++ b/mtr/binlog_streaming/t/resume_streaming.test @@ -12,8 +12,8 @@ eval $stmt_reset_binary_logs_and_gtids; # identifying backend storage type ('file' or 's3') --source ../include/identify_storage_backend.inc -# identifying utility buffering mode from the conbination ---let $extracted_init_connect_variable_name = binsrv_buffering_mode +# identifying utility buffering mode from the combination +--let $extracted_init_connect_variable_name = binsrv_utility_mode --source ../include/extract_init_connect_variable_value.inc # creating data directory, configuration file, etc. @@ -30,6 +30,16 @@ if ($extracted_init_connect_variable_value == 'unbuffered') { --let $binsrv_checkpoint_size = 1 } +if ($extracted_init_connect_variable_value == 'rewrite_buffered') +{ + --let $binsrv_rewrite_file_size = 1K + --let $binsrv_checkpoint_size = 1G +} +if ($extracted_init_connect_variable_value == 'rewrite_unbuffered') +{ + --let $binsrv_rewrite_file_size = 1K + --let $binsrv_checkpoint_size = 1 +} --source ../include/set_up_binsrv_environment.inc --echo diff --git a/src/app.cpp b/src/app.cpp index 8da9bca..95c8b35 100644 --- a/src/app.cpp +++ b/src/app.cpp @@ -59,9 +59,12 @@ #include "binsrv/models/error_response.hpp" #include "binsrv/models/search_response.hpp" +#include "binsrv/events/checksum_algorithm_type.hpp" #include "binsrv/events/code_type.hpp" #include "binsrv/events/common_header_flag_type.hpp" #include "binsrv/events/event.hpp" +#include "binsrv/events/event_view.hpp" +#include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/events/reader_context.hpp" #include "easymysql/connection.hpp" @@ -77,6 +80,7 @@ #include "util/ct_string.hpp" #include "util/exception_location_helpers.hpp" #include "util/nv_tuple.hpp" +#include "util/semantic_version.hpp" namespace { @@ -215,6 +219,13 @@ void log_connection_config_info( } } +void log_rewrite_config_info(binsrv::basic_logger &logger, + const binsrv::rewrite_config &rewrite_config) { + log_config_param<"base_file_name">(logger, rewrite_config, + "rewrite base binlog file name"); + log_config_param<"file_size">(logger, rewrite_config, + "rewrite binlog file size"); +} void log_replication_config_info( binsrv::basic_logger &logger, const binsrv::replication_config &replication_config) { @@ -227,6 +238,10 @@ void log_replication_config_info( logger, replication_config, "mysql replication checksum verification"); log_config_param<"mode">(logger, replication_config, "mysql replication mode"); + const auto &optional_rewrite_config{replication_config.get<"rewrite">()}; + if (optional_rewrite_config.has_value()) { + log_rewrite_config_info(logger, *optional_rewrite_config); + } } void log_storage_config_info(binsrv::basic_logger &logger, @@ -261,7 +276,7 @@ void log_storage_info(binsrv::basic_logger &logger, msg = "binlog storage initialized on an empty directory"; } else { msg = "binlog storage initialized at \""; - msg += storage.get_current_binlog_name(); + msg += storage.get_current_binlog_name().str(); msg += "\":"; msg += std::to_string(storage.get_current_position()); } @@ -320,7 +335,7 @@ void log_replication_info( if (storage.is_empty()) { msg += "the very beginning"; } else { - msg += storage.get_current_binlog_name(); + msg += storage.get_current_binlog_name().str(); msg += ":"; msg += std::to_string(storage.get_current_position()); } @@ -375,16 +390,16 @@ void log_span_dump(binsrv::basic_logger &logger, } } -void process_artificial_rotate_event(const binsrv::events::event ¤t_event, - binsrv::basic_logger &logger, - binsrv::storage &storage) { - assert(current_event.get_common_header().get_type_code() == +void process_artificial_rotate_event( + const binsrv::events::event_view ¤t_event_v, + binsrv::basic_logger &logger, binsrv::storage &storage) { + assert(current_event_v.get_common_header_view().get_type_code() == binsrv::events::code_type::rotate); - assert(current_event.get_common_header().get_flags().has_element( + assert(current_event_v.get_common_header_view().get_flags().has_element( binsrv::events::common_header_flag_type::artificial)); - const auto ¤t_rotate_body = - current_event.get_body(); + const binsrv::events::generic_body + current_rotate_body{current_event_v.get_body_raw()}; bool binlog_opening_needed{true}; @@ -401,11 +416,14 @@ void process_artificial_rotate_event(const binsrv::events::event ¤t_event, // have ROTATE or STOP event as the last one in the binlog, so here we // handle this case by closing the old binlog and opening a new one - if (current_rotate_body.get_binlog() == storage.get_current_binlog_name()) { + if (current_rotate_body.get_parsed_binlog() == + storage.get_current_binlog_name()) { // in addition, in position-based replication mode we also need to check // the position - const auto ¤t_rotate_post_header = - current_event.get_post_header(); + const binsrv::events::generic_post_header< + binsrv::events::code_type::rotate> + current_rotate_post_header{current_event_v.get_post_header_raw()}; + if (current_rotate_post_header.get_position_raw() != storage.get_current_position()) { util::exception_location().raise( @@ -414,7 +432,8 @@ void process_artificial_rotate_event(const binsrv::events::event ¤t_event, binlog_opening_needed = false; - const std::string current_binlog_name{storage.get_current_binlog_name()}; + const std::string current_binlog_name{ + storage.get_current_binlog_name().str()}; logger.log(binsrv::log_severity::info, "storage: reused already open binlog file: " + current_binlog_name); @@ -423,7 +442,8 @@ void process_artificial_rotate_event(const binsrv::events::event ¤t_event, // if names do not match, we need to close the currently open // binlog and make sure that binlog_opening_needed is set to true, so // that we will open a new one later - const std::string old_binlog_name{storage.get_current_binlog_name()}; + const std::string old_binlog_name{ + storage.get_current_binlog_name().str()}; storage.close_binlog(); logger.log(binsrv::log_severity::info, "storage: closed binlog file left open: " + old_binlog_name); @@ -433,7 +453,7 @@ void process_artificial_rotate_event(const binsrv::events::event ¤t_event, } if (binlog_opening_needed) { const auto binlog_open_result{ - storage.open_binlog(current_rotate_body.get_binlog())}; + storage.open_binlog(current_rotate_body.get_parsed_binlog())}; std::string message{"storage: "}; if (binlog_open_result == binsrv::open_binlog_status::created) { @@ -443,45 +463,77 @@ void process_artificial_rotate_event(const binsrv::events::event ¤t_event, if (binlog_open_result == binsrv::open_binlog_status::opened_empty) { message += " (empty)"; } else if (binlog_open_result == - binsrv::open_binlog_status::opened_at_magic_paylod_offset) { + binsrv::open_binlog_status::opened_at_magic_payload_offset) { message += " (with magic payload only)"; } } message += " binlog file: "; - message += current_rotate_body.get_binlog(); + message += current_rotate_body.get_readable_binlog(); logger.log(binsrv::log_severity::info, message); } } void process_rotate_or_stop_event(binsrv::basic_logger &logger, binsrv::storage &storage) { - const std::string old_binlog_name{storage.get_current_binlog_name()}; + const std::string old_binlog_name{storage.get_current_binlog_name().str()}; storage.close_binlog(); logger.log(binsrv::log_severity::info, "storage: closed binlog file: " + old_binlog_name); } -void process_binlog_event(const binsrv::events::event ¤t_event, - util::const_byte_span portion, +void process_binlog_event(const binsrv::events::event_view ¤t_event_v, binsrv::basic_logger &logger, binsrv::events::reader_context &context, binsrv::storage &storage) { - const auto ¤t_common_header = current_event.get_common_header(); - const auto code = current_common_header.get_type_code(); + const auto current_common_header_v{current_event_v.get_common_header_view()}; + const auto readable_flags{current_common_header_v.get_readable_flags()}; + logger.log(binsrv::log_severity::info, + "event : " + + std::string{current_common_header_v.get_readable_type_code()} + + (readable_flags.empty() ? "" : " (" + readable_flags + ")")); + logger.log(binsrv::log_severity::debug, + "event : [parsed view] " + + boost::lexical_cast(current_event_v)); + + const bool info_only{context.process_event_view(current_event_v)}; + + if (info_only) { + logger.log( + binsrv::log_severity::info, + "event : [info_only] - will not be written to the binary log file"); + } + + if (context.is_at_transaction_boundary()) { + logger.log( + binsrv::log_severity::info, + "event : [end_of_transaction] " + + boost::lexical_cast(context.get_transaction_gtid())); + } + + // here we additionally check for log level because event materialization + // is not a trivial operation + if (binsrv::log_severity::debug >= logger.get_min_level()) { + const binsrv::events::event current_event{context, current_event_v}; + logger.log(binsrv::log_severity::debug, + "event : [parsed] " + + boost::lexical_cast(current_event)); + } - const auto is_artificial{current_common_header.get_flags().has_element( + const auto code = current_common_header_v.get_type_code(); + const auto is_artificial{current_common_header_v.get_flags().has_element( binsrv::events::common_header_flag_type::artificial)}; // processing the very first event in the sequence - artificial ROTATE event if (code == binsrv::events::code_type::rotate && is_artificial) { - process_artificial_rotate_event(current_event, logger, storage); + process_artificial_rotate_event(current_event_v, logger, storage); } // checking if the event needs to be written to the binlog - if (!context.is_event_info_only()) { - storage.write_event(portion, context.is_at_transaction_boundary(), + if (!info_only) { + storage.write_event(current_event_v.get_portion(), + context.is_at_transaction_boundary(), context.get_transaction_gtid(), - current_common_header.get_timestamp()); + current_common_header_v.get_timestamp()); } // processing the very last event in the sequence - either a non-artificial @@ -492,6 +544,224 @@ void process_binlog_event(const binsrv::events::event ¤t_event, } } +[[nodiscard]] binsrv::events::event_view +generate_rotate_event(binsrv::events::event_storage &event_buffer, + const binsrv::events::reader_context &context, + std::uint32_t offset, bool current_timestamp, + std::uint32_t server_id, bool artificial, + const binsrv::composite_binlog_name &binlog_name) { + const binsrv::events::generic_post_header + post_header{binsrv::events::magic_binlog_offset}; + const binsrv::events::generic_body body{ + binlog_name}; + + binsrv::ctime_timestamp timestamp{}; + if (current_timestamp) { + timestamp = binsrv::ctime_timestamp::now(); + } + + binsrv::events::common_header_flag_set flags{}; + if (artificial) { + flags |= binsrv::events::common_header_flag_type::artificial; + } + + // the value of the 'include_checksum' parameters is taken from the + // 'reader_context': immediately after reconnection it will be equal to + // the '' configuration parameter and after + // that will be taken from the FORMAT_DESCRIPTION events, which in the + // rewrite mode will be generated by us and therefore will always include + // 'checksum_algorithm' set to 'crc32' + const auto generated_event{ + binsrv::events::event::create_event( + offset, timestamp, server_id, flags, post_header, body, + context.is_footer_expected(), event_buffer)}; + + return binsrv::events::event_view{context, + util::const_byte_span{event_buffer}}; +} + +[[nodiscard]] binsrv::events::event_view +generate_format_description_event(binsrv::events::event_storage &event_buffer, + const binsrv::events::reader_context &context, + std::uint32_t offset, + std::uint32_t server_id) { + const util::semantic_version server_version{ + context.get_current_encoded_server_version()}; + const binsrv::events::generic_post_header< + binsrv::events::code_type::format_description> + post_header{ + binsrv::events::default_binlog_version, server_version, + binsrv::ctime_timestamp::now(), + binsrv::events::default_common_header_length, + binsrv::events::reader_context::get_hardcoded_post_header_lengths( + server_version.get_encoded())}; + const binsrv::events::generic_body< + binsrv::events::code_type::format_description> + body{binsrv::events::checksum_algorithm_type::crc32}; + + // enforcing checksums for all rewritten upcoming events + const auto generated_event{binsrv::events::event::create_event< + binsrv::events::code_type::format_description>( + offset, binsrv::ctime_timestamp::now(), server_id, + binsrv::events::common_header_flag_set{}, post_header, body, + true /* include_checksum */, event_buffer)}; + + return binsrv::events::event_view{context, + util::const_byte_span{event_buffer}}; +} + +[[nodiscard]] binsrv::events::event_view +generate_previous_gtids_log_event(binsrv::events::event_storage &event_buffer, + const binsrv::events::reader_context &context, + std::uint32_t offset, std::uint32_t server_id, + const binsrv::gtids::gtid_set >ids) { + const binsrv::events::generic_post_header< + binsrv::events::code_type::previous_gtids_log> + post_header{}; + const binsrv::events::generic_body< + binsrv::events::code_type::previous_gtids_log> + body{gtids}; + const auto generated_previous_gtids_log_event{ + binsrv::events::event::create_event< + binsrv::events::code_type::previous_gtids_log>( + offset, binsrv::ctime_timestamp::now(), server_id, + binsrv::events::common_header_flag_set{}, post_header, body, true, + event_buffer)}; + + return binsrv::events::event_view{context, + util::const_byte_span{event_buffer}}; +} + +void rewrite_and_process_binlog_event( + const binsrv::events::event_view ¤t_event_v, + binsrv::basic_logger &logger, binsrv::events::reader_context &context, + binsrv::storage &storage, std::uint32_t server_id, + std::string_view base_file_name, std::uint64_t file_size) { + const auto current_common_header_v = current_event_v.get_common_header_view(); + const auto code = current_common_header_v.get_type_code(); + + // for ROTATE (both artificial and non-artificial), FORMAT_DESCRIPTION, + // PREVIOUS_GTIDS_LOG, and STOP events we don't have to do anything - + // simply return early from this function + if (code == binsrv::events::code_type::format_description || + code == binsrv::events::code_type::previous_gtids_log || + code == binsrv::events::code_type::rotate || + code == binsrv::events::code_type::stop) { + const auto readable_flags{current_common_header_v.get_readable_flags()}; + logger.log( + binsrv::log_severity::info, + "rewrite: encountered " + + std::string{current_common_header_v.get_readable_type_code()} + + (readable_flags.empty() ? "" : " (" + readable_flags + ")") + + " event in the rewrite mode - skipping"); + return; + } + + // the very first step is to check if we need to close the old binary log + // file and open a new one in case when we reached the file size specified + // in the 'rewrite_config' or this is the very first event we are going to + // write to an empty storage + + // in case of an empty storage we need to generate the following: + // 1. ROTATE(artificial ) .000001:4 + // 2. FORMAT_DESCRIPTION + // 3. PREVIOUS_GTIDS_LOG + + // in case when the storage is not empty, we are at transaction boundary, + // and current binlog file reached the file size specified in the + // 'rewrite_config', we need to generate the following: + // 0. ROTATE(non-artificial) .:4 + // 1. ROTATE(artificial ) .:4 + // 2. FORMAT_DESCRIPTION + // 3. PREVIOUS_GTIDS_LOG + + if (context.is_fresh() || (context.is_at_transaction_boundary() && + storage.get_current_position() >= file_size)) { + binsrv::events::event_storage event_buffer; + std::uint32_t offset{0U}; + + // generating next binlog file name based on base file name from the + // configuration file and current binlog file + // sequence number from the storage + + // please notice that if storage is empty, then the sequence number will be + // zero + binsrv::composite_binlog_name binlog_name{}; + if (storage.is_empty()) { + // the very first time we receive an event on an empty storage + binlog_name = binsrv::composite_binlog_name{base_file_name, 1U}; + } else if (context.is_fresh()) { + // this is the very first event we received after reconnection + // (the storage is not empty and we have an active binlog in it) + binlog_name = storage.get_current_binlog_name(); + } else { + // we are at transaction boundary and reached max binlog file size + binlog_name = storage.get_current_binlog_name().next(); + } + + if (!context.is_fresh()) { + // generate and process ROTATE(non-artificial) event + offset = static_cast(storage.get_current_position()); + const auto generated_rotate_event_v{generate_rotate_event( + event_buffer, context, offset, true /* current timestamp */, + server_id, false /* non-artificial */, binlog_name)}; + logger.log(binsrv::log_severity::info, + "rewrite: generated rotate event in the rewrite mode"); + process_binlog_event(generated_rotate_event_v, logger, context, storage); + } + + // generate and process ROTATE(artificial) event + offset = 0U; + // artificial ROTATE event must include zero timestamp + const auto generated_artificial_rotate_event_v{generate_rotate_event( + event_buffer, context, offset, false /* zero timestamp */, server_id, + true /* artificial */, binlog_name)}; + logger.log( + binsrv::log_severity::info, + "rewrite: generated artificial rotate event in the rewrite mode"); + process_binlog_event(generated_artificial_rotate_event_v, logger, context, + storage); + + // generate and process FORMAT_DESCRIPTION event + offset = binsrv::events::magic_binlog_offset; + const auto generated_format_description_event_v{ + generate_format_description_event(event_buffer, context, offset, + server_id)}; + logger.log( + binsrv::log_severity::info, + "rewrite: generated format description event in the rewrite mode"); + process_binlog_event(generated_format_description_event_v, logger, context, + storage); + + // generate and process PREVIOUS_GTIDS_LOG event + offset += static_cast( + generated_format_description_event_v.get_total_size()); + const auto generated_previous_gtids_log_event_v{ + generate_previous_gtids_log_event(event_buffer, context, offset, + server_id, storage.get_gtids())}; + logger.log( + binsrv::log_severity::info, + "rewrite: generated previous gtids log event in the rewrite mode"); + process_binlog_event(generated_previous_gtids_log_event_v, logger, context, + storage); + } + + // in rewrite mode we need to update next_event_position (and optional + // checksum in the footer) in the received event data portion + binsrv::events::event_storage buffer{}; + const auto event_copy_uv{binsrv::events::materialize( + current_event_v, buffer, + binsrv::events::materialization_type::force_add_checksum)}; + { + // TODO: optimize redundant checksum recalculation + const auto proxy{event_copy_uv.get_write_proxy()}; + proxy.get_common_header_updatable_view().set_next_event_position_raw( + static_cast(storage.get_current_position() + + event_copy_uv.get_total_size())); + } + process_binlog_event(event_copy_uv, logger, context, storage); +} + bool open_connection_and_switch_to_replication( binsrv::operation_mode_type operation_mode, binsrv::basic_logger &logger, const easymysql::library &mysql_lib, @@ -537,7 +807,7 @@ bool open_connection_and_switch_to_replication( blocking_mode); } else { connection.switch_to_position_replication( - server_id, storage.get_current_binlog_name(), + server_id, storage.get_current_binlog_name().str(), storage.get_current_position(), verify_checksum, blocking_mode); } } @@ -559,7 +829,8 @@ void receive_binlog_events( const volatile std::atomic_flag &termination_flag, binsrv::basic_logger &logger, const easymysql::library &mysql_lib, const easymysql::connection_config &connection_config, - std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage) { + std::uint32_t server_id, bool verify_checksum, binsrv::storage &storage, + const binsrv::optional_rewrite_config &optional_rewrite_config) { easymysql::connection connection{}; if (!open_connection_and_switch_to_replication( operation_mode, logger, mysql_lib, connection_config, server_id, @@ -575,7 +846,7 @@ void receive_binlog_events( binsrv::events::reader_context context{ connection.get_server_version(), verify_checksum, - storage.get_replication_mode(), storage.get_current_binlog_name(), + storage.get_replication_mode(), storage.get_current_binlog_name().str(), static_cast(storage.get_current_position())}; bool fetch_result{}; @@ -590,29 +861,19 @@ void receive_binlog_events( portion = portion.subspan(1U); log_span_dump(logger, portion); - // TODO: just for redirection to another byte stream we need to parse - // the ROTATE and FORMAT_DESCRIPTION events only, every other one - // can be just considered as a data portion (unless we want to do - // basic integrity checks like event sizes / position and CRC) - const binsrv::events::event current_event{context, portion}; - const auto ¤t_header{current_event.get_common_header()}; - auto readable_flags{current_header.get_readable_flags()}; - logger.log( - binsrv::log_severity::info, - "event: " + std::string{current_header.get_readable_type_code()} + - (readable_flags.empty() ? "" : " (" + readable_flags + ")") + - (context.is_event_info_only() ? " [info_only]" : "")); - logger.log(binsrv::log_severity::debug, - "Parsed event:\n" + - boost::lexical_cast(current_event)); - if (context.is_at_transaction_boundary()) { - logger.log( - binsrv::log_severity::info, - "encountered the end of transaction " + - boost::lexical_cast(context.get_transaction_gtid())); - } + const binsrv::events::event_view current_event_v{context, portion}; - process_binlog_event(current_event, portion, logger, context, storage); + if (optional_rewrite_config.has_value()) { + // in rewrite mode we need to ignore ROTATE (artificial), + // FORMAT_DESCRIPTION, PREVIOUS_GTIDS_LOG, ROTATE (non-artificial), + // and STOP events + rewrite_and_process_binlog_event( + current_event_v, logger, context, storage, server_id, + optional_rewrite_config->get<"base_file_name">(), + optional_rewrite_config->get<"file_size">().get_value()); + } else { + process_binlog_event(current_event_v, logger, context, storage); + } } if (termination_flag.test()) { logger.log(binsrv::log_severity::info, @@ -634,7 +895,7 @@ void receive_binlog_events( // continue operation from the transaction boundary // in position-based replication mode this is not needed as it is not a - // problem to resume streaming fron a position that does not correspond to + // problem to resume streaming from a position that does not correspond to // transaction boundary if (storage.is_in_gtid_replication_mode()) { storage.discard_incomplete_transaction_events(); @@ -706,7 +967,7 @@ bool handle_search_by_timestamp(std::string_view config_file_path, if (record.timestamps.get_min_timestamp() > timestamp) { break; } - response.add_record(record.name, record.size, + response.add_record(record.name.str(), record.size, storage.get_binlog_uri(record.name), record.previous_gtids, record.added_gtids, record.timestamps.get_min_timestamp().get_value(), @@ -766,7 +1027,7 @@ bool handle_search_by_gtid_set(std::string_view config_file_path, } remaining_gtids.subtract(*record.added_gtids); - response.add_record(record.name, record.size, + response.add_record(record.name.str(), record.size, storage.get_binlog_uri(record.name), record.previous_gtids, record.added_gtids, record.timestamps.get_min_timestamp().get_value(), @@ -914,6 +1175,7 @@ int main(int argc, char *argv[]) { const auto idle_time_seconds{replication_config.get<"idle_time">()}; const auto verify_checksum{replication_config.get<"verify_checksum">()}; const auto replication_mode{replication_config.get<"mode">()}; + const auto optional_rewrite_config{replication_config.get<"rewrite">()}; binsrv::storage storage{storage_config, binsrv::storage_construction_mode_type::streaming, @@ -927,7 +1189,7 @@ int main(int argc, char *argv[]) { receive_binlog_events(operation_mode, termination_flag, *logger, mysql_lib, connection_config, server_id, verify_checksum, - storage); + storage, optional_rewrite_config); if (operation_mode == binsrv::operation_mode_type::pull) { std::size_t iteration_number{1U}; @@ -948,7 +1210,8 @@ int main(int argc, char *argv[]) { receive_binlog_events(operation_mode, termination_flag, *logger, mysql_lib, connection_config, server_id, - verify_checksum, storage); + verify_checksum, storage, + optional_rewrite_config); ++iteration_number; } } diff --git a/src/binsrv/composite_binlog_name.cpp b/src/binsrv/composite_binlog_name.cpp new file mode 100644 index 0000000..3465e2e --- /dev/null +++ b/src/binsrv/composite_binlog_name.cpp @@ -0,0 +1,107 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/composite_binlog_name.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +#include "util/exception_location_helpers.hpp" + +namespace binsrv { + +composite_binlog_name::composite_binlog_name(std::string_view base_name, + std::uint32_t sequence_number) + : base_name_{base_name}, sequence_number_{sequence_number} { + if (base_name_.empty()) { + util::exception_location().raise( + "binlog base name cannot be empty"); + } + // currently checking only that the name does not include a filesystem + // separator + if (base_name_.find(std::filesystem::path::preferred_separator) != + std::string::npos) { + util::exception_location().raise( + "binlog base name contains filesystem separator"); + } + if (sequence_number_ == 0U) { + util::exception_location().raise( + "binlog sequence number cannot be zero"); + } + static constexpr std::uint32_t decimal_base{10U}; + if (sequence_number_ >= + boost::math::pow(decimal_base)) { + util::exception_location().raise( + "binlog sequence number is too large"); + } +} + +[[nodiscard]] composite_binlog_name +composite_binlog_name::parse(std::string_view composite_name) { + if (std::size(composite_name) <= number_of_sequence_number_characters + 1U) { + util::exception_location().raise( + "binlog composite name is too short"); + } + const auto separator_position{std::size(composite_name) - + number_of_sequence_number_characters - 1U}; + if (composite_name[separator_position] != components_separator) { + util::exception_location().raise( + "binlog composite name does not contain a base name / sequence number " + "separator at expected position"); + } + + const auto sequence_number_sv{composite_name.substr(separator_position + 1U)}; + + std::uint32_t sequence_number{}; + const auto [ptr, parse_ec]{std::from_chars(std::begin(sequence_number_sv), + std::end(sequence_number_sv), + sequence_number)}; + if (parse_ec != std::errc{} || ptr != std::end(sequence_number_sv)) { + util::exception_location().raise( + "binlog composite name contains invalid characters in its sequence " + "number part"); + } + + const auto base_name_sv{composite_name.substr(0U, separator_position)}; + return composite_binlog_name{base_name_sv, sequence_number}; +} + +[[nodiscard]] std::string composite_binlog_name::str() const { + return boost::lexical_cast(*this); +} + +std::ostream &operator<<(std::ostream &output, + const composite_binlog_name &obj) { + static constexpr char sequence_number_filler{'0'}; + return output + << obj.get_base_name() << composite_binlog_name::components_separator + << std::setfill(sequence_number_filler) + << std::setw( + composite_binlog_name::number_of_sequence_number_characters) + << obj.get_sequence_number(); +} + +} // namespace binsrv diff --git a/src/binsrv/composite_binlog_name.hpp b/src/binsrv/composite_binlog_name.hpp new file mode 100644 index 0000000..19268e7 --- /dev/null +++ b/src/binsrv/composite_binlog_name.hpp @@ -0,0 +1,65 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_COMPOSITE_BINLOG_NAME_HPP +#define BINSRV_COMPOSITE_BINLOG_NAME_HPP + +#include "binsrv/composite_binlog_name_fwd.hpp" // IWYU pragma: export + +#include +#include +#include + +namespace binsrv { + +class [[nodiscard]] composite_binlog_name { +public: + static constexpr char components_separator{'.'}; + static constexpr std::size_t number_of_sequence_number_characters{6U}; + + composite_binlog_name() = default; + composite_binlog_name(std::string_view base_name, + std::uint32_t sequence_number); + [[nodiscard]] static composite_binlog_name + parse(std::string_view composite_name); + + [[nodiscard]] bool is_empty() const noexcept { + return sequence_number_ == 0U; + } + + [[nodiscard]] const std::string &get_base_name() const noexcept { + return base_name_; + } + [[nodiscard]] std::uint32_t get_sequence_number() const noexcept { + return sequence_number_; + } + + [[nodiscard]] std::string str() const; + + composite_binlog_name next() const { + return composite_binlog_name{get_base_name(), get_sequence_number() + 1U}; + } + + friend bool operator==(const composite_binlog_name &first, + const composite_binlog_name &second) = default; + +private: + std::string base_name_{}; + std::uint32_t sequence_number_{0U}; +}; + +} // namespace binsrv + +#endif // BINSRV_COMPOSITE_BINLOG_NAME_HPP diff --git a/src/binsrv/composite_binlog_name_fwd.hpp b/src/binsrv/composite_binlog_name_fwd.hpp new file mode 100644 index 0000000..3769c20 --- /dev/null +++ b/src/binsrv/composite_binlog_name_fwd.hpp @@ -0,0 +1,30 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_COMPOSITE_BINLOG_NAME_FWD_HPP +#define BINSRV_COMPOSITE_BINLOG_NAME_FWD_HPP + +#include + +namespace binsrv { + +class composite_binlog_name; + +std::ostream &operator<<(std::ostream &output, + const composite_binlog_name &obj); + +} // namespace binsrv + +#endif // BINSRV_COMPOSITE_BINLOG_NAME_FWD_HPP diff --git a/src/binsrv/events/common_header_view.hpp b/src/binsrv/events/common_header_view.hpp index e2f23a3..cb96e1e 100644 --- a/src/binsrv/events/common_header_view.hpp +++ b/src/binsrv/events/common_header_view.hpp @@ -162,7 +162,7 @@ class [[nodiscard]] common_header_view : private common_header_view_base { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) const_cast(std::data(portion)), std::size(portion)}} {} - // deliberately implicit to allow seamless convertion from + // deliberately implicit to allow seamless conversion from // common_header_updatable_view to common_header_view // NOLINTNEXTLINE(hicpp-explicit-conversions) common_header_view(const common_header_updatable_view &other) diff --git a/src/binsrv/events/event.cpp b/src/binsrv/events/event.cpp index 0c7acd8..30953ad 100644 --- a/src/binsrv/events/event.cpp +++ b/src/binsrv/events/event.cpp @@ -39,7 +39,7 @@ namespace binsrv::events { -event::event(reader_context &context, const event_view &view) +event::event(const reader_context &context, const event_view &view) : common_header_{view.get_common_header_raw()} { const auto encoded_server_version{ context.get_current_encoded_server_version()}; @@ -51,10 +51,9 @@ event::event(reader_context &context, const event_view &view) if (view.has_footer()) { footer_.emplace(view.get_footer_view()); }; - context.process_event(*this); } -event::event(reader_context &context, util::const_byte_span portion) +event::event(const reader_context &context, util::const_byte_span portion) : event{context, event_view{context, portion}} {} template diff --git a/src/binsrv/events/event.hpp b/src/binsrv/events/event.hpp index e82ceba..0e6382c 100644 --- a/src/binsrv/events/event.hpp +++ b/src/binsrv/events/event.hpp @@ -128,8 +128,8 @@ class [[nodiscard]] event { post_header, body, include_checksum, nullptr}; } - event(reader_context &context, const event_view &view); - event(reader_context &context, util::const_byte_span portion); + event(const reader_context &context, const event_view &view); + event(const reader_context &context, util::const_byte_span portion); [[nodiscard]] const common_header &get_common_header() const noexcept { return common_header_; diff --git a/src/binsrv/events/event_view.cpp b/src/binsrv/events/event_view.cpp index 9843bfb..f2483a4 100644 --- a/src/binsrv/events/event_view.cpp +++ b/src/binsrv/events/event_view.cpp @@ -15,16 +15,23 @@ #include "binsrv/events/event_view.hpp" +#include #include #include #include #include #include +// needed for 'event_storage' +#include // IWYU pragma: keep + #include "binsrv/events/code_type.hpp" #include "binsrv/events/common_header_view.hpp" +#include "binsrv/events/event_fwd.hpp" #include "binsrv/events/footer_view.hpp" -#include "binsrv/events/format_description_body_impl.hpp" +// needed for extracting info from the FDE body +#include "binsrv/events/format_description_body_impl.hpp" // IWYU pragma: keep +#include "binsrv/events/generic_body.hpp" #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/events/reader_context.hpp" @@ -93,7 +100,7 @@ event_view_base::event_view_base(const reader_context &context, // we should verify checksum for them or not is determined by the value // of the 'checksum_algorithm' in their bodies, not by the reader_context // (as for all other events) - const generic_body_impl body{get_body_raw()}; + const generic_body body{get_body_raw()}; if (!body.has_checksum_algorithm()) { return; } @@ -129,14 +136,80 @@ event_view_base::get_common_header_updatable_view() const { event_view_base::get_footer_updatable_view() const { return footer_updatable_view{get_footer_updatable_raw()}; } -std::ostream &operator<<(std::ostream &output, const event_view &obj) { - output << "| common header | " << event_view::get_common_header_size() - << " byte(s) |\n" - << "| post header | " << obj.get_post_header_size() << " byte(s) |\n" - << "| body | " << obj.get_body_size() << " byte(s) |\n" - << "| footer | " << obj.get_footer_size() << " byte(s) |\n"; - return output; +[[nodiscard]] event_updatable_view materialize(const event_view &event_v, + event_storage &buffer, + materialization_type mode) { + // mode adjustments for cases when nothing has to be changed + if (mode == materialization_type::force_remove_checksum && + !event_v.has_footer()) { + mode = materialization_type::leave_checksum_as_is; + } + if (mode == materialization_type::force_add_checksum && + event_v.has_footer()) { + mode = materialization_type::leave_checksum_as_is; + } + + // source does not have checksum, destination should + if (mode == materialization_type::force_add_checksum) { + assert(!event_v.has_footer()); + const auto event_size_with_footer{event_v.get_total_size() + + footer_view_base::size_in_bytes}; + const auto source_portion{event_v.get_portion()}; + buffer.reserve(event_size_with_footer); + buffer.assign(std::cbegin(source_portion), std::cend(source_portion)); + buffer.resize(event_size_with_footer); + const util::byte_span destination_portion{std::begin(buffer), + std::size(buffer)}; + event_updatable_view result{destination_portion, + event_v.get_post_header_size(), + footer_view_base::size_in_bytes}; + { + const auto write_proxy{result.get_write_proxy()}; + write_proxy.get_common_header_updatable_view().set_event_size_raw( + static_cast(event_size_with_footer)); + } + return result; + } + + // source has checksum, destination should not + if (mode == materialization_type::force_remove_checksum) { + assert(event_v.has_footer()); + const auto event_size_wo_footer{event_v.get_total_size() - + footer_view_base::size_in_bytes}; + const auto source_portion{ + event_v.get_portion().subspan(0U, event_size_wo_footer)}; + buffer.assign(std::cbegin(source_portion), std::cend(source_portion)); + const util::byte_span destination_portion{std::begin(buffer), + std::size(buffer)}; + event_updatable_view result{destination_portion, + event_v.get_post_header_size(), 0U}; + { + const auto write_proxy{result.get_write_proxy()}; + write_proxy.get_common_header_updatable_view().set_event_size_raw( + static_cast(event_size_wo_footer)); + } + return result; + } + + // either source has checksum and destination should or + // source does not have checksum and destination should not + assert(mode == materialization_type::leave_checksum_as_is); + + const auto source_portion{event_v.get_portion()}; + buffer.assign(std::cbegin(source_portion), std::cend(source_portion)); + const util::byte_span destination_portion{std::begin(buffer), + std::size(buffer)}; + return event_updatable_view{destination_portion, + event_v.get_post_header_size(), + event_v.get_footer_size()}; +} + +std::ostream &operator<<(std::ostream &output, const event_view &obj) { + return output << "common header: " << event_view::get_common_header_size() + << " byte(s), post header: " << obj.get_post_header_size() + << " byte(s), body: " << obj.get_body_size() + << " byte(s), footer: " << obj.get_footer_size() << " byte(s)"; } } // namespace binsrv::events diff --git a/src/binsrv/events/event_view.hpp b/src/binsrv/events/event_view.hpp index 801a821..ada6f72 100644 --- a/src/binsrv/events/event_view.hpp +++ b/src/binsrv/events/event_view.hpp @@ -103,7 +103,13 @@ class [[nodiscard]] event_view_base { [[nodiscard]] footer_updatable_view get_footer_updatable_view() const; protected: + // NOLINTNEXTLINE(bugprone-easily-swappable-parameters) + event_view_base(util::byte_span portion, std::size_t post_header_size, + std::size_t footer_size) noexcept + : portion_{portion}, post_header_size_{post_header_size}, + footer_size_{footer_size} {} event_view_base(const reader_context &context, util::byte_span portion); + event_view_base(const event_view_base &other) = default; event_view_base(event_view_base &&other) = default; event_view_base &operator=(const event_view_base &other) = default; @@ -118,6 +124,9 @@ class [[nodiscard]] event_view_base { class [[nodiscard]] event_updatable_view : private event_view_base { friend class event_view; + friend event_updatable_view materialize(const event_view &event_v, + event_storage &buffer, + materialization_type mode); public: // an auxiliary proxy class that allows to update the content of an @@ -129,9 +138,9 @@ class [[nodiscard]] event_updatable_view : private event_view_base { // const binsrv::events::reader_context context{...}; // const util::const_byte_span ro_event_data_block{...}; // const binsrv::events::event_view event_v{context, ro_event_data_block}; - // using event_buffer_type = std::vector; - // event_buffer_type event_copy(std::cbegin(ro_event_data_block), - // std::cend(ro_event_data_block)); + // + // binsrv::events::event_storage event_copy( + // std::cbegin(ro_event_data_block), std::cend(ro_event_data_block)); // const binsrv::events::event_updatable_view event_copy_uv{context, // event_copy}; // { @@ -141,7 +150,7 @@ class [[nodiscard]] event_updatable_view : private event_view_base { // } // // please notice a new code block with the write_proxy object: it is - // crucial to keep it alive when you want to perform several modyfying + // crucial to keep it alive when you want to perform several modifying // operations - the checksum will be recalculated only at the end of // this code block when the proxy is destroyed class [[nodiscard]] write_proxy { @@ -159,6 +168,15 @@ class [[nodiscard]] event_updatable_view : private event_view_base { } } + [[nodiscard]] util::byte_span get_updatable_portion() const noexcept { + return parent_->get_updatable_portion(); + } + + [[nodiscard]] util::byte_span + get_common_header_updatable_raw() const noexcept { + return parent_->get_common_header_updatable_raw(); + } + [[nodiscard]] common_header_updatable_view get_common_header_updatable_view() const { return parent_->get_common_header_updatable_view(); @@ -184,6 +202,7 @@ class [[nodiscard]] event_updatable_view : private event_view_base { : event_view_base{context, portion} {} // clang-format off + using event_view_base::get_portion; using event_view_base::get_total_size; using event_view_base::calculate_crc; @@ -204,6 +223,9 @@ class [[nodiscard]] event_updatable_view : private event_view_base { [[nodiscard]] write_proxy get_write_proxy() const { return write_proxy{*this}; } + +private: + using event_view_base::event_view_base; }; class [[nodiscard]] event_view : private event_view_base { @@ -216,12 +238,13 @@ class [[nodiscard]] event_view : private event_view_base { const_cast(std::data(portion)), std::size(portion)}} {} - // deliberately implicit to allow seamless convertion from + // deliberately implicit to allow seamless conversion from // event_updatable_view to event_view // NOLINTNEXTLINE(hicpp-explicit-conversions) event_view(const event_updatable_view &other) : event_view_base{other} {} // clang-format off + using event_view_base::get_portion; using event_view_base::get_total_size; using event_view_base::calculate_crc; @@ -244,6 +267,9 @@ class [[nodiscard]] event_view : private event_view_base { using event_view_base::get_footer_raw; using event_view_base::get_footer_view; // clang-format on + +private: + using event_view_base::event_view_base; }; } // namespace binsrv::events diff --git a/src/binsrv/events/event_view_fwd.hpp b/src/binsrv/events/event_view_fwd.hpp index dbb876a..9fdd2ab 100644 --- a/src/binsrv/events/event_view_fwd.hpp +++ b/src/binsrv/events/event_view_fwd.hpp @@ -16,13 +16,25 @@ #ifndef BINSRV_EVENTS_EVENT_VIEW_FWD_HPP #define BINSRV_EVENTS_EVENT_VIEW_FWD_HPP +#include #include +#include "binsrv/events/event_fwd.hpp" + namespace binsrv::events { class event_updatable_view; class event_view; +enum class materialization_type : std::uint8_t { + force_add_checksum, + force_remove_checksum, + leave_checksum_as_is +}; +[[nodiscard]] event_updatable_view materialize(const event_view &event_v, + event_storage &buffer, + materialization_type mode); + std::ostream &operator<<(std::ostream &output, const event_view &obj); } // namespace binsrv::events diff --git a/src/binsrv/events/footer_view.hpp b/src/binsrv/events/footer_view.hpp index 6a019f6..2edb895 100644 --- a/src/binsrv/events/footer_view.hpp +++ b/src/binsrv/events/footer_view.hpp @@ -74,7 +74,7 @@ class [[nodiscard]] footer_view : private footer_view_base { // NOLINTNEXTLINE(cppcoreguidelines-pro-type-const-cast) const_cast(std::data(portion)), std::size(portion)}} {} - // deliberately implicit to allow seamless convertion from + // deliberately implicit to allow seamless conversion from // footer_updatable_view to footer_view // NOLINTNEXTLINE(hicpp-explicit-conversions) footer_view(const footer_updatable_view &other) : footer_view_base{other} {} diff --git a/src/binsrv/events/protocol_traits.cpp b/src/binsrv/events/protocol_traits.cpp index 67926dc..90b6d0d 100644 --- a/src/binsrv/events/protocol_traits.cpp +++ b/src/binsrv/events/protocol_traits.cpp @@ -88,23 +88,22 @@ print_post_header_lengths(std::ostream &output, return output; } -void validate_post_header_lengths( - std::uint32_t encoded_server_version, - const post_header_length_container &runtime, - const post_header_length_container &hardcoded) { +void validate_post_header_lengths(std::uint32_t encoded_server_version, + const post_header_length_container &runtime, + const post_header_length_container &known) { const auto number_of_events{get_number_of_events(encoded_server_version)}; const auto length_mismatch_result{std::ranges::mismatch( runtime | std::ranges::views::take(number_of_events), - hardcoded | std::ranges::views::take(number_of_events), + known | std::ranges::views::take(number_of_events), [](encoded_post_header_length_type real, encoded_post_header_length_type expected) { return expected == static_cast( unspecified_post_header_length) || real == expected; })}; - if (length_mismatch_result.in2 != std::cend(hardcoded)) { + if (length_mismatch_result.in2 != std::cend(known)) { const auto offset{static_cast( - std::distance(std::cbegin(hardcoded), length_mismatch_result.in2))}; + std::distance(std::cbegin(known), length_mismatch_result.in2))}; const std::string label{ to_string_view(util::index_to_enum(offset + 1U))}; util::exception_location().raise( diff --git a/src/binsrv/events/protocol_traits.hpp b/src/binsrv/events/protocol_traits.hpp index cdb6f32..a559973 100644 --- a/src/binsrv/events/protocol_traits.hpp +++ b/src/binsrv/events/protocol_traits.hpp @@ -42,10 +42,9 @@ print_post_header_lengths(std::ostream &output, std::uint32_t encoded_server_version, const post_header_length_container &obj); -void validate_post_header_lengths( - std::uint32_t encoded_server_version, - const post_header_length_container &runtime, - const post_header_length_container &hardcoded); +void validate_post_header_lengths(std::uint32_t encoded_server_version, + const post_header_length_container &runtime, + const post_header_length_container &known); } // namespace binsrv::events diff --git a/src/binsrv/events/reader_context.cpp b/src/binsrv/events/reader_context.cpp index b856efc..ce9438f 100644 --- a/src/binsrv/events/reader_context.cpp +++ b/src/binsrv/events/reader_context.cpp @@ -28,6 +28,7 @@ #include "binsrv/events/code_type.hpp" #include "binsrv/events/common_header_flag_type.hpp" #include "binsrv/events/event.hpp" +#include "binsrv/events/event_view.hpp" #include "binsrv/events/protocol_traits.hpp" #include "util/conversion_helpers.hpp" @@ -47,7 +48,7 @@ reader_context::reader_context(std::uint32_t encoded_server_version, position_{position == 0U ? static_cast(magic_binlog_offset) : position}, post_header_lengths_{ - get_hardcoded_post_header_lengths(encoded_server_version_)} {} + get_known_post_header_lengths(encoded_server_version_)} {} [[nodiscard]] std::size_t reader_context::get_current_post_header_length(code_type code) const noexcept { @@ -56,7 +57,7 @@ reader_context::get_current_post_header_length(code_type code) const noexcept { } [[nodiscard]] const post_header_length_container & -reader_context::get_hardcoded_post_header_lengths( +reader_context::get_known_post_header_lengths( std::uint32_t encoded_server_version) noexcept { // here we use a trick with a templated lambda to initialize constexpr // arrays which would have expected post header lengths for all @@ -82,7 +83,7 @@ reader_context::get_hardcoded_post_header_lengths( } }); - constexpr auto hardcoded_post_header_lengths_generator{ + constexpr auto known_post_header_lengths_generator{ [](std::uint32_t version, std::index_sequence) constexpr -> post_header_length_container { @@ -92,70 +93,435 @@ reader_context::get_hardcoded_post_header_lengths( }; }}; - // the remainder of the 'earliest_supported_post_header_lengths' container + // the remainder of the 'earliest' container (the gtid_tagged_log element) // will be default-initialized - static constexpr post_header_length_container - earliest_supported_post_header_lengths{ - hardcoded_post_header_lengths_generator( - earliest_supported_protocol_server_version, - std::make_index_sequence< - get_number_of_events( - earliest_supported_protocol_server_version) - - 1U>{})}; - static constexpr post_header_length_container - latest_known_post_header_lengths{hardcoded_post_header_lengths_generator( + static constexpr post_header_length_container earliest{ + known_post_header_lengths_generator( + earliest_supported_protocol_server_version, + std::make_index_sequence< + get_number_of_events(earliest_supported_protocol_server_version) - + 1U>{})}; + static constexpr post_header_length_container latest{ + known_post_header_lengths_generator( latest_known_protocol_server_version, std::make_index_sequence{})}; return encoded_server_version < latest_known_protocol_server_version - ? earliest_supported_post_header_lengths - : latest_known_post_header_lengths; + ? earliest + : latest; +} + +[[nodiscard]] const post_header_length_container & +reader_context::get_hardcoded_post_header_lengths( + std::uint32_t encoded_server_version) noexcept { + // clang-format off + // https://github.com/mysql/mysql-server/blob/mysql-8.0.45/libbinlogevents/src/control_events.cpp#L96 + // https://github.com/mysql/mysql-server/blob/mysql-8.4.8/libs/mysql/binlog/event/control_events.cpp#L104 + // https://github.com/mysql/mysql-server/blob/mysql-9.6.0/libs/mysql/binlog/event/control_events.cpp#L104 + + // here is a formatted version of 8.0 post header length array definition + // from the MySQL Server source code + + /* + static uint8_t server_event_header_length[] = { + 0, + QUERY_HEADER_LEN, + STOP_HEADER_LEN, + ROTATE_HEADER_LEN, + INTVAR_HEADER_LEN, + 0, + // Unused because the code for Slave log event was removed. + // (15th Oct. 2010) + 0, + 0, + APPEND_BLOCK_HEADER_LEN, + 0, + DELETE_FILE_HEADER_LEN, + 0, + RAND_HEADER_LEN, + USER_VAR_HEADER_LEN, + FORMAT_DESCRIPTION_HEADER_LEN, + XID_HEADER_LEN, + BEGIN_LOAD_QUERY_HEADER_LEN, + EXECUTE_LOAD_QUERY_HEADER_LEN, + TABLE_MAP_HEADER_LEN, + 0, + 0, + 0, + ROWS_HEADER_LEN_V1, // WRITE_ROWS_EVENT_V1 + ROWS_HEADER_LEN_V1, // UPDATE_ROWS_EVENT_V1 + ROWS_HEADER_LEN_V1, // DELETE_ROWS_EVENT_V1 + INCIDENT_HEADER_LEN, + 0, // HEARTBEAT_LOG_EVENT + IGNORABLE_HEADER_LEN, + IGNORABLE_HEADER_LEN, + ROWS_HEADER_LEN_V2, + ROWS_HEADER_LEN_V2, + ROWS_HEADER_LEN_V2, + Gtid_event::POST_HEADER_LENGTH, // GTID_EVENT + Gtid_event::POST_HEADER_LENGTH, // ANONYMOUS_GTID_EVENT + IGNORABLE_HEADER_LEN, + TRANSACTION_CONTEXT_HEADER_LEN, + VIEW_CHANGE_HEADER_LEN, + XA_PREPARE_HEADER_LEN, + ROWS_HEADER_LEN_V2, + TRANSACTION_PAYLOAD_EVENT, + 0, // HEARTBEAT_LOG_EVENT_V2 + }; + */ + + // here is a formatted version of 8.4 post header length array definition + // from the MySQL Server source code + /* + static uint8_t server_event_header_length[] = { + 0, + QUERY_HEADER_LEN, + STOP_HEADER_LEN, + ROTATE_HEADER_LEN, + INTVAR_HEADER_LEN, + 0, + // Unused because the code for Slave log event was removed. + // (15th Oct. 2010) + 0, + 0, + APPEND_BLOCK_HEADER_LEN, + 0, + DELETE_FILE_HEADER_LEN, + 0, + RAND_HEADER_LEN, + USER_VAR_HEADER_LEN, + FORMAT_DESCRIPTION_HEADER_LEN, + XID_HEADER_LEN, + BEGIN_LOAD_QUERY_HEADER_LEN, + EXECUTE_LOAD_QUERY_HEADER_LEN, + TABLE_MAP_HEADER_LEN, + 0, + 0, + 0, + // First three values are unused as the code for V1 Rows events were + // removed in 8.4.0 + 0, + 0, + 0, + INCIDENT_HEADER_LEN, + 0, // HEARTBEAT_LOG_EVENT + IGNORABLE_HEADER_LEN, + IGNORABLE_HEADER_LEN, + ROWS_HEADER_LEN_V2, + ROWS_HEADER_LEN_V2, + ROWS_HEADER_LEN_V2, + Gtid_event::POST_HEADER_LENGTH, // GTID_EVENT + Gtid_event::POST_HEADER_LENGTH, // ANONYMOUS_GTID_EVENT + IGNORABLE_HEADER_LEN, + TRANSACTION_CONTEXT_HEADER_LEN, + VIEW_CHANGE_HEADER_LEN, + XA_PREPARE_HEADER_LEN, + ROWS_HEADER_LEN_V2, + TRANSACTION_PAYLOAD_EVENT, + 0, // HEARTBEAT_LOG_EVENT_V2 + 0, // GTID_TAGGED_LOG_EVENT + }; + */ + + // in addition, here are dumps of the FORMAT_DESCRIPTION event generated + // by this utility + + // for the 8.0 MySQL Server + /* + start_v3 => 0 + query => 13 + stop => 0 + rotate => 8 + intvar => 0 + obsolete_6 => 0 + slave => 0 + obsolete_8 => 0 + append_block => 4 + obsolete_10 => 0 + delete_file => 4 + obsolete_12 => 0 + rand => 0 + user_var => 0 + format_description => 98 + xid => 0 + begin_load_query => 4 + execute_load_query => 26 + table_map => 8 + obsolete_20 => 0 + obsolete_21 => 0 + obsolete_22 => 0 + write_rows_v1 => 8 + update_rows_v1 => 8 + delete_rows_v1 => 8 + incident => 2 + heartbeat_log => 0 + ignorable_log => 0 + rows_query_log => 0 + write_rows => 10 + update_rows => 10 + delete_rows => 10 + gtid_log => 42 + anonymous_gtid_log => 42 + previous_gtids_log => 0 + transaction_context => 18 + view_change => 52 + xa_prepare_log => 0 + partial_update_rows => 10 + transaction_payload => 40 + heartbeat_log_v2 => 0 + */ + + // for the 8.4 MySQL Server + /* + start_v3 => 0 + query => 13 + stop => 0 + rotate => 8 + intvar => 0 + obsolete_6 => 0 + slave => 0 + obsolete_8 => 0 + append_block => 4 + obsolete_10 => 0 + delete_file => 4 + obsolete_12 => 0 + rand => 0 + user_var => 0 + format_description => 99 + xid => 0 + begin_load_query => 4 + execute_load_query => 26 + table_map => 8 + obsolete_20 => 0 + obsolete_21 => 0 + obsolete_22 => 0 + write_rows_v1 => 0 + update_rows_v1 => 0 + delete_rows_v1 => 0 + incident => 2 + heartbeat_log => 0 + ignorable_log => 0 + rows_query_log => 0 + write_rows => 10 + update_rows => 10 + delete_rows => 10 + gtid_log => 42 + anonymous_gtid_log => 42 + previous_gtids_log => 0 + transaction_context => 18 + view_change => 52 + xa_prepare_log => 0 + partial_update_rows => 10 + transaction_payload => 40 + heartbeat_log_v2 => 0 + gtid_tagged_log => 0 + */ + + // 8.0 - 8.4 comparison table + /* + +-----------+---------------------+-----------+-----------+----------+ + | Code | Mnemonic | 8.0 | 8.4 | Mismatch | + +-----------+---------------------+-----------+-----------+----------+ + | 1 | start_v3 | 0 | 0 | | + | 2 | query | 13 | 13 | | + | 3 | stop | 0 | 0 | | + | 4 | rotate | 8 | 8 | | + | 5 | intvar | 0 | 0 | | + | 6 | obsolete_6 | 0 | 0 | | + | 7 | slave | 0 | 0 | | + | 8 | obsolete_8 | 0 | 0 | | + | 9 | append_block | 4 | 4 | | + | 10 | obsolete_10 | 0 | 0 | | + | 11 | delete_file | 4 | 4 | | + | 12 | obsolete_12 | 0 | 0 | | + | 13 | rand | 0 | 0 | | + | 14 | user_var | 0 | 0 | | + | 15 | format_description | 98 | 99 | * | + | 16 | xid | 0 | 0 | | + | 17 | begin_load_query | 4 | 4 | | + | 18 | execute_load_query | 26 | 26 | | + | 19 | table_map | 8 | 8 | | + | 20 | obsolete_20 | 0 | 0 | | + | 21 | obsolete_21 | 0 | 0 | | + | 22 | obsolete_22 | 0 | 0 | | + | 23 | write_rows_v1 | 8 | 0 | * | + | 24 | update_rows_v1 | 8 | 0 | * | + | 25 | delete_rows_v1 | 8 | 0 | * | + | 26 | incident | 2 | 2 | | + | 27 | heartbeat_log | 0 | 0 | | + | 28 | ignorable_log | 0 | 0 | | + | 29 | rows_query_log | 0 | 0 | | + | 30 | write_rows | 10 | 10 | | + | 31 | update_rows | 10 | 10 | | + | 32 | delete_rows | 10 | 10 | | + | 33 | gtid_log | 42 | 42 | | + | 34 | anonymous_gtid_log | 42 | 42 | | + | 35 | previous_gtids_log | 0 | 0 | | + | 36 | transaction_context | 18 | 18 | | + | 37 | view_change | 52 | 52 | | + | 38 | xa_prepare_log | 0 | 0 | | + | 39 | partial_update_rows | 10 | 10 | | + | 40 | transaction_payload | 40 | 40 | | + | 41 | heartbeat_log_v2 | 0 | 0 | | + | 42 | gtid_tagged_log | | 0 | * | + +-----------+---------------------+-----------+-----------+----------+ + */ + // clang-format on + + // to sum up, 8.4 in comparison to 8.0 has: + // - one more event GTID_TAGGED_LOG_EVENT (code 42) + // - because of this the length of the FORMAT_DESCRIPTION_EVENT post-header + // (code 15) also increased by 1 byte + // - also WRITE_ROWS_V1 (code 23), UPDATE_ROWS_V1 (code 24), and + // DELETE_ROWS_V1 (code 25) are lo longer supported and their post-header + // lengths changed from 8 bytes to 0 + + // the remainder of the 'earliest' container (the gtid_tagged_log element) + // will be default-initialized + static constexpr post_header_length_container earliest{ + 0, // start_v3 + 13, // query + 0, // stop + 8, // rotate + 0, // intvar + 0, // obsolete_6 + 0, // slave + 0, // obsolete_8 + 4, // append_block + 0, // obsolete_10 + 4, // delete_file + 0, // obsolete_12 + 0, // rand + 0, // user_var + 98, // format_description + 0, // xid + 4, // begin_load_query + 26, // execute_load_query + 8, // table_map + 0, // obsolete_20 + 0, // obsolete_21 + 0, // obsolete_22 + 8, // write_rows_v1 + 8, // update_rows_v1 + 8, // delete_rows_v1 + 2, // incident + 0, // heartbeat_log + 0, // ignorable_log + 0, // rows_query_log + 10, // write_rows + 10, // update_rows + 10, // delete_rows + 42, // gtid_log + 42, // anonymous_gtid_log + 0, // previous_gtids_log + 18, // transaction_context + 52, // view_change + 0, // xa_prepare_log + 10, // partial_update_rows + 40, // transaction_payload + 0 // heartbeat_log_v2 + }; + static constexpr post_header_length_container latest{ + 0, // start_v3 + 13, // query + 0, // stop + 8, // rotate + 0, // intvar + 0, // obsolete_6 + 0, // slave + 0, // obsolete_8 + 4, // append_block + 0, // obsolete_10 + 4, // delete_file + 0, // obsolete_12 + 0, // rand + 0, // user_var + 99, // format_description + 0, // xid + 4, // begin_load_query + 26, // execute_load_query + 8, // table_map + 0, // obsolete_20 + 0, // obsolete_21 + 0, // obsolete_22 + 0, // write_rows_v1 + 0, // update_rows_v1 + 0, // delete_rows_v1 + 2, // incident + 0, // heartbeat_log + 0, // ignorable_log + 0, // rows_query_log + 10, // write_rows + 10, // update_rows + 10, // delete_rows + 42, // gtid_log + 42, // anonymous_gtid_log + 0, // previous_gtids_log + 18, // transaction_context + 52, // view_change + 0, // xa_prepare_log + 10, // partial_update_rows + 40, // transaction_payload + 0, // heartbeat_log_v2 + 0 // gtid_tagged_log + }; + + return encoded_server_version < latest_known_protocol_server_version + ? earliest + : latest; } -void reader_context::process_event(const event ¤t_event) { +[[nodiscard]] bool +reader_context::process_event_view(const event_view ¤t_event_v) { + bool info_only{false}; bool processed{false}; while (!processed) { switch (state_) { case state_type::rotate_artificial_expected: - processed = - process_event_in_rotate_artificial_expected_state(current_event); + processed = process_event_in_rotate_artificial_expected_state( + current_event_v, info_only); break; case state_type::format_description_expected: - processed = - process_event_in_format_description_expected_state(current_event); + processed = process_event_in_format_description_expected_state( + current_event_v, info_only); break; case state_type::previous_gtids_expected: - processed = process_event_in_previous_gtids_expected_state(current_event); + processed = process_event_in_previous_gtids_expected_state( + current_event_v, info_only); break; case state_type::gtid_log_expected: - processed = process_event_in_gtid_log_expected_state(current_event); + processed = + process_event_in_gtid_log_expected_state(current_event_v, info_only); break; case state_type::any_other_expected: - processed = process_event_in_any_other_expected_state(current_event); + processed = + process_event_in_any_other_expected_state(current_event_v, info_only); break; case state_type::rotate_or_stop_expected: - processed = process_event_in_rotate_or_stop_expected_state(current_event); + processed = process_event_in_rotate_or_stop_expected_state( + current_event_v, info_only); break; default: assert(false); } } + return info_only; } [[nodiscard]] bool reader_context::process_event_in_rotate_artificial_expected_state( - const event ¤t_event) { + const event_view ¤t_event_v, bool &info_only) { assert(state_ == state_type::rotate_artificial_expected); - const auto &common_header{current_event.get_common_header()}; + const auto common_header_v{current_event_v.get_common_header_view()}; // in the "rotate_artificial_expected" state we expect only artificial rotate // events - const auto is_artificial{common_header.get_flags().has_element( + const auto is_artificial{common_header_v.get_flags().has_element( common_header_flag_type::artificial)}; const auto is_artificial_rotate{ - common_header.get_type_code() == code_type::rotate && is_artificial}; + common_header_v.get_type_code() == code_type::rotate && is_artificial}; if (!is_artificial_rotate) { util::exception_location().raise( "an artificial rotate event must be the very first event in the " @@ -164,20 +530,21 @@ reader_context::process_event_in_rotate_artificial_expected_state( // artificial rotate events must always have next event position and // timestamp set to 0 - if (common_header.get_timestamp_raw() != 0U) { + if (common_header_v.get_timestamp_raw() != 0U) { util::exception_location().raise( "non-zero timestamp found in an artificial rotate event"); } - if (common_header.get_next_event_position_raw() != 0U) { + if (common_header_v.get_next_event_position_raw() != 0U) { util::exception_location().raise( "non-zero next event position found in an artificial rotate event"); } - const auto ¤t_post_header{ - current_event.get_post_header()}; - const auto ¤t_body{current_event.get_body()}; + const auto current_post_header{generic_post_header{ + current_event_v.get_post_header_raw()}}; + const auto current_body{ + generic_body{current_event_v.get_body_raw()}}; if (replication_mode_ == replication_mode_type::position) { - if (current_body.get_binlog() == binlog_name_) { + if (current_body.get_readable_binlog() == binlog_name_) { // in position-based replication mode, when we continue streaming to the // same binlog file, we expect the artificial rotate event to have the // same position as the one supplied to the constructor @@ -205,7 +572,15 @@ reader_context::process_event_in_rotate_artificial_expected_state( "GTID-based replication mode"); } } - if (current_body.get_binlog() != binlog_name_) { + // whether we should expect info-only FORMAT_DESCRIPTION and optional + // PREVIOUS_GTIDS_LOG events depends on whether we resumed streaming to an + // existing non-empty binlog file or not + expect_info_only_preamble_events_ = + (cycle_number_ == 0U) && + (current_body.get_readable_binlog() == binlog_name_) && + (position_ > magic_binlog_offset); + + if (current_body.get_readable_binlog() != binlog_name_) { // in the case when binlog name in the artificial rotate event does not // match the one specified in the last saved one, we should update it here // and reset the position to "magic_offset" (4) @@ -217,19 +592,12 @@ reader_context::process_event_in_rotate_artificial_expected_state( // also, when the storage objects is created on an empty directory, // 'storage.get_current_binlog_name()' returns an empty string which will // never be equal to any real binlog name - binlog_name_ = current_body.get_binlog(); + binlog_name_ = current_body.get_readable_binlog(); reset_position(); } - // whether we should expect info-only FORMAT_DESCRIPTION and optional - // PREVIOUS_GTIDS_LOG events depends on whether we resumed streaming to an - // existing non-empty binlog file or not - if (current_body.get_binlog() == binlog_name_ && - position_ > magic_binlog_offset) { - expect_ignorable_preamble_events_ = true; - } - - info_only_event_ = true; + ++cycle_number_; + info_only = true; // transition to the next state state_ = state_type::format_description_expected; return true; @@ -237,28 +605,29 @@ reader_context::process_event_in_rotate_artificial_expected_state( [[nodiscard]] bool reader_context::process_event_in_format_description_expected_state( - const event ¤t_event) { + const event_view ¤t_event_v, bool &info_only) { assert(state_ == state_type::format_description_expected); - const auto &common_header{current_event.get_common_header()}; + const auto common_header_v{current_event_v.get_common_header_view()}; // in the "format_description_expected" state we expect only format // description events - if (common_header.get_type_code() != code_type::format_description) { + if (common_header_v.get_type_code() != code_type::format_description) { // TODO: this check should be performed just after the common header is // parsed to make sure we rely on proper post_header lengths util::exception_location().raise( "format description event must follow an artificial rotate event"); } - const auto is_artificial{common_header.get_flags().has_element( + const auto is_artificial{common_header_v.get_flags().has_element( common_header_flag_type::artificial)}; if (is_artificial) { util::exception_location().raise( "format description event is not expected to be artificial"); } - const auto &post_header{ - current_event.get_post_header()}; + const auto post_header{generic_post_header{ + get_current_encoded_server_version(), + current_event_v.get_post_header_raw()}}; // check if FDE has expected binlog version number if (post_header.get_binlog_version_raw() != default_binlog_version) { @@ -282,11 +651,12 @@ reader_context::process_event_in_format_description_expected_state( // generic_post_header_impl::size_in_bytes for known events validate_post_header_lengths( encoded_server_version_, post_header.get_post_header_lengths_raw(), - get_hardcoded_post_header_lengths(encoded_server_version_)); + get_known_post_header_lengths(encoded_server_version_)); post_header_lengths_ = post_header.get_post_header_lengths_raw(); - const auto &body{current_event.get_body()}; + const auto body{generic_body{ + current_event_v.get_body_raw()}}; footer_expected_ = body.has_checksum_algorithm(); // here we differentiate format description events that were actually @@ -321,58 +691,52 @@ reader_context::process_event_in_format_description_expected_state( // in other words, in GTID-based mode there is no way to distinguish whether // the FDE / PREVIOUS_GTIDS_LOG should be written to the binlog file or // not - that is why we rely only on the value of the - // "expect_ignorable_preamble_events" calculated previously - info_only_event_ = expect_ignorable_preamble_events_; - if (replication_mode_ == replication_mode_type::position && - info_only_event_) { - if (common_header.get_next_event_position_raw() != 0U) { + // "expect_info_only_preamble_events" calculated previously + info_only = expect_info_only_preamble_events_; + if (replication_mode_ == replication_mode_type::position && info_only) { + if (common_header_v.get_next_event_position_raw() != 0U) { util::exception_location().raise( "expected next event position set to zero in pseudo format " "description event"); } } - if (!info_only_event_) { - validate_position_and_advance(common_header); + if (!info_only) { + validate_position_and_advance(common_header_v); } // the next expected event is PREVIOUS_GTIDS_LOG, unless we are in // position-based replication mode and we resumed streaming to an // existing non-empty binlog file, in which case we expect the next event // to be one of the GTID_LOG events - if (replication_mode_ == replication_mode_type::position && - info_only_event_) { - state_ = state_type::gtid_log_expected; - expect_ignorable_preamble_events_ = false; - } else { - state_ = state_type::previous_gtids_expected; - } + state_ = (replication_mode_ == replication_mode_type::position && info_only) + ? state_type::gtid_log_expected + : state_type::previous_gtids_expected; return true; } [[nodiscard]] bool reader_context::process_event_in_previous_gtids_expected_state( - const event ¤t_event) { + const event_view ¤t_event_v, bool &info_only) { assert(state_ == state_type::previous_gtids_expected); - const auto &common_header{current_event.get_common_header()}; + const auto common_header_v{current_event_v.get_common_header_view()}; // in the "previous_gtids_log_expected" state we expect only previous gtids // log events - if (common_header.get_type_code() != code_type::previous_gtids_log) { + if (common_header_v.get_type_code() != code_type::previous_gtids_log) { util::exception_location().raise( "previous gtids log event must follow a format description event"); } - const auto is_artificial{common_header.get_flags().has_element( + const auto is_artificial{common_header_v.get_flags().has_element( common_header_flag_type::artificial)}; if (is_artificial) { util::exception_location().raise( "previous gtids log event is not expected to be artificial"); } - info_only_event_ = expect_ignorable_preamble_events_; - expect_ignorable_preamble_events_ = false; - if (!info_only_event_) { - validate_position_and_advance(common_header); + info_only = expect_info_only_preamble_events_; + if (!info_only) { + validate_position_and_advance(common_header_v); } state_ = state_type::gtid_log_expected; @@ -380,11 +744,11 @@ reader_context::process_event_in_previous_gtids_expected_state( } [[nodiscard]] bool reader_context::process_event_in_gtid_log_expected_state( - const event ¤t_event) { + const event_view ¤t_event_v, bool &info_only) { assert(state_ == state_type::gtid_log_expected); - const auto &common_header{current_event.get_common_header()}; - const auto code{common_header.get_type_code()}; - const auto is_artificial{common_header.get_flags().has_element( + const auto common_header_v{current_event_v.get_common_header_view()}; + const auto code{common_header_v.get_type_code()}; + const auto is_artificial{common_header_v.get_flags().has_element( common_header_flag_type::artificial)}; // early return here with "false" return code so that the while loop @@ -419,20 +783,20 @@ reader_context::process_event_in_previous_gtids_expected_state( "unexpected anonymous gtid log event in gtid mode replication"); } - start_transaction(current_event); - validate_position_and_advance(common_header); + start_transaction(current_event_v); + validate_position_and_advance(common_header_v); - info_only_event_ = false; + info_only = false; state_ = state_type::any_other_expected; return true; } [[nodiscard]] bool reader_context::process_event_in_any_other_expected_state( - const event ¤t_event) { + const event_view ¤t_event_v, bool &info_only) { assert(state_ == state_type::any_other_expected); - const auto &common_header{current_event.get_common_header()}; - const auto code{common_header.get_type_code()}; - const auto is_artificial{common_header.get_flags().has_element( + const auto common_header_v{current_event_v.get_common_header_view()}; + const auto code{common_header_v.get_type_code()}; + const auto is_artificial{common_header_v.get_flags().has_element( common_header_flag_type::artificial)}; // early return here, the exit from the @@ -481,20 +845,20 @@ reader_context::process_event_in_previous_gtids_expected_state( "unexpected artificial event in the 'previous gtids log processed' " "state"); } - update_transaction(common_header); - validate_position_and_advance(common_header); + update_transaction(common_header_v); + validate_position_and_advance(common_header_v); - info_only_event_ = false; + info_only = false; // not changing the state here - remain in 'any_other_expected' return true; } [[nodiscard]] bool reader_context::process_event_in_rotate_or_stop_expected_state( - const event ¤t_event) { + const event_view ¤t_event_v, bool &info_only) { assert(state_ == state_type::rotate_or_stop_expected); - const auto &common_header{current_event.get_common_header()}; - const auto code{common_header.get_type_code()}; + const auto common_header_v{current_event_v.get_common_header_view()}; + const auto code{common_header_v.get_type_code()}; // in the "rotate_or_stop_expected" state we expect only rotate // (non-artificial) or stop events @@ -503,7 +867,7 @@ reader_context::process_event_in_rotate_or_stop_expected_state( "the very last event in the binlog must be either rotate or stop"); } - const auto is_artificial{common_header.get_flags().has_element( + const auto is_artificial{common_header_v.get_flags().has_element( common_header_flag_type::artificial)}; if (is_artificial) { util::exception_location().raise( @@ -513,8 +877,9 @@ reader_context::process_event_in_rotate_or_stop_expected_state( if (code == code_type::rotate) { // position in non-artificial rotate event post header must be equal to // magic_binlog_offset (4) - if (current_event.get_post_header().get_position_raw() != - magic_binlog_offset) { + const auto current_post_header{generic_post_header{ + current_event_v.get_post_header_raw()}}; + if (current_post_header.get_position_raw() != magic_binlog_offset) { util::exception_location().raise( "unexpected position in an non-artificial rotate event post " "header"); @@ -527,54 +892,58 @@ reader_context::process_event_in_rotate_or_stop_expected_state( // in this latter case, we also reset the position in order to indicate // the end of the current cycle and expect new ROTATE(artificial) and // FORMAT_DESCRIPTION - validate_position(common_header); + validate_position(common_header_v); reset_position(); - info_only_event_ = false; + info_only = false; state_ = state_type::rotate_artificial_expected; return true; } void reader_context::validate_position( - const common_header &common_header) const { - if (common_header.get_next_event_position_raw() == 0U) { + const common_header_view &common_header_v) const { + if (common_header_v.get_next_event_position_raw() == 0U) { util::exception_location().raise( "next event position in the event common header cannot be zero"); } // check if common_header.next_event_position matches current position // plus common_header.event_size - if (position_ + common_header.get_event_size_raw() != - common_header.get_next_event_position_raw()) { + if (position_ + common_header_v.get_event_size_raw() != + common_header_v.get_next_event_position_raw()) { util::exception_location().raise( "unexpected next event position in the event common header"); } } void reader_context::validate_position_and_advance( - const common_header &common_header) { - validate_position(common_header); + const common_header_view &common_header_v) { + validate_position(common_header_v); // simply advance current position - position_ = common_header.get_next_event_position_raw(); + position_ = common_header_v.get_next_event_position_raw(); } void reader_context::reset_position() { position_ = magic_binlog_offset; } -void reader_context::start_transaction(const event ¤t_event) { - switch (current_event.get_common_header().get_type_code()) { +void reader_context::start_transaction(const event_view ¤t_event_v) { + const common_header_view common_header_v{ + current_event_v.get_common_header_view()}; + switch (common_header_v.get_type_code()) { case code_type::anonymous_gtid_log: { // no need to update transaction_gtid_ as in anonymous gtid log event // the gtid part is expected to be empty - if (!current_event.get_post_header() - .get_gtid() - .is_empty()) { + const auto current_post_header{ + generic_post_header{ + current_event_v.get_post_header_raw()}}; + if (!current_post_header.get_gtid().is_empty()) { util::exception_location().raise( "encountered non-empty gtid in the anonymous gtid log event"); } + const auto current_body{generic_body{ + current_event_v.get_body_raw()}}; const auto expected_transaction_length_raw{ - current_event.get_body() - .get_transaction_length_raw()}; + current_body.get_transaction_length_raw()}; if (!std::in_range(expected_transaction_length_raw)) { util::exception_location().raise( "transaction length in the anonymous gtid log event is too large"); @@ -583,16 +952,17 @@ void reader_context::start_transaction(const event ¤t_event) { static_cast(expected_transaction_length_raw); } break; case code_type::gtid_log: { - transaction_gtid_ = - current_event.get_post_header() - .get_gtid(); + const auto current_post_header{generic_post_header{ + current_event_v.get_post_header_raw()}}; + transaction_gtid_ = current_post_header.get_gtid(); if (transaction_gtid_.is_empty()) { util::exception_location().raise( "encountered an empty gtid in the gtid log event"); } + const auto current_body{ + generic_body{current_event_v.get_body_raw()}}; const auto expected_transaction_length_raw{ - current_event.get_body() - .get_transaction_length_raw()}; + current_body.get_transaction_length_raw()}; if (!std::in_range(expected_transaction_length_raw)) { util::exception_location().raise( "transaction length in the gtid log event is too large"); @@ -601,15 +971,15 @@ void reader_context::start_transaction(const event ¤t_event) { static_cast(expected_transaction_length_raw); } break; case code_type::gtid_tagged_log: { - const auto >id_tagged_log_body{ - current_event.get_body()}; - transaction_gtid_ = gtid_tagged_log_body.get_gtid(); + const auto current_body{generic_body{ + current_event_v.get_body_raw()}}; + transaction_gtid_ = current_body.get_gtid(); if (transaction_gtid_.is_empty()) { util::exception_location().raise( "encountered an empty gtid in the gtid tagged log event"); } const auto expected_transaction_length_raw{ - gtid_tagged_log_body.get_transaction_length_raw()}; + current_body.get_transaction_length_raw()}; if (!std::in_range(expected_transaction_length_raw)) { util::exception_location().raise( "transaction length in the gtid tagged log event is too large"); @@ -620,12 +990,12 @@ void reader_context::start_transaction(const event ¤t_event) { default: assert(false); } - current_transaction_length_ = - current_event.get_common_header().get_event_size_raw(); + current_transaction_length_ = common_header_v.get_event_size_raw(); } -void reader_context::update_transaction(const common_header &common_header) { - current_transaction_length_ += common_header.get_event_size_raw(); +void reader_context::update_transaction( + const common_header_view &common_header_v) { + current_transaction_length_ += common_header_v.get_event_size_raw(); if (current_transaction_length_ > expected_transaction_length_) { util::exception_location().raise( "current event exceeds declared transaction length"); diff --git a/src/binsrv/events/reader_context.hpp b/src/binsrv/events/reader_context.hpp index 8f3f23b..966580a 100644 --- a/src/binsrv/events/reader_context.hpp +++ b/src/binsrv/events/reader_context.hpp @@ -24,21 +24,21 @@ #include "binsrv/gtids/gtid.hpp" -#include "binsrv/events/common_header_fwd.hpp" +#include "binsrv/events/common_header_view_fwd.hpp" #include "binsrv/events/event_fwd.hpp" +#include "binsrv/events/event_view_fwd.hpp" #include "binsrv/events/protocol_traits.hpp" namespace binsrv::events { class [[nodiscard]] reader_context { - friend class event; - public: reader_context(std::uint32_t encoded_server_version, bool checksum_verification_enabled, replication_mode_type replication_mode, std::string_view binlog_name, std::uint32_t position); + [[nodiscard]] bool is_fresh() const noexcept { return cycle_number_ == 0U; } [[nodiscard]] std::uint32_t get_current_encoded_server_version() const noexcept { return encoded_server_version_; @@ -65,14 +65,15 @@ class [[nodiscard]] reader_context { (state_ == state_type::rotate_artificial_expected); } - [[nodiscard]] bool is_event_info_only() const noexcept { - return info_only_event_; - } + [[nodiscard]] static const post_header_length_container & + get_known_post_header_lengths(std::uint32_t encoded_server_version) noexcept; [[nodiscard]] static const post_header_length_container & get_hardcoded_post_header_lengths( std::uint32_t encoded_server_version) noexcept; + [[nodiscard]] bool process_event_view(const event_view ¤t_event_v); + private: // this class implements the logic of the following state machine // ( @@ -110,29 +111,31 @@ class [[nodiscard]] reader_context { std::uint32_t expected_transaction_length_{0U}; std::uint32_t current_transaction_length_{0U}; - bool expect_ignorable_preamble_events_{false}; - bool info_only_event_{false}; + bool expect_info_only_preamble_events_{false}; - void process_event(const event ¤t_event); - [[nodiscard]] bool - process_event_in_rotate_artificial_expected_state(const event ¤t_event); + std::size_t cycle_number_{0U}; + + [[nodiscard]] bool process_event_in_rotate_artificial_expected_state( + const event_view ¤t_event_v, bool &info_only); [[nodiscard]] bool process_event_in_format_description_expected_state( - const event ¤t_event); - [[nodiscard]] bool - process_event_in_previous_gtids_expected_state(const event ¤t_event); - [[nodiscard]] bool - process_event_in_gtid_log_expected_state(const event ¤t_event); + const event_view ¤t_event_v, bool &info_only); + [[nodiscard]] bool process_event_in_previous_gtids_expected_state( + const event_view ¤t_event_v, bool &info_only); [[nodiscard]] bool - process_event_in_any_other_expected_state(const event ¤t_event); + process_event_in_gtid_log_expected_state(const event_view ¤t_event_v, + bool &info_only); [[nodiscard]] bool - process_event_in_rotate_or_stop_expected_state(const event ¤t_event); + process_event_in_any_other_expected_state(const event_view ¤t_event_v, + bool &info_only); + [[nodiscard]] bool process_event_in_rotate_or_stop_expected_state( + const event_view ¤t_event_v, bool &info_only); - void validate_position(const common_header &common_header) const; - void validate_position_and_advance(const common_header &common_header); + void validate_position(const common_header_view &common_header_v) const; + void validate_position_and_advance(const common_header_view &common_header_v); void reset_position(); - void start_transaction(const event ¤t_event); - void update_transaction(const common_header &common_header); + void start_transaction(const event_view ¤t_event_v); + void update_transaction(const common_header_view &common_header_v); void finish_transaction(); }; diff --git a/src/binsrv/events/rotate_body_impl.cpp b/src/binsrv/events/rotate_body_impl.cpp index 534d4b2..84d5fc2 100644 --- a/src/binsrv/events/rotate_body_impl.cpp +++ b/src/binsrv/events/rotate_body_impl.cpp @@ -21,6 +21,8 @@ #include #include +#include "binsrv/composite_binlog_name.hpp" + #include "binsrv/events/code_type.hpp" #include "util/byte_span.hpp" @@ -29,9 +31,13 @@ namespace binsrv::events { generic_body_impl::generic_body_impl( - std::string_view binlog_name) - : binlog_(std::cbegin(util::as_const_byte_span(binlog_name)), - std::cend(util::as_const_byte_span(binlog_name))) {} + const composite_binlog_name &binlog_name) + : binlog_{} { + const auto binlog_name_s{binlog_name.str()}; + const auto binlog_name_span{util::as_const_byte_span(binlog_name_s)}; + + binlog_.assign(std::cbegin(binlog_name_span), std::cend(binlog_name_span)); +} generic_body_impl::generic_body_impl( util::const_byte_span portion) { @@ -43,6 +49,11 @@ generic_body_impl::generic_body_impl( binlog_.assign(std::cbegin(portion), std::cend(portion)); } +[[nodiscard]] composite_binlog_name +generic_body_impl::get_parsed_binlog() const { + return composite_binlog_name::parse(get_readable_binlog()); +} + void generic_body_impl::encode_to( util::byte_span &destination) const { if (std::size(destination) < calculate_encoded_size()) { @@ -55,7 +66,7 @@ void generic_body_impl::encode_to( std::ostream &operator<<(std::ostream &output, const generic_body_impl &obj) { - return output << "binlog: " << obj.get_binlog(); + return output << "binlog: " << obj.get_readable_binlog(); } } // namespace binsrv::events diff --git a/src/binsrv/events/rotate_body_impl.hpp b/src/binsrv/events/rotate_body_impl.hpp index 5ecba8d..a029ffe 100644 --- a/src/binsrv/events/rotate_body_impl.hpp +++ b/src/binsrv/events/rotate_body_impl.hpp @@ -23,6 +23,8 @@ #include +#include "binsrv/composite_binlog_name_fwd.hpp" + #include "util/byte_span.hpp" namespace binsrv::events { @@ -35,16 +37,16 @@ template <> class [[nodiscard]] generic_body_impl { boost::container::small_vector; - explicit generic_body_impl(std::string_view binlog_name); + explicit generic_body_impl(const composite_binlog_name &binlog_name); explicit generic_body_impl(util::const_byte_span portion); [[nodiscard]] const binlog_storage &get_binlog_raw() noexcept { return binlog_; } - - [[nodiscard]] std::string_view get_binlog() const noexcept { + [[nodiscard]] std::string_view get_readable_binlog() const noexcept { return util::as_string_view(binlog_); } + [[nodiscard]] composite_binlog_name get_parsed_binlog() const; [[nodiscard]] std::size_t calculate_encoded_size() const noexcept { return std::size(binlog_); diff --git a/src/binsrv/main_config.cpp b/src/binsrv/main_config.cpp index c701ea6..03570be 100644 --- a/src/binsrv/main_config.cpp +++ b/src/binsrv/main_config.cpp @@ -72,6 +72,9 @@ main_config::main_config(std::string_view file_name) { validate(); } -void main_config::validate() const { root().get<"connection">().validate(); } +void main_config::validate() const { + root().get<"connection">().validate(); + root().get<"replication">().validate(); +} } // namespace binsrv diff --git a/src/binsrv/replication_config.cpp b/src/binsrv/replication_config.cpp new file mode 100644 index 0000000..0e95b45 --- /dev/null +++ b/src/binsrv/replication_config.cpp @@ -0,0 +1,39 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/replication_config.hpp" + +#include + +#include "binsrv/replication_mode_type.hpp" + +#include "util/exception_location_helpers.hpp" + +namespace binsrv { + +void replication_config::validate() const { + const auto &optional_rewrite{get<"rewrite">()}; + if (optional_rewrite.has_value()) { + if (get<"mode">() != replication_mode_type::gtid) { + util::exception_location().raise( + "error validating replication config: " + "rewrite can only be enabled in gtid replication mode"); + } + + optional_rewrite->validate(); + } +} + +} // namespace binsrv diff --git a/src/binsrv/replication_config.hpp b/src/binsrv/replication_config.hpp index edca23d..af699a5 100644 --- a/src/binsrv/replication_config.hpp +++ b/src/binsrv/replication_config.hpp @@ -21,6 +21,7 @@ #include #include "binsrv/replication_mode_type_fwd.hpp" +#include "binsrv/rewrite_config.hpp" // IWYU pragma: export #include "util/nv_tuple.hpp" @@ -32,9 +33,12 @@ struct [[nodiscard]] replication_config util::nv<"server_id", std::uint32_t>, util::nv<"idle_time", std::uint32_t>, util::nv<"verify_checksum", bool>, - util::nv<"mode", replication_mode_type> + util::nv<"mode", replication_mode_type>, + util::nv<"rewrite", optional_rewrite_config> // clang-format on - > {}; + > { + void validate() const; +}; } // namespace binsrv diff --git a/src/binsrv/rewrite_config.cpp b/src/binsrv/rewrite_config.cpp new file mode 100644 index 0000000..20e7ac4 --- /dev/null +++ b/src/binsrv/rewrite_config.cpp @@ -0,0 +1,35 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#include "binsrv/rewrite_config.hpp" + +#include +#include +#include + +#include "util/exception_location_helpers.hpp" + +namespace binsrv { + +void rewrite_config::validate() const { + static constexpr std::uint64_t min_file_size{1024ULL}; + if (get<"file_size">().get_value() < min_file_size) { + util::exception_location().raise( + "error validating rewrite config: file size must be >= " + + std::to_string(min_file_size) + " bytes"); + } +} + +} // namespace binsrv diff --git a/src/binsrv/rewrite_config.hpp b/src/binsrv/rewrite_config.hpp new file mode 100644 index 0000000..b801601 --- /dev/null +++ b/src/binsrv/rewrite_config.hpp @@ -0,0 +1,41 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_REWRITE_CONFIG_HPP +#define BINSRV_REWRITE_CONFIG_HPP + +#include "binsrv/rewrite_config_fwd.hpp" // IWYU pragma: export + +#include + +#include "binsrv/size_unit.hpp" + +#include "util/nv_tuple.hpp" + +namespace binsrv { + +// clang-format off +struct [[nodiscard]] rewrite_config + : util::nv_tuple< + util::nv<"base_file_name", std::string>, + util::nv<"file_size", size_unit> + > { + void validate() const; +}; +// clang-format on + +} // namespace binsrv + +#endif // BINSRV_REWRITE_CONFIG_HPP diff --git a/src/binsrv/rewrite_config_fwd.hpp b/src/binsrv/rewrite_config_fwd.hpp new file mode 100644 index 0000000..5ddffe2 --- /dev/null +++ b/src/binsrv/rewrite_config_fwd.hpp @@ -0,0 +1,28 @@ +// Copyright (c) 2023-2024 Percona and/or its affiliates. +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License, version 2.0, +// as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License, version 2.0, for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +#ifndef BINSRV_REWRITE_CONFIG_FWD_HPP +#define BINSRV_REWRITE_CONFIG_FWD_HPP + +#include + +namespace binsrv { + +struct rewrite_config; +using optional_rewrite_config = std::optional; + +} // namespace binsrv + +#endif // BINSRV_REWRITE_CONFIG_FWD_HPP diff --git a/src/binsrv/storage.cpp b/src/binsrv/storage.cpp index c82255a..b1cf4fc 100644 --- a/src/binsrv/storage.cpp +++ b/src/binsrv/storage.cpp @@ -29,6 +29,7 @@ #include "binsrv/basic_storage_backend.hpp" #include "binsrv/binlog_file_metadata.hpp" +#include "binsrv/composite_binlog_name.hpp" #include "binsrv/ctime_timestamp.hpp" #include "binsrv/replication_mode_type.hpp" #include "binsrv/storage_backend_factory.hpp" @@ -127,33 +128,17 @@ storage::~storage() { return replication_mode_ == replication_mode_type::gtid; } -[[nodiscard]] bool -storage::check_binlog_name(std::string_view binlog_name) noexcept { - // TODO: parse binlog name into "base name" and "rotation number" - // e.g. "binlog.000001" -> ("binlog", 1) - - // currently checking only that the name does not include a filesystem - // separator - return binlog_name.find(std::filesystem::path::preferred_separator) == - std::string_view::npos; -} - [[nodiscard]] bool storage::is_binlog_open() const noexcept { return backend_->is_stream_open(); } [[nodiscard]] open_binlog_status -storage::open_binlog(std::string_view binlog_name) { +storage::open_binlog(const composite_binlog_name &binlog_name) { ensure_streaming_mode(); auto result{open_binlog_status::opened_with_data_present}; - if (!check_binlog_name(binlog_name)) { - util::exception_location().raise( - "cannot create a binlog with invalid name"); - } - - // here we either create a new binlog file if its name is not presen in the + // here we either create a new binlog file if its name is not presentin the // "binlog_records_", or we open an existing one and append to it, in which // case we need to make sure that the current position is properly set const bool binlog_exists{ @@ -176,7 +161,7 @@ storage::open_binlog(std::string_view binlog_name) { const auto mode{binlog_exists ? storage_backend_open_stream_mode::append : storage_backend_open_stream_mode::create}; - const auto open_stream_offset{backend_->open_stream(binlog_name, mode)}; + const auto open_stream_offset{backend_->open_stream(binlog_name.str(), mode)}; if (!binlog_exists) { // writing the magic binlog footprint only if this is a newly @@ -190,10 +175,10 @@ storage::open_binlog(std::string_view binlog_name) { added_binlog_gtids = gtids::gtid_set{}; } - binlog_records_.emplace_back( - std::string{binlog_name}, events::magic_binlog_offset, - std::move(previous_binlog_gtids), std::move(added_binlog_gtids), - ctime_timestamp_range{}); + binlog_records_.emplace_back(binlog_name, events::magic_binlog_offset, + std::move(previous_binlog_gtids), + std::move(added_binlog_gtids), + ctime_timestamp_range{}); save_binlog_metadata(get_current_binlog_record()); save_binlog_index(); result = open_binlog_status::created; @@ -204,7 +189,7 @@ storage::open_binlog(std::string_view binlog_name) { get_current_binlog_record().size = events::magic_binlog_offset; result = open_binlog_status::opened_empty; } else if (open_stream_offset == events::magic_binlog_offset) { - result = open_binlog_status::opened_at_magic_paylod_offset; + result = open_binlog_status::opened_at_magic_payload_offset; } else { // position is beyond magic payload offset assert(open_stream_offset > events::magic_binlog_offset); @@ -308,8 +293,8 @@ void storage::flush_event_buffer() { } [[nodiscard]] std::string -storage::get_binlog_uri(std::string_view binlog_name) const { - return backend_->get_object_uri(binlog_name); +storage::get_binlog_uri(const composite_binlog_name &binlog_name) const { + return backend_->get_object_uri(binlog_name.str()); } void storage::ensure_streaming_mode() const { @@ -385,12 +370,10 @@ void storage::load_binlog_index() { util::exception_location().raise( "binlog index contains a reference to the binlog index name"); } - if (!check_binlog_name(current_binlog_name)) { - util::exception_location().raise( - "binlog index contains a reference to a binlog with invalid " - "name"); - } - if (std::ranges::find(std::as_const(binlog_records_), current_binlog_name, + const auto current_binlog_name_parsed{ + composite_binlog_name::parse(current_binlog_name)}; + if (std::ranges::find(std::as_const(binlog_records_), + current_binlog_name_parsed, &binlog_record::name) != std::cend(binlog_records_)) { util::exception_location().raise( "binlog index contains a duplicate entry"); @@ -402,7 +385,7 @@ void storage::load_binlog_index() { added_binlog_gtids = gtids::gtid_set{}; } binlog_records_.emplace_back( - current_binlog_name, 0ULL, std::move(previous_binlog_gtids), + current_binlog_name_parsed, 0ULL, std::move(previous_binlog_gtids), std::move(added_binlog_gtids), ctime_timestamp_range{}); } } @@ -410,7 +393,7 @@ void storage::load_binlog_index() { void storage::validate_binlog_index( const storage_object_name_container &object_names) const { for (auto const &record : binlog_records_) { - if (!object_names.contains(record.name)) { + if (!object_names.contains(record.name.str())) { util::exception_location().raise( "binlog index contains a reference to a non-existing object"); } @@ -430,7 +413,7 @@ void storage::save_binlog_index() const { std::ostringstream oss; for (const auto &record : binlog_records_) { std::filesystem::path binlog_path{default_binlog_index_entry_path}; - binlog_path /= record.name; + binlog_path /= record.name.str(); oss << binlog_path.generic_string() << '\n'; } const auto content{oss.str()}; @@ -459,20 +442,20 @@ void storage::save_metadata() const { backend_->put_object(metadata_name, util::as_const_byte_span(content)); } -[[nodiscard]] std::string -storage::generate_binlog_metadata_name(std::string_view binlog_name) { - std::string binlog_metadata_name{binlog_name}; +[[nodiscard]] std::string storage::generate_binlog_metadata_name( + const composite_binlog_name &binlog_name) { + std::string binlog_metadata_name{binlog_name.str()}; binlog_metadata_name += storage::binlog_metadata_extension; return binlog_metadata_name; } [[nodiscard]] storage::binlog_record -storage::load_binlog_metadata(std::string_view binlog_name) const { +storage::load_binlog_metadata(const composite_binlog_name &binlog_name) const { const auto content{ backend_->get_object(generate_binlog_metadata_name(binlog_name))}; binlog_file_metadata metadata{content}; - return binlog_record{.name = std::string(binlog_name), + return binlog_record{.name = binlog_name, .size = metadata.root().get<"size">(), .previous_gtids = metadata.root().get<"previous_gtids">(), @@ -536,7 +519,7 @@ void storage::load_and_validate_binlog_metadata_set( auto loaded_binlog_metadata{load_binlog_metadata(record.name)}; validate_binlog_metadata(loaded_binlog_metadata); // validating that the size stored in the metadata matches the actual size - if (loaded_binlog_metadata.size != object_names.at(record.name)) { + if (loaded_binlog_metadata.size != object_names.at(record.name.str())) { util::exception_location().raise( "size from the binlog metadata does not match the actual binlog " "size"); diff --git a/src/binsrv/storage.hpp b/src/binsrv/storage.hpp index 6419976..62be85f 100644 --- a/src/binsrv/storage.hpp +++ b/src/binsrv/storage.hpp @@ -24,6 +24,7 @@ #include #include "binsrv/basic_storage_backend_fwd.hpp" +#include "binsrv/composite_binlog_name.hpp" #include "binsrv/ctime_timestamp_fwd.hpp" #include "binsrv/ctime_timestamp_range.hpp" #include "binsrv/replication_mode_type_fwd.hpp" @@ -39,7 +40,7 @@ namespace binsrv { class [[nodiscard]] storage { private: struct binlog_record { - std::string name; + composite_binlog_name name; std::uint64_t size{0ULL}; gtids::optional_gtid_set previous_gtids{}; gtids::optional_gtid_set added_gtids{}; @@ -65,7 +66,8 @@ class [[nodiscard]] storage { storage(storage &&) = delete; storage &operator=(storage &&) = delete; - // desctuctor is explicitly defined as default here to complete the rule of 5 + // destructor is explicitly declared here and defined as default in .cpp + // file to complete the rule of 5 ~storage(); [[nodiscard]] std::string get_backend_description() const; @@ -82,8 +84,10 @@ class [[nodiscard]] storage { [[nodiscard]] bool is_empty() const noexcept { return binlog_records_.empty(); } - [[nodiscard]] std::string_view get_current_binlog_name() const noexcept { - return is_empty() ? std::string_view{} : get_current_binlog_record().name; + [[nodiscard]] const composite_binlog_name & + get_current_binlog_name() const noexcept { + return is_empty() ? binlog_name_sentinel_ + : get_current_binlog_record().name; } [[nodiscard]] std::uint64_t get_current_position() const noexcept { return get_flushed_position() + std::size(event_buffer_); @@ -107,12 +111,10 @@ class [[nodiscard]] storage { return result; } - [[nodiscard]] static bool - check_binlog_name(std::string_view binlog_name) noexcept; - [[nodiscard]] bool is_binlog_open() const noexcept; - [[nodiscard]] open_binlog_status open_binlog(std::string_view binlog_name); + [[nodiscard]] open_binlog_status + open_binlog(const composite_binlog_name &binlog_name); void write_event(util::const_byte_span event_data, bool at_transaction_boundary, const gtids::gtid &transaction_gtid, @@ -122,13 +124,15 @@ class [[nodiscard]] storage { void discard_incomplete_transaction_events(); void flush_event_buffer(); - [[nodiscard]] std::string get_binlog_uri(std::string_view binlog_name) const; + [[nodiscard]] std::string + get_binlog_uri(const composite_binlog_name &binlog_name) const; private: storage_construction_mode_type construction_mode_; basic_storage_backend_ptr backend_; replication_mode_type replication_mode_; + composite_binlog_name binlog_name_sentinel_{}; binlog_record_container binlog_records_{}; std::uint64_t checkpoint_size_bytes_{0ULL}; @@ -186,9 +190,9 @@ class [[nodiscard]] storage { void save_metadata() const; [[nodiscard]] static std::string - generate_binlog_metadata_name(std::string_view binlog_name); + generate_binlog_metadata_name(const composite_binlog_name &binlog_name); [[nodiscard]] binlog_record - load_binlog_metadata(std::string_view binlog_name) const; + load_binlog_metadata(const composite_binlog_name &binlog_name) const; void validate_binlog_metadata(const binlog_record &record) const; void save_binlog_metadata(const binlog_record &record) const; diff --git a/src/binsrv/storage_fwd.hpp b/src/binsrv/storage_fwd.hpp index 6adcb2e..f1a243f 100644 --- a/src/binsrv/storage_fwd.hpp +++ b/src/binsrv/storage_fwd.hpp @@ -28,7 +28,7 @@ enum class storage_construction_mode_type : std::uint8_t { enum class open_binlog_status : std::uint8_t { created, opened_empty, - opened_at_magic_paylod_offset, + opened_at_magic_payload_offset, opened_with_data_present }; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0f3fbe9..23d92ac 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -50,6 +50,9 @@ set(gtid_set_source_files set(event_set_source_files ${PROJECT_SOURCE_DIR}/src/binsrv/ctime_timestamp.cpp + ${PROJECT_SOURCE_DIR}/src/binsrv/composite_binlog_name_fwd.hpp + ${PROJECT_SOURCE_DIR}/src/binsrv/composite_binlog_name.hpp + ${PROJECT_SOURCE_DIR}/src/binsrv/composite_binlog_name.cpp ${PROJECT_SOURCE_DIR}/src/binsrv/events/anonymous_gtid_log_body_impl_fwd.hpp ${PROJECT_SOURCE_DIR}/src/binsrv/events/anonymous_gtid_log_body_impl.hpp diff --git a/tests/event_test.cpp b/tests/event_test.cpp index 71582dd..0f19a4b 100644 --- a/tests/event_test.cpp +++ b/tests/event_test.cpp @@ -25,6 +25,7 @@ #include +#include "binsrv/composite_binlog_name.hpp" #include "binsrv/ctime_timestamp.hpp" #include "binsrv/replication_mode_type.hpp" @@ -35,6 +36,7 @@ #include "binsrv/events/common_header.hpp" #include "binsrv/events/common_header_flag_type.hpp" #include "binsrv/events/event.hpp" +#include "binsrv/events/event_view.hpp" #include "binsrv/events/protocol_traits_fwd.hpp" #include "binsrv/events/reader_context.hpp" @@ -59,8 +61,9 @@ BOOST_AUTO_TEST_CASE(EventRoundTrip) { const binsrv::events::generic_post_header rotate_post_header{binsrv::events::magic_binlog_offset}; + const binsrv::composite_binlog_name binlog_name{"binlog", 1U}; const binsrv::events::generic_body - rotate_body{"binlog.000001"}; + rotate_body{binlog_name}; const auto generated_rotate_event{ binsrv::events::event::create_event( @@ -69,8 +72,13 @@ BOOST_AUTO_TEST_CASE(EventRoundTrip) { binsrv::ctime_timestamp{}, default_server_id, flags, rotate_post_header, rotate_body, true, event_buffer)}; - const binsrv::events::event parsed_rotate_event{ + const binsrv::events::event_view generated_rotate_event_v{ context, util::const_byte_span{event_buffer}}; + const binsrv::events::event parsed_rotate_event{context, + generated_rotate_event_v}; + bool info_only{false}; + info_only = context.process_event_view(generated_rotate_event_v); + BOOST_CHECK(info_only); BOOST_CHECK_EQUAL(generated_rotate_event, parsed_rotate_event); @@ -96,8 +104,12 @@ BOOST_AUTO_TEST_CASE(EventRoundTrip) { format_description_post_header, format_description_body, true, event_buffer)}; - const binsrv::events::event parsed_format_description_event{ + const binsrv::events::event_view generated_format_description_event_v{ context, util::const_byte_span{event_buffer}}; + const binsrv::events::event parsed_format_description_event{ + context, generated_format_description_event_v}; + info_only = context.process_event_view(generated_format_description_event_v); + BOOST_CHECK(!info_only); BOOST_CHECK_EQUAL(generated_format_description_event, parsed_format_description_event); @@ -120,9 +132,136 @@ BOOST_AUTO_TEST_CASE(EventRoundTrip) { previous_gtids_log_post_header, previous_gtids_log_body, true, event_buffer)}; - const binsrv::events::event parsed_previous_gtids_log_event{ + const binsrv::events::event_view generated_previous_gtids_log_event_v{ context, util::const_byte_span{event_buffer}}; + const binsrv::events::event parsed_previous_gtids_log_event{ + context, generated_previous_gtids_log_event_v}; + info_only = context.process_event_view(generated_previous_gtids_log_event_v); + BOOST_CHECK(!info_only); BOOST_CHECK_EQUAL(generated_previous_gtids_log_event, parsed_previous_gtids_log_event); } + +BOOST_AUTO_TEST_CASE(EventMaterialization) { + const util::semantic_version server_version{"8.4.8"}; + std::uint32_t offset{0U}; + const binsrv::events::reader_context context_with_checksum{ + server_version.get_encoded(), true, binsrv::replication_mode_type::gtid, + "", offset}; + const binsrv::events::reader_context context_wo_checksum{ + server_version.get_encoded(), false, binsrv::replication_mode_type::gtid, + "", offset}; + + binsrv::events::event_storage event_with_footer_buffer; + + // artificial ROTATE event + offset = 0U; + const binsrv::events::common_header_flag_set flags{ + binsrv::events::common_header_flag_type::artificial}; + + const binsrv::events::generic_post_header + post_header{binsrv::events::magic_binlog_offset}; + const binsrv::composite_binlog_name binlog_name{"binlog", 1U}; + const binsrv::events::generic_body body{ + binlog_name}; + + const auto generated_event{ + binsrv::events::event::create_event( + offset, + // artificial ROTATE event must include zero timestamp + binsrv::ctime_timestamp{}, default_server_id, flags, post_header, + body, true, event_with_footer_buffer)}; + + const binsrv::events::event_view generated_event_v{ + context_with_checksum, util::const_byte_span{event_with_footer_buffer}}; + + BOOST_CHECK(generated_event_v.has_footer()); + BOOST_CHECK(generated_event_v.get_footer_view().get_crc_raw() == + generated_event_v.calculate_crc()); + BOOST_CHECK(generated_event_v.get_common_header_view().get_event_size_raw() == + generated_event_v.get_total_size()); + BOOST_CHECK(generated_event.calculate_encoded_size() == + generated_event_v.get_total_size()); + + binsrv::events::event_storage materialization_buffer; + + // checking materialization of an event that has checksum + const binsrv::events::event_view checksum_as_is_generated_v{ + binsrv::events::materialize( + generated_event_v, materialization_buffer, + binsrv::events::materialization_type::leave_checksum_as_is)}; + BOOST_CHECK(checksum_as_is_generated_v.has_footer()); + BOOST_CHECK(checksum_as_is_generated_v.get_footer_view().get_crc_raw() == + checksum_as_is_generated_v.calculate_crc()); + BOOST_CHECK(checksum_as_is_generated_v.get_common_header_view() + .get_event_size_raw() == + checksum_as_is_generated_v.get_total_size()); + const binsrv::events::event_view checked_checksum_as_is_generated_v{ + context_with_checksum, util::const_byte_span{materialization_buffer}}; + + const binsrv::events::event_view force_add_checksum_generated_v{ + binsrv::events::materialize( + generated_event_v, materialization_buffer, + binsrv::events::materialization_type::force_add_checksum)}; + BOOST_CHECK(force_add_checksum_generated_v.has_footer()); + BOOST_CHECK(force_add_checksum_generated_v.get_footer_view().get_crc_raw() == + force_add_checksum_generated_v.calculate_crc()); + BOOST_CHECK(force_add_checksum_generated_v.get_common_header_view() + .get_event_size_raw() == + force_add_checksum_generated_v.get_total_size()); + const binsrv::events::event_view checked_force_add_checksum_generated_v{ + context_with_checksum, util::const_byte_span{materialization_buffer}}; + + const binsrv::events::event_view force_remove_checksum_generated_v{ + binsrv::events::materialize( + generated_event_v, materialization_buffer, + binsrv::events::materialization_type::force_remove_checksum)}; + BOOST_CHECK(!force_remove_checksum_generated_v.has_footer()); + BOOST_CHECK(force_remove_checksum_generated_v.get_common_header_view() + .get_event_size_raw() == + force_remove_checksum_generated_v.get_total_size()); + const binsrv::events::event_view checked_force_remove_checksum_generated_v{ + context_wo_checksum, util::const_byte_span{materialization_buffer}}; + + // checking materialization of an event that does not have checksum + const binsrv::events::event_storage event_wo_footer_buffer{ + materialization_buffer}; + const binsrv::events::event_view copied_event_v{ + context_wo_checksum, util::const_byte_span{event_wo_footer_buffer}}; + + const binsrv::events::event_view checksum_as_is_copied_v{ + binsrv::events::materialize( + copied_event_v, materialization_buffer, + binsrv::events::materialization_type::leave_checksum_as_is)}; + BOOST_CHECK(!checksum_as_is_copied_v.has_footer()); + BOOST_CHECK( + checksum_as_is_copied_v.get_common_header_view().get_event_size_raw() == + checksum_as_is_copied_v.get_total_size()); + const binsrv::events::event_view checked_checksum_as_is_copied_v{ + context_wo_checksum, util::const_byte_span{materialization_buffer}}; + + const binsrv::events::event_view force_add_checksum_copied_v{ + binsrv::events::materialize( + copied_event_v, materialization_buffer, + binsrv::events::materialization_type::force_add_checksum)}; + BOOST_CHECK(force_add_checksum_copied_v.has_footer()); + BOOST_CHECK(force_add_checksum_copied_v.get_footer_view().get_crc_raw() == + force_add_checksum_copied_v.calculate_crc()); + BOOST_CHECK(force_add_checksum_copied_v.get_common_header_view() + .get_event_size_raw() == + force_add_checksum_copied_v.get_total_size()); + const binsrv::events::event_view checked_force_add_checksum_copied_v{ + context_with_checksum, util::const_byte_span{materialization_buffer}}; + + const binsrv::events::event_view force_remove_checksum_copied_v{ + binsrv::events::materialize( + copied_event_v, materialization_buffer, + binsrv::events::materialization_type::force_remove_checksum)}; + BOOST_CHECK(!force_remove_checksum_copied_v.has_footer()); + BOOST_CHECK(force_remove_checksum_copied_v.get_common_header_view() + .get_event_size_raw() == + force_remove_checksum_copied_v.get_total_size()); + const binsrv::events::event_view checked_force_remove_checksum_copied_v{ + context_wo_checksum, util::const_byte_span{materialization_buffer}}; +}