From 9c6e44adb673f1a6a9bb242b8aa3f9abe3c59785 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Mon, 13 Oct 2025 02:58:06 -0400 Subject: [PATCH 1/6] Add outputstream bodypublisher Adds a body method that allows for writing to an outputstream. This is for cases such as certain libraries that can only write to an outputstream --- .../avaje/http/client/DHttpClientRequest.java | 6 + .../avaje/http/client/HttpClientRequest.java | 28 ++++ .../client/OutputStreamBodyPublisher.java | 121 +++++++++++++++++ .../avaje/http/client/OutputStreamWriter.java | 8 ++ .../avaje/http/client/OutPutStreamTest.java | 127 ++++++++++++++++++ .../generator/client/ClientMethodWriter.java | 3 +- 6 files changed, 292 insertions(+), 1 deletion(-) create mode 100644 http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java create mode 100644 http-client/src/main/java/io/avaje/http/client/OutputStreamWriter.java create mode 100644 http-client/src/test/java/io/avaje/http/client/OutPutStreamTest.java diff --git a/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java b/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java index 398ed19dd..620380609 100644 --- a/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java +++ b/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java @@ -363,6 +363,12 @@ public HttpClientRequest body(HttpRequest.BodyPublisher body) { return this; } + @Override + public HttpClientRequest body(OutputStreamWriter writer) { + this.body = new OutputStreamBodyPublisher(writer, context.httpClient().executor().orElse(null)); + return this; + } + @Override public HttpClientRequest errorMapper(Function errorMapper) { this.errorMapper = errorMapper; diff --git a/http-client/src/main/java/io/avaje/http/client/HttpClientRequest.java b/http-client/src/main/java/io/avaje/http/client/HttpClientRequest.java index 7e6ed9768..4becc17d8 100644 --- a/http-client/src/main/java/io/avaje/http/client/HttpClientRequest.java +++ b/http-client/src/main/java/io/avaje/http/client/HttpClientRequest.java @@ -403,6 +403,34 @@ default HttpClientRequest queryParam(String name, Collection values) { */ HttpClientRequest body(Path file); + + /** + * Set the body content using a callback that writes to an {@link java.io.OutputStream}. + *

+ * This allows streaming large or dynamically generated content directly to the HTTP request body, + * without buffering the entire payload in memory. The provided {@code OutputStreamWriter} is called + * with an {@link java.io.OutputStream} that writes to the request body. Data written to the stream + * is sent as the request body. + *

+ * Example usage: + *

{@code
+  *   client.request()
+  *     .url("http://example.com/upload")
+  *     .body(outputStream -> {
+  *       // Write data in chunks
+  *       for (byte[] chunk : getChunks()) {
+  *         outputStream.write(chunk);
+  *       }
+  *     })
+  *     .POST()
+  *     .asPlainString();
+  * }
+ * + * @param writer Callback to write data to the request body output stream + * @return The request being built + */ + HttpClientRequest body(OutputStreamWriter writer); + /** * Set the body content using http BodyPublisher. * diff --git a/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java new file mode 100644 index 000000000..a602f31ec --- /dev/null +++ b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java @@ -0,0 +1,121 @@ +package io.avaje.http.client; + +import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.io.UncheckedIOException; +import java.net.http.HttpRequest; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * A BodyPublisher that allows writing to an OutputStream. Data written to the OutputStream is + * published to the HTTP request body. + */ +final class OutputStreamBodyPublisher implements HttpRequest.BodyPublisher { + + private final PipedOutputStream outputStream; + private final PipedInputStream inputStream; + private final int bufferSize; + private final AtomicBoolean streamClosed = new AtomicBoolean(false); + private final Executor executor; + private final OutputStreamWriter writer; + + OutputStreamBodyPublisher(OutputStreamWriter writer, Executor executor) { + this.bufferSize = 8192; + this.writer = writer; + this.outputStream = new PipedOutputStream(); + this.inputStream = new PipedInputStream(bufferSize); + this.executor = executor; + } + + @Override + public long contentLength() { + return -1; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + try { + outputStream.connect(inputStream); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + subscriber.onSubscribe(new OutputStreamSubscription(subscriber)); + } + + private class OutputStreamSubscription implements Flow.Subscription { + private final Flow.Subscriber subscriber; + private final AtomicBoolean cancelled = new AtomicBoolean(false); + private volatile boolean completed = false; + private CompletableFuture writeTask; + + OutputStreamSubscription(Flow.Subscriber subscriber) { + this.subscriber = subscriber; + + // Start a background thread to write to the output stream + writeTask = + CompletableFuture.runAsync( + () -> { + try { + writer.write(outputStream); + } catch (Exception e) { + subscriber.onError(e); + } finally { + try { + outputStream.close(); + } catch (IOException e) { + subscriber.onError(e); + } + } + }, + executor); + } + + @Override + public void request(long n) { + if (cancelled.get() || completed) { + return; + } + try { + byte[] buffer = new byte[bufferSize]; + for (long i = 0; i < n && !cancelled.get(); i++) { + int bytesRead = inputStream.read(buffer); + if (bytesRead == -1) { + // End of stream + completed = true; + subscriber.onComplete(); + closeStreams(); + return; + } + ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead); + subscriber.onNext(byteBuffer); + } + } catch (IOException e) { + completed = true; + subscriber.onError(e); + closeStreams(); + } + } + + @Override + public void cancel() { + cancelled.set(true); + writeTask.cancel(true); + closeStreams(); + } + + private void closeStreams() { + if (streamClosed.compareAndSet(false, true)) { + try (outputStream; + inputStream; ) { + } catch (IOException e) { + // Ignore + } + } + } + } +} diff --git a/http-client/src/main/java/io/avaje/http/client/OutputStreamWriter.java b/http-client/src/main/java/io/avaje/http/client/OutputStreamWriter.java new file mode 100644 index 000000000..da31f4422 --- /dev/null +++ b/http-client/src/main/java/io/avaje/http/client/OutputStreamWriter.java @@ -0,0 +1,8 @@ +package io.avaje.http.client; + +import java.io.IOException; +import java.io.OutputStream; + +public interface OutputStreamWriter { + void write(OutputStream outputStream) throws IOException; +} diff --git a/http-client/src/test/java/io/avaje/http/client/OutPutStreamTest.java b/http-client/src/test/java/io/avaje/http/client/OutPutStreamTest.java new file mode 100644 index 000000000..d8e0b3d8e --- /dev/null +++ b/http-client/src/test/java/io/avaje/http/client/OutPutStreamTest.java @@ -0,0 +1,127 @@ +package io.avaje.http.client; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import com.sun.net.httpserver.HttpServer; + +public class OutPutStreamTest { + + private static HttpServer server; + private static int port; + private static final AtomicReference receivedBody = new AtomicReference<>(); + + @BeforeAll + static void startServer() throws IOException { + server = HttpServer.create(new InetSocketAddress(0), 0); + port = server.getAddress().getPort(); + server.createContext( + "/test", + exchange -> { + try (var is = exchange.getRequestBody(); + ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + + is.transferTo(baos); + + receivedBody.set(baos.toString()); + } + exchange.sendResponseHeaders(204, -1); + }); + server.setExecutor(Executors.newSingleThreadExecutor()); + server.start(); + } + + @AfterAll + static void stopServer() { + if (server != null) { + server.stop(0); + } + } + + @Test + void testOutputStreamBodyPublisher() throws Exception { + String data = "Hello OutputStreamBodyPublisher!"; + + HttpClient client = HttpClient.builder().requestTimeout(Duration.ofDays(1)).build(); + + HttpResponse response = + client + .request() + .url("http://localhost:" + port + "/test") + .header("Content-Type", "text/plain") + .body( + outputStream -> { + outputStream.write(data.getBytes()); + }) + .POST() + .asPlainString(); + + assertEquals(204, response.statusCode()); + assertEquals(data, receivedBody.get()); + } + + @Test + void testOutputStreamBodyPublisherLargeData() throws Exception { + int repeat = 100_000; // much larger than buffer (8192) + String chunk = "abcdefghij"; // 10 bytes + StringBuilder sb = new StringBuilder(repeat * chunk.length()); + for (int i = 0; i < repeat; i++) { + sb.append(chunk); + } + String data = sb.toString(); + + receivedBody.set(null); // reset + + HttpClient client = HttpClient.builder().requestTimeout(Duration.ofMinutes(2)).build(); + + HttpResponse response = + client + .request() + .url("http://localhost:" + port + "/test") + .header("Content-Type", "text/plain") + .body( + os -> { + for (int i = 0; i < repeat; i++) { + os.write(chunk.getBytes()); + } + }) + .POST() + .asPlainString(); + + assertEquals(204, response.statusCode()); + assertEquals(data, receivedBody.get()); + } + + @Test + void testError() throws Exception { + + HttpClient client = HttpClient.builder().requestTimeout(Duration.ofDays(1)).build(); + try { + + client + .request() + .url("http://localhost:" + port + "/test") + .header("Content-Type", "text/plain") + .body( + outputStream -> { + outputStream.write(" Output".getBytes()); + throw new IOException("test error"); + }) + .POST() + .asPlainString(); + } catch (HttpException e) { + assertEquals("test error", e.getCause().getMessage()); + } + } +} diff --git a/http-generator-client/src/main/java/io/avaje/http/generator/client/ClientMethodWriter.java b/http-generator-client/src/main/java/io/avaje/http/generator/client/ClientMethodWriter.java index 943d41849..a8eb32089 100644 --- a/http-generator-client/src/main/java/io/avaje/http/generator/client/ClientMethodWriter.java +++ b/http-generator-client/src/main/java/io/avaje/http/generator/client/ClientMethodWriter.java @@ -425,7 +425,8 @@ private static boolean directBodyType(String type) { || "java.util.function.Supplier".equals(type) || "java.util.function.Supplier".equals(type) || "java.nio.file.Path".equals(type) - || "io.avaje.http.client.BodyContent".equals(type); + || "io.avaje.http.client.BodyContent".equals(type) + || "io.avaje.http.client.OutputStreamWriter".equals(type); } private void writePaths(Set segments) { From d11a05d99fd7207691ffe7e9f0583d58fd5a14b4 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Mon, 13 Oct 2025 03:00:17 -0400 Subject: [PATCH 2/6] Update DHttpClientRequest.java --- .../src/main/java/io/avaje/http/client/DHttpClientRequest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java b/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java index 620380609..a937b60d7 100644 --- a/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java +++ b/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java @@ -365,7 +365,8 @@ public HttpClientRequest body(HttpRequest.BodyPublisher body) { @Override public HttpClientRequest body(OutputStreamWriter writer) { - this.body = new OutputStreamBodyPublisher(writer, context.httpClient().executor().orElse(null)); + this.body = + new OutputStreamBodyPublisher(writer, context.httpClient().executor().orElseThrow()); return this; } From 6610fe8ede981e02797715671c1fbfdd18530219 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Mon, 13 Oct 2025 03:01:42 -0400 Subject: [PATCH 3/6] Update module-info.java --- http-client/src/main/java/module-info.java | 1 + 1 file changed, 1 insertion(+) diff --git a/http-client/src/main/java/module-info.java b/http-client/src/main/java/module-info.java index 07527a063..25ba37d5e 100644 --- a/http-client/src/main/java/module-info.java +++ b/http-client/src/main/java/module-info.java @@ -30,6 +30,7 @@ requires static com.fasterxml.jackson.core; requires static io.avaje.jsonb; requires static io.avaje.inject; + requires static jdk.httpserver; exports io.avaje.http.client; } From e9656062fc32544f2ca18e6015532fa36858c9fd Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Mon, 13 Oct 2025 03:06:24 -0400 Subject: [PATCH 4/6] rename --- .../main/java/io/avaje/http/client/DHttpClientRequest.java | 2 +- .../src/main/java/io/avaje/http/client/HttpClientRequest.java | 2 +- .../java/io/avaje/http/client/OutputStreamBodyPublisher.java | 4 ++-- .../{OutputStreamWriter.java => OutputStreamBodyWriter.java} | 2 +- .../io/avaje/http/generator/client/ClientMethodWriter.java | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) rename http-client/src/main/java/io/avaje/http/client/{OutputStreamWriter.java => OutputStreamBodyWriter.java} (78%) diff --git a/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java b/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java index a937b60d7..f3375bdbe 100644 --- a/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java +++ b/http-client/src/main/java/io/avaje/http/client/DHttpClientRequest.java @@ -364,7 +364,7 @@ public HttpClientRequest body(HttpRequest.BodyPublisher body) { } @Override - public HttpClientRequest body(OutputStreamWriter writer) { + public HttpClientRequest body(OutputStreamBodyWriter writer) { this.body = new OutputStreamBodyPublisher(writer, context.httpClient().executor().orElseThrow()); return this; diff --git a/http-client/src/main/java/io/avaje/http/client/HttpClientRequest.java b/http-client/src/main/java/io/avaje/http/client/HttpClientRequest.java index 4becc17d8..9ba893fbc 100644 --- a/http-client/src/main/java/io/avaje/http/client/HttpClientRequest.java +++ b/http-client/src/main/java/io/avaje/http/client/HttpClientRequest.java @@ -429,7 +429,7 @@ default HttpClientRequest queryParam(String name, Collection values) { * @param writer Callback to write data to the request body output stream * @return The request being built */ - HttpClientRequest body(OutputStreamWriter writer); + HttpClientRequest body(OutputStreamBodyWriter writer); /** * Set the body content using http BodyPublisher. diff --git a/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java index a602f31ec..b26769644 100644 --- a/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java +++ b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java @@ -22,9 +22,9 @@ final class OutputStreamBodyPublisher implements HttpRequest.BodyPublisher { private final int bufferSize; private final AtomicBoolean streamClosed = new AtomicBoolean(false); private final Executor executor; - private final OutputStreamWriter writer; + private final OutputStreamBodyWriter writer; - OutputStreamBodyPublisher(OutputStreamWriter writer, Executor executor) { + OutputStreamBodyPublisher(OutputStreamBodyWriter writer, Executor executor) { this.bufferSize = 8192; this.writer = writer; this.outputStream = new PipedOutputStream(); diff --git a/http-client/src/main/java/io/avaje/http/client/OutputStreamWriter.java b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyWriter.java similarity index 78% rename from http-client/src/main/java/io/avaje/http/client/OutputStreamWriter.java rename to http-client/src/main/java/io/avaje/http/client/OutputStreamBodyWriter.java index da31f4422..1ea826c9e 100644 --- a/http-client/src/main/java/io/avaje/http/client/OutputStreamWriter.java +++ b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyWriter.java @@ -3,6 +3,6 @@ import java.io.IOException; import java.io.OutputStream; -public interface OutputStreamWriter { +public interface OutputStreamBodyWriter { void write(OutputStream outputStream) throws IOException; } diff --git a/http-generator-client/src/main/java/io/avaje/http/generator/client/ClientMethodWriter.java b/http-generator-client/src/main/java/io/avaje/http/generator/client/ClientMethodWriter.java index a8eb32089..1e9cb7295 100644 --- a/http-generator-client/src/main/java/io/avaje/http/generator/client/ClientMethodWriter.java +++ b/http-generator-client/src/main/java/io/avaje/http/generator/client/ClientMethodWriter.java @@ -426,7 +426,7 @@ private static boolean directBodyType(String type) { || "java.util.function.Supplier".equals(type) || "java.nio.file.Path".equals(type) || "io.avaje.http.client.BodyContent".equals(type) - || "io.avaje.http.client.OutputStreamWriter".equals(type); + || "io.avaje.http.client.OutputStreamBodyWriter".equals(type); } private void writePaths(Set segments) { From 3efb51658620e8eceed7e0777c9f4602733195f7 Mon Sep 17 00:00:00 2001 From: Josiah Noel <32279667+SentryMan@users.noreply.github.com> Date: Mon, 13 Oct 2025 15:53:16 -0400 Subject: [PATCH 5/6] Update OutputStreamBodyPublisher.java --- .../java/io/avaje/http/client/OutputStreamBodyPublisher.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java index b26769644..f91d88b20 100644 --- a/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java +++ b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyPublisher.java @@ -62,8 +62,8 @@ private class OutputStreamSubscription implements Flow.Subscription { () -> { try { writer.write(outputStream); - } catch (Exception e) { - subscriber.onError(e); + } catch (Throwable t) { + subscriber.onError(t); } finally { try { outputStream.close(); From c16176c7ca1d5e47d42d9ef1e45efcc0003f1161 Mon Sep 17 00:00:00 2001 From: Rob Bygrave Date: Wed, 15 Oct 2025 08:00:46 +1300 Subject: [PATCH 6/6] Add javadoc to OutputStreamBodyWriter --- .../http/client/OutputStreamBodyWriter.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyWriter.java b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyWriter.java index 1ea826c9e..ebeedc184 100644 --- a/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyWriter.java +++ b/http-client/src/main/java/io/avaje/http/client/OutputStreamBodyWriter.java @@ -3,6 +3,34 @@ import java.io.IOException; import java.io.OutputStream; +/** + * Use to set the body content using a callback that writes to an {@link java.io.OutputStream}. + *

+ * This allows streaming large or dynamically generated content directly to the HTTP request body, + * without buffering the entire payload in memory. The provided {@code OutputStreamWriter} is called + * with an {@link java.io.OutputStream} that writes to the request body. Data written to the stream + * is sent as the request body. + *

+ * Example usage: + *

{@code
+ *   client.request()
+ *     .url("http://example.com/upload")
+ *     .body(outputStream -> {
+ *       // Write data in chunks
+ *       for (byte[] chunk : getChunks()) {
+ *         outputStream.write(chunk);
+ *       }
+ *     })
+ *     .POST()
+ *     .asPlainString();
+ * }
+ * + * @see HttpClientRequest#body(OutputStreamBodyWriter) + */ public interface OutputStreamBodyWriter { + + /** + * Write body content to the outputStream. + */ void write(OutputStream outputStream) throws IOException; }