Skip to content

Commit

Permalink
Enh 37387064 - [37381796->24.09.1] Topics: general refactoring and ha…
Browse files Browse the repository at this point in the history
…rdening

(merge ce/main -> 24.09 113632)

[git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v24.09/": change = 113633]
  • Loading branch information
thegridman committed Jan 18, 2025
1 parent e948341 commit e856cf5
Showing 1 changed file with 7 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,15 @@
import com.tangosol.io.pof.PofWriter;
import com.tangosol.io.pof.PortableObject;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.Cluster;
import com.tangosol.net.Coherence;
import com.tangosol.net.NamedCache;
import com.tangosol.net.Session;
import com.tangosol.net.TopicService;
import com.tangosol.net.topic.NamedTopic;
import com.tangosol.net.topic.Publisher;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.util.ExternalizableHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
Expand Down Expand Up @@ -357,9 +353,11 @@ optComplete, withIdentifyingName(sName)))
// shutdown the storage members
restartCluster();

IsServiceRunning isRunning = new IsServiceRunning(sServiceName);
IsServiceRunning isRunning = new IsServiceRunning(sServiceName);
IsCoherenceRunning isCohRunning = new IsCoherenceRunning();
for (CoherenceClusterMember m : s_storageCluster)
{
Eventually.assertDeferred(() -> m.invoke(isCohRunning), is(true));
Eventually.assertDeferred(() -> m.invoke(isRunning), is(true));
}
Logger.info(">>>> Restarted service " + sServiceName + " on all members");
Expand Down Expand Up @@ -388,7 +386,7 @@ optComplete, withIdentifyingName(sName)))
System.err.println(mapPublished.get(message) + " " + mapReceived.get(message));
}
}
assertThat(count, is(cPublished.get()));
assertThat(count, greaterThanOrEqualTo(cPublished.get()));
}
}

Expand All @@ -398,8 +396,6 @@ public void shouldRecoverWaitingSubscriberAfterCleanStorageRestart() throws Exce
{
NamedTopic<Message> topic = ensureTopic("test-three");
String sGroup = "group-one";
TopicService service = topic.getTopicService();
Cluster cluster = service.getCluster();
String sServiceName = s_storageCluster.getAny().invoke(new GetTopicServiceName(topic.getName()));

// create a subscriber group so that published messages are not lost before the subscriber subscribes
Expand Down Expand Up @@ -433,13 +429,11 @@ public void shouldRecoverWaitingSubscriberAfterCleanStorageRestart() throws Exce
// shutdown the storage members
restartCluster();

// we should eventually have three cluster members
Eventually.assertDeferred(() -> cluster.getMemberSet().size(), is(3));
Logger.info(">>>> Restarted storage.");

IsServiceRunning isRunning = new IsServiceRunning(sServiceName);
IsServiceRunning isRunning = new IsServiceRunning(sServiceName);
IsCoherenceRunning isCohRunning = new IsCoherenceRunning();
for (CoherenceClusterMember m : s_storageCluster)
{
Eventually.assertDeferred(() -> m.invoke(isCohRunning), is(true));
Eventually.assertDeferred(() -> m.invoke(isRunning), is(true));
}
Logger.info(">>>> Restarted service " + sServiceName + " on all members");
Expand Down

0 comments on commit e856cf5

Please sign in to comment.