Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(batcher): altda concurrent blob submissions respect holocene strict ordering rules #14

Closed
8 changes: 6 additions & 2 deletions op-alt-da/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ func (c CLIConfig) Check() error {
return nil
}

func (c CLIConfig) NewDAClient() *DAClient {
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}
func (c CLIConfig) NewDAClient() (*DAClient, error) {
err := c.Check()
if err != nil {
return nil, err
}
return &DAClient{url: c.DAServerURL, verify: c.VerifyOnRead, precompute: !c.GenericDA, getTimeout: c.GetTimeout, putTimeout: c.PutTimeout}, nil
}

func ReadCLIConfig(c *cli.Context) CLIConfig {
Expand Down
6 changes: 4 additions & 2 deletions op-alt-da/daclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ func TestDAClientPrecomputed(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

Expand Down Expand Up @@ -85,7 +86,8 @@ func TestDAClientService(t *testing.T) {
}
require.NoError(t, cfg.Check())

client := cfg.NewDAClient()
client, err := cfg.NewDAClient()
require.NoError(t, err)

rng := rand.New(rand.NewSource(1234))

Expand Down
8 changes: 6 additions & 2 deletions op-alt-da/damgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ type DA struct {
}

// NewAltDA creates a new AltDA instance with the given log and CLIConfig.
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) *DA {
return NewAltDAWithStorage(log, cfg, cli.NewDAClient(), metrics)
func NewAltDA(log log.Logger, cli CLIConfig, cfg Config, metrics Metricer) (*DA, error) {
daClient, err := cli.NewDAClient()
if err != nil {
return nil, err
}
return NewAltDAWithStorage(log, cfg, daClient, metrics), nil
}

// NewAltDAWithStorage creates a new AltDA instance with the given log and DAStorage interface.
Expand Down
108 changes: 101 additions & 7 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package altda

import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"sync"
Expand All @@ -16,22 +18,53 @@ import (
)

// MockDAClient mocks a DA storage provider to avoid running an HTTP DA server
// in unit tests.
// in unit tests. MockDAClient is goroutine-safe.
type MockDAClient struct {
CommitmentType CommitmentType
store ethdb.KeyValueStore
log log.Logger
mu sync.Mutex
CommitmentType CommitmentType
GenericCommitmentCount uint16 // next generic commitment (use counting commitment instead of hash to help with testing)
store ethdb.KeyValueStore
StoreCount int
log log.Logger
dropEveryNthPut uint // 0 means nothing gets dropped, 1 means every put errors, etc.
setInputRequestCount uint // number of put requests received, irrespective of whether they were successful
}

func NewMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
CommitmentType: Keccak256CommitmentType,
store: memorydb.New(),
StoreCount: 0,
log: log,
}
}

// NewCountingGenericCommitmentMockDAClient creates a MockDAClient that uses counting commitments.
// It's commitments are big-endian encoded uint16s of 0, 1, 2, etc. instead of actual hash or altda-layer related commitments.
// Used for testing to make sure we receive commitments in order following Holocene strict ordering rules.
func NewCountingGenericCommitmentMockDAClient(log log.Logger) *MockDAClient {
return &MockDAClient{
CommitmentType: GenericCommitmentType,
GenericCommitmentCount: 0,
store: memorydb.New(),
StoreCount: 0,
log: log,
}
}

// Fakes a da server that drops/errors on every Nth put request.
// Useful for testing the batcher's error handling.
// 0 means nothing gets dropped, 1 means every put errors, etc.
func (c *MockDAClient) DropEveryNthPut(n uint) {
c.mu.Lock()
defer c.mu.Unlock()
c.dropEveryNthPut = n
}

func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Getting input", "key", key)
bytes, err := c.store.Get(key.Encode())
if err != nil {
return nil, ErrNotFound
Expand All @@ -40,12 +73,42 @@ func (c *MockDAClient) GetInput(ctx context.Context, key CommitmentData) ([]byte
}

func (c *MockDAClient) SetInput(ctx context.Context, data []byte) (CommitmentData, error) {
key := NewCommitmentData(c.CommitmentType, data)
return key, c.store.Put(key.Encode(), data)
c.mu.Lock()
defer c.mu.Unlock()
c.setInputRequestCount++
var key CommitmentData
if c.CommitmentType == GenericCommitmentType {
countCommitment := make([]byte, 2)
binary.BigEndian.PutUint16(countCommitment, c.GenericCommitmentCount)
key = NewGenericCommitment(countCommitment)
} else {
key = NewKeccak256Commitment(data)
}
var action string = "put"
if c.dropEveryNthPut > 0 && c.setInputRequestCount%c.dropEveryNthPut == 0 {
action = "dropped"
}
c.log.Debug("Setting input", "action", action, "key", key, "data", fmt.Sprintf("%x", data))
if action == "dropped" {
return nil, errors.New("put dropped")
}
err := c.store.Put(key.Encode(), data)
if err == nil {
c.GenericCommitmentCount++
c.StoreCount++
}
return key, err
}

func (c *MockDAClient) DeleteData(key []byte) error {
return c.store.Delete(key)
c.mu.Lock()
defer c.mu.Unlock()
c.log.Debug("Deleting data", "key", key)
err := c.store.Delete(key)
if err == nil {
c.StoreCount--
}
return err
}

type DAErrFaker struct {
Expand Down Expand Up @@ -111,6 +174,12 @@ type FakeDAServer struct {
*DAServer
putRequestLatency time.Duration
getRequestLatency time.Duration
// outOfOrderResponses is a flag that, when set, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
outOfOrderResponses bool
oooMu sync.Mutex
oooWaitChan chan struct{}
}

func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
Expand All @@ -130,6 +199,21 @@ func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) {

func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
time.Sleep(s.putRequestLatency)
if s.outOfOrderResponses {
s.oooMu.Lock()
if s.oooWaitChan == nil {
s.log.Info("Received put request while in out-of-order mode, waiting for next request")
s.oooWaitChan = make(chan struct{})
s.oooMu.Unlock()
<-s.oooWaitChan
time.Sleep(1 * time.Second)
} else {
s.log.Info("Received second put request in out-of-order mode, responding to this one first, then the first one")
close(s.oooWaitChan)
s.oooWaitChan = nil
s.oooMu.Unlock()
}
}
s.DAServer.HandlePut(w, r)
}

Expand All @@ -147,13 +231,23 @@ func (s *FakeDAServer) Start() error {
}

func (s *FakeDAServer) SetPutRequestLatency(latency time.Duration) {
s.log.Info("Setting put request latency", "latency", latency)
s.putRequestLatency = latency
}

func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.log.Info("Setting get request latency", "latency", latency)
s.getRequestLatency = latency
}

// When ooo=true, causes the server to send responses out of order.
// It will only respond to pairs of request, returning the second response first, and waiting 1 second before sending the first response.
// This is used to test the batcher's ability to handle out of order responses, while still ensuring holocene's strict ordering rules.
func (s *FakeDAServer) SetOutOfOrderResponses(ooo bool) {
s.log.Info("Setting out of order responses", "ooo", ooo)
s.outOfOrderResponses = ooo
}

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
Expand Down
65 changes: 65 additions & 0 deletions op-alt-da/damock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package altda

import (
"net/http/httptest"
"sync"
"testing"
"time"

"github.com/ethereum-optimism/optimism/op-service/testlog"
"github.com/ethereum/go-ethereum/log"
)

func TestFakeDAServer_OutOfOrderResponses(t *testing.T) {
logger := testlog.Logger(t, log.LevelDebug)
daServer := NewFakeDAServer("localhost", 0, logger)
daServer.SetOutOfOrderResponses(true)

// Channel to track completion order
completionOrder := make(chan int, 2)

// Start two concurrent requests
var wg sync.WaitGroup
wg.Add(2)

// First request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 1
}()

// Small delay to ensure first request starts first
time.Sleep(100 * time.Millisecond)

// Second request
go func() {
defer wg.Done()
w := httptest.NewRecorder()
r := httptest.NewRequest("PUT", "/data", nil)

daServer.HandlePut(w, r)
completionOrder <- 2
}()

// Wait for both requests to complete
wg.Wait()
close(completionOrder)

// Check completion order
var order []int
for n := range completionOrder {
order = append(order, n)
}

// Second request should complete before first
if len(order) != 2 {
t.Fatalf("expected 2 requests to complete, got %d", len(order))
}
if order[0] != 2 || order[1] != 1 {
t.Errorf("expected completion order [2,1], got %v", order)
}
}
Loading