Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sources/ftypes.h
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,9 @@ typedef int (*TFUN1)(UBYTE *,int);

#define BUCKETDOINGTERMS 0
#define BUCKETDOINGBRACKET 1

// The minimum number of terms which should fit in a bucket
#define BUCKETMINTERMS 2
#endif

/*
Expand Down
1 change: 1 addition & 0 deletions sources/structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ typedef struct ThReAdBuCkEt {
WORD *threadbuffer; /* Here are the (primary) terms */
WORD *compressbuffer; /* For keep brackets we need the compressbuffer */
LONG threadbuffersize; /* Number of words in threadbuffer */
LONG compressbuffersize; /* Number of words in compressbuffer */
LONG ddterms; /* Number of primary+secondary terms represented */
LONG firstterm; /* The number of the first term in the bucket */
LONG firstbracket; /* When doing complete brackets */
Expand Down
72 changes: 53 additions & 19 deletions sources/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
*/

#include "form3.h"
#include <math.h>

#ifdef WITH_ALARM
// This is only required if we are blocking SIG_ALRM in the worker threads.
Expand Down Expand Up @@ -888,7 +889,9 @@ void TerminateAllThreads(void)
* Creates 2*number thread buckets. We want double the number because
* we want to prepare number of them while another number are occupied.
*
* Each bucket should have about AC.ThreadBucketSize*AM.MaxTerm words.
* Each bucket should have about AC.ThreadBucketSize/4*AM.MaxTer words,
* for default values of these parameters. Particlularly for large
* AM.MaxTer we need to be more restrictive.
*
* When loading a thread we only have to pass the address of a full bucket.
* This gives more overlap between the master and the workers and hence
Expand All @@ -900,8 +903,6 @@ void TerminateAllThreads(void)
* buckets while the workers are processing the contents of the buckets
* they have been assigned. In practise often the processing can go faster
* than that the master can fill the buckets for all workers.
* It should be possible to improve this bucket system, but the trivial
* idea
*
* @param number The number of workers
* @param par par = 0: First allocation
Expand All @@ -914,28 +915,36 @@ int MakeThreadBuckets(int number, int par)
int i;
LONG sizethreadbuckets;
THREADBUCKET *thr;
/*
First we need a decent estimate. Not all terms should be maximal.
Note that AM.MaxTer is in bytes!!!
Maybe we should try to limit the size here a bit more effectively.
This is a great consumer of memory.
*/
sizethreadbuckets = ( AC.ThreadBucketSize + 1 ) * AM.MaxTer + 2*sizeof(WORD);
if ( AC.ThreadBucketSize >= 250 ) sizethreadbuckets /= 4;
else if ( AC.ThreadBucketSize >= 90 ) sizethreadbuckets /= 3;
else if ( AC.ThreadBucketSize >= 40 ) sizethreadbuckets /= 2;
sizethreadbuckets /= sizeof(WORD);

// Here we divide by 4, which has been the behaviour with the default
// AC.ThreadBucketSize for a long time.
// MAXTER is the default value of AM.MaxTer, in units of sizeof(WORD).
sizethreadbuckets = (AC.ThreadBucketSize*MAXTER)/4;
// Now we scale up the buffer logarithmically with the user AM.MaxTer.
// If we scale linearly, we end up with really enormous buffers here
// when AM.MaxTer is of the order of millions of WORDs.
float scale = 1.0;
if ( AM.MaxTer/sizeof(WORD) > MAXTER ) {
scale += log(((float)AM.MaxTer/sizeof(WORD))/MAXTER);
}
sizethreadbuckets = (LONG)((float)sizethreadbuckets*scale);
// Nonetheless, we must fit at least BUCKETMINTERMS terms in each bucket!
// So the buffer will eventually scale linearly with MaxTer anyway, but
// much less aggressively than the old code.
sizethreadbuckets = MaX((ULONG)sizethreadbuckets, BUCKETMINTERMS*AM.MaxTer/sizeof(WORD));

if ( par == 0 ) {
numthreadbuckets = 2*(number-1);
threadbuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets");
freebuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"threadbuckets");
freebuckets = (THREADBUCKET **)Malloc1(numthreadbuckets*sizeof(THREADBUCKET *),"freebuckets");
}
if ( par > 0 ) {
if ( sizethreadbuckets <= threadbuckets[0]->threadbuffersize ) return(0);
for ( i = 0; i < numthreadbuckets; i++ ) {
thr = threadbuckets[i];
M_free(thr->deferbuffer,"deferbuffer");
M_free(thr->threadbuffer,"threadbuffer");
M_free(thr->compressbuffer,"compressbuffer");
}
}
else {
Expand All @@ -947,11 +956,13 @@ int MakeThreadBuckets(int number, int par)
for ( i = 0; i < numthreadbuckets; i++ ) {
thr = threadbuckets[i];
thr->threadbuffersize = sizethreadbuckets;
// This buffer does not need to be so large, start it at MaxTer.
// We'll double it if necessary.
thr->compressbuffersize = AM.MaxTer/sizeof(WORD);
thr->free = BUCKETFREE;
thr->deferbuffer = (POSITION *)Malloc1(2*sizethreadbuckets*sizeof(WORD)
+(AC.ThreadBucketSize+1)*sizeof(POSITION),"deferbuffer");
thr->threadbuffer = (WORD *)(thr->deferbuffer+AC.ThreadBucketSize+1);
thr->compressbuffer = (WORD *)(thr->threadbuffer+sizethreadbuckets);
thr->deferbuffer = (POSITION *)Malloc1((AC.ThreadBucketSize+1)*sizeof(POSITION),"deferbuffer");
thr->threadbuffer = (WORD *)Malloc1(sizethreadbuckets*sizeof(WORD),"threadbuffer");
thr->compressbuffer = (WORD *)Malloc1(thr->compressbuffersize*sizeof(WORD),"compressbuffer");
thr->busy = BUCKETPREPARINGTERM;
thr->usenum = thr->totnum = 0;
thr->type = BUCKETDOINGTERMS;
Expand Down Expand Up @@ -2825,6 +2836,17 @@ Found2:;
defcount = 0;
thr->deferbuffer[defcount++] = AR0.DefPosition;
ttco = thr->compressbuffer; t1 = AR0.CompressBuffer; j = *t1;
while ( thr->compressbuffersize <= j ) {
// the compressbuffer is not large enough!
WORD *top = thr->compressbuffer+thr->compressbuffersize;
DoubleBuffer((void**)&(thr->compressbuffer),(void**)&(top),
sizeof(*(thr->compressbuffer)), "double compressbuffer");
ttco = thr->compressbuffer;
thr->compressbuffersize *= 2;
MLOCK(ErrorMessageLock);
MesPrint("double compressbuffer 1");
MUNLOCK(ErrorMessageLock);
}
NCOPY(ttco,t1,j);
}
else if ( first && ( AC.CollectFun == 0 ) ) { /* Brackets ? */
Expand Down Expand Up @@ -2889,6 +2911,18 @@ Found2:;
if ( AR0.DeferFlag ) {
thr->deferbuffer[defcount++] = AR0.DefPosition;
t1 = AR0.CompressBuffer; j = *t1;
while ( thr->compressbuffer+thr->compressbuffersize-ttco <= j ) {
// the compressbuffer is not large enough!
const ptrdiff_t oldoffset = ttco - thr->compressbuffer;
WORD *top = thr->compressbuffer+thr->compressbuffersize;
DoubleBuffer((void**)&(thr->compressbuffer),(void**)&(top),
sizeof(*(thr->compressbuffer)), "double compressbuffer");
ttco = thr->compressbuffer + oldoffset;
thr->compressbuffersize *= 2;
MLOCK(ErrorMessageLock);
MesPrint("double compressbuffer 2");
MUNLOCK(ErrorMessageLock);
}
NCOPY(ttco,t1,j);
}
if ( AC.CollectFun && *tt < (AM.MaxTer/((LONG)sizeof(WORD))-10) ) {
Expand Down
Loading