Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: relax indexedChainState to ChainState for retrieval #943

Closed
wants to merge 35 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9201e3c
feat: relax indexedChainState to ChainState for retrieval
hopeyen Dec 20, 2024
8f88c8e
feat: index operator socket registry with filter logs
hopeyen Jan 2, 2025
f009aaf
fix: lint for mocks
hopeyen Jan 2, 2025
33c33f2
fix: unit tests for retrieval client
hopeyen Jan 2, 2025
0438b5d
chore: contract binding update
hopeyen Jan 2, 2025
2ddee5e
refactor: initialize socket map with direct reads
hopeyen Jan 2, 2025
76898c0
fix: properly read filtered logs for params
hopeyen Jan 3, 2025
6099a2f
fix: make sure to query socket for other operators in the quorum
hopeyen Jan 3, 2025
f67ce0f
refactor: clean up
hopeyen Jan 3, 2025
05760b5
chore: regenerate contracts
hopeyen Feb 11, 2025
a6e0b89
rfr: remove old code and update new client usage
hopeyen Feb 11, 2025
56ead50
fix: test v2 client update
hopeyen Feb 11, 2025
f0508b7
rfr: variable comments and error msgs
hopeyen Feb 11, 2025
adad91d
refactor: chainState constructor returns error
hopeyen Feb 13, 2025
1cd5ce6
refactor: remove old file, socketUpdate indexing per log
hopeyen Feb 13, 2025
f2d88f3
refactor: check length for direct indexing access
hopeyen Feb 13, 2025
44ac427
fix: protect socket update lock, refactor getOperatorState helper
hopeyen Feb 13, 2025
993f3a2
chore: merge master
hopeyen Feb 18, 2025
c23fd54
chore: rm duplicate import
hopeyen Feb 18, 2025
c80f85e
refactor: chainState comments, socket map checks, rename, loggers
hopeyen Feb 19, 2025
ce594bf
feat: chainState logger arg
hopeyen Feb 19, 2025
23b3efd
refactor: separate log fetching and processing
hopeyen Feb 19, 2025
d939826
fix: lint
hopeyen Feb 19, 2025
663b8fa
refactor: edge cases + helpers
hopeyen Feb 19, 2025
b463cfa
refactor: names and logs, atomic int update
hopeyen Feb 20, 2025
5a02ef7
fix: logs
hopeyen Feb 21, 2025
001ee66
refactor: reduce tx call
hopeyen Feb 21, 2025
ca4848d
refactor: clean up locking
hopeyen Feb 21, 2025
d386c9d
fix: lint and registryCoordinator addr
hopeyen Feb 21, 2025
7e1d5a9
fix: fmt error format
hopeyen Feb 21, 2025
90023b5
fix: function sig
hopeyen Feb 21, 2025
94636ec
fix: typo
hopeyen Feb 21, 2025
ad98ac4
feat: use generated binding filter
hopeyen Feb 25, 2025
021a776
fix: rm an accident file
hopeyen Feb 25, 2025
a010cf0
fix: update mock writer
hopeyen Feb 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/clients/mock/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *MockNodeClient) GetBlobHeader(ctx context.Context, socket string, batch
func (c *MockNodeClient) GetChunks(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
batchHeaderHash [32]byte,
blobIndex uint32,
quorumID core.QuorumID,
Expand Down
4 changes: 2 additions & 2 deletions api/clients/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type RetrievedChunks struct {

type NodeClient interface {
GetBlobHeader(ctx context.Context, socket string, batchHeaderHash [32]byte, blobIndex uint32) (*core.BlobHeader, *merkletree.Proof, error)
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.IndexedOperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
GetChunks(ctx context.Context, opID core.OperatorID, opInfo *core.OperatorInfo, batchHeaderHash [32]byte, blobIndex uint32, quorumID core.QuorumID, chunksChan chan RetrievedChunks)
}

type client struct {
Expand Down Expand Up @@ -79,7 +79,7 @@ func (c client) GetBlobHeader(
func (c client) GetChunks(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
batchHeaderHash [32]byte,
blobIndex uint32,
quorumID core.QuorumID,
Expand Down
16 changes: 8 additions & 8 deletions api/clients/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type BlobChunks struct {

type retrievalClient struct {
logger logging.Logger
indexedChainState core.IndexedChainState
chainState core.ChainState
assignmentCoordinator core.AssignmentCoordinator
nodeClient NodeClient
verifier encoding.Verifier
Expand All @@ -63,15 +63,15 @@ type retrievalClient struct {
// NewRetrievalClient creates a new retrieval client.
func NewRetrievalClient(
logger logging.Logger,
chainState core.IndexedChainState,
chainState core.ChainState,
assignmentCoordinator core.AssignmentCoordinator,
nodeClient NodeClient,
verifier encoding.Verifier,
numConnections int) (RetrievalClient, error) {

return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
indexedChainState: chainState,
chainState: chainState,
assignmentCoordinator: assignmentCoordinator,
nodeClient: nodeClient,
verifier: verifier,
Expand Down Expand Up @@ -104,11 +104,11 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
batchRoot [32]byte,
quorumID core.QuorumID) (*BlobChunks, error) {

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
operatorState, err := r.chainState.GetOperatorState(ctx, referenceBlockNumber, []core.QuorumID{quorumID})
if err != nil {
return nil, err
}
operators, ok := indexedOperatorState.Operators[quorumID]
operators, ok := operatorState.Operators[quorumID]
if !ok {
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
}
Expand All @@ -118,7 +118,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
var proof *merkletree.Proof
var proofVerified bool
for opID := range operators {
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
blobHeader, proof, err = r.nodeClient.GetBlobHeader(ctx, opInfo.Socket, batchHeaderHash, blobIndex)
if err != nil {
// try another operator
Expand Down Expand Up @@ -172,7 +172,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
return nil, err
}

assignments, info, err := r.assignmentCoordinator.GetAssignments(indexedOperatorState.OperatorState, blobHeader.Length, quorumHeader)
assignments, info, err := r.assignmentCoordinator.GetAssignments(operatorState, blobHeader.Length, quorumHeader)
if err != nil {
return nil, errors.New("failed to get assignments")
}
Expand All @@ -182,7 +182,7 @@ func (r *retrievalClient) RetrieveBlobChunks(ctx context.Context,
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
pool.Submit(func() {
r.nodeClient.GetChunks(ctx, opID, opInfo, batchHeaderHash, blobIndex, quorumID, chunksChan)
})
Expand Down
35 changes: 10 additions & 25 deletions api/clients/retrieval_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,14 @@ func makeTestComponents() (encoding.Prover, encoding.Verifier, error) {
}

var (
indexedChainState core.IndexedChainState
chainState core.ChainState
indexer *indexermock.MockIndexer
operatorState *core.OperatorState
nodeClient *clientsmock.MockNodeClient
coordinator *core.StdAssignmentCoordinator
retrievalClient clients.RetrievalClient
blobHeader *core.BlobHeader
encodedBlob core.EncodedBlob = core.EncodedBlob{
chainState core.ChainState
indexer *indexermock.MockIndexer
operatorState *core.OperatorState
nodeClient *clientsmock.MockNodeClient
coordinator *core.StdAssignmentCoordinator
retrievalClient clients.RetrievalClient
blobHeader *core.BlobHeader
encodedBlob core.EncodedBlob = core.EncodedBlob{
BlobHeader: nil,
EncodedBundlesByOperator: make(map[core.OperatorID]core.EncodedBundles),
}
Expand All @@ -81,15 +80,6 @@ func setup(t *testing.T) {
t.Fatalf("failed to create new mocked chain data: %s", err)
}

indexedChainState, err = coremock.MakeChainDataMock(map[uint8]int{
0: numOperators,
1: numOperators,
2: numOperators,
})
if err != nil {
t.Fatalf("failed to create new mocked indexed chain data: %s", err)
}

nodeClient = clientsmock.NewNodeClient()
coordinator = &core.StdAssignmentCoordinator{}
p, v, err := makeTestComponents()
Expand All @@ -100,12 +90,7 @@ func setup(t *testing.T) {
indexer = &indexermock.MockIndexer{}
indexer.On("Index").Return(nil).Once()

ics, err := coreindexer.NewIndexedChainState(chainState, indexer)
if err != nil {
panic("failed to create a new indexed chain state")
}

retrievalClient, err = clients.NewRetrievalClient(logger, ics, coordinator, nodeClient, v, 2)
retrievalClient, err = clients.NewRetrievalClient(logger, chainState, coordinator, nodeClient, v, 2)
if err != nil {
panic("failed to create a new retrieval client")
}
Expand All @@ -132,7 +117,7 @@ func setup(t *testing.T) {
},
Data: codec.ConvertByPaddingEmptyByte(gettysburgAddressBytes),
}
operatorState, err = indexedChainState.GetOperatorState(context.Background(), (0), []core.QuorumID{quorumID})
operatorState, err = chainState.GetOperatorState(context.Background(), (0), []core.QuorumID{quorumID})
if err != nil {
t.Fatalf("failed to get operator state: %s", err)
}
Expand Down
33 changes: 16 additions & 17 deletions api/clients/v2/retrieval_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ type RetrievalClient interface {
}

type retrievalClient struct {
logger logging.Logger
ethClient core.Reader
indexedChainState core.IndexedChainState
verifier encoding.Verifier
numConnections int
logger logging.Logger
ethClient core.Reader
chainState core.ChainState
verifier encoding.Verifier
numConnections int
}

var _ RetrievalClient = &retrievalClient{}
Expand All @@ -46,16 +46,16 @@ var _ RetrievalClient = &retrievalClient{}
func NewRetrievalClient(
logger logging.Logger,
ethClient core.Reader,
chainState core.IndexedChainState,
chainState core.ChainState,
verifier encoding.Verifier,
numConnections int,
) RetrievalClient {
return &retrievalClient{
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
indexedChainState: chainState,
verifier: verifier,
numConnections: numConnections,
logger: logger.With("component", "RetrievalClient"),
ethClient: ethClient,
chainState: chainState,
verifier: verifier,
numConnections: numConnections,
}
}

Expand All @@ -74,11 +74,11 @@ func (r *retrievalClient) GetBlob(
return nil, err
}

indexedOperatorState, err := r.indexedChainState.GetIndexedOperatorState(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
operatorState, err := r.chainState.GetOperatorState(ctx, uint(referenceBlockNumber), []core.QuorumID{quorumID})
if err != nil {
return nil, err
}
operators, ok := indexedOperatorState.Operators[quorumID]
operators, ok := operatorState.Operators[quorumID]
if !ok {
return nil, fmt.Errorf("no quorum with ID: %d", quorumID)
}
Expand All @@ -98,7 +98,7 @@ func (r *retrievalClient) GetBlob(
return nil, err
}

assignments, err := corev2.GetAssignments(indexedOperatorState.OperatorState, blobParam, quorumID)
assignments, err := corev2.GetAssignments(operatorState, blobParam, quorumID)
if err != nil {
return nil, errors.New("failed to get assignments")
}
Expand All @@ -107,8 +107,7 @@ func (r *retrievalClient) GetBlob(
chunksChan := make(chan clients.RetrievedChunks, len(operators))
pool := workerpool.New(r.numConnections)
for opID := range operators {
opID := opID
opInfo := indexedOperatorState.IndexedOperators[opID]
opInfo := operators[opID]
pool.Submit(func() {
r.getChunksFromOperator(ctx, opID, opInfo, blobKey, quorumID, chunksChan)
})
Expand Down Expand Up @@ -160,7 +159,7 @@ func (r *retrievalClient) GetBlob(
func (r *retrievalClient) getChunksFromOperator(
ctx context.Context,
opID core.OperatorID,
opInfo *core.IndexedOperatorInfo,
opInfo *core.OperatorInfo,
blobKey corev2.BlobKey,
quorumID core.QuorumID,
chunksChan chan clients.RetrievedChunks,
Expand Down
10 changes: 5 additions & 5 deletions api/clients/v2/validator_payload_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/Layr-Labs/eigenda/common/geth"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigenda/core/thegraph"
corev2 "github.com/Layr-Labs/eigenda/core/v2"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/kzg"
Expand Down Expand Up @@ -37,7 +36,6 @@ func BuildValidatorPayloadRetriever(
logger logging.Logger,
validatorPayloadRetrieverConfig ValidatorPayloadRetrieverConfig,
ethConfig geth.EthClientConfig,
thegraphConfig thegraph.Config,
kzgConfig kzg.KzgConfig,
) (*ValidatorPayloadRetriever, error) {
err := validatorPayloadRetrieverConfig.checkAndSetDefaults()
Expand All @@ -59,8 +57,10 @@ func BuildValidatorPayloadRetriever(
return nil, fmt.Errorf("new reader: %w", err)
}

chainState := eth.NewChainState(reader, ethClient)
indexedChainState := thegraph.MakeIndexedChainState(thegraphConfig, chainState, logger)
chainState, err := eth.NewChainState(reader, ethClient, logger)
if err != nil {
return nil, fmt.Errorf("new chain state: %w", err)
}

kzgVerifier, err := verifier.NewVerifier(&kzgConfig, nil)
if err != nil {
Expand All @@ -70,7 +70,7 @@ func BuildValidatorPayloadRetriever(
retrievalClient := NewRetrievalClient(
logger,
reader,
indexedChainState,
chainState,
kzgVerifier,
int(validatorPayloadRetrieverConfig.MaxConnectionCount))

Expand Down
7 changes: 7 additions & 0 deletions common/abi.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,11 @@ import (
//go:embed abis/EigenDAServiceManager.json
var ServiceManagerAbi []byte

//go:embed abis/RegistryCoordinator.json
var RegistryCoordinatorAbi []byte

var BatchConfirmedEventSigHash = crypto.Keccak256Hash([]byte("BatchConfirmed(bytes32,uint32)"))
var OperatorSocketUpdateEventSigHash = crypto.Keccak256Hash([]byte("OperatorSocketUpdate(bytes32,string)"))

// TODO: consider adding deregistration for limiting size of socket map
// var OperatorDeregisteredEventSigHash = crypto.Keccak256Hash([]byte("OperatorDeregistered(address,bytes32)"))
Loading
Loading