Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ private static final class Replacement implements SerializableOnlyOverRemoting {
}

private Object readResolve() throws IOException {
return new BufferedBuildListener(new DelayBufferedOutputStream(ros, tuning));
return new BufferedBuildListener(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning)));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,8 @@ void flushAndReschedule() {
reschedule();
}

@SuppressWarnings("FinalizeDeclaration") // not ideal, but PhantomReference is more of a hassle
@Override protected void finalize() throws Throwable {
super.finalize();
// Odd that this is not the default behavior for BufferedOutputStream.
flush();
@Override public String toString() {
return "DelayBufferedOutputStream[" + out + "]";
}

private static final class Flush implements Runnable {
Expand Down Expand Up @@ -143,6 +140,10 @@ private static final class FlushControlledOutputStream extends FilterOutputStrea
}
}

@Override public String toString() {
return "FlushControlledOutputStream[" + out + "]";
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private FileLogStorage(File log) {
private synchronized void open() throws IOException {
if (os == null) {
os = new FileOutputStream(log, true);
bos = new DelayBufferedOutputStream(os);
bos = new GCFlushedOutputStream(new DelayBufferedOutputStream(os));
if (index.isFile()) {
try (BufferedReader r = Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8)) {
// TODO would be faster to scan the file backwards for the penultimate \n, then convert the byte sequence from there to EOF to UTF-8 and set lastId accordingly
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* The MIT License
*
* Copyright 2018 CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.jenkinsci.plugins.workflow.log;

import java.io.BufferedOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.ref.PhantomReference;
import java.lang.ref.ReferenceQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import jenkins.util.Timer;

/**
* A stream which will be flushed before garbage collection.
* {@link BufferedOutputStream} does not do this automatically.
*/
final class GCFlushedOutputStream extends FilterOutputStream {

private static final Logger LOGGER = Logger.getLogger(GCFlushedOutputStream.class.getName());

GCFlushedOutputStream(OutputStream out) {
super(out);
FlushRef.register(this, out);
}

@Override public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len); // super method is surprising
}

@Override public String toString() {
return "GCFlushedOutputStream[" + out + "]";
}

/**
* Flushes streams prior to garbage collection.
* ({@link BufferedOutputStream} does not do this automatically.)
* TODO Java 9+ could use java.util.Cleaner
*/
private static final class FlushRef extends PhantomReference<GCFlushedOutputStream> {

private static final ReferenceQueue<GCFlushedOutputStream> rq = new ReferenceQueue<>();

static {
Timer.get().scheduleWithFixedDelay(() -> {
while (true) {
FlushRef ref = (FlushRef) rq.poll();
if (ref == null) {
break;
}
LOGGER.log(Level.FINE, "flushing {0}", ref.out);
try {
ref.out.flush();
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, x);
}
}
}, 0, 10, TimeUnit.SECONDS);
}

static void register(GCFlushedOutputStream fos, OutputStream out) {
new FlushRef(fos, out, rq).enqueue();
Copy link
Member

@dwnusbaum dwnusbaum Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jglick Do you remember if there are any tests that specifically exercise GCFlushedOutputStream? Calling PhantomReference.enqueue here ourselves (called in turn by the GCFlushedOutputStream constructor) causes the cleanup code to run the next time the timer task executes (at most ~10 seconds later) regardless of the reachability of the GCFlushedOutputStream and then never again, so I am not sure if this class is doing anything meaningful right now.

You can test the behavior with a class like this.
public class Test {
    private static class Ref extends PhantomReference<Object> {
        static final ReferenceQueue<Object> queue = new ReferenceQueue<>();
        
        private final Runnable cleanup;

        public Ref(Object referent, Runnable cleanup) {
            super(referent, queue);
            this.cleanup = cleanup;
        }
    }

    public static void main(String[] args) throws Exception {
        Object o = new Object();
        var ref = new Ref(o, () -> {
            System.out.println("Cleaned up");
        });
        // Uncomment the next line to see what happens when `PhantomReference.enqueue` is called manually.
        // System.out.println(ref.enqueue());
        System.out.println("GC 1");
        System.gc();
        while (Ref.queue.poll() instanceof Ref ref2) {
            System.out.println("Running cleanup up while reference still exists!");
            ref2.cleanup.run();
        }
        Thread.sleep(2 * 1000);
        System.out.println(o); // Make sure o cannot be GC'd prior to this point.
        o = null;
        System.out.println("GC 2");
        System.gc();
        Thread.sleep(2 * 1000);
        while (Ref.queue.poll() instanceof Ref ref2) {
            System.out.println("Running cleanup at correct time.");
            ref2.cleanup.run();
        }
    }
}

Alternatively, you can add logging and Thread.sleep calls in FlushRef.register to observe the cleanup code running within the extent of the GCFlushedOutputStream constructor, when the object must still be strongly reachable.

I think we can make the class work as intended by switching to Cleaner. See #388. Trying to fix the issue directly by dropping the explicit call to enqueue runs into the problem that we need something to hold a reference to the FlushRef that outlives the GCFlushedOutputStream, otherwise the FlushRef itself will just get GC'd and the cleanup code will never run.

We could also just delete the class, given this has apparently never worked as intended. See #387.

Any thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I recall little of this. It is possible tests in pipeline-cloudwatch-logs implicitly exercise this behavior. There are also some JEP-210-related tests in workflow-durable-task-step which deal with remote TaskListenerDecorators or something like that.

}

private final OutputStream out;

private FlushRef(GCFlushedOutputStream fos, OutputStream out, ReferenceQueue<GCFlushedOutputStream> rq) {
super(fos, rq);
this.out = out;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import hudson.console.AnnotatedLargeText;
import hudson.console.HyperlinkNote;
import hudson.model.TaskListener;
import hudson.remoting.Channel;
import hudson.remoting.VirtualChannel;
import hudson.util.StreamTaskListener;
import java.io.EOFException;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand All @@ -37,11 +39,18 @@
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import jenkins.security.MasterToSlaveCallable;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.io.output.NullWriter;
import org.apache.commons.io.output.WriterOutputStream;
import static org.hamcrest.Matchers.*;
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
import org.jenkinsci.plugins.workflow.graph.FlowNode;
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
Expand All @@ -51,6 +60,7 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.jvnet.hudson.test.JenkinsRule;
import org.jvnet.hudson.test.LoggerRule;

/**
* Foundation for compliance tests of {@link LogStorage} implementations.
Expand All @@ -63,6 +73,8 @@ public abstract class LogStorageTestBase {

@ClassRule public static JenkinsRule r = new JenkinsRule();

@ClassRule public static LoggerRule logging = new LoggerRule();

/** Create a new storage implementation, but potentially reusing any data initialized in the last {@link Before} setup. */
protected abstract LogStorage createStorage() throws Exception;

Expand Down Expand Up @@ -142,6 +154,7 @@ protected static void close(TaskListener listener) throws Exception {
}

@Test public void remoting() throws Exception {
logging.capture(100).record(Channel.class, Level.WARNING);
LogStorage ls = createStorage();
TaskListener overall = ls.overallListener();
overall.getLogger().println("overall from master");
Expand All @@ -150,12 +163,15 @@ protected static void close(TaskListener listener) throws Exception {
long overallPos = assertOverallLog(0, "overall from master\n<span class=\"pipeline-node-1\">step from master\n</span>", true);
long stepPos = assertStepLog("1", 0, "step from master\n", true);
VirtualChannel channel = r.createOnlineSlave().getChannel();
channel.call(new RemoteLogDumper("agent"));
channel.call(new RemotePrint("overall from agent", overall));
channel.call(new RemotePrint("step from agent", step));
channel.call(new GC());
overallPos = assertOverallLog(overallPos, "overall from agent\n<span class=\"pipeline-node-1\">step from agent\n</span>", true);
stepPos = assertStepLog("1", stepPos, "step from agent\n", true);
assertEquals(overallPos, assertOverallLog(overallPos, "", true));
assertEquals(stepPos, assertStepLog("1", stepPos, "", true));
assertThat(logging.getMessages(), empty());
}
private static final class RemotePrint extends MasterToSlaveCallable<Void, Exception> {
static {
Expand All @@ -173,6 +189,39 @@ private static final class RemotePrint extends MasterToSlaveCallable<Void, Excep
return null;
}
}
/** Checking behavior of {@link DelayBufferedOutputStream} garbage collection. */
private static final class GC extends MasterToSlaveCallable<Void, Exception> {
@Override public Void call() throws Exception {
System.gc();
System.runFinalization();
return null;
}
}
// TODO copied from pipeline-log-cloudwatch; consider whether this should be moved into LoggerRule
private static final class RemoteLogDumper extends MasterToSlaveCallable<Void, RuntimeException> {
private final String name;
private final TaskListener stderr = StreamTaskListener.fromStderr();
RemoteLogDumper(String name) {
this.name = name;
}
@Override public Void call() throws RuntimeException {
Handler handler = new Handler() {
final Formatter formatter = new SimpleFormatter();
@Override public void publish(LogRecord record) {
if (isLoggable(record)) {
stderr.getLogger().print(formatter.format(record).replaceAll("(?m)^", "[" + name + "] "));
}
}
@Override public void flush() {}
@Override public void close() throws SecurityException {}
};
handler.setLevel(Level.ALL);
Logger logger = Logger.getLogger(LogStorageTestBase.class.getPackage().getName());
logger.setLevel(Level.FINER);
logger.addHandler(handler);
return null;
}
}

/**
* Checks what happens when code using {@link TaskListener#getLogger} prints a line with inadequate synchronization.
Expand Down