diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java index 6f6e5bb66c..2e6c5f0b54 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/Entities.java @@ -38,6 +38,7 @@ import com.mongodb.client.vault.ClientEncryption; import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ClusterDescription; +import com.mongodb.event.ConnectionReadyEvent; import com.mongodb.event.TestServerMonitorListener; import com.mongodb.internal.connection.ServerMonitoringModeUtil; import com.mongodb.internal.connection.TestClusterListener; @@ -64,6 +65,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -82,6 +84,7 @@ import static com.mongodb.client.unified.UnifiedCrudHelper.asReadPreference; import static com.mongodb.client.unified.UnifiedCrudHelper.asWriteConcern; import static com.mongodb.internal.connection.AbstractConnectionPoolTest.waitForPoolAsyncWorkManagerStart; +import static java.lang.String.format; import static java.lang.System.getenv; import static java.util.Arrays.asList; import static org.junit.Assume.assumeTrue; @@ -90,7 +93,8 @@ public final class Entities { private static final Set SUPPORTED_CLIENT_ENTITY_OPTIONS = new HashSet<>( asList( "id", "autoEncryptOpts", "uriOptions", "serverApi", "useMultipleMongoses", "storeEventsAsEntities", - "observeEvents", "observeLogMessages", "observeSensitiveCommands", "ignoreCommandMonitoringEvents")); + "observeEvents", "observeLogMessages", "observeSensitiveCommands", "ignoreCommandMonitoringEvents", + "awaitMinPoolSizeMS")); private final Set entityNames = new HashSet<>(); private final Map threads = new HashMap<>(); private final Map>> tasks = new HashMap<>(); @@ -306,6 +310,7 @@ private void initClient(final BsonDocument entity, final String id, throw new UnsupportedOperationException("Client entity contains unsupported options: " + entity.keySet() + ". Supported options are " + SUPPORTED_CLIENT_ENTITY_OPTIONS); } + boolean waitForMinPoolSizeToPopulate = isWaitForMinPoolSizeToPopulate(entity); MongoClientSettings.Builder clientSettingsBuilder; if (entity.getBoolean("useMultipleMongoses", BsonBoolean.FALSE).getValue() && (isSharded() || isLoadBalanced())) { assumeTrue("Multiple mongos connection string not available for sharded cluster", @@ -331,6 +336,9 @@ private void initClient(final BsonDocument entity, final String id, if (entity.containsKey("observeEvents")) { List observeEvents = entity.getArray("observeEvents").stream() .map(type -> type.asString().getValue()).collect(Collectors.toList()); + if (waitForMinPoolSizeToPopulate) { + observeEvents.add("connectionReadyEvent"); + } List ignoreCommandMonitoringEvents = entity .getArray("ignoreCommandMonitoringEvents", new BsonArray()).stream() .map(type -> type.asString().getValue()).collect(Collectors.toList()); @@ -341,7 +349,6 @@ private void initClient(final BsonDocument entity, final String id, null); clientSettingsBuilder.addCommandListener(testCommandListener); putEntity(id + "-command-listener", testCommandListener, clientCommandListeners); - TestConnectionPoolListener testConnectionPoolListener = new TestConnectionPoolListener(observeEvents); clientSettingsBuilder.applyToConnectionPoolSettings(builder -> builder.addConnectionPoolListener(testConnectionPoolListener)); @@ -583,6 +590,35 @@ private void initClient(final BsonDocument entity, final String id, if (waitForPoolAsyncWorkManagerStart) { waitForPoolAsyncWorkManagerStart(); } + if (waitForMinPoolSizeToPopulate) { + waitForMinPoolSizeToPopulate(entity, id, clientSettings); + } + } + + private void waitForMinPoolSizeToPopulate(final BsonDocument entity, final String id, final MongoClientSettings clientSettings) { + int minSize = clientSettings.getConnectionPoolSettings().getMinSize(); + int awaitMinPoolSizeMS = entity.getInt32("awaitMinPoolSizeMS").getValue(); + TestConnectionPoolListener testConnectionPoolListener = getConnectionPoolListener(id); + try { + /* + From the spec: + Any CMAP and SDAM event/log listeners configured on the client SHOULD ignore any events that occur before the pool is populated. + + Currently, no flaky or racy behavior is caused by not clearing pre-pool-population CMAP and SDAM events. + If any race behavior is observed, consider clearing events. + */ + testConnectionPoolListener.waitForEvent(ConnectionReadyEvent.class, minSize, awaitMinPoolSizeMS, TimeUnit.MILLISECONDS); + } catch (TimeoutException | InterruptedException e) { + throw new RuntimeException(format("Error waiting for awaitMinPoolSizeMS [%s] to establish minPoolSize [%s] connections", + awaitMinPoolSizeMS, minSize)); + } + } + + private static boolean isWaitForMinPoolSizeToPopulate(final BsonDocument clientEntity) { + int minPoolSize = clientEntity.getDocument("uriOptions", new BsonDocument()) + .get("minPoolSize", new BsonInt32(0)) + .asInt32().getValue(); + return minPoolSize != 0 && clientEntity.containsKey("awaitMinPoolSizeMS"); } private static LogMessage.Component toComponent(final Map.Entry entry) { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 79b2a9c9da..62d641d12d 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -109,7 +109,7 @@ public abstract class UnifiedTest { private static final Set PRESTART_POOL_ASYNC_WORK_MANAGER_FILE_DESCRIPTIONS = Collections.singleton( "wait queue timeout errors include details about checked out connections"); - private static final String MAX_SUPPORTED_SCHEMA_VERSION = "1.25"; + private static final String MAX_SUPPORTED_SCHEMA_VERSION = "1.26"; private static final List MAX_SUPPORTED_SCHEMA_VERSION_COMPONENTS = Arrays.stream(MAX_SUPPORTED_SCHEMA_VERSION.split("\\.")) .map(Integer::parseInt) .collect(Collectors.toList()); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 327cc3f3da..d1408ac29a 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -60,7 +60,18 @@ public static void applyCustomizations(final TestDef def) { .file("client-side-encryption/tests/unified", "client bulkWrite with queryable encryption"); // client-side-operation-timeout (CSOT) - + /* + As to the background connection pooling section: + timeoutMS set at the MongoClient level MUST be used as the timeout for all commands sent as part of the handshake. + We first configure a failpoint to block all hello/isMaster commands for 50 ms, then set timeoutMS = 10 ms on MongoClient + and wait for awaitMinPoolSize = 1000. So that means the background thread tries to populate connections under a 10ms timeout + cap while the failpoint blocks for 50ms, so all attempts effectively fail. + */ + def.skipAccordingToSpec("background connection pooling section") + .test("client-side-operations-timeout", "timeoutMS behaves correctly during command execution", + "short-circuit is not enabled with only 1 RTT measurement") + .test("client-side-operations-timeout", "timeoutMS behaves correctly during command execution", + "command is not sent if RTT is greater than timeoutMS"); def.skipNoncompliantReactive("No good way to fulfill tryNext() requirement with a Publisher") .test("client-side-operations-timeout", "timeoutMS behaves correctly for tailable awaitData cursors", "apply remaining timeoutMS if less than maxAwaitTimeMS");