diff --git a/sources/ftypes.h b/sources/ftypes.h index c2273c13..29ed7f20 100644 --- a/sources/ftypes.h +++ b/sources/ftypes.h @@ -981,6 +981,9 @@ typedef int (*TFUN1)(UBYTE *,int); #define BUCKETDOINGTERMS 0 #define BUCKETDOINGBRACKET 1 + +// The minimum number of terms which should fit in a bucket +#define BUCKETMINTERMS 2 #endif /* diff --git a/sources/structs.h b/sources/structs.h index 4c2fe359..489d030d 100644 --- a/sources/structs.h +++ b/sources/structs.h @@ -1211,6 +1211,7 @@ typedef struct ThReAdBuCkEt { WORD *threadbuffer; /* Here are the (primary) terms */ WORD *compressbuffer; /* For keep brackets we need the compressbuffer */ LONG threadbuffersize; /* Number of words in threadbuffer */ + LONG compressbuffersize; /* Number of words in compressbuffer */ LONG ddterms; /* Number of primary+secondary terms represented */ LONG firstterm; /* The number of the first term in the bucket */ LONG firstbracket; /* When doing complete brackets */ diff --git a/sources/threads.c b/sources/threads.c index eabd35eb..f7839d78 100644 --- a/sources/threads.c +++ b/sources/threads.c @@ -62,6 +62,7 @@ */ #include "form3.h" +#include #ifdef WITH_ALARM // This is only required if we are blocking SIG_ALRM in the worker threads. @@ -888,7 +889,9 @@ void TerminateAllThreads(void) * Creates 2*number thread buckets. We want double the number because * we want to prepare number of them while another number are occupied. * - * Each bucket should have about AC.ThreadBucketSize*AM.MaxTerm words. + * Each bucket should have about AC.ThreadBucketSize/4*AM.MaxTer words, + * for default values of these parameters. Particlularly for large + * AM.MaxTer we need to be more restrictive. * * When loading a thread we only have to pass the address of a full bucket. * This gives more overlap between the master and the workers and hence @@ -900,8 +903,6 @@ void TerminateAllThreads(void) * buckets while the workers are processing the contents of the buckets * they have been assigned. In practise often the processing can go faster * than that the master can fill the buckets for all workers. - * It should be possible to improve this bucket system, but the trivial - * idea * * @param number The number of workers * @param par par = 0: First allocation @@ -914,28 +915,36 @@ int MakeThreadBuckets(int number, int par) int i; LONG sizethreadbuckets; THREADBUCKET *thr; -/* - First we need a decent estimate. Not all terms should be maximal. - Note that AM.MaxTer is in bytes!!! - Maybe we should try to limit the size here a bit more effectively. - This is a great consumer of memory. -*/ - sizethreadbuckets = ( AC.ThreadBucketSize + 1 ) * AM.MaxTer + 2*sizeof(WORD); - if ( AC.ThreadBucketSize >= 250 ) sizethreadbuckets /= 4; - else if ( AC.ThreadBucketSize >= 90 ) sizethreadbuckets /= 3; - else if ( AC.ThreadBucketSize >= 40 ) sizethreadbuckets /= 2; - sizethreadbuckets /= sizeof(WORD); + + // Here we divide by 4, which has been the behaviour with the default + // AC.ThreadBucketSize for a long time. + // MAXTER is the default value of AM.MaxTer, in units of sizeof(WORD). + sizethreadbuckets = (AC.ThreadBucketSize*MAXTER)/4; + // Now we scale up the buffer logarithmically with the user AM.MaxTer. + // If we scale linearly, we end up with really enormous buffers here + // when AM.MaxTer is of the order of millions of WORDs. + float scale = 1.0; + if ( AM.MaxTer/sizeof(WORD) > MAXTER ) { + scale += log(((float)AM.MaxTer/sizeof(WORD))/MAXTER); + } + sizethreadbuckets = (LONG)((float)sizethreadbuckets*scale); + // Nonetheless, we must fit at least BUCKETMINTERMS terms in each bucket! + // So the buffer will eventually scale linearly with MaxTer anyway, but + // much less aggressively than the old code. + sizethreadbuckets = MaX((ULONG)sizethreadbuckets, BUCKETMINTERMS*AM.MaxTer/sizeof(WORD)); if ( par == 0 ) { numthreadbuckets = 2*(number-1); threadbuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets"); - freebuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets"); + freebuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"freebuckets"); } if ( par > 0 ) { if ( sizethreadbuckets <= threadbuckets[0]->threadbuffersize ) return(0); for ( i = 0; i < numthreadbuckets; i++ ) { thr = threadbuckets[i]; M_free(thr->deferbuffer,"deferbuffer"); + M_free(thr->threadbuffer,"threadbuffer"); + M_free(thr->compressbuffer,"compressbuffer"); } } else { @@ -947,11 +956,13 @@ int MakeThreadBuckets(int number, int par) for ( i = 0; i < numthreadbuckets; i++ ) { thr = threadbuckets[i]; thr->threadbuffersize = sizethreadbuckets; + // This buffer does not need to be so large, start it at MaxTer. + // We'll double it if necessary. + thr->compressbuffersize = AM.MaxTer/sizeof(WORD); thr->free = BUCKETFREE; - thr->deferbuffer = (POSITION *)Malloc1(2*sizethreadbuckets*sizeof(WORD) - +(AC.ThreadBucketSize+1)*sizeof(POSITION),"deferbuffer"); - thr->threadbuffer = (WORD *)(thr->deferbuffer+AC.ThreadBucketSize+1); - thr->compressbuffer = (WORD *)(thr->threadbuffer+sizethreadbuckets); + thr->deferbuffer = (POSITION *)Malloc1((AC.ThreadBucketSize+1)*sizeof(POSITION),"deferbuffer"); + thr->threadbuffer = (WORD *)Malloc1(sizethreadbuckets*sizeof(WORD),"threadbuffer"); + thr->compressbuffer = (WORD *)Malloc1(thr->compressbuffersize*sizeof(WORD),"compressbuffer"); thr->busy = BUCKETPREPARINGTERM; thr->usenum = thr->totnum = 0; thr->type = BUCKETDOINGTERMS; @@ -2825,6 +2836,17 @@ Found2:; defcount = 0; thr->deferbuffer[defcount++] = AR0.DefPosition; ttco = thr->compressbuffer; t1 = AR0.CompressBuffer; j = *t1; + while ( thr->compressbuffersize <= j ) { + // the compressbuffer is not large enough! + WORD *top = thr->compressbuffer+thr->compressbuffersize; + DoubleBuffer((void**)&(thr->compressbuffer),(void**)&(top), + sizeof(*(thr->compressbuffer)), "double compressbuffer"); + ttco = thr->compressbuffer; + thr->compressbuffersize *= 2; + MLOCK(ErrorMessageLock); + MesPrint("double compressbuffer 1"); + MUNLOCK(ErrorMessageLock); + } NCOPY(ttco,t1,j); } else if ( first && ( AC.CollectFun == 0 ) ) { /* Brackets ? */ @@ -2889,6 +2911,18 @@ Found2:; if ( AR0.DeferFlag ) { thr->deferbuffer[defcount++] = AR0.DefPosition; t1 = AR0.CompressBuffer; j = *t1; + while ( thr->compressbuffer+thr->compressbuffersize-ttco <= j ) { + // the compressbuffer is not large enough! + const ptrdiff_t oldoffset = ttco - thr->compressbuffer; + WORD *top = thr->compressbuffer+thr->compressbuffersize; + DoubleBuffer((void**)&(thr->compressbuffer),(void**)&(top), + sizeof(*(thr->compressbuffer)), "double compressbuffer"); + ttco = thr->compressbuffer + oldoffset; + thr->compressbuffersize *= 2; + MLOCK(ErrorMessageLock); + MesPrint("double compressbuffer 2"); + MUNLOCK(ErrorMessageLock); + } NCOPY(ttco,t1,j); } if ( AC.CollectFun && *tt < (AM.MaxTer/((LONG)sizeof(WORD))-10) ) {