Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions cmd/bb_worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ func main() {
// currently only used by the virtual file system to store
// output files of build actions. Going forward, this may be
// used to store core dumps generated by build actions as well.
filePool, err := pool.NewFilePoolFromConfiguration(configuration.FilePool)
filePoolFactory, baseSectorAllocator, sectorSizeBytes, err := pool.NewFilePoolFactoryFromConfiguration(configuration.FilePool)
if err != nil {
return util.StatusWrap(err, "Failed to create file pool")
return util.StatusWrap(err, "Failed to create file pool factory")
}

// Storage access.
Expand Down Expand Up @@ -490,13 +490,21 @@ func main() {
return util.StatusWrapf(err, "Invalid instance name prefix %#v", runnerConfiguration.InstanceNamePrefix)
}

maximumSectors := runnerConfiguration.MaximumFilePoolSizeBytes / uint64(sectorSizeBytes)
runnerFilePool := pool.NewQuotaEnforcingFilePool(
filePoolFactory.NewFilePool(
pool.NewQuotaEnforcingSectorAllocator(
baseSectorAllocator,
int64(maximumSectors),
),
),
int64(runnerConfiguration.MaximumFilePoolFileCount),
)

buildClient := builder.NewBuildClient(
schedulerClient,
buildExecutor,
pool.NewQuotaEnforcingFilePool(
filePool,
runnerConfiguration.MaximumFilePoolFileCount,
runnerConfiguration.MaximumFilePoolSizeBytes),
runnerFilePool,
clock.SystemClock,
workerID,
instanceNamePrefix,
Expand Down
5 changes: 5 additions & 0 deletions pkg/filesystem/pool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ go_library(
"hole_source.go",
"metrics_file_pool.go",
"quota_enforcing_file_pool.go",
"quota_enforcing_sector_allocator.go",
"quota_metric.go",
"sector_allocator.go",
"sector_map.go",
],
importpath = "github.com/buildbarn/bb-remote-execution/pkg/filesystem/pool",
visibility = ["//visibility:public"],
Expand All @@ -33,6 +36,8 @@ go_test(
"block_device_backed_file_pool_test.go",
"empty_file_pool_test.go",
"quota_enforcing_file_pool_test.go",
"quota_enforcing_sector_allocator_test.go",
"sector_map_test.go",
],
deps = [
":pool",
Expand Down
195 changes: 80 additions & 115 deletions pkg/filesystem/pool/block_device_backed_file_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,20 @@ func (fp *blockDeviceBackedFilePool) NewFile(holeSource HoleSource, size uint64)
fp: fp,
holeSource: holeSource,
sizeBytes: size,
sm: SectorMap{},
}, nil
}

type blockDeviceBackedFile struct {
fp *blockDeviceBackedFilePool
holeSource HoleSource
sizeBytes uint64
sectors []uint32
sm SectorMap
}

func (f *blockDeviceBackedFile) Close() error {
if len(f.sectors) > 0 {
f.fp.sectorAllocator.FreeList(f.sectors)
}
f.sm.FreeSectors(0, f.fp.sectorAllocator.FreeList)
f.fp = nil
f.sectors = nil
err := f.holeSource.Close()
f.holeSource = nil
return err
Expand All @@ -66,44 +64,48 @@ func (f *blockDeviceBackedFile) toDeviceOffset(sector uint32, offsetWithinSector

// getInitialSectorIndex is called by ReadAt() and WriteAt() to
// determine which sectors in a file are affected by the operation.
func (f *blockDeviceBackedFile) getInitialSectorIndex(off int64, n int) (int, int, int) {
firstSectorIndex := int(off / int64(f.fp.sectorSizeBytes))
endSectorIndex := int((uint64(off) + uint64(n) + uint64(f.fp.sectorSizeBytes) - 1) / uint64(f.fp.sectorSizeBytes))
if endSectorIndex > len(f.sectors) {
endSectorIndex = len(f.sectors)
}
func (f *blockDeviceBackedFile) getInitialSectorIndex(off int64, n int) (uint64, uint64, int) {
firstSectorIndex := uint64(off / int64(f.fp.sectorSizeBytes))
endSectorIndex := (uint64(off) + uint64(n) + uint64(f.fp.sectorSizeBytes) - 1) / uint64(f.fp.sectorSizeBytes)
offsetWithinSector := int(off % int64(f.fp.sectorSizeBytes))
return firstSectorIndex, endSectorIndex - 1, offsetWithinSector
}

// incrementSectorIndex is called by ReadAt() and WriteAt() to progress
// to the next sequence of contiguously stored sectors.
func (f *blockDeviceBackedFile) incrementSectorIndex(sectorIndex, offsetWithinSector *int, n int) {
func (f *blockDeviceBackedFile) incrementSectorIndex(sectorIndex *uint64, offsetWithinSector *int, n int) {
if (*offsetWithinSector+n)%f.fp.sectorSizeBytes != 0 {
panic("Read or write did not finish at sector boundary")
}
*sectorIndex += (*offsetWithinSector + n) / f.fp.sectorSizeBytes
*sectorIndex += uint64((*offsetWithinSector + n) / f.fp.sectorSizeBytes)
*offsetWithinSector = 0
}

// getSectorsContiguous converts an index of a sector in a file to the
// on-disk sector number. It also computes how many sectors are stored
// contiguously starting at this point.
func (f *blockDeviceBackedFile) getSectorsContiguous(firstSectorIndex, lastSectorIndex int) (uint32, int) {
firstSector := f.sectors[firstSectorIndex]
nContiguous := 1
func (f *blockDeviceBackedFile) getSectorsContiguous(firstSectorIndex, lastSectorIndex uint64) (uint32, int) {
firstSector := f.sm.GetPhysicalIndex(firstSectorIndex)
nContiguous := 0
if firstSector == 0 {
// A hole in a sparse file. Determine the size of the hole.
for firstSectorIndex+nContiguous <= lastSectorIndex &&
f.sectors[firstSectorIndex+nContiguous] == 0 {
nContiguous++
nextMapped, err := f.sm.GetNextMappedSector(firstSectorIndex)
if err == io.EOF {
nextMapped = lastSectorIndex + 1
} else if err != nil {
panic(fmt.Errorf("unexpected error from SectorMap.GetNextMappedSector: %w"))
}
nextMapped = min(nextMapped, lastSectorIndex+1)
nContiguous = int(nextMapped - firstSectorIndex)
} else {
// A region that contains actual data. Determine how
// many sectors are contiguous.
for firstSectorIndex+nContiguous <= lastSectorIndex &&
uint64(f.sectors[firstSectorIndex+nContiguous]) == uint64(firstSector)+uint64(nContiguous) {
nContiguous++
n := lastSectorIndex - firstSectorIndex + 1
for i := uint64(0); i < n; i++ {
if uint64(f.sm.GetPhysicalIndex(i+firstSectorIndex)) != uint64(firstSector)+i {
break
}
nContiguous = int(i + 1)
}
}
return firstSector, nContiguous
Expand All @@ -129,72 +131,70 @@ func (f *blockDeviceBackedFile) GetNextRegionOffset(off int64, regionType filesy
}

sectorSizeBytes := int64(f.fp.sectorSizeBytes)
sectorIndex := uint64(off / sectorSizeBytes)
switch regionType {
case filesystem.Data:
sectorIndex := int(off / sectorSizeBytes)
if sectorIndex >= len(f.sectors) {
// Inside the hole at the end of the file.
return f.holeSource.GetNextRegionOffset(off, filesystem.Data)
}
if f.sectors[sectorIndex] != 0 {
if f.sm.GetPhysicalIndex(sectorIndex) != 0 {
// Already inside a sector containing data.
return off, nil
}

// Find the next sector containing data.
sectorIndex++
for f.sectors[sectorIndex] == 0 {
sectorIndex++
sectorIndex, err := f.sm.GetNextMappedSector(sectorIndex)
if err == io.EOF {
// No more mapped sectors. Defer to holeSource.
return f.holeSource.GetNextRegionOffset(off, filesystem.Data)
} else if err != nil {
return 0, status.Errorf(codes.Internal, "Failed to get next mapped sector: %v", err)
}
sectorOffsetBytes := int64(sectorIndex) * sectorSizeBytes

// Also consider data provided by the hole source.
holeSourceOffsetBytes, err := f.holeSource.GetNextRegionOffset(off, filesystem.Data)
if err != nil {
if err == io.EOF {
return sectorOffsetBytes, nil
}
return 0, err
if err == io.EOF {
return sectorOffsetBytes, nil
} else if err != nil {
return 0, status.Errorf(codes.Internal, "Failed to get next mapped region from hole source: %v", err)
}
return min(sectorOffsetBytes, holeSourceOffsetBytes), nil
case filesystem.Hole:
for {
// Progress to the next hole in the file.
sectorIndex := int(off / sectorSizeBytes)
if sectorIndex < len(f.sectors) && f.sectors[sectorIndex] != 0 {
for sectorIndex++; sectorIndex < len(f.sectors); sectorIndex++ {
if f.sectors[sectorIndex] == 0 {
break
}
}
off = int64(sectorIndex) * sectorSizeBytes
}
if uint64(off) >= f.sizeBytes {
sectorIndex := uint64(off / sectorSizeBytes)
index, err := f.sm.GetNextUnmappedSector(sectorIndex)
if err == io.EOF {
// This can not happen with the current
// implementation of SectorMap.
return int64(f.sizeBytes), nil
} else if err != nil {
return 0, status.Errorf(codes.Internal, "Failed to get next unmapped sector: %v", err)
}

// Progress to the next hole in the hole source.
holeSourceOffsetBytes, err := f.holeSource.GetNextRegionOffset(off, filesystem.Hole)
if err != nil {
if err == io.EOF {
return off, nil
}
return 0, err
nextSectorOffsetBytes := max(off, int64(index)*int64(sectorSizeBytes))
nextHoleOffsetBytes, err := f.holeSource.GetNextRegionOffset(off, filesystem.Hole)
if err == io.EOF {
// Hole source has given up, sector
// offset decides.
return min(int64(f.sizeBytes), nextSectorOffsetBytes), nil
} else if err != nil {
return 0, status.Errorf(codes.Internal, "Failed to get next unmapped region from hole source: %v", err)
}
off = max(nextHoleOffsetBytes, nextSectorOffsetBytes)
if off > int64(f.sizeBytes) {
// Next hole is at end of file.
return int64(f.sizeBytes), nil
}
if holeSourceOffsetBytes < int64(sectorIndex+1)*sectorSizeBytes {
// Found an offset that refers both to a
// hole in the file and one in the hole
// source.
return holeSourceOffsetBytes, nil
if nextHoleOffsetBytes == nextSectorOffsetBytes {
// Both the sector map and the hole
// source agrees that this is a hole.
return off, nil
}
off = holeSourceOffsetBytes
// Continue.
}
default:
panic("Unknown region type")
}
}

func (f *blockDeviceBackedFile) readFromHoleSource(p []byte, sectorIndex, offsetWithinSector int) (int, error) {
func (f *blockDeviceBackedFile) readFromHoleSource(p []byte, sectorIndex uint64, offsetWithinSector int) (int, error) {
n, err := f.holeSource.ReadAt(p, int64(sectorIndex)*int64(f.fp.sectorSizeBytes)+int64(offsetWithinSector))
if err != nil {
return n, err
Expand All @@ -209,14 +209,7 @@ func (f *blockDeviceBackedFile) readFromHoleSource(p []byte, sectorIndex, offset
// attempts to read as much data into the output buffer as is possible
// in a single read operation. If the file is fragmented, multiple reads
// are necessary, requiring this function to be called repeatedly.
func (f *blockDeviceBackedFile) readFromSectors(p []byte, sectorIndex, lastSectorIndex, offsetWithinSector int) (int, error) {
if sectorIndex >= len(f.sectors) {
// Attempted to read from a hole located at the
// end of the file. Redirect the read to the hole
// source, which usually produces zeroes.
return f.readFromHoleSource(p, sectorIndex, offsetWithinSector)
}

func (f *blockDeviceBackedFile) readFromSectors(p []byte, sectorIndex, lastSectorIndex uint64, offsetWithinSector int) (int, error) {
sector, sectorsToRead := f.getSectorsContiguous(sectorIndex, lastSectorIndex)
p = f.limitBufferToSectorBoundary(p, sectorsToRead, offsetWithinSector)
if sector == 0 {
Expand Down Expand Up @@ -278,18 +271,9 @@ func (f *blockDeviceBackedFile) ReadAt(p []byte, off int64) (int, error) {
}

// truncateSectors truncates a file to a given number of sectors.
func (f *blockDeviceBackedFile) truncateSectors(sectorCount int) {
if len(f.sectors) > sectorCount {
f.fp.sectorAllocator.FreeList(f.sectors[sectorCount:])
f.sectors = f.sectors[:sectorCount]

// Ensure that no hole remains at the end, as that would
// lead to unnecessary fragmentation when growing the
// file again.
for len(f.sectors) > 0 && f.sectors[len(f.sectors)-1] == 0 {
f.sectors = f.sectors[:len(f.sectors)-1]
}
}
func (f *blockDeviceBackedFile) truncateSectors(sectorCount uint64) {
f.sm.FreeSectors(sectorCount, f.fp.sectorAllocator.FreeList)
f.sm.Truncate(sectorCount)
}

func (f *blockDeviceBackedFile) Sync() error {
Expand All @@ -303,19 +287,19 @@ func (f *blockDeviceBackedFile) Truncate(size int64) error {
return status.Errorf(codes.InvalidArgument, "Negative truncation size: %d", size)
}

sectorIndex := int(size / int64(f.fp.sectorSizeBytes))
sectorIndex := uint64(size / int64(f.fp.sectorSizeBytes))
offsetWithinSector := int(size % int64(f.fp.sectorSizeBytes))
if offsetWithinSector == 0 {
// Truncating to an exact number of sectors.
f.truncateSectors(sectorIndex)
} else {
// Truncating to partially into a sector.
if uint64(size) < f.sizeBytes && sectorIndex < len(f.sectors) && f.sectors[sectorIndex] != 0 {
sector := f.sm.GetPhysicalIndex(sectorIndex)
if uint64(size) < f.sizeBytes && sector != 0 {
// The file is being shrunk and the new last
// sector is not a hole. Zero the trailing part
// of the last sector to ensure that growing the
// file later on doesn't bring back old data.
sector := f.sectors[sectorIndex]
zeroes := f.fp.zeroSector[:f.fp.sectorSizeBytes-offsetWithinSector]
if diff := f.sizeBytes - uint64(size); uint64(len(zeroes)) > diff {
zeroes = zeroes[:diff]
Expand Down Expand Up @@ -343,7 +327,7 @@ func (f *blockDeviceBackedFile) Truncate(size int64) error {
// writeToNewSectors is used to write data into new sectors. This
// function is called when holes in a sparse file are filled up or when
// data is appended to the end of a file.
func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, firstSectorIndex, offsetWithinSector int) (int, uint32, int, error) {
func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, firstSectorIndex uint64, offsetWithinSector int) (int, uint32, int, error) {
// Allocate space to store the data.
sectorsToAllocate := int((uint64(offsetWithinSector) + uint64(len(p)) + uint64(f.fp.sectorSizeBytes) - 1) / uint64(f.fp.sectorSizeBytes))
firstSector, sectorsAllocated, err := f.fp.sectorAllocator.AllocateContiguous(sectorsToAllocate)
Expand Down Expand Up @@ -395,7 +379,7 @@ func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, firstSectorIndex, of
}
p = p[fullSectorsSize:]
sector += uint32(fullSectors)
sectorIndex += int(fullSectors)
sectorIndex += uint64(fullSectors)
}

// Write the last sector separately when we need to introduce
Expand All @@ -415,38 +399,12 @@ func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, firstSectorIndex, of
return nWritten, firstSector, sectorsAllocated, nil
}

// insertSectorsContiguous inserts a series of contiguous sectors into a
// file. This function is used to update a file after appending data to
// it or filling up a hole in a sparse file.
func (f *blockDeviceBackedFile) insertSectorsContiguous(firstSectorIndex int, firstSector uint32, count int) {
for i := 0; i < count; i++ {
sectorIndex := firstSectorIndex + i
if f.sectors[sectorIndex] != 0 {
panic(fmt.Sprintf("Attempted to replace existing sector at index %d", sectorIndex))
}
f.sectors[sectorIndex] = firstSector + uint32(i)
}
}

// writeToSectors performs a single write against the block device. It
// attempts to write as much data from the input buffer as is possible
// in a single write operation. If the file is fragmented, multiple
// writes are necessary, requiring this function to be called
// repeatedly.
func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSectorIndex, offsetWithinSector int) (int, error) {
if sectorIndex >= len(f.sectors) {
// Attempted to write past the end-of-file or within a
// hole located at the end of a sparse file. Allocate
// space and grow the file.
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, sectorIndex, offsetWithinSector)
if err != nil {
return 0, err
}
f.sectors = append(f.sectors, make([]uint32, sectorIndex+sectorsAllocated-len(f.sectors))...)
f.insertSectorsContiguous(sectorIndex, firstSector, sectorsAllocated)
return bytesWritten, nil
}

func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSectorIndex uint64, offsetWithinSector int) (int, error) {
sector, sectorsToWrite := f.getSectorsContiguous(sectorIndex, lastSectorIndex)
p = f.limitBufferToSectorBoundary(p, sectorsToWrite, offsetWithinSector)
if sector == 0 {
Expand All @@ -456,7 +414,14 @@ func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSector
if err != nil {
return 0, err
}
f.insertSectorsContiguous(sectorIndex, firstSector, sectorsAllocated)
err = f.sm.InsertSectorsContiguous(sectorIndex, firstSector, uint32(sectorsAllocated))
if err != nil {
// There was an error inserting the sector
// mapping, give the sectors back to the sector
// allocator.
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
return 0, status.Errorf(codes.Internal, "Unable to insert sectors into sector map: %v", err)
}
return bytesWritten, nil
}

Expand Down
Loading