diff --git a/sources/ftypes.h b/sources/ftypes.h index c2273c13..73d10a3d 100644 --- a/sources/ftypes.h +++ b/sources/ftypes.h @@ -971,9 +971,6 @@ typedef int (*TFUN1)(UBYTE *,int); #define BUCKETTERMINATED 3 #define BUCKETRELEASED 4 -#define NUMBEROFBLOCKSINSORT 10 -#define MINIMUMNUMBEROFTERMS 10 - #define BUCKETDOINGTERM 1 #define BUCKETASSIGNED -1 #define BUCKETTOBERELEASED -2 @@ -981,6 +978,23 @@ typedef int (*TFUN1)(UBYTE *,int); #define BUCKETDOINGTERMS 0 #define BUCKETDOINGBRACKET 1 + +/* + Sortblock config +*/ +// The number of blocks in the ring-buffer for data transfer between +// workers or sortbots and the master thread or other sortbots. With +// sortbots, each thread has half of this number. +// The merging algorithm requires at least 4 blocks, so this must be +// at least 8, with sortbots. +#define NUMBEROFBLOCKSINSORT 8 +// The minimum number of terms which must fit in a block, i.e. they +// must be at least this number times MaxTermSize. +#define MINIMUMNUMBEROFTERMS 1 +// The minimum number of terms which will be written in a block, +// before potentially yielding the block to a waiting reading thread. +#define MINWRITENUMBEROFTERMS 1 + #endif /* diff --git a/sources/structs.h b/sources/structs.h index 4c2fe359..82e2cbbd 100644 --- a/sources/structs.h +++ b/sources/structs.h @@ -1184,6 +1184,7 @@ typedef struct SoRtBlOcK { WORD **MasterStart; WORD **MasterFill; WORD **MasterStop; + LONG *BlockTerms; int MasterNumBlocks; int MasterBlock; int FillBlock; diff --git a/sources/threads.c b/sources/threads.c index eabd35eb..b0d0a3d7 100644 --- a/sources/threads.c +++ b/sources/threads.c @@ -3537,6 +3537,7 @@ intercepted:; int PutToMaster(PHEAD WORD *term) { int i,j,nexti,ret = 0; + int urgent = 0; WORD *t, *fill, *top, zero = 0; if ( term == 0 ) { /* Mark the end of the expression */ t = &zero; j = 1; @@ -3548,29 +3549,48 @@ int PutToMaster(PHEAD WORD *term) i = AT.SB.FillBlock; /* The block we are working at */ fill = AT.SB.MasterFill[i]; /* Where we are filling */ top = AT.SB.MasterStop[i]; /* End of the block */ - while ( j > 0 ) { - LONG copy = MiN(top - fill, j); - j -= copy; - NCOPY(fill, t, copy); - if ( j > 0 ) { -/* - We reached the end of the block. - Get the next block and release this block. - The order is important. This is why there should be at least - 4 blocks or deadlocks can occur. -*/ - nexti = i+1; - if ( nexti > AT.SB.MasterNumBlocks ) { - nexti = 1; - } - LOCK(AT.SB.MasterBlockLock[nexti]); - UNLOCK(AT.SB.MasterBlockLock[i]); - AT.SB.MasterFill[i] = AT.SB.MasterStart[i]; - AT.SB.FillBlock = i = nexti; - fill = AT.SB.MasterStart[i]; - top = AT.SB.MasterStop[i]; + + // If there is space in the block, and we have already written MINWRITENUMBEROFTERMS, + // determine if the reading thread is waiting for us by trying to lock the previous + // block. If we manage to lock it, then we still have time to continue filling this + // block. If we can't lock it, the reading thread is waiting for us and we should + // move to the next block ASAP. + if ( j < top - fill && AT.SB.BlockTerms[i] > MINWRITENUMBEROFTERMS ) { + const int prev = ( i == 1 ? AT.SB.MasterNumBlocks : i-1 ); + if ( ! pthread_mutex_trylock(&(AT.SB.MasterBlockLock[prev])) ) { + UNLOCK(AT.SB.MasterBlockLock[prev]); + } + else { + urgent = 1; + } + } + + // If the term doesn't fit in the current block, or a thread is waiting for us + // (and we've already written at least MINWRITENUMBEROFTERMS), move to the next: + if ( ( j >= top - fill ) || urgent ) { + nexti = i+1; + if ( nexti > AT.SB.MasterNumBlocks ) { + nexti = 1; + } + LOCK(AT.SB.MasterBlockLock[nexti]); + UNLOCK(AT.SB.MasterBlockLock[i]); + AT.SB.MasterFill[i] = AT.SB.MasterStart[i]; + AT.SB.FillBlock = i = nexti; + fill = AT.SB.MasterStart[i]; + top = AT.SB.MasterStop[i]; + if ( AT.SB.BlockTerms[i] != 0 ) { + // In this case, there has been an accounting error in a previous use + // of this block. Blocks that have been read from and unlocked, should + // have BlockTerms == 0. + MLOCK(ErrorMessageLock); + MesPrint("Error in PutToMaster, starting a block with BlockTerms != 0"); + MUNLOCK(ErrorMessageLock); + Terminate(-1); } } + + NCOPY(fill, t, j); + AT.SB.BlockTerms[i]++; AT.SB.MasterFill[i] = fill; return(ret); } @@ -3773,7 +3793,15 @@ int MasterMerge(void) k = S->used[level]; i = k + lpat - 1; if ( !*(poin[k]) ) { - do { if ( !( i >>= 1 ) ) goto EndOfMerge; } while ( !S->tree[i] ); + // Stream k has hit the end-of-stream "0". We still need to decrement + // BlockTerms, which includes the marker in the count. + ki = S->ktoi[k]; + AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock]--; + do { + if ( !( i >>= 1 ) ) { + goto EndOfMerge; + } + } while ( !S->tree[i] ); if ( S->tree[i] == -1 ) { S->tree[i] = 0; level--; @@ -3851,6 +3879,12 @@ int MasterMerge(void) poin[S->tree[i]] = m1; } else { + // Here we are writing the new merged term *before* the original start of term1. + // We can always do this, since before term1 there is previous term data of this + // block, or the previous block, for which we are holding a lock. This requires + // the existence of "block 0", if term1 is the first term of block 1! + // It also requires the blocks to be contiguous in memory; we can't allocate + // separate memory regions for each block without larger-scale changes. r2 = r1 - m1[1]; m2 = tt1 - r2; r1 = S->PolyWise; @@ -3914,9 +3948,8 @@ int MasterMerge(void) im = *poin2[ul]; poin[ul] = poin2[ul]; ki = S->ktoi[ul]; - if ( (poin[ul] + im + COMPINC) >= - AB[ki+1]->T.SB.MasterStop[AB[ki+1]->T.SB.MasterBlock] - && im > 0 ) { + AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock]--; + if ( AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock] == 0 ) { /* We made it to the end of the block. We have to release the previous block and claim the next. @@ -3930,19 +3963,13 @@ int MasterMerge(void) UNLOCK(AT.SB.MasterBlockLock[i-1]); } if ( i == AT.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - WORD *from, *to; - to = AT.SB.MasterStart[1]; - from = AT.SB.MasterStop[i]; - while ( from > poin[ul] ) *--to = *--from; - poin[ul] = to; i = 1; } else { i++; } LOCK(AT.SB.MasterBlockLock[i]); AT.SB.MasterBlock = i; + poin[ul] = AT.SB.MasterStart[i]; + im = *poin[ul]; poin2[ul] = poin[ul] + im; } else { @@ -3994,9 +4021,8 @@ int MasterMerge(void) im = poin2[k][0]; poin[k] = poin2[k]; ki = S->ktoi[k]; - if ( (poin[k] + im + COMPINC) >= - AB[ki+1]->T.SB.MasterStop[AB[ki+1]->T.SB.MasterBlock] - && im > 0 ) { + AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock]--; + if ( AB[ki+1]->T.SB.BlockTerms[AB[ki+1]->T.SB.MasterBlock] == 0 ) { /* We made it to the end of the block. We have to release the previous block and claim the next. @@ -4010,19 +4036,13 @@ int MasterMerge(void) UNLOCK(AT.SB.MasterBlockLock[i-1]); } if ( i == AT.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - WORD *from, *to; - to = AT.SB.MasterStart[1]; - from = AT.SB.MasterStop[i]; - while ( from > poin[k] ) *--to = *--from; - poin[k] = to; i = 1; } else { i++; } LOCK(AT.SB.MasterBlockLock[i]); AT.SB.MasterBlock = i; + poin[k] = AT.SB.MasterStart[i]; + im = *poin[k]; poin2[k] = poin[k] + im; } else { @@ -4238,7 +4258,7 @@ int SortBotMerge(PHEAD0) { GETBIDENTITY ALLPRIVATES *Bin1 = AB[AT.SortBotIn1],*Bin2 = AB[AT.SortBotIn2]; - WORD *term1, *term2, *next, *wp; + WORD *term1, *term2, *wp; int blin1, blin2; /* Current block numbers */ int error = 0; WORD l1, l2, *m1, *m2, *w, r1, r2, r3, r33, r31, *tt1, ii; @@ -4279,6 +4299,7 @@ int SortBotMerge(PHEAD0) /* #[ One is smallest : */ + Bin1->T.SB.BlockTerms[blin1]--; if ( SortBotOut(BHEAD term1) < 0 ) { MLOCK(ErrorMessageLock); MesPrint("Called from SortBotMerge with thread = %d",AT.identity); @@ -4286,10 +4307,8 @@ int SortBotMerge(PHEAD0) error = -1; goto ReturnError; } - im = *term1; - next = term1 + im; - if ( next >= Bin1->T.SB.MasterStop[blin1] || ( *next && - next+*next+COMPINC > Bin1->T.SB.MasterStop[blin1] ) ) { + term1 += *term1; + if ( Bin1->T.SB.BlockTerms[blin1] == 0 ) { if ( blin1 == 1 ) { UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]); } @@ -4297,13 +4316,6 @@ int SortBotMerge(PHEAD0) UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]); } if ( blin1 == Bin1->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin1->T.SB.MasterStart[1]; - from = Bin1->T.SB.MasterStop[Bin1->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin1 = 1; } else { @@ -4311,8 +4323,8 @@ int SortBotMerge(PHEAD0) } LOCK(Bin1->T.SB.MasterBlockLock[blin1]); Bin1->T.SB.MasterBlock = blin1; + term1 = Bin1->T.SB.MasterStart[blin1]; } - term1 = next; /* #] One is smallest : */ @@ -4321,6 +4333,7 @@ int SortBotMerge(PHEAD0) /* #[ Two is smallest : */ + Bin2->T.SB.BlockTerms[blin2]--; if ( SortBotOut(BHEAD term2) < 0 ) { MLOCK(ErrorMessageLock); MesPrint("Called from SortBotMerge with thread = %d",AT.identity); @@ -4328,10 +4341,9 @@ int SortBotMerge(PHEAD0) error = -1; goto ReturnError; } -next2: im = *term2; - next = term2 + im; - if ( next >= Bin2->T.SB.MasterStop[blin2] || ( *next - && next+*next+COMPINC > Bin2->T.SB.MasterStop[blin2] ) ) { +next2: + term2 += *term2; + if ( Bin2->T.SB.BlockTerms[blin2] == 0 ) { if ( blin2 == 1 ) { UNLOCK(Bin2->T.SB.MasterBlockLock[Bin2->T.SB.MasterNumBlocks]); } @@ -4339,13 +4351,6 @@ next2: im = *term2; UNLOCK(Bin2->T.SB.MasterBlockLock[blin2-1]); } if ( blin2 == Bin2->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin2->T.SB.MasterStart[1]; - from = Bin2->T.SB.MasterStop[Bin2->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin2 = 1; } else { @@ -4353,8 +4358,8 @@ next2: im = *term2; } LOCK(Bin2->T.SB.MasterBlockLock[blin2]); Bin2->T.SB.MasterBlock = blin2; + term2 = Bin2->T.SB.MasterStart[blin2]; } - term2 = next; /* #] Two is smallest : */ @@ -4363,6 +4368,8 @@ next2: im = *term2; /* #[ Equal : */ + Bin1->T.SB.BlockTerms[blin1]--; + Bin2->T.SB.BlockTerms[blin2]--; l1 = *( m1 = term1 ); l2 = *( m2 = term2 ); if ( S->PolyWise ) { /* Here we work with PolyFun */ @@ -4417,6 +4424,12 @@ next2: im = *term2; term1 = m1; } else { + // Here we are writing the new merged term *before* the original start of term1. + // We can always do this, since before term1 there is previous term data of this + // block, or the previous block, for which we are holding a lock. This requires + // the existence of "block 0", if term1 is the first term of block 1! + // It also requires the blocks to be contiguous in memory; we can't allocate + // separate memory regions for each block without larger-scale changes. r2 = r1 - m1[1]; m2 = tt1 - r2; r1 = S->PolyWise; @@ -4580,10 +4593,9 @@ next2: im = *term2; goto ReturnError; } cancelled:; /* Now we need two new terms */ - im = *term1; - next = term1 + im; - if ( next >= Bin1->T.SB.MasterStop[blin1] || ( *next && - next+*next+COMPINC > Bin1->T.SB.MasterStop[blin1] ) ) { + term1 += *term1; + if ( Bin1->T.SB.BlockTerms[blin1] == 0 ) { + if ( blin1 == 1 ) { UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]); } @@ -4591,13 +4603,6 @@ cancelled:; /* Now we need two new terms */ UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]); } if ( blin1 == Bin1->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin1->T.SB.MasterStart[1]; - from = Bin1->T.SB.MasterStop[Bin1->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin1 = 1; } else { @@ -4605,8 +4610,8 @@ cancelled:; /* Now we need two new terms */ } LOCK(Bin1->T.SB.MasterBlockLock[blin1]); Bin1->T.SB.MasterBlock = blin1; + term1 = Bin1->T.SB.MasterStart[blin1]; } - term1 = next; goto next2; /* #] Equal : @@ -4621,6 +4626,7 @@ cancelled:; /* Now we need two new terms */ #[ Tail in one : */ while ( *term1 ) { + Bin1->T.SB.BlockTerms[blin1]--; if ( SortBotOut(BHEAD term1) < 0 ) { MLOCK(ErrorMessageLock); MesPrint("Called from SortBotMerge with thread = %d",AT.identity); @@ -4628,10 +4634,8 @@ cancelled:; /* Now we need two new terms */ error = -1; goto ReturnError; } - im = *term1; - next = term1 + im; - if ( next >= Bin1->T.SB.MasterStop[blin1] || ( *next && - next+*next+COMPINC > Bin1->T.SB.MasterStop[blin1] ) ) { + if ( Bin1->T.SB.BlockTerms[blin1] == 0 ) { + if ( blin1 == 1 ) { UNLOCK(Bin1->T.SB.MasterBlockLock[Bin1->T.SB.MasterNumBlocks]); } @@ -4639,13 +4643,6 @@ cancelled:; /* Now we need two new terms */ UNLOCK(Bin1->T.SB.MasterBlockLock[blin1-1]); } if ( blin1 == Bin1->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin1->T.SB.MasterStart[1]; - from = Bin1->T.SB.MasterStop[Bin1->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin1 = 1; } else { @@ -4653,8 +4650,11 @@ cancelled:; /* Now we need two new terms */ } LOCK(Bin1->T.SB.MasterBlockLock[blin1]); Bin1->T.SB.MasterBlock = blin1; + term1 = Bin1->T.SB.MasterStart[blin1]; + } + else { + term1 += *term1; } - term1 = next; } /* #] Tail in one : @@ -4665,6 +4665,7 @@ cancelled:; /* Now we need two new terms */ #[ Tail in two : */ while ( *term2 ) { + Bin2->T.SB.BlockTerms[blin2]--; if ( SortBotOut(BHEAD term2) < 0 ) { MLOCK(ErrorMessageLock); MesPrint("Called from SortBotMerge with thread = %d",AT.identity); @@ -4672,10 +4673,8 @@ cancelled:; /* Now we need two new terms */ error = -1; goto ReturnError; } - im = *term2; - next = term2 + im; - if ( next >= Bin2->T.SB.MasterStop[blin2] || ( *next - && next+*next+COMPINC > Bin2->T.SB.MasterStop[blin2] ) ) { + if ( Bin2->T.SB.BlockTerms[blin2] == 0 ) { + if ( blin2 == 1 ) { UNLOCK(Bin2->T.SB.MasterBlockLock[Bin2->T.SB.MasterNumBlocks]); } @@ -4683,13 +4682,6 @@ cancelled:; /* Now we need two new terms */ UNLOCK(Bin2->T.SB.MasterBlockLock[blin2-1]); } if ( blin2 == Bin2->T.SB.MasterNumBlocks ) { -/* - Move the remainder down into block 0 -*/ - to = Bin2->T.SB.MasterStart[1]; - from = Bin2->T.SB.MasterStop[Bin2->T.SB.MasterNumBlocks]; - while ( from > next ) *--to = *--from; - next = to; blin2 = 1; } else { @@ -4697,17 +4689,27 @@ cancelled:; /* Now we need two new terms */ } LOCK(Bin2->T.SB.MasterBlockLock[blin2]); Bin2->T.SB.MasterBlock = blin2; + term2 = Bin2->T.SB.MasterStart[blin2]; + } + else { + term2 += *term2; } - term2 = next; } /* #] Tail in two : */ } + + // Both streams have hit the end-of-stream marker "0". We still need to + // decrement the BlockTerms counters a final time, the marker is included + // in the count. + Bin1->T.SB.BlockTerms[blin1]--; + Bin2->T.SB.BlockTerms[blin2]--; + SortBotOut(BHEAD 0); ReturnError:; /* - Release all locks + Release all locks. */ UNLOCK(Bin1->T.SB.MasterBlockLock[blin1]); if ( blin1 > 1 ) { @@ -4799,14 +4801,17 @@ int IniSortBlocks(int numworkers) AT.SB.MasterFill = AT.SB.MasterStart + (numberofblocks+1); AT.SB.MasterStop = AT.SB.MasterFill + (numberofblocks+1); AT.SB.MasterNumBlocks = numberofblocks; + AT.SB.BlockTerms = (LONG*)Malloc1(sizeof(LONG)*(numberofblocks+1),"BlockTerms"); AT.SB.MasterBlock = 0; AT.SB.FillBlock = 0; AT.SB.MasterFill[0] = AT.SB.MasterStart[0] = w; + AT.SB.BlockTerms[0] = 0; w += maxter; AT.SB.MasterStop[0] = w; AT.SB.MasterBlockLock[0] = dummylock; for ( j = 1; j <= numberofblocks; j++ ) { AT.SB.MasterFill[j] = AT.SB.MasterStart[j] = w; + AT.SB.BlockTerms[j] = 0; w += blocksize; AT.SB.MasterStop[j] = w; AT.SB.MasterBlockLock[j] = dummylock;