Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add flatmapConcat with parallelism support #1702

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package org.apache.pekko.stream
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory
Expand Down Expand Up @@ -76,6 +76,16 @@ class FlatMapConcatBenchmark {
awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def sourceDotSingleP1(): Unit = {
val latch = new CountDownLatch(1)

testSource.flatMapConcat(1, Source.single).runWith(new LatchSink(OperationsPerInvocation, latch))

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def internalSingleSource(): Unit = {
Expand All @@ -88,6 +98,18 @@ class FlatMapConcatBenchmark {
awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def internalSingleSourceP1(): Unit = {
val latch = new CountDownLatch(1)

testSource
.flatMapConcat(1, elem => new GraphStages.SingleSource(elem))
.runWith(new LatchSink(OperationsPerInvocation, latch))

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def oneElementList(): Unit = {
Expand All @@ -98,6 +120,64 @@ class FlatMapConcatBenchmark {
awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def oneElementListP1(): Unit = {
val latch = new CountDownLatch(1)

testSource.flatMapConcat(1, n => Source(n :: Nil)).runWith(new LatchSink(OperationsPerInvocation, latch))

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def completedFuture(): Unit = {
val latch = new CountDownLatch(1)

testSource
.flatMapConcat(n => Source.future(Future.successful(n)))
.runWith(new LatchSink(OperationsPerInvocation, latch))

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def completedFutureP1(): Unit = {
val latch = new CountDownLatch(1)

testSource
.flatMapConcat(1, n => Source.future(Future.successful(n)))
.runWith(new LatchSink(OperationsPerInvocation, latch))

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def normalFuture(): Unit = {
val latch = new CountDownLatch(1)

testSource
.flatMapConcat(n => Source.future(Future(n)(system.dispatcher)))
.runWith(new LatchSink(OperationsPerInvocation, latch))

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def normalFutureP1(): Unit = {
val latch = new CountDownLatch(1)

testSource
.flatMapConcat(1, n => Source.future(Future(n)(system.dispatcher)))
.runWith(new LatchSink(OperationsPerInvocation, latch))

awaitLatch(latch)
}

@Benchmark
@OperationsPerInvocation(OperationsPerInvocation)
def mapBaseline(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ import org.apache.pekko
import pekko.NotUsed
import pekko.stream._
import pekko.stream.impl.TraversalTestUtils._
import pekko.stream.scaladsl.Keep
import pekko.stream.impl.fusing.IterableSource
import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
import pekko.stream.scaladsl.{ Keep, Source }
import pekko.util.OptionVal
import pekko.testkit.PekkoSpec

import scala.concurrent.Future

class TraversalBuilderSpec extends PekkoSpec {

"CompositeTraversalBuilder" must {
Expand Down Expand Up @@ -447,4 +452,93 @@ class TraversalBuilderSpec extends PekkoSpec {
}
}

"find Source.single via TraversalBuilder" in {
TraversalBuilder.getSingleSource(Source.single("a")).get.elem should ===("a")
TraversalBuilder.getSingleSource(Source(List("a", "b"))) should be(OptionVal.None)

val singleSourceA = new SingleSource("a")
TraversalBuilder.getSingleSource(singleSourceA) should be(OptionVal.Some(singleSourceA))

TraversalBuilder.getSingleSource(Source.single("c").async) should be(OptionVal.None)
TraversalBuilder.getSingleSource(Source.single("d").mapMaterializedValue(_ => "Mat")) should be(OptionVal.None)
}

"find Source.single via TraversalBuilder with getValuePresentedSource" in {
TraversalBuilder.getValuePresentedSource(Source.single("a")).get.asInstanceOf[SingleSource[String]].elem should ===(
"a")
val singleSourceA = new SingleSource("a")
TraversalBuilder.getValuePresentedSource(singleSourceA) should be(OptionVal.Some(singleSourceA))

TraversalBuilder.getValuePresentedSource(Source.single("c").async) should be(OptionVal.None)
TraversalBuilder.getValuePresentedSource(Source.single("d").mapMaterializedValue(_ => "Mat")) should be(
OptionVal.None)
}

"find Source.empty via TraversalBuilder with getValuePresentedSource" in {
val emptySource = EmptySource
TraversalBuilder.getValuePresentedSource(emptySource) should be(OptionVal.Some(emptySource))

TraversalBuilder.getValuePresentedSource(Source.empty.async) should be(OptionVal.None)
TraversalBuilder.getValuePresentedSource(Source.empty.mapMaterializedValue(_ => "Mat")) should be(OptionVal.None)
}

"find javadsl Source.empty via TraversalBuilder with getValuePresentedSource" in {
import pekko.stream.javadsl.Source
val emptySource = Source.empty()
TraversalBuilder.getValuePresentedSource(Source.empty()) should be(OptionVal.Some(emptySource))

TraversalBuilder.getValuePresentedSource(Source.empty().async) should be(OptionVal.None)
TraversalBuilder.getValuePresentedSource(Source.empty().mapMaterializedValue(_ => "Mat")) should be(OptionVal.None)
}

"find Source.future via TraversalBuilder with getValuePresentedSource" in {
val future = Future.successful("a")
TraversalBuilder.getValuePresentedSource(Source.future(future)).get.asInstanceOf[FutureSource[String]].future should ===(
future)
val futureSourceA = new FutureSource(future)
TraversalBuilder.getValuePresentedSource(futureSourceA) should be(OptionVal.Some(futureSourceA))

TraversalBuilder.getValuePresentedSource(Source.future(future).async) should be(OptionVal.None)
TraversalBuilder.getValuePresentedSource(Source.future(future).mapMaterializedValue(_ => "Mat")) should be(
OptionVal.None)
}

"find Source.iterable via TraversalBuilder with getValuePresentedSource" in {
val iterable = List("a")
TraversalBuilder.getValuePresentedSource(Source(iterable)).get.asInstanceOf[IterableSource[String]].elements should ===(
iterable)
val iterableSource = new IterableSource(iterable)
TraversalBuilder.getValuePresentedSource(iterableSource) should be(OptionVal.Some(iterableSource))

TraversalBuilder.getValuePresentedSource(Source(iterable).async) should be(OptionVal.None)
TraversalBuilder.getValuePresentedSource(Source(iterable).mapMaterializedValue(_ => "Mat")) should be(
OptionVal.None)
}

"find Source.javaStreamSource via TraversalBuilder with getValuePresentedSource" in {
val javaStream = java.util.stream.Stream.empty[String]()
TraversalBuilder.getValuePresentedSource(Source.fromJavaStream(() => javaStream)).get
.asInstanceOf[JavaStreamSource[String, _]].open() shouldEqual javaStream
val streamSource = new JavaStreamSource(() => javaStream)
TraversalBuilder.getValuePresentedSource(streamSource) should be(OptionVal.Some(streamSource))

TraversalBuilder.getValuePresentedSource(Source.fromJavaStream(() => javaStream).async) should be(OptionVal.None)
TraversalBuilder.getValuePresentedSource(
Source.fromJavaStream(() => javaStream).mapMaterializedValue(_ => "Mat")) should be(
OptionVal.None)
}

"find Source.failed via TraversalBuilder with getValuePresentedSource" in {
val failure = new RuntimeException("failure")
TraversalBuilder.getValuePresentedSource(Source.failed(failure)).get.asInstanceOf[FailedSource[String]]
.failure should ===(
failure)
val failedSourceA = new FailedSource(failure)
TraversalBuilder.getValuePresentedSource(failedSourceA) should be(OptionVal.Some(failedSourceA))

TraversalBuilder.getValuePresentedSource(Source.failed(failure).async) should be(OptionVal.None)
TraversalBuilder.getValuePresentedSource(Source.failed(failure).mapMaterializedValue(_ => "Mat")) should be(
OptionVal.None)
}

}
Loading
Loading