diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java index c0ed079d..5fc2f817 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java @@ -74,7 +74,7 @@ private static final class Replacement implements SerializableOnlyOverRemoting { } private Object readResolve() throws IOException { - return new BufferedBuildListener(new DelayBufferedOutputStream(ros, tuning)); + return new BufferedBuildListener(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning))); } } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java index 2fd4c949..332a0f1d 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java @@ -95,11 +95,8 @@ void flushAndReschedule() { reschedule(); } - @SuppressWarnings("FinalizeDeclaration") // not ideal, but PhantomReference is more of a hassle - @Override protected void finalize() throws Throwable { - super.finalize(); - // Odd that this is not the default behavior for BufferedOutputStream. - flush(); + @Override public String toString() { + return "DelayBufferedOutputStream[" + out + "]"; } private static final class Flush implements Runnable { @@ -143,6 +140,10 @@ private static final class FlushControlledOutputStream extends FilterOutputStrea } } + @Override public String toString() { + return "FlushControlledOutputStream[" + out + "]"; + } + } } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java b/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java index 5432a772..f2789a71 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java @@ -85,7 +85,7 @@ private FileLogStorage(File log) { private synchronized void open() throws IOException { if (os == null) { os = new FileOutputStream(log, true); - bos = new DelayBufferedOutputStream(os); + bos = new GCFlushedOutputStream(new DelayBufferedOutputStream(os)); if (index.isFile()) { try (BufferedReader r = Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8)) { // TODO would be faster to scan the file backwards for the penultimate \n, then convert the byte sequence from there to EOF to UTF-8 and set lastId accordingly diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java new file mode 100644 index 00000000..c794c60b --- /dev/null +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java @@ -0,0 +1,98 @@ +/* + * The MIT License + * + * Copyright 2018 CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package org.jenkinsci.plugins.workflow.log; + +import java.io.BufferedOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import jenkins.util.Timer; + +/** + * A stream which will be flushed before garbage collection. + * {@link BufferedOutputStream} does not do this automatically. + */ +final class GCFlushedOutputStream extends FilterOutputStream { + + private static final Logger LOGGER = Logger.getLogger(GCFlushedOutputStream.class.getName()); + + GCFlushedOutputStream(OutputStream out) { + super(out); + FlushRef.register(this, out); + } + + @Override public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); // super method is surprising + } + + @Override public String toString() { + return "GCFlushedOutputStream[" + out + "]"; + } + + /** + * Flushes streams prior to garbage collection. + * ({@link BufferedOutputStream} does not do this automatically.) + * TODO Java 9+ could use java.util.Cleaner + */ + private static final class FlushRef extends PhantomReference { + + private static final ReferenceQueue rq = new ReferenceQueue<>(); + + static { + Timer.get().scheduleWithFixedDelay(() -> { + while (true) { + FlushRef ref = (FlushRef) rq.poll(); + if (ref == null) { + break; + } + LOGGER.log(Level.FINE, "flushing {0}", ref.out); + try { + ref.out.flush(); + } catch (IOException x) { + LOGGER.log(Level.WARNING, null, x); + } + } + }, 0, 10, TimeUnit.SECONDS); + } + + static void register(GCFlushedOutputStream fos, OutputStream out) { + new FlushRef(fos, out, rq).enqueue(); + } + + private final OutputStream out; + + private FlushRef(GCFlushedOutputStream fos, OutputStream out, ReferenceQueue rq) { + super(fos, rq); + this.out = out; + } + + } + +} diff --git a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java index bcbe068d..7f1d763f 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java @@ -27,7 +27,9 @@ import hudson.console.AnnotatedLargeText; import hudson.console.HyperlinkNote; import hudson.model.TaskListener; +import hudson.remoting.Channel; import hudson.remoting.VirtualChannel; +import hudson.util.StreamTaskListener; import java.io.EOFException; import java.io.PrintWriter; import java.io.StringWriter; @@ -37,11 +39,18 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.function.BiFunction; +import java.util.logging.Formatter; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import java.util.logging.SimpleFormatter; import jenkins.security.MasterToSlaveCallable; import org.apache.commons.io.FileUtils; import org.apache.commons.io.output.NullOutputStream; import org.apache.commons.io.output.NullWriter; import org.apache.commons.io.output.WriterOutputStream; +import static org.hamcrest.Matchers.*; import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; import org.jenkinsci.plugins.workflow.graph.FlowNode; import org.jenkinsci.plugins.workflow.job.WorkflowJob; @@ -51,6 +60,7 @@ import org.junit.ClassRule; import org.junit.Test; import org.jvnet.hudson.test.JenkinsRule; +import org.jvnet.hudson.test.LoggerRule; /** * Foundation for compliance tests of {@link LogStorage} implementations. @@ -63,6 +73,8 @@ public abstract class LogStorageTestBase { @ClassRule public static JenkinsRule r = new JenkinsRule(); + @ClassRule public static LoggerRule logging = new LoggerRule(); + /** Create a new storage implementation, but potentially reusing any data initialized in the last {@link Before} setup. */ protected abstract LogStorage createStorage() throws Exception; @@ -142,6 +154,7 @@ protected static void close(TaskListener listener) throws Exception { } @Test public void remoting() throws Exception { + logging.capture(100).record(Channel.class, Level.WARNING); LogStorage ls = createStorage(); TaskListener overall = ls.overallListener(); overall.getLogger().println("overall from master"); @@ -150,12 +163,15 @@ protected static void close(TaskListener listener) throws Exception { long overallPos = assertOverallLog(0, "overall from master\nstep from master\n", true); long stepPos = assertStepLog("1", 0, "step from master\n", true); VirtualChannel channel = r.createOnlineSlave().getChannel(); + channel.call(new RemoteLogDumper("agent")); channel.call(new RemotePrint("overall from agent", overall)); channel.call(new RemotePrint("step from agent", step)); + channel.call(new GC()); overallPos = assertOverallLog(overallPos, "overall from agent\nstep from agent\n", true); stepPos = assertStepLog("1", stepPos, "step from agent\n", true); assertEquals(overallPos, assertOverallLog(overallPos, "", true)); assertEquals(stepPos, assertStepLog("1", stepPos, "", true)); + assertThat(logging.getMessages(), empty()); } private static final class RemotePrint extends MasterToSlaveCallable { static { @@ -173,6 +189,39 @@ private static final class RemotePrint extends MasterToSlaveCallable { + @Override public Void call() throws Exception { + System.gc(); + System.runFinalization(); + return null; + } + } + // TODO copied from pipeline-log-cloudwatch; consider whether this should be moved into LoggerRule + private static final class RemoteLogDumper extends MasterToSlaveCallable { + private final String name; + private final TaskListener stderr = StreamTaskListener.fromStderr(); + RemoteLogDumper(String name) { + this.name = name; + } + @Override public Void call() throws RuntimeException { + Handler handler = new Handler() { + final Formatter formatter = new SimpleFormatter(); + @Override public void publish(LogRecord record) { + if (isLoggable(record)) { + stderr.getLogger().print(formatter.format(record).replaceAll("(?m)^", "[" + name + "] ")); + } + } + @Override public void flush() {} + @Override public void close() throws SecurityException {} + }; + handler.setLevel(Level.ALL); + Logger logger = Logger.getLogger(LogStorageTestBase.class.getPackage().getName()); + logger.setLevel(Level.FINER); + logger.addHandler(handler); + return null; + } + } /** * Checks what happens when code using {@link TaskListener#getLogger} prints a line with inadequate synchronization.