Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming support for Moshi request bodies #4281

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@

- First-party converters now support deferring serialization to happen when the request body is written (i.e., during HTTP execution) rather than when the HTTP request is created. In some cases this moves conversion from a calling thread to a background thread, such as in the case when using `Call.enqueue` directly.

The following converters support this feature through a new `createStreaming()` factory:
The following converters support this feature through a new `withStreaming()` factory method:
- Moshi
- Wire

**Changed**
Expand Down
1 change: 1 addition & 0 deletions retrofit-converters/moshi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {
testImplementation libs.junit
testImplementation libs.truth
testImplementation libs.okhttp.mockwebserver
testImplementation libs.testParameterInjector
}

jar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import okhttp3.RequestBody;
import okhttp3.ResponseBody;
import retrofit2.Call;
import retrofit2.Converter;
import retrofit2.Retrofit;

Expand All @@ -52,35 +53,51 @@ public static MoshiConverterFactory create() {
@SuppressWarnings("ConstantConditions") // Guarding public API nullability.
public static MoshiConverterFactory create(Moshi moshi) {
if (moshi == null) throw new NullPointerException("moshi == null");
return new MoshiConverterFactory(moshi, false, false, false);
return new MoshiConverterFactory(moshi, false, false, false, false);
}

private final Moshi moshi;
private final boolean lenient;
private final boolean failOnUnknown;
private final boolean serializeNulls;
private final boolean streaming;

private MoshiConverterFactory(
Moshi moshi, boolean lenient, boolean failOnUnknown, boolean serializeNulls) {
Moshi moshi,
boolean lenient,
boolean failOnUnknown,
boolean serializeNulls,
boolean streaming) {
this.moshi = moshi;
this.lenient = lenient;
this.failOnUnknown = failOnUnknown;
this.serializeNulls = serializeNulls;
this.streaming = streaming;
}

/** Return a new factory which uses {@linkplain JsonAdapter#lenient() lenient} adapters. */
public MoshiConverterFactory asLenient() {
return new MoshiConverterFactory(moshi, true, failOnUnknown, serializeNulls);
return new MoshiConverterFactory(moshi, true, failOnUnknown, serializeNulls, streaming);
}

/** Return a new factory which uses {@link JsonAdapter#failOnUnknown()} adapters. */
public MoshiConverterFactory failOnUnknown() {
return new MoshiConverterFactory(moshi, lenient, true, serializeNulls);
return new MoshiConverterFactory(moshi, lenient, true, serializeNulls, streaming);
}

/** Return a new factory which includes null values into the serialized JSON. */
public MoshiConverterFactory withNullSerialization() {
return new MoshiConverterFactory(moshi, lenient, failOnUnknown, true);
return new MoshiConverterFactory(moshi, lenient, failOnUnknown, true, streaming);
}

/**
* Return a new factory which streams serialization of request messages to bytes on the HTTP thread
* This is either the calling thread for {@link Call#execute()}, or one of OkHttp's background
* threads for {@link Call#enqueue}. Response bytes are always converted to message instances on
* one of OkHttp's background threads.
*/
public MoshiConverterFactory withStreaming() {
return new MoshiConverterFactory(moshi, lenient, failOnUnknown, serializeNulls, true);
}

@Override
Expand Down Expand Up @@ -115,7 +132,7 @@ public Converter<?, RequestBody> requestBodyConverter(
if (serializeNulls) {
adapter = adapter.serializeNulls();
}
return new MoshiRequestBodyConverter<>(adapter);
return new MoshiRequestBodyConverter<>(adapter, streaming);
}

private static Set<? extends Annotation> jsonAnnotations(Annotation[] annotations) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,22 @@
import retrofit2.Converter;

final class MoshiRequestBodyConverter<T> implements Converter<T, RequestBody> {
private static final MediaType MEDIA_TYPE = MediaType.get("application/json; charset=UTF-8");
static final MediaType MEDIA_TYPE = MediaType.get("application/json; charset=UTF-8");

private final JsonAdapter<T> adapter;
private final boolean streaming;

MoshiRequestBodyConverter(JsonAdapter<T> adapter) {
MoshiRequestBodyConverter(JsonAdapter<T> adapter, boolean streaming) {
this.adapter = adapter;
this.streaming = streaming;
}

@Override
public RequestBody convert(T value) throws IOException {
if (streaming) {
return new MoshiStreamingRequestBody<>(adapter, value);
}

Buffer buffer = new Buffer();
JsonWriter writer = JsonWriter.of(buffer);
adapter.toJson(writer, value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (C) 2025 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.converter.moshi;

import static retrofit2.converter.moshi.MoshiRequestBodyConverter.MEDIA_TYPE;

import com.squareup.moshi.JsonAdapter;
import java.io.IOException;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import okio.BufferedSink;

final class MoshiStreamingRequestBody<T> extends RequestBody {
private final JsonAdapter<T> adapter;
private final T value;

public MoshiStreamingRequestBody(JsonAdapter<T> adapter, T value) {
this.adapter = adapter;
this.value = value;
}

@Override
public MediaType contentType() {
return MEDIA_TYPE;
}

@Override
public void writeTo(BufferedSink sink) throws IOException {
adapter.toJson(sink, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,41 @@
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;

import com.google.testing.junit.testparameterinjector.TestParameter;
import com.google.testing.junit.testparameterinjector.TestParameterInjector;
import com.squareup.moshi.FromJson;
import com.squareup.moshi.JsonDataException;
import com.squareup.moshi.JsonQualifier;
import com.squareup.moshi.JsonReader;
import com.squareup.moshi.JsonWriter;
import com.squareup.moshi.Moshi;
import com.squareup.moshi.ToJson;
import java.io.EOFException;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.annotation.Retention;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import okio.Buffer;
import okio.ByteString;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import retrofit2.Call;
import retrofit2.Callback;
import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.http.Body;
import retrofit2.http.GET;
import retrofit2.http.POST;

@RunWith(TestParameterInjector.class)
public final class MoshiConverterFactoryTest {
@Retention(RUNTIME)
@JsonQualifier
Expand All @@ -71,10 +79,10 @@ public String getName() {
}
}

static final class Value {
static final class ErroringValue {
final String theName;

Value(String theName) {
ErroringValue(String theName) {
this.theName = theName;
}
}
Expand Down Expand Up @@ -120,11 +128,16 @@ public String readQualified(JsonReader reader) throws IOException {
}

@FromJson
public Value readWithoutEndingObject(JsonReader reader) throws IOException {
public ErroringValue readWithoutEndingObject(JsonReader reader) throws IOException {
reader.beginObject();
reader.skipName();
String theName = reader.nextString();
return new Value(theName);
return new ErroringValue(theName);
}

@ToJson
public void write(JsonWriter writer, ErroringValue value) throws IOException {
throw new EOFException("oops!");
}
}

Expand All @@ -136,7 +149,10 @@ interface Service {
Call<AnInterface> anInterface(@Body AnInterface impl);

@GET("/")
Call<Value> value();
Call<ErroringValue> readErroringValue();

@POST("/")
Call<Void> writeErroringValue(@Body ErroringValue value);

@POST("/")
@Qualifier
Expand All @@ -146,13 +162,15 @@ interface Service {

@Rule public final MockWebServer server = new MockWebServer();

private Service service;
private Service serviceLenient;
private Service serviceNulls;
private Service serviceFailOnUnknown;
private final Service service;
private final Service serviceLenient;
private final Service serviceNulls;
private final Service serviceFailOnUnknown;
private final boolean streaming;

public MoshiConverterFactoryTest(@TestParameter boolean streaming) {
this.streaming = streaming;

@Before
public void setUp() {
Moshi moshi =
new Moshi.Builder()
.add(
Expand All @@ -166,7 +184,12 @@ public void setUp() {
})
.add(new Adapters())
.build();

MoshiConverterFactory factory = MoshiConverterFactory.create(moshi);
if (streaming) {
factory = factory.withStreaming();
}

MoshiConverterFactory factoryLenient = factory.asLenient();
MoshiConverterFactory factoryNulls = factory.withNullSerialization();
MoshiConverterFactory factoryFailOnUnknown = factory.failOnUnknown();
Expand Down Expand Up @@ -306,12 +329,42 @@ public void nonUtf8BomIsNotSkipped() throws IOException {
public void requireFullResponseDocumentConsumption() throws Exception {
server.enqueue(new MockResponse().setBody("{\"theName\":\"value\"}"));

Call<Value> call = service.value();
Call<ErroringValue> call = service.readErroringValue();
try {
call.execute();
fail();
} catch (JsonDataException e) {
assertThat(e).hasMessageThat().isEqualTo("JSON document was not fully consumed.");
}
}

@Test
public void serializeIsStreamed() throws InterruptedException {
assumeTrue(streaming);

Call<Void> call = service.writeErroringValue(new ErroringValue("hi"));

final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);

// If streaming were broken, the call to enqueue would throw the exception synchronously.
call.enqueue(
new Callback<Void>() {
@Override
public void onResponse(Call<Void> call, Response<Void> response) {
latch.countDown();
}

@Override
public void onFailure(Call<Void> call, Throwable t) {
throwableRef.set(t);
latch.countDown();
}
});
latch.await();

Throwable throwable = throwableRef.get();
assertThat(throwable).isInstanceOf(EOFException.class);
assertThat(throwable).hasMessageThat().isEqualTo("oops!");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,28 @@ public final class WireConverterFactory extends Converter.Factory {
/**
* Create an instance which serializes request messages to bytes eagerly on the caller thread
* when either {@link Call#execute()} or {@link Call#enqueue} is called. Response bytes are
* always converted to message instances on one of OKHttp's background threads.
* always converted to message instances on one of OkHttp's background threads.
*/
public static WireConverterFactory create() {
return new WireConverterFactory(false);
}

/**
* Create an instance which streams serialization of request messages to bytes on the HTTP thread
* This is either the calling thread for {@link Call#execute()}, or one of OKHttp's background
* threads for {@link Call#enqueue}. Response bytes are always converted to message instances on
* one of OKHttp's background threads.
*/
public static WireConverterFactory createStreaming() {
return new WireConverterFactory(true);
}

private final boolean streaming;

private WireConverterFactory(boolean streaming) {
this.streaming = streaming;
}

/**
* Return a new factory which streams serialization of request messages to bytes on the HTTP thread
* This is either the calling thread for {@link Call#execute()}, or one of OkHttp's background
* threads for {@link Call#enqueue}. Response bytes are always converted to message instances on
* one of OkHttp's background threads.
*/
public WireConverterFactory withStreaming() {
return new WireConverterFactory(true);
}

@Override
public @Nullable Converter<ResponseBody, ?> responseBodyConverter(
Type type, Annotation[] annotations, Retrofit retrofit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ interface Service {
public WireConverterFactoryTest(@TestParameter boolean streaming) {
this.streaming = streaming;

WireConverterFactory factory = WireConverterFactory.create();
Retrofit retrofit =
new Retrofit.Builder()
.baseUrl(server.url("/"))
.addConverterFactory(
streaming ? WireConverterFactory.createStreaming() : WireConverterFactory.create())
.addConverterFactory(streaming ? factory.withStreaming() : factory)
.build();
service = retrofit.create(Service.class);
}
Expand Down
Loading