From 17d4749385dad34a9ba6d495880cf33a6d275756 Mon Sep 17 00:00:00 2001 From: Jake Wharton Date: Mon, 6 Jan 2025 11:47:58 -0500 Subject: [PATCH] Add streaming support for Moshi request bodies --- CHANGELOG.md | 3 +- retrofit-converters/moshi/build.gradle | 1 + .../moshi/MoshiConverterFactory.java | 29 +++++-- .../moshi/MoshiRequestBodyConverter.java | 10 ++- .../moshi/MoshiStreamingRequestBody.java | 44 +++++++++++ .../moshi/MoshiConverterFactoryTest.java | 79 ++++++++++++++++--- .../converter/wire/WireConverterFactory.java | 22 +++--- .../wire/WireConverterFactoryTest.java | 4 +- 8 files changed, 157 insertions(+), 35 deletions(-) create mode 100644 retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiStreamingRequestBody.java diff --git a/CHANGELOG.md b/CHANGELOG.md index eb2d6154e8..0d53d809a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,8 @@ - First-party converters now support deferring serialization to happen when the request body is written (i.e., during HTTP execution) rather than when the HTTP request is created. In some cases this moves conversion from a calling thread to a background thread, such as in the case when using `Call.enqueue` directly. - The following converters support this feature through a new `createStreaming()` factory: + The following converters support this feature through a new `withStreaming()` factory method: + - Moshi - Wire **Changed** diff --git a/retrofit-converters/moshi/build.gradle b/retrofit-converters/moshi/build.gradle index be2a9b70aa..12781a8fa0 100644 --- a/retrofit-converters/moshi/build.gradle +++ b/retrofit-converters/moshi/build.gradle @@ -9,6 +9,7 @@ dependencies { testImplementation libs.junit testImplementation libs.truth testImplementation libs.okhttp.mockwebserver + testImplementation libs.testParameterInjector } jar { diff --git a/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiConverterFactory.java b/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiConverterFactory.java index 1a48eecbf0..72f09f4f2e 100644 --- a/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiConverterFactory.java +++ b/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiConverterFactory.java @@ -27,6 +27,7 @@ import java.util.Set; import okhttp3.RequestBody; import okhttp3.ResponseBody; +import retrofit2.Call; import retrofit2.Converter; import retrofit2.Retrofit; @@ -52,35 +53,51 @@ public static MoshiConverterFactory create() { @SuppressWarnings("ConstantConditions") // Guarding public API nullability. public static MoshiConverterFactory create(Moshi moshi) { if (moshi == null) throw new NullPointerException("moshi == null"); - return new MoshiConverterFactory(moshi, false, false, false); + return new MoshiConverterFactory(moshi, false, false, false, false); } private final Moshi moshi; private final boolean lenient; private final boolean failOnUnknown; private final boolean serializeNulls; + private final boolean streaming; private MoshiConverterFactory( - Moshi moshi, boolean lenient, boolean failOnUnknown, boolean serializeNulls) { + Moshi moshi, + boolean lenient, + boolean failOnUnknown, + boolean serializeNulls, + boolean streaming) { this.moshi = moshi; this.lenient = lenient; this.failOnUnknown = failOnUnknown; this.serializeNulls = serializeNulls; + this.streaming = streaming; } /** Return a new factory which uses {@linkplain JsonAdapter#lenient() lenient} adapters. */ public MoshiConverterFactory asLenient() { - return new MoshiConverterFactory(moshi, true, failOnUnknown, serializeNulls); + return new MoshiConverterFactory(moshi, true, failOnUnknown, serializeNulls, streaming); } /** Return a new factory which uses {@link JsonAdapter#failOnUnknown()} adapters. */ public MoshiConverterFactory failOnUnknown() { - return new MoshiConverterFactory(moshi, lenient, true, serializeNulls); + return new MoshiConverterFactory(moshi, lenient, true, serializeNulls, streaming); } /** Return a new factory which includes null values into the serialized JSON. */ public MoshiConverterFactory withNullSerialization() { - return new MoshiConverterFactory(moshi, lenient, failOnUnknown, true); + return new MoshiConverterFactory(moshi, lenient, failOnUnknown, true, streaming); + } + + /** + * Return a new factory which streams serialization of request messages to bytes on the HTTP thread + * This is either the calling thread for {@link Call#execute()}, or one of OkHttp's background + * threads for {@link Call#enqueue}. Response bytes are always converted to message instances on + * one of OkHttp's background threads. + */ + public MoshiConverterFactory withStreaming() { + return new MoshiConverterFactory(moshi, lenient, failOnUnknown, serializeNulls, true); } @Override @@ -115,7 +132,7 @@ public Converter requestBodyConverter( if (serializeNulls) { adapter = adapter.serializeNulls(); } - return new MoshiRequestBodyConverter<>(adapter); + return new MoshiRequestBodyConverter<>(adapter, streaming); } private static Set jsonAnnotations(Annotation[] annotations) { diff --git a/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiRequestBodyConverter.java b/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiRequestBodyConverter.java index 39fb00e745..bd381abf29 100644 --- a/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiRequestBodyConverter.java +++ b/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiRequestBodyConverter.java @@ -24,16 +24,22 @@ import retrofit2.Converter; final class MoshiRequestBodyConverter implements Converter { - private static final MediaType MEDIA_TYPE = MediaType.get("application/json; charset=UTF-8"); + static final MediaType MEDIA_TYPE = MediaType.get("application/json; charset=UTF-8"); private final JsonAdapter adapter; + private final boolean streaming; - MoshiRequestBodyConverter(JsonAdapter adapter) { + MoshiRequestBodyConverter(JsonAdapter adapter, boolean streaming) { this.adapter = adapter; + this.streaming = streaming; } @Override public RequestBody convert(T value) throws IOException { + if (streaming) { + return new MoshiStreamingRequestBody<>(adapter, value); + } + Buffer buffer = new Buffer(); JsonWriter writer = JsonWriter.of(buffer); adapter.toJson(writer, value); diff --git a/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiStreamingRequestBody.java b/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiStreamingRequestBody.java new file mode 100644 index 0000000000..d979636183 --- /dev/null +++ b/retrofit-converters/moshi/src/main/java/retrofit2/converter/moshi/MoshiStreamingRequestBody.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2025 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.converter.moshi; + +import static retrofit2.converter.moshi.MoshiRequestBodyConverter.MEDIA_TYPE; + +import com.squareup.moshi.JsonAdapter; +import java.io.IOException; +import okhttp3.MediaType; +import okhttp3.RequestBody; +import okio.BufferedSink; + +final class MoshiStreamingRequestBody extends RequestBody { + private final JsonAdapter adapter; + private final T value; + + public MoshiStreamingRequestBody(JsonAdapter adapter, T value) { + this.adapter = adapter; + this.value = value; + } + + @Override + public MediaType contentType() { + return MEDIA_TYPE; + } + + @Override + public void writeTo(BufferedSink sink) throws IOException { + adapter.toJson(sink, value); + } +} diff --git a/retrofit-converters/moshi/src/test/java/retrofit2/converter/moshi/MoshiConverterFactoryTest.java b/retrofit-converters/moshi/src/test/java/retrofit2/converter/moshi/MoshiConverterFactoryTest.java index 21ce01b514..ba7df76c0d 100644 --- a/retrofit-converters/moshi/src/test/java/retrofit2/converter/moshi/MoshiConverterFactoryTest.java +++ b/retrofit-converters/moshi/src/test/java/retrofit2/converter/moshi/MoshiConverterFactoryTest.java @@ -19,7 +19,10 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; +import com.google.testing.junit.testparameterinjector.TestParameter; +import com.google.testing.junit.testparameterinjector.TestParameterInjector; import com.squareup.moshi.FromJson; import com.squareup.moshi.JsonDataException; import com.squareup.moshi.JsonQualifier; @@ -27,25 +30,30 @@ import com.squareup.moshi.JsonWriter; import com.squareup.moshi.Moshi; import com.squareup.moshi.ToJson; +import java.io.EOFException; import java.io.IOException; import java.lang.annotation.Annotation; import java.lang.annotation.Retention; import java.nio.charset.StandardCharsets; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; import okio.Buffer; import okio.ByteString; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; import retrofit2.Call; +import retrofit2.Callback; import retrofit2.Response; import retrofit2.Retrofit; import retrofit2.http.Body; import retrofit2.http.GET; import retrofit2.http.POST; +@RunWith(TestParameterInjector.class) public final class MoshiConverterFactoryTest { @Retention(RUNTIME) @JsonQualifier @@ -71,10 +79,10 @@ public String getName() { } } - static final class Value { + static final class ErroringValue { final String theName; - Value(String theName) { + ErroringValue(String theName) { this.theName = theName; } } @@ -120,11 +128,16 @@ public String readQualified(JsonReader reader) throws IOException { } @FromJson - public Value readWithoutEndingObject(JsonReader reader) throws IOException { + public ErroringValue readWithoutEndingObject(JsonReader reader) throws IOException { reader.beginObject(); reader.skipName(); String theName = reader.nextString(); - return new Value(theName); + return new ErroringValue(theName); + } + + @ToJson + public void write(JsonWriter writer, ErroringValue value) throws IOException { + throw new EOFException("oops!"); } } @@ -136,7 +149,10 @@ interface Service { Call anInterface(@Body AnInterface impl); @GET("/") - Call value(); + Call readErroringValue(); + + @POST("/") + Call writeErroringValue(@Body ErroringValue value); @POST("/") @Qualifier @@ -146,13 +162,15 @@ interface Service { @Rule public final MockWebServer server = new MockWebServer(); - private Service service; - private Service serviceLenient; - private Service serviceNulls; - private Service serviceFailOnUnknown; + private final Service service; + private final Service serviceLenient; + private final Service serviceNulls; + private final Service serviceFailOnUnknown; + private final boolean streaming; + + public MoshiConverterFactoryTest(@TestParameter boolean streaming) { + this.streaming = streaming; - @Before - public void setUp() { Moshi moshi = new Moshi.Builder() .add( @@ -166,7 +184,12 @@ public void setUp() { }) .add(new Adapters()) .build(); + MoshiConverterFactory factory = MoshiConverterFactory.create(moshi); + if (streaming) { + factory = factory.withStreaming(); + } + MoshiConverterFactory factoryLenient = factory.asLenient(); MoshiConverterFactory factoryNulls = factory.withNullSerialization(); MoshiConverterFactory factoryFailOnUnknown = factory.failOnUnknown(); @@ -306,7 +329,7 @@ public void nonUtf8BomIsNotSkipped() throws IOException { public void requireFullResponseDocumentConsumption() throws Exception { server.enqueue(new MockResponse().setBody("{\"theName\":\"value\"}")); - Call call = service.value(); + Call call = service.readErroringValue(); try { call.execute(); fail(); @@ -314,4 +337,34 @@ public void requireFullResponseDocumentConsumption() throws Exception { assertThat(e).hasMessageThat().isEqualTo("JSON document was not fully consumed."); } } + + @Test + public void serializeIsStreamed() throws InterruptedException { + assumeTrue(streaming); + + Call call = service.writeErroringValue(new ErroringValue("hi")); + + final AtomicReference throwableRef = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + + // If streaming were broken, the call to enqueue would throw the exception synchronously. + call.enqueue( + new Callback() { + @Override + public void onResponse(Call call, Response response) { + latch.countDown(); + } + + @Override + public void onFailure(Call call, Throwable t) { + throwableRef.set(t); + latch.countDown(); + } + }); + latch.await(); + + Throwable throwable = throwableRef.get(); + assertThat(throwable).isInstanceOf(EOFException.class); + assertThat(throwable).hasMessageThat().isEqualTo("oops!"); + } } diff --git a/retrofit-converters/wire/src/main/java/retrofit2/converter/wire/WireConverterFactory.java b/retrofit-converters/wire/src/main/java/retrofit2/converter/wire/WireConverterFactory.java index aaa4c26cf6..525192f8e2 100644 --- a/retrofit-converters/wire/src/main/java/retrofit2/converter/wire/WireConverterFactory.java +++ b/retrofit-converters/wire/src/main/java/retrofit2/converter/wire/WireConverterFactory.java @@ -35,28 +35,28 @@ public final class WireConverterFactory extends Converter.Factory { /** * Create an instance which serializes request messages to bytes eagerly on the caller thread * when either {@link Call#execute()} or {@link Call#enqueue} is called. Response bytes are - * always converted to message instances on one of OKHttp's background threads. + * always converted to message instances on one of OkHttp's background threads. */ public static WireConverterFactory create() { return new WireConverterFactory(false); } - /** - * Create an instance which streams serialization of request messages to bytes on the HTTP thread - * This is either the calling thread for {@link Call#execute()}, or one of OKHttp's background - * threads for {@link Call#enqueue}. Response bytes are always converted to message instances on - * one of OKHttp's background threads. - */ - public static WireConverterFactory createStreaming() { - return new WireConverterFactory(true); - } - private final boolean streaming; private WireConverterFactory(boolean streaming) { this.streaming = streaming; } + /** + * Return a new factory which streams serialization of request messages to bytes on the HTTP thread + * This is either the calling thread for {@link Call#execute()}, or one of OkHttp's background + * threads for {@link Call#enqueue}. Response bytes are always converted to message instances on + * one of OkHttp's background threads. + */ + public WireConverterFactory withStreaming() { + return new WireConverterFactory(true); + } + @Override public @Nullable Converter responseBodyConverter( Type type, Annotation[] annotations, Retrofit retrofit) { diff --git a/retrofit-converters/wire/src/test/java/retrofit2/converter/wire/WireConverterFactoryTest.java b/retrofit-converters/wire/src/test/java/retrofit2/converter/wire/WireConverterFactoryTest.java index 443a9dee91..e64eb5590d 100644 --- a/retrofit-converters/wire/src/test/java/retrofit2/converter/wire/WireConverterFactoryTest.java +++ b/retrofit-converters/wire/src/test/java/retrofit2/converter/wire/WireConverterFactoryTest.java @@ -69,11 +69,11 @@ interface Service { public WireConverterFactoryTest(@TestParameter boolean streaming) { this.streaming = streaming; + WireConverterFactory factory = WireConverterFactory.create(); Retrofit retrofit = new Retrofit.Builder() .baseUrl(server.url("/")) - .addConverterFactory( - streaming ? WireConverterFactory.createStreaming() : WireConverterFactory.create()) + .addConverterFactory(streaming ? factory.withStreaming() : factory) .build(); service = retrofit.create(Service.class); }