From f95f5ea06b2811c7b58f5b695cf33a03ce07286e Mon Sep 17 00:00:00 2001 From: Josh Davies Date: Tue, 12 May 2026 23:20:55 +0100 Subject: [PATCH 1/2] refactor: split deferbufferallocation, make compressbuffer smaller Splitting the buffer will allow valgrind to detect out-of-bounds accesses of deferbuffer or threadbuffer. Start with a small compressbuffer, and double it as needed. This saves a lot of memory with many threads and large maxtermsize. --- sources/structs.h | 1 + sources/threads.c | 37 ++++++++++++++++++++++++++++++++----- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/sources/structs.h b/sources/structs.h index 4c2fe359..489d030d 100644 --- a/sources/structs.h +++ b/sources/structs.h @@ -1211,6 +1211,7 @@ typedef struct ThReAdBuCkEt { WORD *threadbuffer; /* Here are the (primary) terms */ WORD *compressbuffer; /* For keep brackets we need the compressbuffer */ LONG threadbuffersize; /* Number of words in threadbuffer */ + LONG compressbuffersize; /* Number of words in compressbuffer */ LONG ddterms; /* Number of primary+secondary terms represented */ LONG firstterm; /* The number of the first term in the bucket */ LONG firstbracket; /* When doing complete brackets */ diff --git a/sources/threads.c b/sources/threads.c index eabd35eb..d67be29e 100644 --- a/sources/threads.c +++ b/sources/threads.c @@ -929,13 +929,15 @@ int MakeThreadBuckets(int number, int par) if ( par == 0 ) { numthreadbuckets = 2*(number-1); threadbuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets"); - freebuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets"); + freebuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"freebuckets"); } if ( par > 0 ) { if ( sizethreadbuckets <= threadbuckets[0]->threadbuffersize ) return(0); for ( i = 0; i < numthreadbuckets; i++ ) { thr = threadbuckets[i]; M_free(thr->deferbuffer,"deferbuffer"); + M_free(thr->threadbuffer,"threadbuffer"); + M_free(thr->compressbuffer,"compressbuffer"); } } else { @@ -947,11 +949,13 @@ int MakeThreadBuckets(int number, int par) for ( i = 0; i < numthreadbuckets; i++ ) { thr = threadbuckets[i]; thr->threadbuffersize = sizethreadbuckets; + // This buffer does not need to be so large, start it at MaxTer. + // We'll double it if necessary. + thr->compressbuffersize = AM.MaxTer/sizeof(WORD); thr->free = BUCKETFREE; - thr->deferbuffer = (POSITION *)Malloc1(2*sizethreadbuckets*sizeof(WORD) - +(AC.ThreadBucketSize+1)*sizeof(POSITION),"deferbuffer"); - thr->threadbuffer = (WORD *)(thr->deferbuffer+AC.ThreadBucketSize+1); - thr->compressbuffer = (WORD *)(thr->threadbuffer+sizethreadbuckets); + thr->deferbuffer = (POSITION *)Malloc1((AC.ThreadBucketSize+1)*sizeof(POSITION),"deferbuffer"); + thr->threadbuffer = (WORD *)Malloc1(sizethreadbuckets*sizeof(WORD),"threadbuffer"); + thr->compressbuffer = (WORD *)Malloc1(thr->compressbuffersize*sizeof(WORD),"compressbuffer"); thr->busy = BUCKETPREPARINGTERM; thr->usenum = thr->totnum = 0; thr->type = BUCKETDOINGTERMS; @@ -2825,6 +2829,17 @@ Found2:; defcount = 0; thr->deferbuffer[defcount++] = AR0.DefPosition; ttco = thr->compressbuffer; t1 = AR0.CompressBuffer; j = *t1; + while ( thr->compressbuffersize <= j ) { + // the compressbuffer is not large enough! + WORD *top = thr->compressbuffer+thr->compressbuffersize; + DoubleBuffer((void**)&(thr->compressbuffer),(void**)&(top), + sizeof(*(thr->compressbuffer)), "double compressbuffer"); + ttco = thr->compressbuffer; + thr->compressbuffersize *= 2; + MLOCK(ErrorMessageLock); + MesPrint("double compressbuffer 1"); + MUNLOCK(ErrorMessageLock); + } NCOPY(ttco,t1,j); } else if ( first && ( AC.CollectFun == 0 ) ) { /* Brackets ? */ @@ -2889,6 +2904,18 @@ Found2:; if ( AR0.DeferFlag ) { thr->deferbuffer[defcount++] = AR0.DefPosition; t1 = AR0.CompressBuffer; j = *t1; + while ( thr->compressbuffer+thr->compressbuffersize-ttco <= j ) { + // the compressbuffer is not large enough! + const ptrdiff_t oldoffset = ttco - thr->compressbuffer; + WORD *top = thr->compressbuffer+thr->compressbuffersize; + DoubleBuffer((void**)&(thr->compressbuffer),(void**)&(top), + sizeof(*(thr->compressbuffer)), "double compressbuffer"); + ttco = thr->compressbuffer + oldoffset; + thr->compressbuffersize *= 2; + MLOCK(ErrorMessageLock); + MesPrint("double compressbuffer 2"); + MUNLOCK(ErrorMessageLock); + } NCOPY(ttco,t1,j); } if ( AC.CollectFun && *tt < (AM.MaxTer/((LONG)sizeof(WORD))-10) ) { From 2b0daca041920925352a12f5b3e6bd425c34cdfd Mon Sep 17 00:00:00 2001 From: Josh Davies Date: Wed, 13 May 2026 15:57:31 +0100 Subject: [PATCH 2/2] refactor: reduce the size of the thread buckets at larger maxtermsize Using "large" values of MaxTermSize (say, 5M, 10M, ...) results in very large allocations for the thread buffers, since they historically have been ThreadBucketCount(500)/4*MaxTermSize * 2 * workers, which easily reaches 100GB or more. Reduce the size of these allocations, such that with the default setup nothing changes, and then the buffers grow as log(MaxTermSize), up to the point where we enforce a BUCKETMINTERMS*MaxTermSize limit. Then they grow linearly, but BUCKETMINTERMS is much smaller than the default ThreadBucketCount. --- sources/ftypes.h | 3 +++ sources/threads.c | 35 +++++++++++++++++++++-------------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/sources/ftypes.h b/sources/ftypes.h index c2273c13..29ed7f20 100644 --- a/sources/ftypes.h +++ b/sources/ftypes.h @@ -981,6 +981,9 @@ typedef int (*TFUN1)(UBYTE *,int); #define BUCKETDOINGTERMS 0 #define BUCKETDOINGBRACKET 1 + +// The minimum number of terms which should fit in a bucket +#define BUCKETMINTERMS 2 #endif /* diff --git a/sources/threads.c b/sources/threads.c index d67be29e..f7839d78 100644 --- a/sources/threads.c +++ b/sources/threads.c @@ -62,6 +62,7 @@ */ #include "form3.h" +#include #ifdef WITH_ALARM // This is only required if we are blocking SIG_ALRM in the worker threads. @@ -888,7 +889,9 @@ void TerminateAllThreads(void) * Creates 2*number thread buckets. We want double the number because * we want to prepare number of them while another number are occupied. * - * Each bucket should have about AC.ThreadBucketSize*AM.MaxTerm words. + * Each bucket should have about AC.ThreadBucketSize/4*AM.MaxTer words, + * for default values of these parameters. Particlularly for large + * AM.MaxTer we need to be more restrictive. * * When loading a thread we only have to pass the address of a full bucket. * This gives more overlap between the master and the workers and hence @@ -900,8 +903,6 @@ void TerminateAllThreads(void) * buckets while the workers are processing the contents of the buckets * they have been assigned. In practise often the processing can go faster * than that the master can fill the buckets for all workers. - * It should be possible to improve this bucket system, but the trivial - * idea * * @param number The number of workers * @param par par = 0: First allocation @@ -914,17 +915,23 @@ int MakeThreadBuckets(int number, int par) int i; LONG sizethreadbuckets; THREADBUCKET *thr; -/* - First we need a decent estimate. Not all terms should be maximal. - Note that AM.MaxTer is in bytes!!! - Maybe we should try to limit the size here a bit more effectively. - This is a great consumer of memory. -*/ - sizethreadbuckets = ( AC.ThreadBucketSize + 1 ) * AM.MaxTer + 2*sizeof(WORD); - if ( AC.ThreadBucketSize >= 250 ) sizethreadbuckets /= 4; - else if ( AC.ThreadBucketSize >= 90 ) sizethreadbuckets /= 3; - else if ( AC.ThreadBucketSize >= 40 ) sizethreadbuckets /= 2; - sizethreadbuckets /= sizeof(WORD); + + // Here we divide by 4, which has been the behaviour with the default + // AC.ThreadBucketSize for a long time. + // MAXTER is the default value of AM.MaxTer, in units of sizeof(WORD). + sizethreadbuckets = (AC.ThreadBucketSize*MAXTER)/4; + // Now we scale up the buffer logarithmically with the user AM.MaxTer. + // If we scale linearly, we end up with really enormous buffers here + // when AM.MaxTer is of the order of millions of WORDs. + float scale = 1.0; + if ( AM.MaxTer/sizeof(WORD) > MAXTER ) { + scale += log(((float)AM.MaxTer/sizeof(WORD))/MAXTER); + } + sizethreadbuckets = (LONG)((float)sizethreadbuckets*scale); + // Nonetheless, we must fit at least BUCKETMINTERMS terms in each bucket! + // So the buffer will eventually scale linearly with MaxTer anyway, but + // much less aggressively than the old code. + sizethreadbuckets = MaX((ULONG)sizethreadbuckets, BUCKETMINTERMS*AM.MaxTer/sizeof(WORD)); if ( par == 0 ) { numthreadbuckets = 2*(number-1);