forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathMergeTreeData.h
More file actions
1963 lines (1544 loc) · 91.4 KB
/
MergeTreeData.h
File metadata and controls
1963 lines (1544 loc) · 91.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#pragma once
#include <mutex>
#include <base/defines.h>
#include <Common/SimpleIncrement.h>
#include <Common/SharedMutex.h>
#include <Common/MultiVersion.h>
#include <Common/Logger.h>
#include <Storages/IStorage.h>
#include <Interpreters/ExpressionActionsSettings.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/StoragePolicy.h>
#include <Processors/Merges/Algorithms/Graphite.h>
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/ExportList.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Storages/MergeTree/TemporaryParts.h>
#include <Storages/MergeTree/AlterConversions.h>
#include <Storages/MergeTree/RangesInDataPart.h>
#include <Storages/IndicesDescription.h>
#include <Storages/DataDestinationType.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/EphemeralLockInZooKeeper.h>
#include <Storages/MarkCache.h>
#include <Interpreters/PartLog.h>
#include <Poco/Timestamp.h>
#include <Common/threadPoolCallbackRunner.h>
#include <Storages/MergeTree/PatchParts/PatchPartsUtils.h>
#include <Storages/MergeTree/MergeTreePartExportStatus.h>
#include <Storages/MergeTree/MergeTreePartExportManifest.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/global_fun.hpp>
#include <boost/range/iterator_range_core.hpp>
namespace DB
{
/// Number of streams is not number parts, but number or parts*files, hence 100.
const size_t DEFAULT_DELAYED_STREAMS_FOR_PARALLEL_WRITE = 100;
struct AlterCommand;
class AlterCommands;
class InterpreterSelectQuery;
class MergeTreePartsMover;
class MergeTreeDataMergerMutator;
class MutationCommands;
class Context;
struct JobAndPool;
class MergeTreeTransaction;
struct ZeroCopyLock;
class IBackupEntry;
using BackupEntries = std::vector<std::pair<String, std::shared_ptr<const IBackupEntry>>>;
class MergeTreeTransaction;
using MergeTreeTransactionPtr = std::shared_ptr<MergeTreeTransaction>;
struct MergeTreeSettings;
struct WriteSettings;
/// Auxiliary struct holding information about the future merged or mutated part.
struct EmergingPartInfo
{
String disk_name;
String partition_id;
size_t estimate_bytes;
};
struct CurrentlySubmergingEmergingTagger;
struct SelectQueryOptions;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
class MergeTreeDeduplicationLog;
using PartitionIdToMaxBlock = std::unordered_map<String, Int64>;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
struct DataPartsLock
{
std::optional<Stopwatch> wait_watch;
std::unique_lock<std::mutex> lock;
std::optional<Stopwatch> lock_watch;
DataPartsLock() = default;
explicit DataPartsLock(std::mutex & data_parts_mutex_);
~DataPartsLock();
};
/// Data structure for *MergeTree engines.
/// Merge tree is used for incremental sorting of data.
/// The table consists of several sorted parts.
/// During insertion new data is sorted according to the primary key and is written to the new part.
/// Parts are merged in the background according to a heuristic algorithm.
/// For each part the index file is created containing primary key values for every n-th row.
/// This allows efficient selection by primary key range predicate.
///
/// Additionally:
///
/// The date column is specified. For each part min and max dates are remembered.
/// Essentially it is an index too.
///
/// Data is partitioned by the value of the partitioning expression.
/// Parts belonging to different partitions are not merged - for the ease of administration (data sync and backup).
///
/// File structure of old-style month-partitioned tables (format_version = 0):
/// Part directory - / min-date _ max-date _ min-id _ max-id _ level /
/// Inside the part directory:
/// checksums.txt - contains the list of all files along with their sizes and checksums.
/// columns.txt - contains the list of all columns and their types.
/// primary.idx - contains the primary index.
/// [Column].bin - contains compressed column data.
/// [Column].mrk - marks, pointing to seek positions allowing to skip n * k rows.
///
/// File structure of tables with custom partitioning (format_version >= 1):
/// Part directory - / partition-id _ min-id _ max-id _ level /
/// Inside the part directory:
/// The same files as for month-partitioned tables, plus
/// count.txt - contains total number of rows in this part.
/// partition.dat - contains the value of the partitioning expression.
/// minmax_[Column].idx - MinMax indexes (see IMergeTreeDataPart::MinMaxIndex class) for the columns required by the partitioning expression.
///
/// Several modes are implemented. Modes determine additional actions during merge:
/// - Ordinary - don't do anything special
/// - Collapsing - collapse pairs of rows with the opposite values of sign_columns for the same values
/// of primary key (cf. CollapsingSortedTransform.h)
/// - Replacing - for all rows with the same primary key keep only the latest one. Or, if the version
/// column is set, keep the latest row with the maximal version.
/// - Summing - sum all numeric columns not contained in the primary key for all rows with the same primary key.
/// - Aggregating - merge columns containing aggregate function states for all rows with the same primary key.
/// - Graphite - performs coarsening of historical data for Graphite (a system for quantitative monitoring).
/// The MergeTreeData class contains a list of parts and the data structure parameters.
/// To read and modify the data use other classes:
/// - MergeTreeDataSelectExecutor
/// - MergeTreeDataWriter
/// - MergeTreeDataMergerMutator
class MergeTreeData : public IStorage, public WithMutableContext
{
public:
/// Function to call if the part is suspected to contain corrupt data.
using BrokenPartCallback = std::function<void (const String &)>;
using DataPart = IMergeTreeDataPart;
using MutableDataPartPtr = std::shared_ptr<DataPart>;
using MutableDataPartsVector = std::vector<MutableDataPartPtr>;
/// After the DataPart is added to the working set, it cannot be changed.
using DataPartPtr = std::shared_ptr<const DataPart>;
using DataPartKind = MergeTreePartInfo::Kind;
using DataPartsKinds = std::initializer_list<DataPartKind>;
using DataPartState = MergeTreeDataPartState;
using DataPartStates = std::initializer_list<DataPartState>;
using DataPartStateVector = std::vector<DataPartState>;
using PinnedPartUUIDsPtr = std::shared_ptr<const PinnedPartUUIDs>;
using PartitionIdToMinBlock = std::unordered_map<String, Int64>;
using PartitionIdToMinBlockPtr = std::shared_ptr<const PartitionIdToMinBlock>;
constexpr static auto FORMAT_VERSION_FILE_NAME = "format_version.txt";
constexpr static auto DETACHED_DIR_NAME = "detached";
constexpr static auto MOVING_DIR_NAME = "moving";
/// Auxiliary structure for index comparison. Keep in mind lifetime of MergeTreePartInfo.
struct DataPartStateAndInfo
{
DataPartState state;
const MergeTreePartInfo & info;
};
struct DataPartStateAndKind
{
DataPartState state;
DataPartKind kind;
};
/// Auxiliary structure for index comparison
struct DataPartStateAndPartitionID
{
DataPartStateAndPartitionID(DataPartState state_, const String & partition_id_)
: state(state_), kind(MergeTreePartInfo::getKind(partition_id_)), partition_id(partition_id_)
{
}
DataPartState state;
MergeTreePartInfo::Kind kind;
String partition_id;
};
struct PartitionID
{
explicit PartitionID(const String & partition_id_)
: kind(MergeTreePartInfo::getKind(partition_id_)), partition_id(partition_id_)
{
}
MergeTreePartInfo::Kind kind;
String partition_id;
};
struct LessDataPart
{
using is_transparent = void;
bool operator()(const DataPartPtr & lhs, const MergeTreePartInfo & rhs) const { return lhs->info < rhs; }
bool operator()(const MergeTreePartInfo & lhs, const DataPartPtr & rhs) const { return lhs < rhs->info; }
bool operator()(const DataPartPtr & lhs, const DataPartPtr & rhs) const { return lhs->info < rhs->info; }
bool operator()(const MergeTreePartInfo & lhs, const PartitionID & rhs) const
{
return std::forward_as_tuple(lhs.getKind(), lhs.getPartitionId()) < std::forward_as_tuple(rhs.kind, rhs.partition_id);
}
bool operator()(const PartitionID & lhs, const MergeTreePartInfo & rhs) const
{
return std::forward_as_tuple(lhs.kind, lhs.partition_id) < std::forward_as_tuple(rhs.getKind(), rhs.getPartitionId());
}
};
struct LessStateDataPart
{
using is_transparent = void;
bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndInfo & rhs) const
{
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info)
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info);
}
bool operator() (DataPartStateAndInfo info, const DataPartState & state) const
{
return static_cast<UInt8>(info.state) < static_cast<UInt8>(state);
}
bool operator() (const DataPartState & state, DataPartStateAndInfo info) const
{
return static_cast<UInt8>(state) < static_cast<UInt8>(info.state);
}
bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndPartitionID & rhs) const
{
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info.getKind(), lhs.info.getPartitionId())
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.kind, rhs.partition_id);
}
bool operator() (const DataPartStateAndPartitionID & lhs, const DataPartStateAndInfo & rhs) const
{
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.kind, lhs.partition_id)
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info.getKind(), rhs.info.getPartitionId());
}
bool operator() (const DataPartStateAndKind & lhs, const DataPartStateAndInfo & rhs) const
{
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.kind)
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.info.getKind());
}
bool operator() (const DataPartStateAndInfo & lhs, const DataPartStateAndKind & rhs) const
{
return std::forward_as_tuple(static_cast<UInt8>(lhs.state), lhs.info.getKind())
< std::forward_as_tuple(static_cast<UInt8>(rhs.state), rhs.kind);
}
};
using DataParts = std::set<DataPartPtr, LessDataPart>;
using MutableDataParts = std::set<MutableDataPartPtr, LessDataPart>;
using DataPartsVector = std::vector<DataPartPtr>;
DataPartsLock lockParts() const { return DataPartsLock(data_parts_mutex); }
using OperationDataPartsLock = std::unique_lock<std::mutex>;
OperationDataPartsLock lockOperationsWithParts() const { return OperationDataPartsLock(operation_with_data_parts_mutex); }
MergeTreeDataPartFormat choosePartFormat(size_t bytes_uncompressed, size_t rows_count) const;
MergeTreeDataPartFormat choosePartFormatOnDisk(size_t bytes_uncompressed, size_t rows_count) const;
/// return pair <exists, pointer to part>. Sometimes we may fail to load existing part (network issues, oom and so on),
/// in this case pair of <true, nullptr> is returned.
std::pair<bool, MutableDataPartPtr> loadDataPartForRemovalIfExists(const String & name, bool ignore_no_such_key = false);
MergeTreeDataPartBuilder getDataPartBuilder(
const String & name,
const VolumePtr & volume,
const String & part_dir,
const ReadSettings & read_settings) const;
/// Auxiliary object to add a set of parts into the working set in two steps:
/// * First, as PreActive parts (the parts are ready, but not yet in the active set).
/// * Next, if commit() is called, the parts are added to the active set and the parts that are
/// covered by them are marked Outdated.
/// If neither commit() nor rollback() was called, the destructor rollbacks the operation.
class Transaction : private boost::noncopyable
{
public:
Transaction(MergeTreeData & data_, MergeTreeTransaction * txn_);
DataPartsVector commit(DataPartsLock * acquired_parts_lock = nullptr);
/// Rename should be done explicitly, before calling commit(), to
/// guarantee that no lock held during rename (since rename is IO
/// bound, while data parts lock is the bottleneck)
void renameParts();
void addPart(MutableDataPartPtr & part, bool need_rename);
void rollback(DataPartsLock * lock = nullptr);
/// Immediately remove parts from table's data_parts set and change part
/// state to temporary. Useful for new parts which not present in table.
void rollbackPartsToTemporaryState();
size_t size() const { return precommitted_parts.size(); }
bool isEmpty() const { return precommitted_parts.empty(); }
~Transaction()
{
try
{
rollback();
}
catch (...)
{
tryLogCurrentException("~MergeTreeData::Transaction");
}
}
void clear();
TransactionID getTID() const;
private:
friend class MergeTreeData;
MergeTreeData & data;
MergeTreeTransaction * txn;
MutableDataParts precommitted_parts;
MutableDataParts precommitted_parts_need_rename;
};
using TransactionUniquePtr = std::unique_ptr<Transaction>;
using PathWithDisk = std::pair<String, DiskPtr>;
struct PartsTemporaryRename : private boost::noncopyable
{
PartsTemporaryRename(
const MergeTreeData & storage_,
const String & source_dir_)
: storage(storage_)
, source_dir(source_dir_)
{
}
/// Adds part to rename. Both names are relative to relative_data_path.
void addPart(const String & old_name, const String & new_name, const DiskPtr & disk);
/// Renames part from old_name to new_name
void tryRenameAll();
void rollBackAll();
/// Renames all added parts from new_name to old_name if old name is not empty
~PartsTemporaryRename();
struct RenameInfo
{
String old_name;
String new_name;
/// Disk cannot be changed
DiskPtr disk;
};
const MergeTreeData & storage;
const String source_dir;
std::vector<RenameInfo> old_and_new_names;
bool renamed = false;
};
/// Parameters for various modes.
struct MergingParams
{
/// Merging mode. See above.
enum Mode
{
Ordinary = 0, /// Enum values are saved. Do not change them.
Collapsing = 1,
Summing = 2,
Aggregating = 3,
Replacing = 5,
Graphite = 6,
VersionedCollapsing = 7,
Coalescing = 8,
};
Mode mode;
/// For Collapsing and VersionedCollapsing mode.
String sign_column;
/// For Replacing mode. Can be empty for Replacing.
String is_deleted_column;
/// For Summing mode. If empty - columns_to_sum is determined automatically.
Names columns_to_sum;
/// For Replacing and VersionedCollapsing mode. Can be empty for Replacing.
String version_column;
/// For Graphite mode.
Graphite::Params graphite_params;
/// Check that needed columns are present and have correct types.
void check(const MergeTreeSettings & settings, const StorageInMemoryMetadata & metadata) const;
String getModeName() const;
};
/// Attach the table corresponding to the directory in full_path inside policy (must end with /), with the given columns.
/// Correctness of names and paths is not checked.
///
/// date_column_name - if not empty, the name of the Date column used for partitioning by month.
/// Otherwise, partition_by_ast is used for partitioning.
///
/// order_by_ast - a single expression or a tuple. It is used as a sorting key
/// (an ASTExpressionList used for sorting data in parts);
/// primary_key_ast - can be nullptr, an expression, or a tuple.
/// Used to determine an ASTExpressionList values of which are written in the primary.idx file
/// for one row in every `index_granularity` rows to speed up range queries.
/// Primary key must be a prefix of the sorting key;
/// If it is nullptr, then it will be determined from order_by_ast.
///
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData(const StorageID & table_id_,
const StorageInMemoryMetadata & metadata_,
ContextMutablePtr context_,
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool require_part_metadata_,
LoadingStrictnessLevel mode,
BrokenPartCallback broken_part_callback_ = [](const String &){});
/// Build a block of minmax and count values of a MergeTree table. These values are extracted
/// from minmax_indices, the first expression of primary key, and part rows.
///
/// has_filter - if query has no filter, bypass partition pruning completely
///
/// query_info - used to filter unneeded parts
///
/// parts - part set to filter
Block getMinMaxCountProjectionBlock(
const StorageMetadataPtr & metadata_snapshot,
const Names & required_columns,
const ActionsDAG * filter_dag,
const RangesInDataParts & parts,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
ContextPtr query_context) const;
QueryProcessingStage::Enum getQueryProcessingStage(
ContextPtr query_context,
QueryProcessingStage::Enum to_stage,
const StorageSnapshotPtr &,
SelectQueryInfo & info) const override;
ReservationPtr reserveSpace(UInt64 expected_size, VolumePtr & volume) const;
static ReservationPtr tryReserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage);
static ReservationPtr reserveSpace(UInt64 expected_size, const IDataPartStorage & data_part_storage);
StoragePolicyPtr getStoragePolicy() const override;
bool isMergeTree() const override { return true; }
bool supportsPrewhere() const override { return true; }
ConditionSelectivityEstimator getConditionSelectivityEstimatorByPredicate(const StorageSnapshotPtr &, const ActionsDAG *, ContextPtr) const override;
bool supportsFinal() const override;
bool supportsSubcolumns() const override { return true; }
bool supportsTTL() const override { return true; }
bool supportsDynamicSubcolumnsDeprecated() const override { return true; }
bool supportsDynamicSubcolumns() const override { return true; }
bool supportsSparseSerialization() const override { return true; }
bool supportsLightweightDelete() const override;
bool hasProjection() const override;
bool areAsynchronousInsertsEnabled() const override;
bool supportsTrivialCountOptimization(const StorageSnapshotPtr & storage_snapshot, ContextPtr query_context) const override;
/// A snapshot of pending mutations that weren't applied to some of the parts yet
/// and should be applied on the fly (i.e. when reading from the part).
/// Mutations not supported by AlterConversions (isSupported*Mutation) can be omitted.
struct IMutationsSnapshot
{
/// Contains info that doesn't depend on state of mutations.
struct Params
{
Int64 metadata_version = -1;
Int64 min_part_metadata_version = -1;
PartitionIdToMinBlockPtr min_part_data_versions = nullptr;
PartitionIdToMaxBlockPtr max_mutation_versions = nullptr;
bool need_data_mutations = false;
bool need_alter_mutations = false;
bool need_patch_parts = false;
};
static Int64 getMinPartDataVersionForPartition(const Params & params, const String & partition_id);
static Int64 getMaxMutationVersionForPartition(const Params & params, const String & partition_id);
static bool needIncludeMutationToSnapshot(const Params & params, const MutationCommands & commands);
virtual ~IMutationsSnapshot() = default;
virtual void addPatches(DataPartsVector patches_) = 0;
/// Returns mutation commands that are required to be applied to the `part`.
/// @return list of mutation commands in order: oldest to newest.
virtual MutationCommands getOnFlyMutationCommandsForPart(const DataPartPtr & part) const = 0;
virtual PatchParts getPatchesForPart(const DataPartPtr & part) const = 0;
virtual std::shared_ptr<IMutationsSnapshot> cloneEmpty() const = 0;
virtual NameSet getAllUpdatedColumns() const = 0;
virtual bool hasPatchParts() const = 0;
virtual bool hasDataMutations() const = 0;
virtual bool hasAlterMutations() const = 0;
virtual bool hasMetadataMutations() const = 0;
};
struct MutationsSnapshotBase : public IMutationsSnapshot
{
public:
Params params;
MutationCounters counters;
PatchesByPartition patches_by_partition;
MutationsSnapshotBase() = default;
MutationsSnapshotBase(Params params_, MutationCounters counters_, DataPartsVector patches_);
void addPatches(DataPartsVector patches_) override;
PatchParts getPatchesForPart(const DataPartPtr & part) const final;
bool hasPatchParts() const final { return !patches_by_partition.empty(); }
bool hasDataMutations() const final { return counters.num_data > 0; }
bool hasAlterMutations() const final { return counters.num_alter > 0; }
bool hasMetadataMutations() const final { return counters.num_metadata > 0; }
bool hasAnyMutations() const { return hasDataMutations() || hasAlterMutations() || hasMetadataMutations(); }
protected:
NameSet getColumnsUpdatedInPatches() const;
void addSupportedCommands(const MutationCommands & commands, UInt64 mutation_version, MutationCommands & result_commands) const;
};
using MutationsSnapshotPtr = std::shared_ptr<const IMutationsSnapshot>;
/// Snapshot for MergeTree contains the current set of data parts
/// and mutations required to be applied at the moment of the start of query.
struct SnapshotData : public StorageSnapshot::Data
{
RangesInDataParts parts;
MutationsSnapshotPtr mutations_snapshot;
};
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
/// The same as above but does not hold vector of data parts.
StorageSnapshotPtr getStorageSnapshotWithoutData(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
/// Load the set of data parts from disk. Call once - immediately after the object is created.
void loadDataParts(bool skip_sanity_checks, std::optional<std::unordered_set<std::string>> expected_parts);
/// Check the set of data parts on disk and load if needed, assuming the data on disk can change under the hood.
/// This method allows read-only replicas of tables on a shared storage.
void refreshDataParts(UInt64 interval_milliseconds);
/// Returns a pointer to primary index cache if it is enabled.
PrimaryIndexCachePtr getPrimaryIndexCache() const;
/// Returns a pointer to primary index cache if it is enabled and required to be prewarmed.
PrimaryIndexCachePtr getPrimaryIndexCacheToPrewarm(size_t part_uncompressed_bytes) const;
/// Returns a pointer to primary mark cache if it is required to be prewarmed.
MarkCachePtr getMarkCacheToPrewarm(size_t part_uncompressed_bytes) const;
/// Prewarm mark cache and primary index cache for the most recent data parts.
void prewarmCaches(ThreadPool & pool, MarkCachePtr mark_cache, PrimaryIndexCachePtr index_cache);
String getLogName() const { return log.loadName(); }
Int64 getMaxBlockNumber() const;
struct ProjectionPartsVector
{
DataPartsVector data_parts;
DataPartsVector projection_parts;
DataPartStateVector projection_parts_states;
DataPartsVector broken_projection_parts;
DataPartStateVector broken_projection_parts_states;
};
/// Returns a copy of the list so that the caller shouldn't worry about locks.
DataParts getDataParts(const DataPartStates & affordable_states, const DataPartsKinds & affordable_kinds) const;
/// Returns sorted list of the parts with specified states
/// out_states will contain snapshot of each part state
DataPartsVector getDataPartsVectorForInternalUsage(const DataPartStates & affordable_states, const DataPartsKinds & affordable_kinds, const DataPartsLock & lock, DataPartStateVector * out_states = nullptr) const;
DataPartsVector getDataPartsVectorForInternalUsage(const DataPartStates & affordable_states, const DataPartsKinds & affordable_kinds, DataPartStateVector * out_states = nullptr) const;
DataPartsVector getDataPartsVectorForInternalUsage(const DataPartStates & affordable_states, const DataPartsLock & lock, DataPartStateVector * out_states = nullptr) const;
DataPartsVector getDataPartsVectorForInternalUsage(const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
/// Returns parts in Active state.
DataParts getDataPartsForInternalUsage() const;
DataPartsVector getDataPartsVectorForInternalUsage() const;
/// Returns patch parts.
DataPartsVector getPatchPartsVectorForInternalUsage(const DataPartStates & affordable_states, const DataPartsLock & lock, DataPartStateVector * out_states = nullptr) const;
DataPartsVector getPatchPartsVectorForInternalUsage(const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
/// Returns patch parts in Active state
DataPartsVector getPatchPartsVectorForInternalUsage() const;
/// Returns patch parts in Active state that relate to partition_id.
DataPartsVector getPatchPartsVectorForPartition(const String & partition_id, const DataPartsLock & lock) const;
DataPartsVector getPatchPartsVectorForPartition(const String & partition_id) const;
/// Returns absolutely all parts (and snapshot of their states)
DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr) const;
DataPartsVector getDataPartsVectorInPartitionForInternalUsage(const DataPartState & state, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
DataPartsVector getDataPartsVectorInPartitionForInternalUsage(const DataPartStates & affordable_states, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
/// Returns the number of data mutations suitable for applying on the fly.
virtual MutationCounters getMutationCounters() const = 0;
/// Same as above but only returns projection parts
ProjectionPartsVector getAllProjectionPartsVector(MergeTreeData::DataPartStateVector * out_states = nullptr) const;
/// Same as above but only returns projection parts
ProjectionPartsVector getProjectionPartsVectorForInternalUsage(
const DataPartStates & affordable_states,
MergeTreeData::DataPartStateVector * out_states) const;
void filterVisibleDataParts(DataPartsVector & maybe_visible_parts, CSN snapshot_version, TransactionID current_tid) const;
/// Returns parts that visible with current snapshot
DataPartsVector getVisibleDataPartsVector(ContextPtr local_context) const;
DataPartsVector getVisibleDataPartsVectorUnlocked(ContextPtr local_context, const DataPartsLock & lock) const;
DataPartsVector getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const;
DataPartsVector getVisibleDataPartsVector(CSN snapshot_version, TransactionID current_tid) const;
/// Returns all parts in specified partition
DataPartsVector getVisibleDataPartsVectorInPartition(MergeTreeTransaction * txn, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id, DataPartsLock & lock) const;
DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const;
DataPartsVector getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set<String> & partition_ids) const;
/// Return the number of marks in all parts
size_t getTotalMarksCount() const;
/// Adjust the set of data parts that should be used for SELECT queries.
/// Skips very new parts if they're not in cache yet (ignore_cold_parts_seconds), and replaces
/// recently merged parts with the pre-merge source parts if the merged part is not in cache yet
/// (prefer_warmed_unmerged_parts_seconds). This improves cache hit rate and latency when cache
/// warmer is enabled.
void adjustDataPartsVectorBasedOnCacheWarmness(DataPartsVector & parts, const ContextPtr & local_context, const DataPartsLock & lock) const;
/// Returns a part in Active state with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name) const;
DataPartPtr getActiveContainingPart(const String & part_name, DataPartsLock & lock) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const;
/// Swap part with it's identical copy (possible with another path on another disk).
/// If original part is not active or doesn't exist exception will be thrown.
void swapActivePart(MergeTreeData::DataPartPtr part_copy, DataPartsLock &);
/// Returns the part with the given name and state or nullptr if no such part.
DataPartPtr getPartIfExistsUnlocked(const String & part_name, const DataPartStates & valid_states, DataPartsLock & acquired_lock) const;
DataPartPtr getPartIfExistsUnlocked(const MergeTreePartInfo & part_info, const DataPartStates & valid_states, DataPartsLock & acquired_lock) const;
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states) const;
DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states) const;
/// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const;
size_t getTotalActiveSizeInRows() const;
size_t getTotalUncompressedBytesInPatches() const;
size_t getAllPartsCount() const;
size_t getActivePartsCount() const;
size_t getOutdatedPartsCount() const;
size_t getNumberOfOutdatedPartsWithExpiredRemovalTime() const;
/// Returns a pair with: max number of parts in partition across partitions; sum size of parts inside that partition.
/// (if there are multiple partitions with max number of parts, the sum size of parts is returned for arbitrary of them)
std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartitionWithState(DataPartState state) const;
virtual std::pair<size_t, size_t> getMaxPartsCountAndSizeForPartition() const;
virtual size_t getMaxOutdatedPartsCountForPartition() const;
/// Get min value of part->info.getDataVersion() for all active parts.
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
std::optional<Int64> getMinPartDataVersion() const;
/// Returns all detached parts
DetachedPartsInfo getDetachedParts() const;
static void validateDetachedPartName(const String & name);
void dropDetached(const ASTPtr & partition, bool part, ContextPtr context);
MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
ContextPtr context, PartsTemporaryRename & renamed_parts);
bool assertNoPatchesForParts(const DataPartsVector & parts, const DataPartsVector & patches, std::string_view command, bool throw_on_error = true) const;
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
/// The decision to delay or throw is made according to settings 'parts_to_delay_insert' and 'parts_to_throw_insert'.
void delayInsertOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context, bool allow_throw) const;
/// If the table contains too many unfinished mutations, sleep for a while to give them time to execute.
/// If until is non-null, wake up from the sleep earlier if the event happened.
/// The decision to delay or throw is made according to settings 'number_of_mutations_to_delay' and 'number_of_mutations_to_throw'.
void delayMutationOrThrowIfNeeded(Poco::Event * until, const ContextPtr & query_context) const;
/// If the table contains too many uncompressed bytes in patches, throw an exception.
void throwLightweightUpdateIfNeeded(UInt64 added_uncompressed_bytes) const;
/// Renames temporary part to a permanent part and adds it to the parts set.
/// It is assumed that the part does not intersect with existing parts.
/// Adds the part in the PreActive state (the part will be added to the active set later with out_transaction->commit()).
/// Returns true if part was added. Returns false if part is covered by bigger part.
bool renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & transaction,
DataPartsLock & lock,
bool rename_in_transaction);
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part,
Transaction & out_transaction,
bool rename_in_transaction);
/// Unlocked version of previous one. Useful when added multiple parts with a single lock.
bool renameTempPartAndReplaceUnlocked(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock,
bool rename_in_transaction);
/// Remove parts from working set immediately (without wait for background
/// process). Transfer part state to temporary. Have very limited usage only
/// for new parts which aren't already present in table.
void removePartsFromWorkingSetImmediatelyAndSetTemporaryState(const DataPartsVector & remove, DataPartsLock * acquired_lock = nullptr);
/// Removes parts from the working set parts.
/// Parts in add must already be in data_parts with PreActive, Active, or Outdated states.
/// If clear_without_timeout is true, the parts will be deleted at once, or during the next call to
/// clearOldParts (ignoring old_parts_lifetime).
void removePartsFromWorkingSet(MergeTreeTransaction * txn, const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock * acquired_lock = nullptr);
void removePartsFromWorkingSet(MergeTreeTransaction * txn, const DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock);
/// Removes all parts covered by drop_range from the working set parts.
/// Used in REPLACE PARTITION command.
void removePartsInRangeFromWorkingSet(MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
DataPartsVector grabActivePartsToRemoveForDropRange(
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock);
/// This wrapper is required to restrict access to parts in Deleting state
class PartToRemoveFromZooKeeper
{
DataPartPtr part;
bool was_active;
public:
explicit PartToRemoveFromZooKeeper(DataPartPtr && part_, bool was_active_ = true)
: part(std::move(part_)), was_active(was_active_)
{
}
/// It's safe to get name of any part
const String & getPartName() const { return part->name; }
DataPartPtr getPartIfItWasActive() const
{
return was_active ? part : nullptr;
}
};
using PartsToRemoveFromZooKeeper = std::vector<PartToRemoveFromZooKeeper>;
/// Same as above, but also returns list of parts to remove from ZooKeeper.
/// It includes parts that have been just removed by these method
/// and Outdated parts covered by drop_range that were removed earlier for any reason.
PartsToRemoveFromZooKeeper removePartsInRangeFromWorkingSetAndGetPartsToRemoveFromZooKeeper(
MergeTreeTransaction * txn, const MergeTreePartInfo & drop_range, DataPartsLock & lock, bool create_empty_part = true);
/// Restores Outdated part and adds it to working set
void restoreAndActivatePart(const DataPartPtr & part, DataPartsLock * acquired_lock = nullptr);
/// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
//// so it will not be deleted in clearOldParts.
/// NOTE: This method is safe to use only for parts which nobody else holds (like on server start or for parts which was not committed).
/// For active parts it's unsafe because this method modifies fields of part (rename) while some other thread can try to read it.
void forcefullyMovePartToDetachedAndRemoveFromMemory(const DataPartPtr & part, const String & prefix = "");
/// This method should not be here, but async loading of Outdated parts is implemented in MergeTreeData
virtual void forcefullyRemoveBrokenOutdatedPartFromZooKeeperBeforeDetaching(const String & /*part_name*/) {}
/// Outdate broken part, set remove time to zero (remove as fast as possible) and make clone in detached directory.
void outdateUnexpectedPartAndCloneToDetached(const DataPartPtr & part);
/// If the part is Obsolete and not used by anybody else, immediately delete it from filesystem and remove from memory.
bool tryRemovePartImmediately(DataPartPtr && part);
/// Returns old inactive parts that can be deleted. At the same time removes them from the list of parts but not from the disk.
/// If 'force' - don't wait for old_parts_lifetime.
DataPartsVector grabOldParts(bool force = false);
/// Reverts the changes made by grabOldParts(), parts should be in Deleting state.
void rollbackDeletingParts(const DataPartsVector & parts);
/// Removes parts from data_parts, they should be in Deleting state
void removePartsFinally(const MergeTreeData::DataPartsVector & parts, MergeTreeData::DataPartsVector * removed_parts = nullptr);
/// Try to clear parts from filesystem.
/// If we fail to remove some part and throw_on_error equal to `true` will throw an exception on the first failed part.
void clearPartsFromFilesystemImpl(const DataPartsVector & parts, bool throw_on_error, NameSet * parts_failed_to_delete);
/// Remove parts from disk calling part->remove(). Can do it in parallel in case of big set of parts and enabled settings.
/// Throw exception in case of errors.
/// Otherwise, in non-parallel case will break and return.
void clearPartsFromFilesystemImplMaybeInParallel(const DataPartsVector & parts_to_remove, NameSet * part_names_succeed);
/// Try to clear parts from filesystem.
/// In case of error at some point for the rest of the parts its part's state is rollback Deleting - > Outdated.
/// That allows to schedule them for deletion a bit later
size_t clearPartsFromFilesystemAndRollbackIfError(const DataPartsVector & parts_to_delete, const String & parts_type);
/// Delete all directories which names begin with "tmp"
/// Must be called with locked lockForShare() because it's using relative_data_path.
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", "tmp-fetch_"});
size_t clearOldTemporaryDirectories(const String & root_path, size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes);
size_t clearEmptyParts();
/// Moves to outdated state patch parts that do not need to be applied to regular parts.
size_t clearUnusedPatchParts();
/// After the call to dropAllData() no method can be called.
/// Deletes the data directory and flushes the uncompressed blocks cache and the marks cache.
void dropAllData();
/// This flag is for hardening and assertions.
bool all_data_dropped = false;
/// Drop data directories if they are empty. It is safe to call this method if table creation was unsuccessful.
void dropIfEmpty();
/// Moves the entire data directory. Flushes the uncompressed blocks cache
/// and the marks cache. Must be called with locked lockExclusively()
/// because changes relative_data_path.
void rename(const String & new_table_path, const StorageID & new_table_id) override;
/// Also rename log names.
void renameInMemory(const StorageID & new_table_id) override;
/// Check if the ALTER can be performed:
/// - all needed columns are present.
/// - all type conversions can be done.
/// - columns corresponding to primary key, indices, sign, sampling expression, summed columns, and date are not affected.
/// If something is wrong, throws an exception.
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
/// Throw exception if command is some kind of DROP command (drop column, drop index, etc) or rename command
/// and we have unfinished mutation which need this column to finish.
void checkDropOrRenameCommandDoesntAffectInProgressMutations(
const AlterCommand & command, const std::map<std::string, MutationCommands> & unfinished_mutations, ContextPtr context) const;
/// Return mapping unfinished mutation name -> Mutation command
virtual std::map<std::string, MutationCommands> getUnfinishedMutationCommands() const = 0;
/// Checks if the Mutation can be performed.
/// (currently no additional checks: always ok)
void checkMutationIsPossible(const MutationCommands & commands, const Settings & settings) const override;
/// Checks that partition name in all commands is valid
void checkAlterPartitionIsPossible(
const PartitionCommands & commands,
const StorageMetadataPtr & metadata_snapshot,
const Settings & settings,
ContextPtr local_context) const override;
/// Change MergeTreeSettings
void changeSettings(
const ASTPtr & new_settings,
AlterLockHolder & table_lock_holder);
static void verifySortingKey(const KeyDescription & sorting_key);
/// Should be called if part data is suspected to be corrupted.
/// Has the ability to check all other parts
/// which reside on the same disk of the suspicious part.
virtual void reportBrokenPart(MergeTreeData::DataPartPtr data_part) const;
/// TODO (alesap) Duplicate method required for compatibility.
/// Must be removed.
static ASTPtr extractKeyExpressionList(const ASTPtr & node)
{
return DB::extractKeyExpressionList(node);
}
/** Create local backup (snapshot) for parts with specified prefix.
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
* or if 'with_name' is specified - backup is created in directory with specified name.
*/
PartitionCommandsResultInfo freezePartition(
const ASTPtr & partition,
const String & with_name,
ContextPtr context,
TableLockHolder & table_lock_holder);
/// Freezes all parts.
PartitionCommandsResultInfo freezeAll(
const String & with_name,
ContextPtr context,
TableLockHolder & table_lock_holder);
/// Unfreezes particular partition.
PartitionCommandsResultInfo unfreezePartition(
const ASTPtr & partition,
const String & backup_name,
ContextPtr context,
TableLockHolder & table_lock_holder);
/// Unfreezes all parts.
PartitionCommandsResultInfo unfreezeAll(
const String & backup_name,
ContextPtr context,
TableLockHolder & table_lock_holder);
/// Extract data from the backup and put it to the storage.
void restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & partitions) override;
/// Returns true if the storage supports backup/restore for specific partitions.
bool supportsBackupPartition() const override { return true; }
/// Moves partition to specified Disk
void movePartitionToDisk(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context);
/// Moves partition to specified Volume
void movePartitionToVolume(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context);
/// Moves partition to specified Table
void movePartitionToTable(const PartitionCommand & command, ContextPtr query_context);
void exportPartToTable(const PartitionCommand & command, ContextPtr query_context);
void exportPartToTable(
const std::string & part_name,
const StorageID & destination_storage_id,
const String & transaction_id,
ContextPtr query_context,
bool allow_outdated_parts = false,
std::function<void(MergeTreePartExportManifest::CompletionCallbackResult)> completion_callback = {});
void killExportPart(const String & transaction_id);
virtual void exportPartitionToTable(const PartitionCommand &, ContextPtr)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "EXPORT PARTITION is not implemented for engine {}", getName());
}