From 38facf752b4e7d5fb43ed7c671eb91888b41bd10 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 2 Jan 2025 14:04:33 +0100 Subject: [PATCH] reimplement fix for akka/pekko cluster (#1594) (#1664) * Revert "revert #1568 due to test failures (#1587)" This reverts commit 7af03e5215dc69df9702803dba7f22d94b49ead4. * temp run nightly test in this PR * no need for square brackets because the set print adds them * logging to find issue * support tcp protocols * Update ClusterDaemon.scala * remove temp logging * try to fix issue in Remoting * extra tests * more tests * ignore udp tests * try to make tests tidy up after failures * Update MixedProtocolClusterSpec.scala * Update MixedProtocolClusterSpec.scala * run main cluster tests for PR --- .github/workflows/build-test-prValidation.yml | 47 +++++ .../apache/pekko/cluster/ClusterDaemon.scala | 29 +-- .../cluster/MixedProtocolClusterSpec.scala | 192 ++++++++++++++++++ .../org/apache/pekko/remote/Remoting.scala | 22 +- 4 files changed, 275 insertions(+), 15 deletions(-) create mode 100644 cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala diff --git a/.github/workflows/build-test-prValidation.yml b/.github/workflows/build-test-prValidation.yml index 1138fe1eecb..f3d9a18c183 100644 --- a/.github/workflows/build-test-prValidation.yml +++ b/.github/workflows/build-test-prValidation.yml @@ -101,6 +101,53 @@ jobs: -Dpekko.log.timestamps=true \ validatePullRequest + pekko-classic-remoting-tests: + name: Pekko Classic Remoting Tests + runs-on: ubuntu-22.04 + if: github.repository == 'apache/pekko' + strategy: + fail-fast: false + matrix: + command: + - cluster/test distributed-data/test cluster-tools/test cluster-metrics/test + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + # we don't know what commit the last tag was it's safer to get entire repo so previousStableVersion resolves + fetch-depth: 0 + fetch-tags: true + + - name: Setup Java 11 + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: 11 + + - name: Install sbt + uses: sbt/setup-sbt@v1 + + - name: Cache Coursier cache + uses: coursier/cache-action@v6 + + - name: Enable jvm-opts + run: cp .jvmopts-ci .jvmopts + + - name: sbt ${{ matrix.command }} + env: + DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }} + # note that this is not running any multi-jvm tests because multi-in-test=false + run: |- + sbt \ + -Djava.security.egd=file:/dev/./urandom \ + -Dpekko.remote.artery.enabled=off \ + -Dpekko.test.timefactor=2 \ + -Dpekko.actor.testkit.typed.timefactor=2 \ + -Dpekko.test.tags.exclude=gh-exclude,timing \ + -Dpekko.test.multi-in-test=false \ + -Dpekko.cluster.assert=on \ + clean ${{ matrix.command }} + jdk-21-extra-tests: name: Java 21 Extra Tests (including all tests that need Java 9+) runs-on: ubuntu-22.04 diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index 34320067be1..479d58e97bb 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -13,13 +13,13 @@ package org.apache.pekko.cluster +import scala.annotation.nowarn import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.control.NonFatal -import scala.annotation.nowarn import com.typesafe.config.Config import org.apache.pekko @@ -30,13 +30,11 @@ import pekko.annotation.InternalApi import pekko.cluster.ClusterEvent._ import pekko.cluster.MemberStatus._ import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } -import pekko.event.ActorWithLogClass -import pekko.event.Logging +import pekko.event.{ ActorWithLogClass, Logging } import pekko.pattern.ask -import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent } +import pekko.remote.{ QuarantinedEvent => ClassicQuarantinedEvent, RemoteSettings } import pekko.remote.artery.QuarantinedEvent -import pekko.util.Timeout -import pekko.util.Version +import pekko.util.{ Timeout, Version } /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -365,6 +363,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val statsEnabled = PublishStatsInterval.isFinite var gossipStats = GossipStats() + val acceptedProtocols: Set[String] = { + val remoteSettings: RemoteSettings = new RemoteSettings(context.system.settings.config) + val initSet = remoteSettings.AcceptProtocolNames + val tcpSet = initSet.map(protocol => s"$protocol.tcp") + initSet ++ tcpSet + } + var seedNodes = SeedNodes var seedNodeProcess: Option[ActorRef] = None var seedNodeProcessCounter = 0 // for unique names @@ -701,10 +706,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh * which will reply with a `Welcome` message. */ def join(address: Address): Unit = { - if (address.protocol != selfAddress.protocol) + if (!acceptedProtocols.contains(address.protocol)) logWarning( - "Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]", - selfAddress.protocol, + "Trying to join member with wrong protocol, but was ignored, expected any of {} but was [{}]", + acceptedProtocols, address.protocol) else if (address.system != selfAddress.system) logWarning( @@ -750,10 +755,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = { if (!preparingForShutdown) { val selfStatus = latestGossip.member(selfUniqueAddress).status - if (joiningNode.address.protocol != selfAddress.protocol) + if (!acceptedProtocols.contains(joiningNode.address.protocol)) logWarning( - "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", - selfAddress.protocol, + "Member with wrong protocol tried to join, but was ignored, expected any of {} but was [{}]", + acceptedProtocols, joiningNode.address.protocol) else if (joiningNode.address.system != selfAddress.system) logWarning( diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala new file mode 100644 index 00000000000..f3c8c73adb9 --- /dev/null +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.cluster + +import com.typesafe.config.{ Config, ConfigFactory } + +import org.apache.pekko.testkit.{ LongRunningTest, PekkoSpec } + +object MixedProtocolClusterSpec { + + val baseConfig: Config = + ConfigFactory.parseString(""" + pekko.actor.provider = "cluster" + pekko.coordinated-shutdown.terminate-actor-system = on + + pekko.remote.artery.canonical.port = 0 + pekko.remote.classic.netty.tcp.port = 0 + pekko.remote.artery.advanced.aeron.idle-cpu-level = 3 + pekko.remote.accept-protocol-names = ["pekko", "akka"] + + pekko.cluster.jmx.multi-mbeans-in-same-jvm = on + pekko.cluster.configuration-compatibility-check.enforce-on-join = off + """) + + val configWithUdp: Config = + ConfigFactory.parseString(""" + pekko.remote.artery.transport = "aeron-udp" + """).withFallback(baseConfig) + + val configWithPekkoUdp: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "pekko" + """).withFallback(configWithUdp) + + val configWithAkkaUdp: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "akka" + """).withFallback(configWithUdp) + + val configWithPekkoTcp: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "pekko" + """).withFallback(baseConfig) + + val configWithAkkaTcp: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "akka" + """).withFallback(baseConfig) + + val configWithNetty: Config = + ConfigFactory.parseString(""" + pekko.remote.artery.enabled = false + pekko.remote.classic { + enabled-transports = ["pekko.remote.classic.netty.tcp"] + } + """).withFallback(baseConfig) + + val configWithPekkoNetty: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "pekko" + """).withFallback(configWithNetty) + + val configWithAkkaNetty: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "akka" + """).withFallback(configWithNetty) +} + +class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { + + import MixedProtocolClusterSpec._ + + "A node using the akka protocol" must { + + "be allowed to join a cluster with a node using the pekko protocol (udp)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + try { + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekkoUdp) + + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaUdp) + clusterTestUtil.formCluster() + + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + + "be allowed to join a cluster with a node using the pekko protocol (tcp)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + try { + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekkoTcp) + + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaTcp) + clusterTestUtil.formCluster() + + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + + "be allowed to join a cluster with a node using the pekko protocol (netty)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + try { + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekkoNetty) + + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkkaNetty) + clusterTestUtil.formCluster() + + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + + "allow a node using the pekko protocol to join the cluster (udp)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + try { + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkkaUdp) + + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoUdp) + clusterTestUtil.formCluster() + + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + + "allow a node using the pekko protocol to join the cluster (tcp)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + try { + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkkaTcp) + + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoTcp) + clusterTestUtil.formCluster() + + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + + "allow a node using the pekko protocol to join the cluster (netty)" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + try { + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkkaNetty) + + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekkoNetty) + clusterTestUtil.formCluster() + + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + } +} diff --git a/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala b/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala index 85812471143..16811bc837a 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/Remoting.scala @@ -225,11 +225,12 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc Await.result(addressesPromise.future, StartupTimeout.duration) if (transports.isEmpty) throw new RemoteTransportException("No transport drivers were loaded.", null) - transportMapping = transports + val mapping = transports .groupBy { case (transport, _) => transport.schemeIdentifier } .map { case (k, v) => k -> v.toSet } + transportMapping = addProtocolsToMap(mapping) defaultAddress = transports.head._2 addresses = transports.map { _._2 }.toSet @@ -296,6 +297,21 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc } } } + + private def addProtocolsToMap( + map: Map[String, Set[(PekkoProtocolTransport, Address)]]): Map[String, Set[(PekkoProtocolTransport, Address)]] = { + if (AcceptProtocolNames.size > 1) { + map.flatMap { case (protocol, transports) => + val tcpProtocol = protocol.endsWith(".tcp") + AcceptProtocolNames.map { newProtocol => + if (tcpProtocol) + s"$newProtocol.tcp" -> transports + else + newProtocol -> transports + } + } + } else map + } } /** @@ -567,7 +583,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) } OneForOneStrategy(loggingEnabled = false) { - case InvalidAssociation(localAddress, remoteAddress, reason, disassiciationInfo) => + case InvalidAssociation(localAddress, remoteAddress, reason, disassociationInfo) => keepQuarantinedOr(remoteAddress) { val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]" log.warning( @@ -580,7 +596,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) causedBy) endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) } - disassiciationInfo.foreach { + disassociationInfo.foreach { case AssociationHandle.Quarantined => context.system.eventStream.publish(ThisActorSystemQuarantinedEvent(localAddress, remoteAddress)) case _ => // do nothing