From 8a5340aee4044433e4f9a62522783d4408a8c440 Mon Sep 17 00:00:00 2001 From: Brannon King Date: Fri, 13 Aug 2021 17:24:34 -0400 Subject: [PATCH] [lbry] refactored parallel hash computation location, other hash code formatting --- claimtrie/claimtrie.go | 104 +++++++++++++++++++++++----- claimtrie/claimtrie_test.go | 4 +- claimtrie/cmd/cmd/merkletrie.go | 4 +- claimtrie/merkletrie/merkletrie.go | 16 ++--- claimtrie/merkletrie/ramtrie.go | 37 ++++------ claimtrie/node/hashfork_manager.go | 39 +++++++++++ claimtrie/node/hashfunc.go | 30 ++++++++- claimtrie/node/manager.go | 105 +---------------------------- 8 files changed, 178 insertions(+), 161 deletions(-) create mode 100644 claimtrie/node/hashfork_manager.go diff --git a/claimtrie/claimtrie.go b/claimtrie/claimtrie.go index 0d3847bbcc8..fe08644df50 100644 --- a/claimtrie/claimtrie.go +++ b/claimtrie/claimtrie.go @@ -2,8 +2,11 @@ package claimtrie import ( "bytes" + "fmt" "path/filepath" + "runtime" "sort" + "sync" "github.com/pkg/errors" @@ -80,12 +83,13 @@ func New(cfg config.Config) (*ClaimTrie, error) { if err != nil { return nil, errors.Wrap(err, "creating node base manager") } - nodeManager := node.NewNormalizingManager(baseManager) + normalizingManager := node.NewNormalizingManager(baseManager) + nodeManager := &node.HashV2Manager{Manager: normalizingManager} cleanups = append(cleanups, nodeManager.Close) var trie merkletrie.MerkleTrie if cfg.RamTrie { - trie = merkletrie.NewRamTrie(nodeManager) + trie = merkletrie.NewRamTrie() } else { // Initialize repository for MerkleTrie. The cleanup is delegated to MerkleTrie. @@ -95,7 +99,7 @@ func New(cfg config.Config) (*ClaimTrie, error) { return nil, errors.Wrap(err, "creating trie repo") } - persistentTrie := merkletrie.NewPersistentTrie(nodeManager, trieRepo) + persistentTrie := merkletrie.NewPersistentTrie(trieRepo) cleanups = append(cleanups, persistentTrie.Close) trie = persistentTrie } @@ -129,8 +133,11 @@ func New(cfg config.Config) (*ClaimTrie, error) { ct.Close() return nil, errors.Wrap(err, "increment height to") } - // TODO: pass in the interrupt signal here: - trie.SetRoot(hash, nil) // keep this after IncrementHeightTo + err = trie.SetRoot(hash) // keep this after IncrementHeightTo + if err == merkletrie.ErrFullRebuildRequired { + // TODO: pass in the interrupt signal here: + ct.runFullTrieRebuild(nil) + } if !ct.MerkleHash().IsEqual(hash) { ct.Close() @@ -235,7 +242,7 @@ func (ct *ClaimTrie) AppendBlock() error { names = append(names, expirations...) names = removeDuplicates(names) - nhns := ct.nodeManager.MakeNameHashNext(names, false) + nhns := ct.makeNameHashNext(names, false) for nhn := range nhns { ct.merkleTrie.Update(nhn.Name, nhn.Hash, true) @@ -260,10 +267,10 @@ func (ct *ClaimTrie) AppendBlock() error { ct.blockRepo.Set(ct.height, h) if hitFork { - ct.merkleTrie.SetRoot(h, names) // for clearing the memory entirely + err = ct.merkleTrie.SetRoot(h) // for clearing the memory entirely } - return nil + return errors.Wrap(err, "merkle trie clear memory") } func (ct *ClaimTrie) updateTrieForHashForkIfNecessary() bool { @@ -271,15 +278,8 @@ func (ct *ClaimTrie) updateTrieForHashForkIfNecessary() bool { return false } - node.LogOnce("Marking all trie nodes as dirty for the hash fork...") - - // invalidate all names because we have to recompute the hash on everything - pairs := ct.nodeManager.MakeNameHashNext(nil, true) - for pair := range pairs { - ct.merkleTrie.Update(pair.Name, pair.Hash, false) - } - - node.LogOnce("Done. Now recomputing all hashes...") + node.LogOnce(fmt.Sprintf("Rebuilding all trie nodes for the hash fork at %d...", ct.height)) + ct.runFullTrieRebuild(nil) return true } @@ -322,7 +322,10 @@ func (ct *ClaimTrie) ResetHeight(height int32) error { if passedHashFork { names = nil // force them to reconsider all names } - ct.merkleTrie.SetRoot(hash, names) + err = ct.merkleTrie.SetRoot(hash) + if err == merkletrie.ErrFullRebuildRequired { + ct.runFullTrieRebuild(names) + } if !ct.MerkleHash().IsEqual(hash) { return errors.Errorf("unable to restore the hash at height %d", height) @@ -330,6 +333,21 @@ func (ct *ClaimTrie) ResetHeight(height int32) error { return nil } +func (ct *ClaimTrie) runFullTrieRebuild(names [][]byte) { + var nhns chan NameHashNext + if names == nil { + node.LogOnce("Building the entire claim trie in RAM...") + + nhns = ct.makeNameHashNext(nil, true) + } else { + nhns = ct.makeNameHashNext(names, false) + } + + for nhn := range nhns { + ct.merkleTrie.Update(nhn.Name, nhn.Hash, false) + } +} + // MerkleHash returns the Merkle Hash of the claimTrie. func (ct *ClaimTrie) MerkleHash() *chainhash.Hash { if ct.height >= param.ActiveParams.AllClaimsInMerkleForkHeight { @@ -392,3 +410,53 @@ func (ct *ClaimTrie) FlushToDisk() { node.Warn("During blockRepo flush: " + err.Error()) } } + +type NameHashNext struct { + Name []byte + Hash *chainhash.Hash + Next int32 +} + +func (ct *ClaimTrie) makeNameHashNext(names [][]byte, all bool) chan NameHashNext { + inputs := make(chan []byte, 512) + outputs := make(chan NameHashNext, 512) + + var wg sync.WaitGroup + computeHash := func() { + for name := range inputs { + hash, next := ct.nodeManager.Hash(name) + outputs <- NameHashNext{name, hash, next} + } + wg.Done() + } + + threads := int(0.8 * float32(runtime.NumCPU())) + if threads < 1 { + threads = 1 + } + for threads >= 0 { + threads-- + wg.Add(1) + go computeHash() + } + go func() { + if all { + ct.nodeManager.IterateNames(func(name []byte) bool { + clone := make([]byte, len(name)) + copy(clone, name) // iteration name buffer is reused on future loops + inputs <- clone + return true + }) + } else { + for _, name := range names { + inputs <- name + } + } + close(inputs) + }() + go func() { + wg.Wait() + close(outputs) + }() + return outputs +} diff --git a/claimtrie/claimtrie_test.go b/claimtrie/claimtrie_test.go index ab77e28749c..7b56372bb47 100644 --- a/claimtrie/claimtrie_test.go +++ b/claimtrie/claimtrie_test.go @@ -254,8 +254,8 @@ func TestRebuild(t *testing.T) { r.NotNil(m) r.NotEqual(*merkletrie.EmptyTrieHash, *m) - ct.merkleTrie = merkletrie.NewRamTrie(ct.nodeManager) - ct.merkleTrie.SetRoot(m, nil) + ct.merkleTrie = merkletrie.NewRamTrie() + ct.runFullTrieRebuild(nil) m2 := ct.MerkleHash() r.NotNil(m2) diff --git a/claimtrie/cmd/cmd/merkletrie.go b/claimtrie/cmd/cmd/merkletrie.go index 4d6b4079568..503a8762de8 100644 --- a/claimtrie/cmd/cmd/merkletrie.go +++ b/claimtrie/cmd/cmd/merkletrie.go @@ -66,10 +66,10 @@ func NewTrieNameCommand() *cobra.Command { return errors.Wrapf(err, "open merkle trie repo") } - trie := merkletrie.NewPersistentTrie(nil, trieRepo) + trie := merkletrie.NewPersistentTrie(trieRepo) defer trie.Close() - trie.SetRoot(&hash, nil) + trie.SetRoot(&hash) if len(name) > 1 { trie.Dump(name) diff --git a/claimtrie/merkletrie/merkletrie.go b/claimtrie/merkletrie/merkletrie.go index 107fd302bd5..a611fac7605 100644 --- a/claimtrie/merkletrie/merkletrie.go +++ b/claimtrie/merkletrie/merkletrie.go @@ -20,26 +20,19 @@ var ( NoClaimsHash = &chainhash.Hash{3} ) -// ValueStore enables PersistentTrie to query node values from different implementations. -type ValueStore interface { - MakeNameHashNext(names [][]byte, all bool) chan node.NameHashNext -} - // PersistentTrie implements a 256-way prefix tree. type PersistentTrie struct { - store ValueStore - repo Repo + repo Repo root *vertex bufs *sync.Pool } // NewPersistentTrie returns a PersistentTrie. -func NewPersistentTrie(store ValueStore, repo Repo) *PersistentTrie { +func NewPersistentTrie(repo Repo) *PersistentTrie { tr := &PersistentTrie{ - store: store, - repo: repo, + repo: repo, bufs: &sync.Pool{ New: func() interface{} { return new(bytes.Buffer) @@ -52,9 +45,10 @@ func NewPersistentTrie(store ValueStore, repo Repo) *PersistentTrie { } // SetRoot drops all resolved nodes in the PersistentTrie, and set the Root with specified hash. -func (t *PersistentTrie) SetRoot(h *chainhash.Hash, names [][]byte) { +func (t *PersistentTrie) SetRoot(h *chainhash.Hash) error { t.root = newVertex(h) runtime.GC() + return nil } // Update updates the nodes along the path to the key. diff --git a/claimtrie/merkletrie/ramtrie.go b/claimtrie/merkletrie/ramtrie.go index dfcba75ab71..c86aa407387 100644 --- a/claimtrie/merkletrie/ramtrie.go +++ b/claimtrie/merkletrie/ramtrie.go @@ -2,6 +2,7 @@ package merkletrie import ( "bytes" + "errors" "runtime" "sync" @@ -10,7 +11,7 @@ import ( ) type MerkleTrie interface { - SetRoot(h *chainhash.Hash, names [][]byte) + SetRoot(h *chainhash.Hash) error Update(name []byte, h *chainhash.Hash, restoreChildren bool) MerkleHash() *chainhash.Hash MerkleHashAllClaims() *chainhash.Hash @@ -19,13 +20,11 @@ type MerkleTrie interface { type RamTrie struct { collapsedTrie - store ValueStore - bufs *sync.Pool + bufs *sync.Pool } -func NewRamTrie(s ValueStore) *RamTrie { +func NewRamTrie() *RamTrie { return &RamTrie{ - store: s, bufs: &sync.Pool{ New: func() interface{} { return new(bytes.Buffer) @@ -35,30 +34,22 @@ func NewRamTrie(s ValueStore) *RamTrie { } } -func (rt *RamTrie) SetRoot(h *chainhash.Hash, names [][]byte) { +var ErrFullRebuildRequired = errors.New("a full rebuild is required") + +func (rt *RamTrie) SetRoot(h *chainhash.Hash) error { if rt.Root.merkleHash.IsEqual(h) { runtime.GC() - return + return nil } - var nhns chan node.NameHashNext - if names == nil { - node.LogOnce("Building the entire claim trie in RAM...") // could put this in claimtrie.go - - // should technically clear the old trie first: - if rt.Nodes > 1 { - rt.Root = &collapsedVertex{key: make(KeyType, 0)} - rt.Nodes = 1 - runtime.GC() - } - nhns = rt.store.MakeNameHashNext(nil, true) - } else { - nhns = rt.store.MakeNameHashNext(names, false) + // should technically clear the old trie first: + if rt.Nodes > 1 { + rt.Root = &collapsedVertex{key: make(KeyType, 0)} + rt.Nodes = 1 + runtime.GC() } - for nhn := range nhns { - rt.Update(nhn.Name, nhn.Hash, false) - } + return ErrFullRebuildRequired } func (rt *RamTrie) Update(name []byte, h *chainhash.Hash, _ bool) { diff --git a/claimtrie/node/hashfork_manager.go b/claimtrie/node/hashfork_manager.go new file mode 100644 index 00000000000..b3271e2c8d1 --- /dev/null +++ b/claimtrie/node/hashfork_manager.go @@ -0,0 +1,39 @@ +package node + +import ( + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/claimtrie/param" +) + +type HashV2Manager struct { + Manager +} + +func (nm *HashV2Manager) computeClaimHashes(name []byte) (*chainhash.Hash, int32) { + + n, err := nm.NodeAt(nm.Height(), name) + if err != nil || n == nil { + return nil, 0 + } + + n.SortClaimsByBid() + claimHashes := make([]*chainhash.Hash, 0, len(n.Claims)) + for _, c := range n.Claims { + if c.Status == Activated { // TODO: unit test this line + claimHashes = append(claimHashes, calculateNodeHash(c.OutPoint, n.TakenOverAt)) + } + } + if len(claimHashes) > 0 { + return ComputeMerkleRoot(claimHashes), n.NextUpdate() + } + return nil, n.NextUpdate() +} + +func (nm *HashV2Manager) Hash(name []byte) (*chainhash.Hash, int32) { + + if nm.Height() >= param.ActiveParams.AllClaimsInMerkleForkHeight { + return nm.computeClaimHashes(name) + } + + return nm.Manager.Hash(name) +} diff --git a/claimtrie/node/hashfunc.go b/claimtrie/node/hashfunc.go index 7c401e5dcf8..9c9784927c5 100644 --- a/claimtrie/node/hashfunc.go +++ b/claimtrie/node/hashfunc.go @@ -1,6 +1,13 @@ package node -import "github.com/btcsuite/btcd/chaincfg/chainhash" +import ( + "crypto/sha256" + "encoding/binary" + "strconv" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" +) func HashMerkleBranches(left *chainhash.Hash, right *chainhash.Hash) *chainhash.Hash { // Concatenate the left and right nodes. @@ -27,3 +34,24 @@ func ComputeMerkleRoot(hashes []*chainhash.Hash) *chainhash.Hash { } return hashes[0] } + +func calculateNodeHash(op wire.OutPoint, takeover int32) *chainhash.Hash { + + txHash := chainhash.DoubleHashH(op.Hash[:]) + + nOut := []byte(strconv.Itoa(int(op.Index))) + nOutHash := chainhash.DoubleHashH(nOut) + + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, uint64(takeover)) + heightHash := chainhash.DoubleHashH(buf) + + h := make([]byte, 0, sha256.Size*3) + h = append(h, txHash[:]...) + h = append(h, nOutHash[:]...) + h = append(h, heightHash[:]...) + + hh := chainhash.DoubleHashH(h) + + return &hh +} diff --git a/claimtrie/node/manager.go b/claimtrie/node/manager.go index 76d9f9e3e98..88c12127abb 100644 --- a/claimtrie/node/manager.go +++ b/claimtrie/node/manager.go @@ -1,20 +1,13 @@ package node import ( - "crypto/sha256" - "encoding/binary" "fmt" - "runtime" - "sort" - "strconv" - "sync" - "github.com/pkg/errors" + "sort" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/claimtrie/change" "github.com/btcsuite/btcd/claimtrie/param" - "github.com/btcsuite/btcd/wire" ) type Manager interface { @@ -26,7 +19,6 @@ type Manager interface { NodeAt(height int32, name []byte) (*Node, error) IterateNames(predicate func(name []byte) bool) Hash(name []byte) (*chainhash.Hash, int32) - MakeNameHashNext(names [][]byte, all bool) chan NameHashNext Flush() error } @@ -361,32 +353,8 @@ func (nm *BaseManager) IterateNames(predicate func(name []byte) bool) { nm.repo.IterateAll(predicate) } -func (nm *BaseManager) claimHashes(name []byte) (*chainhash.Hash, int32) { - - n, err := nm.node(name) - if err != nil || n == nil { - return nil, 0 - } - - n.SortClaimsByBid() - claimHashes := make([]*chainhash.Hash, 0, len(n.Claims)) - for _, c := range n.Claims { - if c.Status == Activated { // TODO: unit test this line - claimHashes = append(claimHashes, calculateNodeHash(c.OutPoint, n.TakenOverAt)) - } - } - if len(claimHashes) > 0 { - return ComputeMerkleRoot(claimHashes), n.NextUpdate() - } - return nil, n.NextUpdate() -} - func (nm *BaseManager) Hash(name []byte) (*chainhash.Hash, int32) { - if nm.height >= param.ActiveParams.AllClaimsInMerkleForkHeight { - return nm.claimHashes(name) - } - n, err := nm.node(name) if err != nil || n == nil { return nil, 0 @@ -400,77 +368,6 @@ func (nm *BaseManager) Hash(name []byte) (*chainhash.Hash, int32) { return nil, n.NextUpdate() } -type NameHashNext struct { - Name []byte - Hash *chainhash.Hash - Next int32 -} - -func (nm *BaseManager) MakeNameHashNext(names [][]byte, all bool) chan NameHashNext { - inputs := make(chan []byte, 512) - outputs := make(chan NameHashNext, 512) - - var wg sync.WaitGroup - computeHash := func() { - for name := range inputs { - hash, next := nm.Hash(name) - outputs <- NameHashNext{name, hash, next} - } - wg.Done() - } - - threads := int(0.8 * float32(runtime.NumCPU())) - if threads < 1 { - threads = 1 - } - for threads >= 0 { - threads-- - wg.Add(1) - go computeHash() - } - go func() { - if all { - nm.IterateNames(func(name []byte) bool { - clone := make([]byte, len(name)) - copy(clone, name) // iteration name buffer is reused on future loops - inputs <- clone - return true - }) - } else { - for _, name := range names { - inputs <- name - } - } - close(inputs) - }() - go func() { - wg.Wait() - close(outputs) - }() - return outputs -} - -func calculateNodeHash(op wire.OutPoint, takeover int32) *chainhash.Hash { - - txHash := chainhash.DoubleHashH(op.Hash[:]) - - nOut := []byte(strconv.Itoa(int(op.Index))) - nOutHash := chainhash.DoubleHashH(nOut) - - buf := make([]byte, 8) - binary.BigEndian.PutUint64(buf, uint64(takeover)) - heightHash := chainhash.DoubleHashH(buf) - - h := make([]byte, 0, sha256.Size*3) - h = append(h, txHash[:]...) - h = append(h, nOutHash[:]...) - h = append(h, heightHash[:]...) - - hh := chainhash.DoubleHashH(h) - - return &hh -} - func (nm *BaseManager) Flush() error { return nm.repo.Flush() }