Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 84 additions & 155 deletions libindex/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,16 @@ import (
"os"
"runtime"
"strings"
"sync"

"github.com/quay/zlog"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/singleflight"

"github.com/quay/claircore"
"github.com/quay/claircore/indexer"
"github.com/quay/claircore/internal/cache"
"github.com/quay/claircore/internal/httputil"
"github.com/quay/claircore/internal/wart"
"github.com/quay/claircore/internal/zreader"
Expand All @@ -39,13 +38,10 @@ var (
// all users are done with the layers.
type RemoteFetchArena struct {
wc *http.Client
sf *singleflight.Group

// Rc holds (string, *rc).
//
// The string is a layer digest.
rc sync.Map
root string
files cache.Live[string, os.File]
root *os.Root
}

// NewRemoteFetchArena returns an initialized RemoteFetchArena.
Expand Down Expand Up @@ -75,147 +71,74 @@ type RemoteFetchArena struct {
// [O_TMPFILE]: https://man7.org/linux/man-pages/man2/open.2.html
// [systemd-tmpfiles(8)]: https://www.freedesktop.org/software/systemd/man/latest/systemd-tmpfiles.html
func NewRemoteFetchArena(wc *http.Client, root string) *RemoteFetchArena {
return &RemoteFetchArena{
wc: wc,
sf: &singleflight.Group{},
root: fixTemp(root),
}
}

// Rc is a reference counter.
type rc struct {
sync.Mutex
val *tempFile
count int
done func()
}

// NewRc makes an rc.
func newRc(v *tempFile, done func()) *rc {
return &rc{
val: v,
done: done,
}
}

// Dec decrements the reference count, closing the inner file and calling the
// cleanup hook if necessary.
func (r *rc) dec() (err error) {
r.Lock()
defer r.Unlock()
if r.count == 0 {
return errors.New("close botch: count already 0")
}
r.count--
if r.count == 0 {
r.done()
err = r.val.Close()
name := fixTemp(root)
dir, err := os.OpenRoot(name)
if err != nil {
// Backed ourselves into a corner on this API 🙃
panic(fmt.Errorf("fetcher: unable to OpenRoot(%q): %w", root, err))
}
return err
}

// Ref increments the reference count.
func (r *rc) Ref() *ref {
r.Lock()
r.count++
r.Unlock()
n := &ref{rc: r}
runtime.SetFinalizer(n, (*ref).Close)
return n
}

// Ref is a reference handle, RAII-style.
type ref struct {
once sync.Once
rc *rc
}

// Val clones the inner File.
func (r *ref) Val() (*os.File, error) {
r.rc.Lock()
defer r.rc.Unlock()
return r.rc.val.Reopen()
}

// Close decrements the refcount.
func (r *ref) Close() (err error) {
did := false
r.once.Do(func() {
err = r.rc.dec()
did = true
})
if !did {
return errClosed
a := &RemoteFetchArena{
wc: wc,
root: dir,
}
return err
return a
}

// Errors out of the rc/ref types.
var (
errClosed = errors.New("Ref already Closed")
errStale = errors.New("stale file reference")
)

// FetchInto populates "l" and "cl" via a [singleflight.Group].
// FetchInto populates "l" and "cl" via a cache.
//
// It returns a closure to be used with an [errgroup.Group]
func (a *RemoteFetchArena) fetchInto(ctx context.Context, l *claircore.Layer, cl *io.Closer, desc *claircore.LayerDescription) (do func() error) {
key := desc.Digest
// All the refcounting needs to happen _outside_ the singleflight, because
// the result of a singleflight call can be shared. Without doing it this
// way, the refcount would be incorrect.
do = func() error {

return func() (err error) {
ctx, span := tracer.Start(ctx, "RemoteFetchArena.fetchInto", trace.WithAttributes(attribute.String("key", key)))
defer span.End()
var c *rc
var err error
defer func() {
span.RecordError(err)
if err == nil {
span.SetStatus(codes.Ok, "")
} else {
span.SetStatus(codes.Error, "fetchInto error")
}
return
}()

try := func() (any, error) {
return a.fetchUnlinkedFile(ctx, key, desc)
}
select {
case res := <-a.sf.DoChan(key, try):
if e := res.Err; e != nil {
err = fmt.Errorf("error realizing layer %s: %w", key, e)
return err
}
c = res.Val.(*rc)
span.AddEvent("got value from singleflight")
span.SetAttributes(attribute.Bool("shared", res.Shared))
case <-ctx.Done():
err = context.Cause(ctx)
// NB This is not closed on purpose. The [io.Closer] populated by this
// function holds the pointer until that function is cleaned up. Once
// nothing has a copy of this [*os.File], the runtime will run all the
// cleanup logic associated with the pointer.
//
// Every new [*claircore.Layer] gets its own file descriptor via the
// [reopen] helper.
var spool *os.File
spool, err = a.files.Get(ctx, key, func(ctx context.Context, _ string) (*os.File, error) {
return a.fetchFileForCache(ctx, desc)
})
if err != nil {
return err
}

r := c.Ref()
f, err := r.Val()
switch {
case errors.Is(err, nil):
case errors.Is(err, errStale):
zlog.Debug(ctx).Str("key", key).Msg("managed to get stale ref, retrying")
return do()
default:
r.Close()
// This is an owned, independent descriptor for the passed [*os.File].
f, err := reopen(spool)
if err != nil {
return err
}

// If this succeeds, "f" is now owned by "l"
if err := l.Init(ctx, desc, f); err != nil {
return errors.Join(err, f.Close(), r.Close())
return errors.Join(err, f.Close())
}
*cl = closeFunc(func() error {
return errors.Join(l.Close(), f.Close(), r.Close())
*cl = closeFunc(func() (err error) {
err = errors.Join(l.Close(), f.Close())
// Using this KeepAlive keeps the cached file descriptor live until
// all users of the blob have cleaned up. This should be after "f"
// is closed so that the cached-owned file descriptor outlives any
// reopened copies. There's no explicit association of these file
// descriptors, it's all kernel-side book-keeping.
runtime.KeepAlive(spool)
return err
})

return nil
}
return do
}

// CloseFunc is an adapter in the vein of [http.HandlerFunc].
Expand All @@ -226,17 +149,17 @@ func (f closeFunc) Close() error {
return f()
}

// FetchUnlinkedFile is the inner function used inside the singleflight.
// FetchFileForCache is the inner function used inside the [cache.Live].
//
// Because we know we're the only concurrent call that's dealing with this key,
// Because we know we're the only concurrent call that's dealing with this blob,
// we can be a bit more lax.
func (a *RemoteFetchArena) fetchUnlinkedFile(ctx context.Context, key string, desc *claircore.LayerDescription) (*rc, error) {
func (a *RemoteFetchArena) fetchFileForCache(ctx context.Context, desc *claircore.LayerDescription) (*os.File, error) {
ctx = zlog.ContextWithValues(ctx,
"component", "libindex/fetchArena.fetchUnlinkedFile",
"arena", a.root,
"component", "libindex/RemoteFetchArena.fetchFileForCache",
"arena", a.root.Name(),
"layer", desc.Digest,
"uri", desc.URI)
ctx, span := tracer.Start(ctx, "RemoteFetchArena.fetchUnlinkedFile")
ctx, span := tracer.Start(ctx, "RemoteFetchArena.fetchFileForCache")
defer span.End()
span.SetStatus(codes.Error, "")
zlog.Debug(ctx).Msg("layer fetch start")
Expand All @@ -253,16 +176,6 @@ func (a *RemoteFetchArena) fetchUnlinkedFile(ctx context.Context, key string, de
if err != nil {
return nil, fmt.Errorf("failed to parse remote path uri: %v", err)
}
v, ok := a.rc.Load(key)
if ok {
span.SetStatus(codes.Ok, "")
return v.(*rc), nil
}
// Otherwise, it needs to be populated.
f, err := openTemp(a.root)
if err != nil {
return nil, err
}
vh, want := digest.Hash(), digest.Checksum()

// It'd be nice to be able to pre-allocate our file on disk, but we can't
Expand Down Expand Up @@ -348,9 +261,33 @@ func (a *RemoteFetchArena) fetchUnlinkedFile(ctx context.Context, key string, de
return nil, fmt.Errorf("fetcher: mismatched compression (%q) and content-type (%q)", kind.String(), ct)
}

f, err := openTemp(a.root)
if err != nil {
return nil, err
}
// Track whether the file was fully populated.
// Doing this (instead of waiting for GC to clean up the fd associated with
// the [*os.File]) allows the system to eagerly reclaim disk space and
// handle disk contention better.
fileOK := false
defer func() {
if fileOK {
return
}
if err := f.Close(); err != nil {
zlog.Warn(ctx).
Err(err).
Msg("error closing spoolfile in error return")
}
}()
buf := bufio.NewWriter(f)
n, err := io.Copy(buf, zr)
zlog.Debug(ctx).Int64("size", n).Msg("wrote file")
zlog.Debug(ctx).
Int64("size", n).
Bool("big", n >= bigLayerSize).
AnErr("copy_error", err).
Msg("wrote file")
// TODO(hank) Add a metric for "big files" and a histogram for size.
if err != nil {
return nil, err
}
Expand All @@ -364,30 +301,22 @@ func (a *RemoteFetchArena) fetchUnlinkedFile(ctx context.Context, key string, de
return nil, err
}

rc := newRc(f, func() {
a.rc.Delete(key)
})
if _, ok := a.rc.Swap(key, rc); ok {
rc.Ref().Close()
return nil, fmt.Errorf("fetcher: double-store for key %q", key)
}

zlog.Debug(ctx).Msg("layer fetch ok")
span.SetStatus(codes.Ok, "")
return rc, nil
fileOK = true
return f, nil
}

const bigLayerSize = 1024 * 1024 * 1024 // 1 GiB

// Close forgets all references in the arena.
//
// Any outstanding Layers may cause keys to be forgotten at unpredictable times.
func (a *RemoteFetchArena) Close(ctx context.Context) error {
ctx = zlog.ContextWithValues(ctx,
"component", "libindex/fetchArena.Close",
"arena", a.root)
a.rc.Range(func(k, _ any) bool {
a.rc.Delete(k)
return true
})
func (a *RemoteFetchArena) Close(_ context.Context) error {
a.files.Clear()
if err := a.root.Close(); err != nil {
return fmt.Errorf("fetcher: RemoteFetchArena: unable to close os.Root: %w", err)
}
return nil
}

Expand Down Expand Up @@ -420,7 +349,7 @@ func (p *FetchProxy) Realize(ctx context.Context, ls []*claircore.Layer) error {
return nil
}

// RealizeDesciptions returns [claircore.Layer] structs populated according to
// RealizeDescriptions returns [claircore.Layer] structs populated according to
// the passed slice of [claircore.LayerDescription].
func (p *FetchProxy) RealizeDescriptions(ctx context.Context, descs []claircore.LayerDescription) ([]claircore.Layer, error) {
ctx = zlog.ContextWithValues(ctx,
Expand Down
19 changes: 19 additions & 0 deletions libindex/reopen_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package libindex

import (
"errors"
"fmt"
"os"
)

func reopen(f *os.File) (*os.File, error) {
fd := int(f.Fd())
if fd == -1 {
return nil, errors.New("stale file descriptor")
}
p := fmt.Sprintf("/proc/self/fd/%d", fd)
// Need to use OpenFile so that the symlink is not dereferenced.
// There's some proc magic so that opening that symlink itself copies the
// description.
return os.OpenFile(p, os.O_RDONLY, 0o644)
}
9 changes: 9 additions & 0 deletions libindex/reopen_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
//go:build !linux

package libindex

import "os"

func reopen(f *os.File) (*os.File, error) {
return os.OpenFile(f.Name(), os.O_RDONLY, 0o644)
}
9 changes: 2 additions & 7 deletions libindex/tempdir_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@
package libindex

import (
"cmp"
"os"
)

// FixTemp modifies "dir" according to the documented defaults.
//
// See [NewRemoteFetchArena].
func fixTemp(dir string) string {
if dir != "" {
return dir
}
if d, ok := os.LookupEnv("TMPDIR"); ok && d != "" {
return d
}
return "/var/tmp"
return cmp.Or(dir, os.Getenv("TMPDIR"), "/var/tmp")
}
Loading