From 8ee0788a6abe96f6f84d0e47a125f36f2541d0a0 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Thu, 15 Nov 2018 18:55:18 -0500 Subject: [PATCH 1/7] [JENKINS-54566] Reproducing warning sometimes from FileLogStorageTest. --- .../plugins/workflow/log/LogStorageTestBase.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java index bcbe068d..8a7e2751 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java @@ -152,6 +152,7 @@ protected static void close(TaskListener listener) throws Exception { VirtualChannel channel = r.createOnlineSlave().getChannel(); channel.call(new RemotePrint("overall from agent", overall)); channel.call(new RemotePrint("step from agent", step)); + channel.call(new GC()); overallPos = assertOverallLog(overallPos, "overall from agent\nstep from agent\n", true); stepPos = assertStepLog("1", stepPos, "step from agent\n", true); assertEquals(overallPos, assertOverallLog(overallPos, "", true)); @@ -173,6 +174,13 @@ private static final class RemotePrint extends MasterToSlaveCallable { + @Override public Void call() throws Exception { + System.gc(); + System.runFinalization(); + return null; + } + } /** * Checks what happens when code using {@link TaskListener#getLogger} prints a line with inadequate synchronization. From 5e845fe6b60507578775714c75a7148db9816302 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Thu, 15 Nov 2018 19:08:26 -0500 Subject: [PATCH 2/7] Figured out how to make test reliably fail unless running with remoting patch. --- .../plugins/workflow/log/DelayBufferedOutputStream.java | 1 + .../plugins/workflow/log/LogStorageTestBase.java | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java index 2fd4c949..0ca3d4dd 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java @@ -98,6 +98,7 @@ void flushAndReschedule() { @SuppressWarnings("FinalizeDeclaration") // not ideal, but PhantomReference is more of a hassle @Override protected void finalize() throws Throwable { super.finalize(); + Thread.sleep(100); // TODO for FileLogStorageTest#remoting // Odd that this is not the default behavior for BufferedOutputStream. flush(); } diff --git a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java index 8a7e2751..fdd4934e 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java @@ -27,6 +27,7 @@ import hudson.console.AnnotatedLargeText; import hudson.console.HyperlinkNote; import hudson.model.TaskListener; +import hudson.remoting.Channel; import hudson.remoting.VirtualChannel; import java.io.EOFException; import java.io.PrintWriter; @@ -37,11 +38,13 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.function.BiFunction; +import java.util.logging.Level; import jenkins.security.MasterToSlaveCallable; import org.apache.commons.io.FileUtils; import org.apache.commons.io.output.NullOutputStream; import org.apache.commons.io.output.NullWriter; import org.apache.commons.io.output.WriterOutputStream; +import static org.hamcrest.Matchers.*; import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition; import org.jenkinsci.plugins.workflow.graph.FlowNode; import org.jenkinsci.plugins.workflow.job.WorkflowJob; @@ -51,6 +54,7 @@ import org.junit.ClassRule; import org.junit.Test; import org.jvnet.hudson.test.JenkinsRule; +import org.jvnet.hudson.test.LoggerRule; /** * Foundation for compliance tests of {@link LogStorage} implementations. @@ -63,6 +67,8 @@ public abstract class LogStorageTestBase { @ClassRule public static JenkinsRule r = new JenkinsRule(); + @ClassRule public static LoggerRule logging = new LoggerRule(); + /** Create a new storage implementation, but potentially reusing any data initialized in the last {@link Before} setup. */ protected abstract LogStorage createStorage() throws Exception; @@ -142,6 +148,7 @@ protected static void close(TaskListener listener) throws Exception { } @Test public void remoting() throws Exception { + logging.capture(100).record(Channel.class, Level.WARNING); LogStorage ls = createStorage(); TaskListener overall = ls.overallListener(); overall.getLogger().println("overall from master"); @@ -157,6 +164,7 @@ protected static void close(TaskListener listener) throws Exception { stepPos = assertStepLog("1", stepPos, "step from agent\n", true); assertEquals(overallPos, assertOverallLog(overallPos, "", true)); assertEquals(stepPos, assertStepLog("1", stepPos, "", true)); + assertThat(logging.getMessages(), empty()); } private static final class RemotePrint extends MasterToSlaveCallable { static { From 8ab09d12b73465a80b3fc3fa91b56875d0e3f3f2 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Thu, 15 Nov 2018 21:30:03 -0500 Subject: [PATCH 3/7] Capture remote log messages. --- .../workflow/log/LogStorageTestBase.java | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java index fdd4934e..081d610b 100644 --- a/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java +++ b/src/test/java/org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java @@ -29,6 +29,7 @@ import hudson.model.TaskListener; import hudson.remoting.Channel; import hudson.remoting.VirtualChannel; +import hudson.util.StreamTaskListener; import java.io.EOFException; import java.io.PrintWriter; import java.io.StringWriter; @@ -38,7 +39,12 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.function.BiFunction; +import java.util.logging.Formatter; +import java.util.logging.Handler; import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; +import java.util.logging.SimpleFormatter; import jenkins.security.MasterToSlaveCallable; import org.apache.commons.io.FileUtils; import org.apache.commons.io.output.NullOutputStream; @@ -157,6 +163,7 @@ protected static void close(TaskListener listener) throws Exception { long overallPos = assertOverallLog(0, "overall from master\nstep from master\n", true); long stepPos = assertStepLog("1", 0, "step from master\n", true); VirtualChannel channel = r.createOnlineSlave().getChannel(); + channel.call(new RemoteLogDumper("agent")); channel.call(new RemotePrint("overall from agent", overall)); channel.call(new RemotePrint("step from agent", step)); channel.call(new GC()); @@ -189,6 +196,31 @@ private static final class GC extends MasterToSlaveCallable { return null; } } + // TODO copied from pipeline-log-cloudwatch; consider whether this should be moved into LoggerRule + private static final class RemoteLogDumper extends MasterToSlaveCallable { + private final String name; + private final TaskListener stderr = StreamTaskListener.fromStderr(); + RemoteLogDumper(String name) { + this.name = name; + } + @Override public Void call() throws RuntimeException { + Handler handler = new Handler() { + final Formatter formatter = new SimpleFormatter(); + @Override public void publish(LogRecord record) { + if (isLoggable(record)) { + stderr.getLogger().print(formatter.format(record).replaceAll("(?m)^", "[" + name + "] ")); + } + } + @Override public void flush() {} + @Override public void close() throws SecurityException {} + }; + handler.setLevel(Level.ALL); + Logger logger = Logger.getLogger(LogStorageTestBase.class.getPackage().getName()); + logger.setLevel(Level.FINER); + logger.addHandler(handler); + return null; + } + } /** * Checks what happens when code using {@link TaskListener#getLogger} prints a line with inadequate synchronization. From d33b987e4197f5c7c491052d219e30b673bd4804 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Thu, 15 Nov 2018 21:30:24 -0500 Subject: [PATCH 4/7] Use a phantom reference queue rather than overriding finalize. --- .../log/DelayBufferedOutputStream.java | 51 ++++++++++++++++--- 1 file changed, 43 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java index 0ca3d4dd..87cacb0a 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java @@ -28,7 +28,9 @@ import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.lang.ref.PhantomReference; import java.lang.ref.Reference; +import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -65,6 +67,7 @@ private Tuning() {} super(new FlushControlledOutputStream(out), tuning.bufferSize); this.tuning = tuning; recurrencePeriod = tuning.minRecurrencePeriod; + FlushRef.register(this, out); reschedule(); } @@ -95,14 +98,6 @@ void flushAndReschedule() { reschedule(); } - @SuppressWarnings("FinalizeDeclaration") // not ideal, but PhantomReference is more of a hassle - @Override protected void finalize() throws Throwable { - super.finalize(); - Thread.sleep(100); // TODO for FileLogStorageTest#remoting - // Odd that this is not the default behavior for BufferedOutputStream. - flush(); - } - private static final class Flush implements Runnable { /** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */ @@ -121,6 +116,46 @@ private static final class Flush implements Runnable { } + // + /** + * Flushes streams prior to garbage collection. + * TODO Java 9+ could use java.util.Cleaner + */ + private static final class FlushRef extends PhantomReference { + + private static final ReferenceQueue rq = new ReferenceQueue<>(); + + static { + Timer.get().scheduleWithFixedDelay(() -> { + while (true) { + FlushRef ref = (FlushRef) rq.poll(); + if (ref == null) { + break; + } + LOGGER.log(Level.FINE, "cleaning up phantom {0}", ref.out); + try { + // Odd that this is not the default behavior for BufferedOutputStream. + ref.out.flush(); + } catch (IOException x) { + LOGGER.log(Level.WARNING, null, x); + } + } + }, 0, 10, TimeUnit.SECONDS); + } + + static void register(DelayBufferedOutputStream dbos, OutputStream out) { + new FlushRef(dbos, out, rq).enqueue(); + } + + private final OutputStream out; + + private FlushRef(DelayBufferedOutputStream dbos, OutputStream out, ReferenceQueue rq) { + super(dbos, rq); + this.out = out; + } + + } + /** @see DelayBufferedOutputStream#flushBuffer */ private static final class FlushControlledOutputStream extends FilterOutputStream { From 18f1b5341bebd1af409ae07986ecde18e810696a Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Thu, 15 Nov 2018 21:40:06 -0500 Subject: [PATCH 5/7] Comments. --- .../plugins/workflow/log/DelayBufferedOutputStream.java | 3 +-- .../org/jenkinsci/plugins/workflow/log/LogStorageTestBase.java | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java index 87cacb0a..311c01d5 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java @@ -116,9 +116,9 @@ private static final class Flush implements Runnable { } - // /** * Flushes streams prior to garbage collection. + * ({@link BufferedOutputStream} does not do this automatically.) * TODO Java 9+ could use java.util.Cleaner */ private static final class FlushRef extends PhantomReference { @@ -134,7 +134,6 @@ private static final class FlushRef extends PhantomReference { @Override public Void call() throws Exception { System.gc(); From 4bef56fe1ea425a8c477a9364b0d64ad97f52341 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Fri, 16 Nov 2018 13:48:18 -0500 Subject: [PATCH 6/7] Rewrote DelayBufferedOutputStream to not be based on BufferedOutputStream at all. --- .../log/DelayBufferedOutputStream.java | 130 ++++++++++-------- 1 file changed, 75 insertions(+), 55 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java index 311c01d5..73735f98 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java @@ -41,8 +41,10 @@ /** * Buffered output stream which is guaranteed to deliver content after some time even if idle and the buffer does not fill up. * The automatic “flushing” does not flush the underlying stream, for example via {@code ProxyOutputStream.Flush}. + * Also the stream will be flushed before garbage collection. + * Otherwise it is similar to {@link BufferedOutputStream}. */ -final class DelayBufferedOutputStream extends BufferedOutputStream { +final class DelayBufferedOutputStream extends FilterOutputStream { private static final Logger LOGGER = Logger.getLogger(DelayBufferedOutputStream.class.getName()); @@ -56,6 +58,45 @@ private Tuning() {} static final Tuning DEFAULT = new Tuning(); } + /** + * The interesting state of the buffered stream, kept as a separate object so that {@link FlushRef} can hold on to it. + */ + private static final class Buffer { + + final OutputStream out; + private final byte[] dat; + private int pos; + + Buffer(OutputStream out, int size) { + this.out = out; + dat = new byte[size]; + } + + synchronized void drain() throws IOException { + if (pos == 0) { + return; + } + out.write(dat, 0, pos); + pos = 0; + } + + void write(int b) throws IOException { + assert Thread.holdsLock(this); + if (pos == dat.length) { + drain(); + } + dat[pos++] = (byte) b; + } + + synchronized void write(byte[] b, int off, int len) throws IOException { + for (int i = 0; i < len; i++) { + write(b[off + i]); + } + } + + } + + private final Buffer buf; private final Tuning tuning; private long recurrencePeriod; @@ -64,53 +105,57 @@ private Tuning() {} } DelayBufferedOutputStream(OutputStream out, Tuning tuning) { - super(new FlushControlledOutputStream(out), tuning.bufferSize); + super(out); + buf = new Buffer(out, tuning.bufferSize); this.tuning = tuning; recurrencePeriod = tuning.minRecurrencePeriod; - FlushRef.register(this, out); + FlushRef.register(this); reschedule(); } - private void reschedule() { - Timer.get().schedule(new Flush(this), recurrencePeriod, TimeUnit.MILLISECONDS); - recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod); + @Override public void write(int b) throws IOException { + synchronized (buf) { + buf.write(b); + } } - /** We can only call {@link BufferedOutputStream#flushBuffer} via {@link #flush}, but we do not wish to flush the underlying stream, only write out the buffer. */ - private void flushBuffer() throws IOException { - ThreadLocal enableFlush = ((FlushControlledOutputStream) out).enableFlush; - boolean orig = enableFlush.get(); - enableFlush.set(false); - try { - flush(); - } finally { - enableFlush.set(orig); - } + @Override public void write(byte[] b, int off, int len) throws IOException { + buf.write(b, off, len); + } + + @Override public void flush() throws IOException { + buf.drain(); + super.flush(); } - void flushAndReschedule() { + private void reschedule() { + Timer.get().schedule(new Drain(this), recurrencePeriod, TimeUnit.MILLISECONDS); + recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod); + } + + void drainAndReschedule() { // TODO as an optimization, avoid flushing the buffer if it was recently flushed anyway due to filling up try { - flushBuffer(); + buf.drain(); } catch (IOException x) { LOGGER.log(Level.FINE, null, x); } reschedule(); } - private static final class Flush implements Runnable { + private static final class Drain implements Runnable { /** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */ private final Reference osr; - Flush(DelayBufferedOutputStream os) { + Drain(DelayBufferedOutputStream os) { osr = new WeakReference<>(os); } @Override public void run() { DelayBufferedOutputStream os = osr.get(); if (os != null) { - os.flushAndReschedule(); + os.drainAndReschedule(); } } @@ -118,8 +163,7 @@ private static final class Flush implements Runnable { /** * Flushes streams prior to garbage collection. - * ({@link BufferedOutputStream} does not do this automatically.) - * TODO Java 9+ could use java.util.Cleaner + * In Java 9+ could use {@code java.util.Cleaner} instead. */ private static final class FlushRef extends PhantomReference { @@ -132,9 +176,10 @@ private static final class FlushRef extends PhantomReference rq) { + private FlushRef(DelayBufferedOutputStream dbos, ReferenceQueue rq) { super(dbos, rq); - this.out = out; - } - - } - - /** @see DelayBufferedOutputStream#flushBuffer */ - private static final class FlushControlledOutputStream extends FilterOutputStream { - - private final ThreadLocal enableFlush = new ThreadLocal() { - @Override protected Boolean initialValue() { - return true; - } - }; - - FlushControlledOutputStream(OutputStream out) { - super(out); - } - - @Override public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); // super method writes one byte at a time! - } - - @Override public void flush() throws IOException { - if (enableFlush.get()) { - super.flush(); - } + this.buf = dbos.buf; } } From b944d25027d8fd7c488b0575554efbc9fb703c78 Mon Sep 17 00:00:00 2001 From: Jesse Glick Date: Mon, 19 Nov 2018 14:24:12 -0500 Subject: [PATCH 7/7] Factored out GCFlushedOutputStream, and reverting 4bef56fe1ea425a8c477a9364b0d64ad97f52341 so that DelayBufferedOutputStream is again a BufferedOutputStream. --- .../workflow/log/BufferedBuildListener.java | 2 +- .../log/DelayBufferedOutputStream.java | 142 ++++++------------ .../plugins/workflow/log/FileLogStorage.java | 2 +- .../workflow/log/GCFlushedOutputStream.java | 98 ++++++++++++ 4 files changed, 144 insertions(+), 100 deletions(-) create mode 100644 src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java index c0ed079d..5fc2f817 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/BufferedBuildListener.java @@ -74,7 +74,7 @@ private static final class Replacement implements SerializableOnlyOverRemoting { } private Object readResolve() throws IOException { - return new BufferedBuildListener(new DelayBufferedOutputStream(ros, tuning)); + return new BufferedBuildListener(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning))); } } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java index 73735f98..332a0f1d 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/DelayBufferedOutputStream.java @@ -28,9 +28,7 @@ import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.lang.ref.PhantomReference; import java.lang.ref.Reference; -import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -41,10 +39,8 @@ /** * Buffered output stream which is guaranteed to deliver content after some time even if idle and the buffer does not fill up. * The automatic “flushing” does not flush the underlying stream, for example via {@code ProxyOutputStream.Flush}. - * Also the stream will be flushed before garbage collection. - * Otherwise it is similar to {@link BufferedOutputStream}. */ -final class DelayBufferedOutputStream extends FilterOutputStream { +final class DelayBufferedOutputStream extends BufferedOutputStream { private static final Logger LOGGER = Logger.getLogger(DelayBufferedOutputStream.class.getName()); @@ -58,45 +54,6 @@ private Tuning() {} static final Tuning DEFAULT = new Tuning(); } - /** - * The interesting state of the buffered stream, kept as a separate object so that {@link FlushRef} can hold on to it. - */ - private static final class Buffer { - - final OutputStream out; - private final byte[] dat; - private int pos; - - Buffer(OutputStream out, int size) { - this.out = out; - dat = new byte[size]; - } - - synchronized void drain() throws IOException { - if (pos == 0) { - return; - } - out.write(dat, 0, pos); - pos = 0; - } - - void write(int b) throws IOException { - assert Thread.holdsLock(this); - if (pos == dat.length) { - drain(); - } - dat[pos++] = (byte) b; - } - - synchronized void write(byte[] b, int off, int len) throws IOException { - for (int i = 0; i < len; i++) { - write(b[off + i]); - } - } - - } - - private final Buffer buf; private final Tuning tuning; private long recurrencePeriod; @@ -105,97 +62,86 @@ synchronized void write(byte[] b, int off, int len) throws IOException { } DelayBufferedOutputStream(OutputStream out, Tuning tuning) { - super(out); - buf = new Buffer(out, tuning.bufferSize); + super(new FlushControlledOutputStream(out), tuning.bufferSize); this.tuning = tuning; recurrencePeriod = tuning.minRecurrencePeriod; - FlushRef.register(this); reschedule(); } - @Override public void write(int b) throws IOException { - synchronized (buf) { - buf.write(b); - } - } - - @Override public void write(byte[] b, int off, int len) throws IOException { - buf.write(b, off, len); - } - - @Override public void flush() throws IOException { - buf.drain(); - super.flush(); - } - private void reschedule() { - Timer.get().schedule(new Drain(this), recurrencePeriod, TimeUnit.MILLISECONDS); + Timer.get().schedule(new Flush(this), recurrencePeriod, TimeUnit.MILLISECONDS); recurrencePeriod = Math.min((long) (recurrencePeriod * tuning.recurrencePeriodBackoff), tuning.maxRecurrencePeriod); } - void drainAndReschedule() { + /** We can only call {@link BufferedOutputStream#flushBuffer} via {@link #flush}, but we do not wish to flush the underlying stream, only write out the buffer. */ + private void flushBuffer() throws IOException { + ThreadLocal enableFlush = ((FlushControlledOutputStream) out).enableFlush; + boolean orig = enableFlush.get(); + enableFlush.set(false); + try { + flush(); + } finally { + enableFlush.set(orig); + } + } + + void flushAndReschedule() { // TODO as an optimization, avoid flushing the buffer if it was recently flushed anyway due to filling up try { - buf.drain(); + flushBuffer(); } catch (IOException x) { LOGGER.log(Level.FINE, null, x); } reschedule(); } - private static final class Drain implements Runnable { + @Override public String toString() { + return "DelayBufferedOutputStream[" + out + "]"; + } + + private static final class Flush implements Runnable { /** Since there is no explicit close event, just keep flushing periodically until the stream is collected. */ private final Reference osr; - Drain(DelayBufferedOutputStream os) { + Flush(DelayBufferedOutputStream os) { osr = new WeakReference<>(os); } @Override public void run() { DelayBufferedOutputStream os = osr.get(); if (os != null) { - os.drainAndReschedule(); + os.flushAndReschedule(); } } } - /** - * Flushes streams prior to garbage collection. - * In Java 9+ could use {@code java.util.Cleaner} instead. - */ - private static final class FlushRef extends PhantomReference { - - private static final ReferenceQueue rq = new ReferenceQueue<>(); - - static { - Timer.get().scheduleWithFixedDelay(() -> { - while (true) { - FlushRef ref = (FlushRef) rq.poll(); - if (ref == null) { - break; - } - LOGGER.log(Level.FINE, "flushing {0} from a DelayBufferedOutputStream", ref.buf.out); - try { - ref.buf.drain(); - ref.buf.out.flush(); - } catch (IOException x) { - LOGGER.log(Level.WARNING, null, x); - } - } - }, 0, 10, TimeUnit.SECONDS); + /** @see DelayBufferedOutputStream#flushBuffer */ + private static final class FlushControlledOutputStream extends FilterOutputStream { + + private final ThreadLocal enableFlush = new ThreadLocal() { + @Override protected Boolean initialValue() { + return true; + } + }; + + FlushControlledOutputStream(OutputStream out) { + super(out); } - static void register(DelayBufferedOutputStream dbos) { - new FlushRef(dbos, rq).enqueue(); + @Override public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); // super method writes one byte at a time! } - private final Buffer buf; + @Override public void flush() throws IOException { + if (enableFlush.get()) { + super.flush(); + } + } - private FlushRef(DelayBufferedOutputStream dbos, ReferenceQueue rq) { - super(dbos, rq); - this.buf = dbos.buf; + @Override public String toString() { + return "FlushControlledOutputStream[" + out + "]"; } } diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java b/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java index 5432a772..f2789a71 100644 --- a/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/FileLogStorage.java @@ -85,7 +85,7 @@ private FileLogStorage(File log) { private synchronized void open() throws IOException { if (os == null) { os = new FileOutputStream(log, true); - bos = new DelayBufferedOutputStream(os); + bos = new GCFlushedOutputStream(new DelayBufferedOutputStream(os)); if (index.isFile()) { try (BufferedReader r = Files.newBufferedReader(index.toPath(), StandardCharsets.UTF_8)) { // TODO would be faster to scan the file backwards for the penultimate \n, then convert the byte sequence from there to EOF to UTF-8 and set lastId accordingly diff --git a/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java b/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java new file mode 100644 index 00000000..c794c60b --- /dev/null +++ b/src/main/java/org/jenkinsci/plugins/workflow/log/GCFlushedOutputStream.java @@ -0,0 +1,98 @@ +/* + * The MIT License + * + * Copyright 2018 CloudBees, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package org.jenkinsci.plugins.workflow.log; + +import java.io.BufferedOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.lang.ref.PhantomReference; +import java.lang.ref.ReferenceQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; +import jenkins.util.Timer; + +/** + * A stream which will be flushed before garbage collection. + * {@link BufferedOutputStream} does not do this automatically. + */ +final class GCFlushedOutputStream extends FilterOutputStream { + + private static final Logger LOGGER = Logger.getLogger(GCFlushedOutputStream.class.getName()); + + GCFlushedOutputStream(OutputStream out) { + super(out); + FlushRef.register(this, out); + } + + @Override public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); // super method is surprising + } + + @Override public String toString() { + return "GCFlushedOutputStream[" + out + "]"; + } + + /** + * Flushes streams prior to garbage collection. + * ({@link BufferedOutputStream} does not do this automatically.) + * TODO Java 9+ could use java.util.Cleaner + */ + private static final class FlushRef extends PhantomReference { + + private static final ReferenceQueue rq = new ReferenceQueue<>(); + + static { + Timer.get().scheduleWithFixedDelay(() -> { + while (true) { + FlushRef ref = (FlushRef) rq.poll(); + if (ref == null) { + break; + } + LOGGER.log(Level.FINE, "flushing {0}", ref.out); + try { + ref.out.flush(); + } catch (IOException x) { + LOGGER.log(Level.WARNING, null, x); + } + } + }, 0, 10, TimeUnit.SECONDS); + } + + static void register(GCFlushedOutputStream fos, OutputStream out) { + new FlushRef(fos, out, rq).enqueue(); + } + + private final OutputStream out; + + private FlushRef(GCFlushedOutputStream fos, OutputStream out, ReferenceQueue rq) { + super(fos, rq); + this.out = out; + } + + } + +}