Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(batcher): altda->ethda failover #24

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions op-alt-da/daclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ var ErrNotFound = errors.New("not found")
// ErrInvalidInput is returned when the input is not valid for posting to the DA storage.
var ErrInvalidInput = errors.New("invalid input")

// ErrAltDADown is returned when the alt DA returns a 503 status code.
// It is used to signify that the alt DA is down and the client should failover to the eth DA.
// See https://github.com/ethereum-optimism/specs/issues/434
var ErrAltDADown = errors.New("alt DA is down: failover to eth DA")

// DAClient is an HTTP client to communicate with a DA storage service.
// It creates commitments and retrieves input data + verifies if needed.
type DAClient struct {
Expand Down Expand Up @@ -131,6 +136,9 @@ func (c *DAClient) setInput(ctx context.Context, img []byte) (CommitmentData, er
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusServiceUnavailable {
return nil, ErrAltDADown
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to store data: %v", resp.StatusCode)
}
Expand Down
18 changes: 16 additions & 2 deletions op-alt-da/damock.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,16 @@ func (d *AltDADisabled) AdvanceL1Origin(ctx context.Context, l1 L1Fetcher, block
}

// FakeDAServer is a fake DA server for e2e tests.
// It is a small wrapper around DAServer that allows for setting request latencies,
// to mimic a DA service with slow responses (eg. eigenDA with 10 min batching interval).
// It is a small wrapper around DAServer that allows for setting:
// - request latencies, to mimic a DA service with slow responses
// (eg. eigenDA with 10 min batching interval).
// - response status codes, to mimic a DA service that is down.
type FakeDAServer struct {
*DAServer
putRequestLatency time.Duration
getRequestLatency time.Duration
// next failoverCount Put requests will return 503 status code for failover testing
failoverCount uint64
}

func NewFakeDAServer(host string, port int, log log.Logger) *FakeDAServer {
Expand All @@ -130,6 +134,11 @@ func (s *FakeDAServer) HandleGet(w http.ResponseWriter, r *http.Request) {

func (s *FakeDAServer) HandlePut(w http.ResponseWriter, r *http.Request) {
time.Sleep(s.putRequestLatency)
if s.failoverCount > 0 {
w.WriteHeader(http.StatusServiceUnavailable)
s.failoverCount--
return
}
s.DAServer.HandlePut(w, r)
}

Expand All @@ -154,6 +163,11 @@ func (s *FakeDAServer) SetGetRequestLatency(latency time.Duration) {
s.getRequestLatency = latency
}

// SetResponseStatusForNRequests sets the next n Put requests to return 503 status code.
func (s *FakeDAServer) SetPutFailoverForNRequests(n uint64) {
s.failoverCount = n
}

type MemStore struct {
db map[string][]byte
lock sync.RWMutex
Expand Down
26 changes: 18 additions & 8 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func newChannel(log log.Logger, metr metrics.Metricer, cfg ChannelConfig, rollup
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (c *channel) TxFailed(id string) {
// in the failed transaction. failoverToEthDA should be set to true when using altDA
// and altDA is down. This will switch the channel to submit frames to ethDA instead.
func (c *channel) TxFailed(id string, failoverToEthDA bool) {
if data, ok := c.pendingTransactions[id]; ok {
c.log.Trace("marked transaction as failed", "id", id)
// Rewind to the first frame of the failed tx
Expand All @@ -57,7 +58,16 @@ func (c *channel) TxFailed(id string) {
} else {
c.log.Warn("unknown transaction marked as failed", "id", id)
}

if failoverToEthDA {
// We failover to calldata txs because in altda mode the channel and channelManager
// are configured to use a calldataConfigManager, as opposed to DynamicEthChannelConfig
// which can use both calldata and blobs. Failover should happen extremely rarely,
// and is only used while the altDA is down, so we can afford to be inefficient here.
// TODO: figure out how to switch to blobs/auto instead. Might need to make
// batcherService.initChannelConfig function stateless so that we can reuse it.
c.log.Info("Failing over to calldata txs", "id", c.ID())
c.cfg.DaType = DaTypeCalldata
}
c.metr.RecordBatchTxFailed()
}

Expand Down Expand Up @@ -125,29 +135,29 @@ func (c *channel) ID() derive.ChannelID {
}

// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet.
// If cfg.UseBlobs is false, it returns txData with a single frame.
// If cfg.UseBlobs is true, it will read frames from its channel builder
// If cfg.DaType == DaTypeCalldata, it returns txData with a single frame.
// Else when cfg.DaType == DaTypeBlob or DaTypeAltDA, it will read frames from its channel builder
// until it either doesn't have more frames or the target number of frames is reached.
//
// NextTxData should only be called after HasTxData returned true.
func (c *channel) NextTxData() txData {
nf := c.cfg.MaxFramesPerTx()
txdata := txData{frames: make([]frameData, 0, nf), asBlob: c.cfg.UseBlobs}
txdata := txData{frames: make([]frameData, 0, nf), daType: c.cfg.DaType}
for i := 0; i < nf && c.channelBuilder.HasPendingFrame(); i++ {
frame := c.channelBuilder.NextFrame()
txdata.frames = append(txdata.frames, frame)
}

id := txdata.ID().String()
c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "as_blob", txdata.asBlob)
c.log.Debug("returning next tx data", "id", id, "num_frames", len(txdata.frames), "da_type", txdata.daType)
c.pendingTransactions[id] = txdata

return txdata
}

func (c *channel) HasTxData() bool {
if c.IsFull() || // If the channel is full, we should start to submit it
!c.cfg.UseBlobs { // If using calldata, we only send one frame per tx
c.cfg.DaType == DaTypeCalldata { // If using calldata, we only send one frame per tx
return c.channelBuilder.HasPendingFrame()
}
// Collect enough frames if channel is not full yet
Expand Down
11 changes: 7 additions & 4 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ type ChannelConfig struct {
// BatchType indicates whether the channel uses SingularBatch or SpanBatch.
BatchType uint

// UseBlobs indicates that this channel should be sent as a multi-blob
// transaction with one blob per frame.
UseBlobs bool
// DaType indicates how the frames in this channel should be sent to the L1.
DaType DaType
}

func (cc ChannelConfig) UseBlobs() bool {
return cc.DaType == DaTypeBlob
}

// ChannelConfig returns a copy of the receiver.
Expand Down Expand Up @@ -93,7 +96,7 @@ func (cc *ChannelConfig) ReinitCompressorConfig() {
}

func (cc *ChannelConfig) MaxFramesPerTx() int {
if !cc.UseBlobs {
if cc.DaType == DaTypeCalldata {
return 1
}
return cc.TargetNumFrames
Expand Down
3 changes: 2 additions & 1 deletion op-batcher/batcher/channel_config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func TestDynamicEthChannelConfig_ChannelConfig(t *testing.T) {
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
DaType: DaTypeCalldata,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
DaType: DaTypeBlob,
}

tests := []struct {
Expand Down
17 changes: 9 additions & 8 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ func (s *channelManager) pendingBlocks() int {
}

// TxFailed records a transaction as failed. It will attempt to resubmit the data
// in the failed transaction.
func (s *channelManager) TxFailed(_id txID) {
// in the failed transaction. failoverToEthDA should be set to true when using altDA
// and altDA is down. This will switch the channel to submit frames to ethDA instead.
func (s *channelManager) TxFailed(_id txID, failoverToEthDA bool) {
id := _id.String()
if channel, ok := s.txChannels[id]; ok {
delete(s.txChannels, id)
channel.TxFailed(id)
channel.TxFailed(id, failoverToEthDA)
} else {
s.log.Warn("transaction from unknown channel marked as failed", "id", id)
}
Expand Down Expand Up @@ -207,16 +208,16 @@ func (s *channelManager) TxData(l1Head eth.BlockID, isPectra bool) (txData, erro
newCfg := s.cfgProvider.ChannelConfig(isPectra)

// No change:
if newCfg.UseBlobs == s.defaultCfg.UseBlobs {
if newCfg.UseBlobs() == s.defaultCfg.UseBlobs() {
s.log.Debug("Recomputing optimal ChannelConfig: no need to switch DA type",
"useBlobs", s.defaultCfg.UseBlobs)
"useBlobs", s.defaultCfg.UseBlobs())
return s.nextTxData(channel)
}

// Change:
s.log.Info("Recomputing optimal ChannelConfig: changing DA type and requeing blocks...",
"useBlobsBefore", s.defaultCfg.UseBlobs,
"useBlobsAfter", newCfg.UseBlobs)
"useBlobsBefore", s.defaultCfg.UseBlobs(),
"useBlobsAfter", newCfg.UseBlobs())

// Invalidate the channel so its blocks
// get requeued:
Expand Down Expand Up @@ -317,7 +318,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
"compression_algo", cfg.CompressorConfig.CompressionAlgo,
"target_num_frames", cfg.TargetNumFrames,
"max_frame_size", cfg.MaxFrameSize,
"use_blobs", cfg.UseBlobs,
"da_type", cfg.DaType.String(),
)
s.metr.RecordChannelOpened(pc.ID(), s.pendingBlocks())

Expand Down
11 changes: 6 additions & 5 deletions op-batcher/batcher/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func ChannelManager_TxResend(t *testing.T, batchType uint) {
require.ErrorIs(err, io.EOF)

// requeue frame
m.TxFailed(txdata0.ID())
m.TxFailed(txdata0.ID(), false)

txdata1, err := m.TxData(eth.BlockID{}, false)
require.NoError(err)
Expand Down Expand Up @@ -290,11 +290,12 @@ func newFakeDynamicEthChannelConfig(lgr log.Logger,
calldataCfg := ChannelConfig{
MaxFrameSize: 120_000 - 1,
TargetNumFrames: 1,
DaType: DaTypeCalldata,
}
blobCfg := ChannelConfig{
MaxFrameSize: eth.MaxBlobDataSize - 1,
TargetNumFrames: 3, // gets closest to amortized fixed tx costs
UseBlobs: true,
DaType: DaTypeBlob,
}
calldataCfg.InitNoneCompressor()
blobCfg.InitNoneCompressor()
Expand Down Expand Up @@ -348,7 +349,7 @@ func TestChannelManager_TxData(t *testing.T) {

cfg.chooseBlobs = tc.chooseBlobsWhenChannelCreated
m := NewChannelManager(l, metrics.NoopMetrics, cfg, defaultTestRollupConfig)
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.UseBlobs)
require.Equal(t, tc.chooseBlobsWhenChannelCreated, m.defaultCfg.DaType == DaTypeBlob)

// Seed channel manager with a block
rng := rand.New(rand.NewSource(99))
Expand Down Expand Up @@ -385,8 +386,8 @@ func TestChannelManager_TxData(t *testing.T) {
}

require.Equal(t, tc.numExpectedAssessments, cfg.assessments)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.asBlob)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.UseBlobs)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, data.daType == DaTypeBlob)
require.Equal(t, tc.chooseBlobsWhenChannelSubmitted, m.defaultCfg.DaType == DaTypeBlob)
})
}

Expand Down
8 changes: 4 additions & 4 deletions op-batcher/batcher/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestChannel_NextTxData_singleFrameTx(t *testing.T) {
const n = 6
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: false,
DaType: DaTypeCalldata,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestChannel_NextTxData_multiFrameTx(t *testing.T) {
const n = eth.MaxBlobsPerBlobTx
lgr := testlog.Logger(t, log.LevelWarn)
ch, err := newChannelWithChannelOut(lgr, metrics.NoopMetrics, ChannelConfig{
UseBlobs: true,
DaType: DaTypeBlob,
TargetNumFrames: n,
CompressorConfig: compressor.Config{
CompressionAlgo: derive.Zlib,
Expand Down Expand Up @@ -305,13 +305,13 @@ func TestChannelTxFailed(t *testing.T) {

// Trying to mark an unknown pending transaction as failed
// shouldn't modify state
m.TxFailed(zeroFrameTxID(0))
m.TxFailed(zeroFrameTxID(0), false)
require.Equal(t, 0, m.currentChannel.PendingFrames())
require.Equal(t, expectedTxData, m.currentChannel.pendingTransactions[expectedChannelID.String()])

// Now we still have a pending transaction
// Let's mark it as failed
m.TxFailed(expectedChannelID)
m.TxFailed(expectedChannelID, false)
require.Empty(t, m.currentChannel.pendingTransactions)
// There should be a frame in the pending channel now
require.Equal(t, 1, m.currentChannel.PendingFrames())
Expand Down
36 changes: 16 additions & 20 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,14 +780,6 @@ func (l *BatchSubmitter) cancelBlockingTx(queue *txmgr.Queue[txRef], receiptsCh

// publishToAltDAAndL1 posts the txdata to the DA Provider and then sends the commitment to L1.
func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) {
// sanity checks
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
if txdata.asBlob {
l.Log.Crit("Unexpected blob txdata with AltDA enabled")
}

// when posting txdata to an external DA Provider, we use a goroutine to avoid blocking the main loop
// since it may take a while for the request to return.
goroutineSpawned := daGroup.TryGo(func() error {
Expand Down Expand Up @@ -827,29 +819,32 @@ func (l *BatchSubmitter) publishToAltDAAndL1(txdata txData, queue *txmgr.Queue[t
// The method will block if the queue's MaxPendingTransactions is exceeded.
func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef], receiptsCh chan txmgr.TxReceipt[txRef], daGroup *errgroup.Group) error {
var err error

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if l.Config.UseAltDA {
var candidate *txmgr.TxCandidate
switch txdata.daType {
case DaTypeAltDA:
if !l.Config.UseAltDA {
l.Log.Crit("Received AltDA type txdata without AltDA being enabled")
}
// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
l.publishToAltDAAndL1(txdata, queue, receiptsCh, daGroup)
// we return nil to allow publishStateToL1 to keep processing the next txdata
return nil
}

var candidate *txmgr.TxCandidate
if txdata.asBlob {
case DaTypeBlob:
if candidate, err = l.blobTxCandidate(txdata); err != nil {
// We could potentially fall through and try a calldata tx instead, but this would
// likely result in the chain spending more in gas fees than it is tuned for, so best
// to just fail. We do not expect this error to trigger unless there is a serious bug
// or configuration issue.
return fmt.Errorf("could not create blob tx candidate: %w", err)
}
} else {
case DaTypeCalldata:
// sanity check
if nf := len(txdata.frames); nf != 1 {
l.Log.Crit("Unexpected number of frames in calldata tx", "num_frames", nf)
}
candidate = l.calldataTxCandidate(txdata.CallData())
default:
l.Log.Crit("Unknown DA type", "da_type", txdata.daType)
}

l.sendTx(txdata, false, candidate, queue, receiptsCh)
Expand All @@ -867,7 +862,7 @@ func (l *BatchSubmitter) sendTx(txdata txData, isCancel bool, candidate *txmgr.T
candidate.GasLimit = intrinsicGas
}

queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.asBlob}, *candidate, receiptsCh)
queue.Send(txRef{id: txdata.ID(), isCancel: isCancel, isBlob: txdata.daType == DaTypeBlob}, *candidate, receiptsCh)
}

func (l *BatchSubmitter) blobTxCandidate(data txData) (*txmgr.TxCandidate, error) {
Expand Down Expand Up @@ -906,17 +901,18 @@ func (l *BatchSubmitter) handleReceipt(r txmgr.TxReceipt[txRef]) {
func (l *BatchSubmitter) recordFailedDARequest(id txID, err error) {
l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
failover := errors.Is(err, altda.ErrAltDADown)
if err != nil {
l.Log.Warn("DA request failed", logFields(id, err)...)
l.Log.Warn("DA request failed", append([]interface{}{"failoverToEthDA", failover}, logFields(id, err)...)...)
}
l.channelMgr.TxFailed(id)
l.channelMgr.TxFailed(id, failover)
}

func (l *BatchSubmitter) recordFailedTx(id txID, err error) {
l.channelMgrMutex.Lock()
defer l.channelMgrMutex.Unlock()
l.Log.Warn("Transaction failed to send", logFields(id, err)...)
l.channelMgr.TxFailed(id)
l.channelMgr.TxFailed(id, false)
}

func (l *BatchSubmitter) recordConfirmedTx(id txID, receipt *types.Receipt) {
Expand Down
Loading