引言

代码基于 optimism 最新的 bedrock 版本。

Driver 几个

主要逻辑

Driver.eventLoop 是一个事件循环,用于同步 L1 层事件以及内部定时器来产生 L2 block。

step

func (s *state) loop() {
    ...
    // stepReqCh is used to request that the driver attempts to step forward by one L1 block.
    stepReqCh := make(chan struct{}, 1)
    ...
    // reqStep requests that a driver step be taken. Won't deadlock if the channel is full.
    // TODO: Rename step request
    reqStep := func() {
        select {
        case stepReqCh <- struct{}{}:
        // Don't deadlock if the channel is already full
        default:
        }
    }

    // We call reqStep right away to finish syncing to the tip of the chain if we're behind.
    // reqStep will also be triggered when the L1 head moves forward or if there was a reorg on the
    // L1 chain that we need to handle.
    reqStep()

    for {
        select {
        ...
        case <-stepReqCh:
            s.metrics.SetDerivationIdle(false)
            s.idleDerivation = false
            s.log.Debug("Derivation process step", "onto_origin", s.derivation.Origin(), "attempts", stepAttempts)
            err := s.derivation.Step(context.Background())
            stepAttempts += 1 // count as attempt by default. We reset to 0 if we are making healthy progress.
            if err == io.EOF {
                s.log.Debug("Derivation process went idle", "progress", s.derivation.Origin())
                s.idleDerivation = true
                stepAttempts = 0
                s.metrics.SetDerivationIdle(true)
                continue
            } else if err != nil && errors.Is(err, derive.ErrReset) {
                // If the pipeline corrupts, e.g. due to a reorg, simply reset it
                s.log.Warn("Derivation pipeline is reset", "err", err)
                s.derivation.Reset()
                s.metrics.RecordPipelineReset()
                continue
            } else if err != nil && errors.Is(err, derive.ErrTemporary) {
                s.log.Warn("Derivation process temporary error", "attempts", stepAttempts, "err", err)
                reqStep()
                continue
            } else if err != nil && errors.Is(err, derive.ErrCritical) {
                s.log.Error("Derivation process critical error", "err", err)
                return
            } else if err != nil && errors.Is(err, derive.NotEnoughData) {
                stepAttempts = 0 // don't do a backoff for this error
                reqStep()
                continue
            } else if err != nil {
                s.log.Error("Derivation process error", "attempts", stepAttempts, "err", err)
                reqStep()
                continue
            } else {
                stepAttempts = 0
                reqStep() // continue with the next step if we can
            }
            ...
        }
    }
}

loop 启动后就会执行一次 reqStep,如果:

  • op-geth 开启 p2p 同步情况下, s.derivation.Step 将触发 L2 状态进行 p2p 同步到最新的 state(happy-path sync1)。
  • op-geth 开启 p2p 同步情况下,应该怎么处理?(todo)

创建新块

// createNewL2Block builds a L2 block on top of the L2 Head (unsafe). Used by Sequencer nodes to
// construct new L2 blocks. Verifier nodes will use handleEpoch instead.
func (s *Driver) createNewL2Block(ctx context.Context) error {
    l2Head := s.derivation.UnsafeL2Head()
    // Actually create the new block.
    newUnsafeL2Head, payload, err := s.sequencer.CreateNewBlock(ctx, l2Head, l2Safe.ID(), l2Finalized.ID(), l1Origin)

    // Update our L2 head block based on the new unsafe block we just generated.
    s.derivation.SetUnsafeHead(newUnsafeL2Head)

    s.log.Info("Sequenced new l2 block", "l2_unsafe", newUnsafeL2Head, "l1_origin", newUnsafeL2Head.L1Origin, "txs", len
(payload.Transactions), "time", newUnsafeL2Head.Time)
    s.metrics.CountSequencedTxs(len(payload.Transactions))

    if s.network != nil {
        if err := s.network.PublishL2Payload(ctx, payload); err != nil {
            s.log.Warn("failed to publish newly created block", "id", payload.ID(), "err", err)
            s.metrics.RecordPublishingError()
            // publishing of unsafe data via p2p is optional. Errors are not severe enough to change/halt sequencing
but should be logged and metered.
        }
    }

    return nil
}

收到广播的 payload

func (s *Driver) OnUnsafeL2Payload(ctx context.Context, payload *eth.ExecutionPayload) error {
    select {
    case <-ctx.Done():
        return ctx.Err()
    case s.unsafeL2Payloads <- payload:
        return nil
    }
}

op-node 通过 OnUnsafeL2Payload 处理广播的 unsafePayload,最终交给 EngineQueue.Step 插入到到 L2 上。

func (eq *EngineQueue) Step(ctx context.Context) error {
    ...
    if eq.unsafePayloads.Len() > 0 {
        return eq.tryNextUnsafePayload(ctx)
    }
    ...
}

总结