Skip to content

Commit 69f0fd6

Browse files
committed
chore: reinstate goroutine path for macwin
1 parent fff35ab commit 69f0fd6

10 files changed

Lines changed: 549 additions & 101 deletions

pkg/bmt/bmt.go

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -18,43 +18,6 @@ var (
1818
zerosection = make([]byte, 64)
1919
)
2020

21-
// Hasher is a reusable hasher for fixed maximum size chunks representing a BMT
22-
// It reuses a pool of trees for amortised memory allocation and resource control,
23-
// and supports order-agnostic concurrent segment writes and section (double segment) writes
24-
// as well as sequential read and write.
25-
//
26-
// The same hasher instance must not be called concurrently on more than one chunk.
27-
//
28-
// The same hasher instance is synchronously reusable.
29-
//
30-
// Sum gives back the tree to the pool and guaranteed to leave
31-
// the tree and itself in a state reusable for hashing a new chunk.
32-
type Hasher struct {
33-
*Conf // configuration
34-
bmt *tree // prebuilt BMT resource for flowcontrol and proofs
35-
size int // bytes written to Hasher since last Reset()
36-
span []byte // The span of the data subsumed under the chunk
37-
}
38-
39-
// NewHasher gives back an instance of a Hasher struct
40-
func NewHasher() *Hasher {
41-
return newHasherWithConf(NewConf(swarm.BmtBranches, 32))
42-
}
43-
44-
// NewPrefixHasher gives back an instance of a Hasher struct with the given prefix
45-
// prepended to every hash operation.
46-
func NewPrefixHasher(prefix []byte) *Hasher {
47-
return newHasherWithConf(NewConfWithPrefix(prefix, swarm.BmtBranches, 32))
48-
}
49-
50-
func newHasherWithConf(conf *Conf) *Hasher {
51-
return &Hasher{
52-
Conf: conf,
53-
span: make([]byte, SpanSize),
54-
bmt: newTree(conf.maxSize, conf.depth, conf.baseHasher, conf.prefix),
55-
}
56-
}
57-
5821
// Capacity returns the maximum amount of bytes that will be processed by this hasher implementation.
5922
// since BMT assumes a balanced binary tree, capacity it is always a power of 2
6023
func (h *Hasher) Capacity() int {
@@ -94,58 +57,12 @@ func (h *Hasher) BlockSize() int {
9457
return 2 * h.segmentSize
9558
}
9659

97-
// Hash returns the BMT root hash of the buffer and an error
98-
// using Hash presupposes sequential synchronous writes (io.Writer interface).
99-
func (h *Hasher) Hash(b []byte) ([]byte, error) {
100-
if h.size == 0 {
101-
return doHash(h.baseHasher(), h.span, h.zerohashes[h.depth])
102-
}
103-
// zero-fill remainder so all sections have deterministic input
104-
for i := h.size; i < h.maxSize; i++ {
105-
h.bmt.buffer[i] = 0
106-
}
107-
if len(h.bmt.levels) == 1 {
108-
// single-level tree: hash the only section directly
109-
secsize := 2 * h.segmentSize
110-
root := h.bmt.levels[0][0]
111-
rootHash, err := doHash(root.hasher, h.bmt.buffer[:secsize])
112-
if err != nil {
113-
return nil, err
114-
}
115-
return doHash(h.baseHasher(), h.span, rootHash)
116-
}
117-
rootHash, err := h.hashSIMD()
118-
if err != nil {
119-
return nil, err
120-
}
121-
return doHash(h.baseHasher(), h.span, rootHash)
122-
}
123-
12460
// Sum returns the BMT root hash of the buffer, unsafe version of Hash
12561
func (h *Hasher) Sum(b []byte) []byte {
12662
s, _ := h.Hash(b)
12763
return s
12864
}
12965

130-
// Write calls sequentially add to the buffer to be hashed.
131-
// All hashing is deferred to Hash().
132-
func (h *Hasher) Write(b []byte) (int, error) {
133-
l := len(b)
134-
maxVal := h.maxSize - h.size
135-
if l > maxVal {
136-
l = maxVal
137-
}
138-
copy(h.bmt.buffer[h.size:], b)
139-
h.size += l
140-
return l, nil
141-
}
142-
143-
// Reset prepares the Hasher for reuse
144-
func (h *Hasher) Reset() {
145-
h.size = 0
146-
copy(h.span, zerospan)
147-
}
148-
14966
// calculates the Keccak256 SHA3 hash of the data
15067
func sha3hash(data ...[]byte) ([]byte, error) {
15168
return doHash(swarm.NewHasher(), data...)

pkg/bmt/bmt_simd.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// Use of this source code is governed by a BSD-style
33
// license that can be found in the LICENSE file.
44

5+
//go:build linux && amd64 && !purego
6+
57
package bmt
68

79
import (

pkg/bmt/export_goroutine_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
// Copyright 2021 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build !linux || !amd64 || purego
6+
7+
package bmt
8+
9+
// NewConfNoSIMD on the goroutine path just returns a regular Conf
10+
// since there is no SIMD to disable.
11+
func NewConfNoSIMD(segmentCount, capacity int) *Conf {
12+
return NewConf(segmentCount, capacity)
13+
}

pkg/bmt/export_simd_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2021 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build linux && amd64 && !purego
6+
7+
package bmt
8+
9+
// NewConfNoSIMD creates a Conf identical to NewConf but with SIMD disabled,
10+
// useful for benchmarking the non-SIMD path.
11+
func NewConfNoSIMD(segmentCount, capacity int) *Conf {
12+
c := NewConf(segmentCount, capacity)
13+
c.useSIMD = false
14+
c.batchWidth = 8 // use 8-wide batching with scalar fallback
15+
return c
16+
}

pkg/bmt/export_test.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,3 @@
44
package bmt
55

66
var Sha3hash = sha3hash
7-
8-
// NewConfNoSIMD creates a Conf identical to NewConf but with SIMD disabled,
9-
// useful for benchmarking the non-SIMD path.
10-
func NewConfNoSIMD(segmentCount, capacity int) *Conf {
11-
c := NewConf(segmentCount, capacity)
12-
c.useSIMD = false
13-
c.batchWidth = 8 // use 8-wide batching with scalar fallback
14-
return c
15-
}

pkg/bmt/hasher_goroutine.go

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
// Copyright 2021 The Swarm Authors. All rights reserved.
2+
// Use of this source code is governed by a BSD-style
3+
// license that can be found in the LICENSE file.
4+
5+
//go:build !linux || !amd64 || purego
6+
7+
package bmt
8+
9+
import (
10+
"github.com/ethersphere/bee/v2/pkg/swarm"
11+
)
12+
13+
// Hasher is a reusable hasher for fixed maximum size chunks representing a BMT.
14+
// This implementation uses goroutine-based concurrent tree traversal.
15+
//
16+
// It reuses a pool of trees for amortised memory allocation and resource control,
17+
// and supports order-agnostic concurrent segment writes and section (double segment) writes
18+
// as well as sequential read and write.
19+
//
20+
// The same hasher instance must not be called concurrently on more than one chunk.
21+
//
22+
// The same hasher instance is synchronously reusable.
23+
//
24+
// Sum gives back the tree to the pool and guaranteed to leave
25+
// the tree and itself in a state reusable for hashing a new chunk.
26+
type Hasher struct {
27+
*Conf // configuration
28+
bmt *tree // prebuilt BMT resource for flowcontrol and proofs
29+
size int // bytes written to Hasher since last Reset()
30+
pos int // index of rightmost currently open segment
31+
result chan []byte // result channel
32+
errc chan error // error channel
33+
span []byte // The span of the data subsumed under the chunk
34+
}
35+
36+
// NewHasher gives back an instance of a Hasher struct
37+
func NewHasher() *Hasher {
38+
return newHasherWithConf(NewConf(swarm.BmtBranches, 32))
39+
}
40+
41+
// NewPrefixHasher gives back an instance of a Hasher struct with the given prefix
42+
// prepended to every hash operation.
43+
func NewPrefixHasher(prefix []byte) *Hasher {
44+
return newHasherWithConf(NewConfWithPrefix(prefix, swarm.BmtBranches, 32))
45+
}
46+
47+
func newHasherWithConf(conf *Conf) *Hasher {
48+
return &Hasher{
49+
Conf: conf,
50+
result: make(chan []byte),
51+
errc: make(chan error, 1),
52+
span: make([]byte, SpanSize),
53+
bmt: newTree(conf.maxSize, conf.depth, conf.hasherFunc),
54+
}
55+
}
56+
57+
// Write calls sequentially add to the buffer to be hashed,
58+
// with every full segment calls processSection in a go routine.
59+
func (h *Hasher) Write(b []byte) (int, error) {
60+
l := len(b)
61+
maxVal := h.maxSize - h.size
62+
if l > maxVal {
63+
l = maxVal
64+
}
65+
copy(h.bmt.buffer[h.size:], b)
66+
secsize := 2 * h.segmentSize
67+
from := h.size / secsize
68+
h.size += l
69+
to := h.size / secsize
70+
if l == maxVal {
71+
to--
72+
}
73+
h.pos = to
74+
for i := from; i < to; i++ {
75+
go h.processSection(i, false)
76+
}
77+
return l, nil
78+
}
79+
80+
// Hash returns the BMT root hash of the buffer and an error
81+
// using Hash presupposes sequential synchronous writes (io.Writer interface).
82+
func (h *Hasher) Hash(b []byte) ([]byte, error) {
83+
if h.size == 0 {
84+
return doHash(h.baseHasher(), h.span, h.zerohashes[h.depth])
85+
}
86+
copy(h.bmt.buffer[h.size:], zerosection)
87+
// write the last section with final flag set to true
88+
go h.processSection(h.pos, true)
89+
select {
90+
case result := <-h.result:
91+
return doHash(h.baseHasher(), h.span, result)
92+
case err := <-h.errc:
93+
return nil, err
94+
}
95+
}
96+
97+
// Reset prepares the Hasher for reuse
98+
func (h *Hasher) Reset() {
99+
h.pos = 0
100+
h.size = 0
101+
copy(h.span, zerospan)
102+
}
103+
104+
// processSection writes the hash of i-th section into level 1 node of the BMT tree.
105+
func (h *Hasher) processSection(i int, final bool) {
106+
secsize := 2 * h.segmentSize
107+
offset := i * secsize
108+
level := 1
109+
// select the leaf node for the section
110+
n := h.bmt.leaves[i]
111+
isLeft := n.isLeft
112+
hasher := n.hasher
113+
n = n.parent
114+
// hash the section
115+
section, err := doHash(hasher, h.bmt.buffer[offset:offset+secsize])
116+
if err != nil {
117+
select {
118+
case h.errc <- err:
119+
default:
120+
}
121+
return
122+
}
123+
// write hash into parent node
124+
if final {
125+
// for the last segment use writeFinalNode
126+
h.writeFinalNode(level, n, isLeft, section)
127+
} else {
128+
h.writeNode(n, isLeft, section)
129+
}
130+
}
131+
132+
// writeNode pushes the data to the node.
133+
// if it is the first of 2 sisters written, the routine terminates.
134+
// if it is the second, it calculates the hash and writes it
135+
// to the parent node recursively.
136+
// since hashing the parent is synchronous the same hasher can be used.
137+
func (h *Hasher) writeNode(n *node, isLeft bool, s []byte) {
138+
var err error
139+
for {
140+
// at the root of the bmt just write the result to the result channel
141+
if n == nil {
142+
h.result <- s
143+
return
144+
}
145+
// otherwise assign child hash to left or right segment
146+
if isLeft {
147+
n.left = s
148+
} else {
149+
n.right = s
150+
}
151+
// the child-thread first arriving will terminate
152+
if n.toggle() {
153+
return
154+
}
155+
// the thread coming second now can be sure both left and right children are written
156+
// so it calculates the hash of left|right and pushes it to the parent
157+
s, err = doHash(n.hasher, n.left, n.right)
158+
if err != nil {
159+
select {
160+
case h.errc <- err:
161+
default:
162+
}
163+
return
164+
}
165+
isLeft = n.isLeft
166+
n = n.parent
167+
}
168+
}
169+
170+
// writeFinalNode is following the path starting from the final datasegment to the
171+
// BMT root via parents.
172+
// For unbalanced trees it fills in the missing right sister nodes using
173+
// the pool's lookup table for BMT subtree root hashes for all-zero sections.
174+
// Otherwise behaves like `writeNode`.
175+
func (h *Hasher) writeFinalNode(level int, n *node, isLeft bool, s []byte) {
176+
var err error
177+
for {
178+
// at the root of the bmt just write the result to the result channel
179+
if n == nil {
180+
if s != nil {
181+
h.result <- s
182+
}
183+
return
184+
}
185+
var noHash bool
186+
if isLeft {
187+
// coming from left sister branch
188+
// when the final section's path is going via left child node
189+
// we include an all-zero subtree hash for the right level and toggle the node.
190+
n.right = h.zerohashes[level]
191+
if s != nil {
192+
n.left = s
193+
// if a left final node carries a hash, it must be the first (and only thread)
194+
// so the toggle is already in passive state no need no call
195+
// yet thread needs to carry on pushing hash to parent
196+
noHash = false
197+
} else {
198+
// if again first thread then propagate nil and calculate no hash
199+
noHash = n.toggle()
200+
}
201+
} else {
202+
// right sister branch
203+
if s != nil {
204+
// if hash was pushed from right child node, write right segment change state
205+
n.right = s
206+
// if toggle is true, we arrived first so no hashing just push nil to parent
207+
noHash = n.toggle()
208+
} else {
209+
// if s is nil, then thread arrived first at previous node and here there will be two,
210+
// so no need to do anything and keep s = nil for parent
211+
noHash = true
212+
}
213+
}
214+
// the child-thread first arriving will just continue resetting s to nil
215+
// the second thread now can be sure both left and right children are written
216+
// it calculates the hash of left|right and pushes it to the parent
217+
if noHash {
218+
s = nil
219+
} else {
220+
s, err = doHash(n.hasher, n.left, n.right)
221+
if err != nil {
222+
select {
223+
case h.errc <- err:
224+
default:
225+
}
226+
return
227+
}
228+
}
229+
// iterate to parent
230+
isLeft = n.isLeft
231+
n = n.parent
232+
level++
233+
}
234+
}

0 commit comments

Comments
 (0)