Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 66 additions & 4 deletions op-alt-da/daclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ package altda
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"
)

// ErrNotFound is returned when the server could not find the input.
var ErrNotFound = errors.New("not found")
// =========== SetInput (PUT path) errors ===========

// ErrInvalidInput is returned when the input is not valid for posting to the DA storage.
var ErrInvalidInput = errors.New("invalid input")
Expand All @@ -21,6 +21,47 @@ var ErrInvalidInput = errors.New("invalid input")
// See https://github.com/ethereum-optimism/specs/issues/434
var ErrAltDADown = errors.New("alt DA is down: failover to eth DA")

// =========== GetInput (GET path) errors ===========

// ErrNotFound is returned when the server could not find the input.
// Note: this error only applies to keccak commitments, and not to EigenDA altda commitments,
// because a cert that parses correctly and passes the recency check by definition proves
// the availability of the blob that is certifies.
// See https://github.com/Layr-Labs/eigenda/blob/f4ef5cd5/docs/spec/src/integration/spec/6-secure-integration.md#derivation-process for more info.
var ErrNotFound = errors.New("not found")

// DropEigenDACommitmentError is returned when the eigenda-proxy returns a 418 TEAPOT error,
// which signifies that the commitment should be dropped/skipped from the derivation pipeline, as either:
// 1. the cert in the commitment is invalid
// 2. the cert's blob cannot be decoded into a frame (it was not encoded according to one of the supported codecs,
// see https://github.com/Layr-Labs/eigenda/blob/f4ef5cd5/api/clients/codecs/blob_codec.go#L7-L15)
//
// See https://github.com/Layr-Labs/eigenda/blob/f4ef5cd5/docs/spec/src/integration/spec/6-secure-integration.md#derivation-process for more info.
//
// This error is parsed from the json body of the 418 TEAPOT error response.
// DropEigenDACommitmentError is the only error that can lead to a cert being dropped from the derivation pipeline.
// It is needed to protect the rollup from liveness attacks (derivation pipeline stalled by malicious batcher).
type DropEigenDACommitmentError struct {
// The StatusCode field MUST be contained in the response body of the 418 TEAPOT error.
StatusCode int
// The Msg field is a human-readable string that explains the error.
// It is optional, but should ideally be set to a meaningful value.
Msg string
}

func (e DropEigenDACommitmentError) Error() string {
return fmt.Sprintf("Invalid AltDA Commitment: cert verification failed with status code %v: %v", e.StatusCode, e.Msg)
}

// Validate that the status code is an integer between 1 and 4, and panics if it is not.
func (e DropEigenDACommitmentError) Validate() {
if e.StatusCode < 1 || e.StatusCode > 4 {
panic(fmt.Sprintf("DropEigenDACommitmentError: invalid status code %d, must be between 1 and 4", e.StatusCode))
}
// The Msg field should ideally be a human-readable string that explains the error,
// but we don't enforce it.
}

// DAClient is an HTTP client to communicate with a DA storage service.
// It creates commitments and retrieves input data + verifies if needed.
type DAClient struct {
Expand All @@ -33,6 +74,8 @@ type DAClient struct {
putTimeout time.Duration
}

var _ DAStorage = (*DAClient)(nil)

func NewDAClient(url string, verify bool, pc bool) *DAClient {
return &DAClient{
url: url,
Expand All @@ -42,8 +85,12 @@ func NewDAClient(url string, verify bool, pc bool) *DAClient {
}

// GetInput returns the input data for the given encoded commitment bytes.
func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/get/0x%x", c.url, comm.Encode()), nil)
// The l1InclusionBlock at which the commitment was included in the batcher-inbox is submitted
// to the DA server as a query parameter.
// It is used to discard old commitments whose blobs have a risk of not being available anymore.
// It is optional, and passing a 0 value will tell the DA server to skip the check.
func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData, l1InclusionBlockNumber uint64) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/get/0x%x?l1_inclusion_block_number=%d", c.url, comm.Encode(), l1InclusionBlockNumber), nil)
if err != nil {
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
}
Expand All @@ -55,6 +102,21 @@ func (c *DAClient) GetInput(ctx context.Context, comm CommitmentData) ([]byte, e
if resp.StatusCode == http.StatusNotFound {
return nil, ErrNotFound
}
if resp.StatusCode == http.StatusTeapot {
defer resp.Body.Close()
// Limit the body to 5000 bytes to prevent being DDoSed with a large error message.
bytesLimitedBody := io.LimitReader(resp.Body, 5000)
bodyBytes, _ := io.ReadAll(bytesLimitedBody)

var invalidCommitmentErr DropEigenDACommitmentError
if err := json.Unmarshal(bodyBytes, &invalidCommitmentErr); err != nil {
return nil, fmt.Errorf("failed to decode 418 TEAPOT HTTP error body into a DropEigenDACommitmentError. "+
"Consider updating proxy to a more recent version that contains https://github.com/Layr-Labs/eigenda/pull/1736: "+
"%w", err)
}
invalidCommitmentErr.Validate()
return nil, invalidCommitmentErr
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to get preimage: %v", resp.StatusCode)
}
Expand Down
16 changes: 8 additions & 8 deletions op-alt-da/daclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@ func TestDAClientPrecomputed(t *testing.T) {

require.Equal(t, comm, NewKeccak256Commitment(input))

stored, err := client.GetInput(ctx, comm)
stored, err := client.GetInput(ctx, comm, 0)
require.NoError(t, err)

require.Equal(t, input, stored)

// set a bad commitment in the store
require.NoError(t, store.Put(ctx, comm.Encode(), []byte("bad data")))

_, err = client.GetInput(ctx, comm)
_, err = client.GetInput(ctx, comm, 0)
require.ErrorIs(t, err, ErrCommitmentMismatch)

// test not found error
comm = NewKeccak256Commitment(RandomData(rng, 32))
_, err = client.GetInput(ctx, comm)
_, err = client.GetInput(ctx, comm, 0)
require.ErrorIs(t, err, ErrNotFound)

// test storing bad data
Expand All @@ -64,7 +64,7 @@ func TestDAClientPrecomputed(t *testing.T) {
_, err = client.SetInput(ctx, input)
require.Error(t, err)

_, err = client.GetInput(ctx, NewKeccak256Commitment(input))
_, err = client.GetInput(ctx, NewKeccak256Commitment(input), 0)
require.Error(t, err)
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func TestDAClientService(t *testing.T) {

require.Equal(t, comm.String(), NewKeccak256Commitment(input).String())

stored, err := client.GetInput(ctx, comm)
stored, err := client.GetInput(ctx, comm, 0)
require.NoError(t, err)

require.Equal(t, input, stored)
Expand All @@ -107,12 +107,12 @@ func TestDAClientService(t *testing.T) {
require.NoError(t, store.Put(ctx, comm.Encode(), []byte("bad data")))

// assert no error as generic commitments cannot be verified client side
_, err = client.GetInput(ctx, comm)
_, err = client.GetInput(ctx, comm, 0)
require.NoError(t, err)

// test not found error
comm = NewKeccak256Commitment(RandomData(rng, 32))
_, err = client.GetInput(ctx, comm)
_, err = client.GetInput(ctx, comm, 0)
require.ErrorIs(t, err, ErrNotFound)

// test storing bad data
Expand All @@ -124,6 +124,6 @@ func TestDAClientService(t *testing.T) {
_, err = client.SetInput(ctx, input)
require.Error(t, err)

_, err = client.GetInput(ctx, NewKeccak256Commitment(input))
_, err = client.GetInput(ctx, NewKeccak256Commitment(input), 0)
require.Error(t, err)
}
8 changes: 5 additions & 3 deletions op-alt-da/damgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ type L1Fetcher interface {

// DAStorage interface for calling the DA storage server.
type DAStorage interface {
GetInput(ctx context.Context, key CommitmentData) ([]byte, error)
// L1InclusionBlockNumber is the block number at which the commitment was included in the batcher inbox.
// It is used to check if the commitment is expired, and should be sent as a query parameter
// to the DA server. It is optional, and passing a 0 value will tell the DA server to skip the check.
GetInput(ctx context.Context, key CommitmentData, L1InclusionBlockNumber uint64) ([]byte, error)
SetInput(ctx context.Context, img []byte) (CommitmentData, error)
}

Expand Down Expand Up @@ -220,12 +223,11 @@ func (d *DA) GetInput(ctx context.Context, l1 L1Fetcher, comm CommitmentData, bl
d.log.Info("getting input", "comm", comm, "status", status)

// Fetch the input from the DA storage.
data, err := d.storage.GetInput(ctx, comm)
data, err := d.storage.GetInput(ctx, comm, blockId.Number)
notFound := errors.Is(ErrNotFound, err)
if err != nil && !notFound {
d.log.Error("failed to get preimage", "err", err)
// the storage client request failed for some other reason
// in which case derivation pipeline should be retried
return nil, err
}

Expand Down
10 changes: 7 additions & 3 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type MockDAClient struct {
setInputRequestCount uint // number of put requests received, irrespective of whether they were successful
}

var _ DAStorage = (*MockDAClient)(nil)

func NewMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
CommitmentType: Keccak256CommitmentType,
Expand Down Expand Up @@ -58,7 +60,7 @@ func (c *MockDAClient) DropEveryNthPut(n uint) {
c.dropEveryNthPut = n
}

func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte, error) {
func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData, _ uint64) ([]byte, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Getting input", "key", key)
Expand Down Expand Up @@ -121,12 +123,14 @@ type DAErrFaker struct {
setInputErr error
}

func (f *DAErrFaker) GetInput(ctx context.Context, key CommitmentData) ([]byte, error) {
var _ DAStorage = (*DAErrFaker)(nil)

func (f *DAErrFaker) GetInput(ctx context.Context, key CommitmentData, l1InclusionBlockNumber uint64) ([]byte, error) {
if err := f.getInputErr; err != nil {
f.getInputErr = nil
return nil, err
}
return f.Client.GetInput(ctx, key)
return f.Client.GetInput(ctx, key, l1InclusionBlockNumber)
}

func (f *DAErrFaker) SetInput(ctx context.Context, data []byte) (CommitmentData, error) {
Expand Down
6 changes: 3 additions & 3 deletions op-e2e/actions/altda/altda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (a *L2AltDA) ActResolveInput(t helpers.Testing, comm []byte, input []byte,

func (a *L2AltDA) ActResolveLastChallenge(t helpers.Testing) {
// remove derivation byte prefix
input, err := a.storage.GetInput(t.Ctx(), altda.Keccak256Commitment(a.lastComm[1:]))
input, err := a.storage.GetInput(t.Ctx(), altda.Keccak256Commitment(a.lastComm[1:]), 0)
require.NoError(t, err)

a.ActResolveInput(t, a.lastComm, input, a.lastCommBn)
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestAltDA_SequencerStalledMultiChallenges(gt *testing.T) {

// keep track of the related commitment
comm1 := a.lastComm
input1, err := a.storage.GetInput(t.Ctx(), altda.Keccak256Commitment(comm1[1:]))
input1, err := a.storage.GetInput(t.Ctx(), altda.Keccak256Commitment(comm1[1:]), 0)
bn1 := a.lastCommBn
require.NoError(t, err)

Expand Down Expand Up @@ -525,7 +525,7 @@ func TestAltDA_SequencerStalledMultiChallenges(gt *testing.T) {

// keep track of the second commitment
comm2 := a.lastComm
_, err = a.storage.GetInput(t.Ctx(), altda.Keccak256Commitment(comm2[1:]))
_, err = a.storage.GetInput(t.Ctx(), altda.Keccak256Commitment(comm2[1:]), 0)
require.NoError(t, err)
a.lastCommBn = a.miner.L1Chain().CurrentBlock().Number.Uint64()

Expand Down
11 changes: 11 additions & 0 deletions op-node/rollup/derive/altda_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (s *AltDADataSource) Next(ctx context.Context) (eth.Data, error) {
}
// use the commitment to fetch the input from the AltDA provider.
data, err := s.fetcher.GetInput(ctx, s.l1, s.comm, s.id)
var dropEigenDACommitmentError altda.DropEigenDACommitmentError
// ========================= vvv keccak commitment errors ===========================
// GetInput may call for a reorg if the pipeline is stalled and the AltDA manager
// continued syncing origins detached from the pipeline origin.
if errors.Is(err, altda.ErrReorgRequired) {
Expand All @@ -91,6 +93,15 @@ func (s *AltDADataSource) Next(ctx context.Context) (eth.Data, error) {
} else if errors.Is(err, altda.ErrPendingChallenge) {
// continue stepping without slowing down.
return nil, NotEnoughData
// ========================= ^^^ keccak commitment errors ===========================
// ========================= vvv eigenDA commitment errors ===========================
} else if errors.As(err, &dropEigenDACommitmentError) {
// DropEigenDACommitmentError is the only error that can lead to a cert being dropped from the derivation pipeline.
// Any other error should be retried.
s.log.Warn("dropping invalid commitment", "comm", s.comm, "err", err)
s.comm = nil
return s.Next(ctx) // skip the input
// ========================= ^^^ eigenDA commitment errors ===========================
} else if err != nil {
// return temporary error so we can keep retrying.
return nil, NewTemporaryError(fmt.Errorf("failed to fetch input data with comm %s from da service: %w", s.comm, err))
Expand Down