Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ public HttpClientRequest body(HttpRequest.BodyPublisher body) {
return this;
}

@Override
public HttpClientRequest body(OutputStreamBodyWriter writer) {
this.body =
new OutputStreamBodyPublisher(writer, context.httpClient().executor().orElseThrow());
return this;
}

@Override
public HttpClientRequest errorMapper(Function<HttpException, RuntimeException> errorMapper) {
this.errorMapper = errorMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,34 @@ default HttpClientRequest queryParam(String name, Collection<String> values) {
*/
HttpClientRequest body(Path file);


/**
* Set the body content using a callback that writes to an {@link java.io.OutputStream}.
* <p>
* This allows streaming large or dynamically generated content directly to the HTTP request body,
* without buffering the entire payload in memory. The provided {@code OutputStreamWriter} is called
* with an {@link java.io.OutputStream} that writes to the request body. Data written to the stream
* is sent as the request body.
* <p>
* Example usage:
* <pre>{@code
* client.request()
* .url("http://example.com/upload")
* .body(outputStream -> {
* // Write data in chunks
* for (byte[] chunk : getChunks()) {
* outputStream.write(chunk);
* }
* })
* .POST()
* .asPlainString();
* }</pre>
*
* @param writer Callback to write data to the request body output stream
* @return The request being built
*/
HttpClientRequest body(OutputStreamBodyWriter writer);

/**
* Set the body content using http BodyPublisher.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package io.avaje.http.client;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.UncheckedIOException;
import java.net.http.HttpRequest;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A BodyPublisher that allows writing to an OutputStream. Data written to the OutputStream is
* published to the HTTP request body.
*/
final class OutputStreamBodyPublisher implements HttpRequest.BodyPublisher {

private final PipedOutputStream outputStream;
private final PipedInputStream inputStream;
private final int bufferSize;
private final AtomicBoolean streamClosed = new AtomicBoolean(false);
private final Executor executor;
private final OutputStreamBodyWriter writer;

OutputStreamBodyPublisher(OutputStreamBodyWriter writer, Executor executor) {
this.bufferSize = 8192;
this.writer = writer;
this.outputStream = new PipedOutputStream();
this.inputStream = new PipedInputStream(bufferSize);
this.executor = executor;
}

@Override
public long contentLength() {
return -1;
}

@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
try {
outputStream.connect(inputStream);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
subscriber.onSubscribe(new OutputStreamSubscription(subscriber));
}

private class OutputStreamSubscription implements Flow.Subscription {
private final Flow.Subscriber<? super ByteBuffer> subscriber;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private volatile boolean completed = false;
private CompletableFuture<Void> writeTask;

OutputStreamSubscription(Flow.Subscriber<? super ByteBuffer> subscriber) {
this.subscriber = subscriber;

// Start a background thread to write to the output stream
writeTask =
CompletableFuture.runAsync(
() -> {
try {
writer.write(outputStream);
} catch (Throwable t) {
subscriber.onError(t);
} finally {
try {
outputStream.close();
} catch (IOException e) {
subscriber.onError(e);
}
}
},
executor);
}

@Override
public void request(long n) {
if (cancelled.get() || completed) {
return;
}
try {
byte[] buffer = new byte[bufferSize];
for (long i = 0; i < n && !cancelled.get(); i++) {
int bytesRead = inputStream.read(buffer);
if (bytesRead == -1) {
// End of stream
completed = true;
subscriber.onComplete();
closeStreams();
return;
}
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer, 0, bytesRead);
subscriber.onNext(byteBuffer);
}
} catch (IOException e) {
completed = true;
subscriber.onError(e);
closeStreams();
}
}

@Override
public void cancel() {
cancelled.set(true);
writeTask.cancel(true);
closeStreams();
}

private void closeStreams() {
if (streamClosed.compareAndSet(false, true)) {
try (outputStream;
inputStream; ) {
} catch (IOException e) {
// Ignore
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.avaje.http.client;

import java.io.IOException;
import java.io.OutputStream;

/**
* Use to set the body content using a callback that writes to an {@link java.io.OutputStream}.
* <p>
* This allows streaming large or dynamically generated content directly to the HTTP request body,
* without buffering the entire payload in memory. The provided {@code OutputStreamWriter} is called
* with an {@link java.io.OutputStream} that writes to the request body. Data written to the stream
* is sent as the request body.
* <p>
* Example usage:
* <pre>{@code
* client.request()
* .url("http://example.com/upload")
* .body(outputStream -> {
* // Write data in chunks
* for (byte[] chunk : getChunks()) {
* outputStream.write(chunk);
* }
* })
* .POST()
* .asPlainString();
* }</pre>
*
* @see HttpClientRequest#body(OutputStreamBodyWriter)
*/
public interface OutputStreamBodyWriter {

/**
* Write body content to the outputStream.
*/
void write(OutputStream outputStream) throws IOException;
}
1 change: 1 addition & 0 deletions http-client/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
requires static com.fasterxml.jackson.core;
requires static io.avaje.jsonb;
requires static io.avaje.inject;
requires static jdk.httpserver;

exports io.avaje.http.client;
}
127 changes: 127 additions & 0 deletions http-client/src/test/java/io/avaje/http/client/OutPutStreamTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package io.avaje.http.client;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import com.sun.net.httpserver.HttpServer;

public class OutPutStreamTest {

private static HttpServer server;
private static int port;
private static final AtomicReference<String> receivedBody = new AtomicReference<>();

@BeforeAll
static void startServer() throws IOException {
server = HttpServer.create(new InetSocketAddress(0), 0);
port = server.getAddress().getPort();
server.createContext(
"/test",
exchange -> {
try (var is = exchange.getRequestBody();
ByteArrayOutputStream baos = new ByteArrayOutputStream()) {

is.transferTo(baos);

receivedBody.set(baos.toString());
}
exchange.sendResponseHeaders(204, -1);
});
server.setExecutor(Executors.newSingleThreadExecutor());
server.start();
}

@AfterAll
static void stopServer() {
if (server != null) {
server.stop(0);
}
}

@Test
void testOutputStreamBodyPublisher() throws Exception {
String data = "Hello OutputStreamBodyPublisher!";

HttpClient client = HttpClient.builder().requestTimeout(Duration.ofDays(1)).build();

HttpResponse<String> response =
client
.request()
.url("http://localhost:" + port + "/test")
.header("Content-Type", "text/plain")
.body(
outputStream -> {
outputStream.write(data.getBytes());
})
.POST()
.asPlainString();

assertEquals(204, response.statusCode());
assertEquals(data, receivedBody.get());
}

@Test
void testOutputStreamBodyPublisherLargeData() throws Exception {
int repeat = 100_000; // much larger than buffer (8192)
String chunk = "abcdefghij"; // 10 bytes
StringBuilder sb = new StringBuilder(repeat * chunk.length());
for (int i = 0; i < repeat; i++) {
sb.append(chunk);
}
String data = sb.toString();

receivedBody.set(null); // reset

HttpClient client = HttpClient.builder().requestTimeout(Duration.ofMinutes(2)).build();

HttpResponse<String> response =
client
.request()
.url("http://localhost:" + port + "/test")
.header("Content-Type", "text/plain")
.body(
os -> {
for (int i = 0; i < repeat; i++) {
os.write(chunk.getBytes());
}
})
.POST()
.asPlainString();

assertEquals(204, response.statusCode());
assertEquals(data, receivedBody.get());
}

@Test
void testError() throws Exception {

HttpClient client = HttpClient.builder().requestTimeout(Duration.ofDays(1)).build();
try {

client
.request()
.url("http://localhost:" + port + "/test")
.header("Content-Type", "text/plain")
.body(
outputStream -> {
outputStream.write(" Output".getBytes());
throw new IOException("test error");
})
.POST()
.asPlainString();
} catch (HttpException e) {
assertEquals("test error", e.getCause().getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ private static boolean directBodyType(String type) {
|| "java.util.function.Supplier<?extendsjava.io.InputStream>".equals(type)
|| "java.util.function.Supplier<java.io.InputStream>".equals(type)
|| "java.nio.file.Path".equals(type)
|| "io.avaje.http.client.BodyContent".equals(type);
|| "io.avaje.http.client.BodyContent".equals(type)
|| "io.avaje.http.client.OutputStreamBodyWriter".equals(type);
}

private void writePaths(Set<PathSegments.Segment> segments) {
Expand Down