Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.mongodb.client.vault.ClientEncryption;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.event.ConnectionReadyEvent;
import com.mongodb.event.TestServerMonitorListener;
import com.mongodb.internal.connection.ServerMonitoringModeUtil;
import com.mongodb.internal.connection.TestClusterListener;
Expand All @@ -64,6 +65,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -82,6 +84,7 @@
import static com.mongodb.client.unified.UnifiedCrudHelper.asReadPreference;
import static com.mongodb.client.unified.UnifiedCrudHelper.asWriteConcern;
import static com.mongodb.internal.connection.AbstractConnectionPoolTest.waitForPoolAsyncWorkManagerStart;
import static java.lang.String.format;
import static java.lang.System.getenv;
import static java.util.Arrays.asList;
import static org.junit.Assume.assumeTrue;
Expand All @@ -90,7 +93,8 @@ public final class Entities {
private static final Set<String> SUPPORTED_CLIENT_ENTITY_OPTIONS = new HashSet<>(
asList(
"id", "autoEncryptOpts", "uriOptions", "serverApi", "useMultipleMongoses", "storeEventsAsEntities",
"observeEvents", "observeLogMessages", "observeSensitiveCommands", "ignoreCommandMonitoringEvents"));
"observeEvents", "observeLogMessages", "observeSensitiveCommands", "ignoreCommandMonitoringEvents",
"awaitMinPoolSizeMS"));
private final Set<String> entityNames = new HashSet<>();
private final Map<String, ExecutorService> threads = new HashMap<>();
private final Map<String, ArrayList<Future<?>>> tasks = new HashMap<>();
Expand Down Expand Up @@ -306,6 +310,7 @@ private void initClient(final BsonDocument entity, final String id,
throw new UnsupportedOperationException("Client entity contains unsupported options: " + entity.keySet()
+ ". Supported options are " + SUPPORTED_CLIENT_ENTITY_OPTIONS);
}
boolean waitForMinPoolSizeToPopulate = isWaitForMinPoolSizeToPopulate(entity);
MongoClientSettings.Builder clientSettingsBuilder;
if (entity.getBoolean("useMultipleMongoses", BsonBoolean.FALSE).getValue() && (isSharded() || isLoadBalanced())) {
assumeTrue("Multiple mongos connection string not available for sharded cluster",
Expand All @@ -331,6 +336,9 @@ private void initClient(final BsonDocument entity, final String id,
if (entity.containsKey("observeEvents")) {
List<String> observeEvents = entity.getArray("observeEvents").stream()
.map(type -> type.asString().getValue()).collect(Collectors.toList());
if (waitForMinPoolSizeToPopulate) {
observeEvents.add("connectionReadyEvent");
}
List<String> ignoreCommandMonitoringEvents = entity
.getArray("ignoreCommandMonitoringEvents", new BsonArray()).stream()
.map(type -> type.asString().getValue()).collect(Collectors.toList());
Expand All @@ -341,7 +349,6 @@ private void initClient(final BsonDocument entity, final String id,
null);
clientSettingsBuilder.addCommandListener(testCommandListener);
putEntity(id + "-command-listener", testCommandListener, clientCommandListeners);

TestConnectionPoolListener testConnectionPoolListener = new TestConnectionPoolListener(observeEvents);
clientSettingsBuilder.applyToConnectionPoolSettings(builder ->
builder.addConnectionPoolListener(testConnectionPoolListener));
Expand Down Expand Up @@ -583,6 +590,35 @@ private void initClient(final BsonDocument entity, final String id,
if (waitForPoolAsyncWorkManagerStart) {
waitForPoolAsyncWorkManagerStart();
}
if (waitForMinPoolSizeToPopulate) {
waitForMinPoolSizeToPopulate(entity, id, clientSettings);
}
}

private void waitForMinPoolSizeToPopulate(final BsonDocument entity, final String id, final MongoClientSettings clientSettings) {
int minSize = clientSettings.getConnectionPoolSettings().getMinSize();
int awaitMinPoolSizeMS = entity.getInt32("awaitMinPoolSizeMS").getValue();
TestConnectionPoolListener testConnectionPoolListener = getConnectionPoolListener(id);
try {
/*
From the spec:
Any CMAP and SDAM event/log listeners configured on the client SHOULD ignore any events that occur before the pool is populated.

Currently, no flaky or racy behavior is caused by not clearing pre-pool-population CMAP and SDAM events.
If any race behavior is observed, consider clearing events.
*/
testConnectionPoolListener.waitForEvent(ConnectionReadyEvent.class, minSize, awaitMinPoolSizeMS, TimeUnit.MILLISECONDS);
} catch (TimeoutException | InterruptedException e) {
throw new RuntimeException(format("Error waiting for awaitMinPoolSizeMS [%s] to establish minPoolSize [%s] connections",
awaitMinPoolSizeMS, minSize));
}
}

private static boolean isWaitForMinPoolSizeToPopulate(final BsonDocument clientEntity) {
int minPoolSize = clientEntity.getDocument("uriOptions", new BsonDocument())
.get("minPoolSize", new BsonInt32(0))
.asInt32().getValue();
return minPoolSize != 0 && clientEntity.containsKey("awaitMinPoolSizeMS");
}

private static LogMessage.Component toComponent(final Map.Entry<String, BsonValue> entry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public abstract class UnifiedTest {
private static final Set<String> PRESTART_POOL_ASYNC_WORK_MANAGER_FILE_DESCRIPTIONS = Collections.singleton(
"wait queue timeout errors include details about checked out connections");

private static final String MAX_SUPPORTED_SCHEMA_VERSION = "1.25";
private static final String MAX_SUPPORTED_SCHEMA_VERSION = "1.26";
private static final List<Integer> MAX_SUPPORTED_SCHEMA_VERSION_COMPONENTS = Arrays.stream(MAX_SUPPORTED_SCHEMA_VERSION.split("\\."))
.map(Integer::parseInt)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,18 @@ public static void applyCustomizations(final TestDef def) {
.file("client-side-encryption/tests/unified", "client bulkWrite with queryable encryption");

// client-side-operation-timeout (CSOT)

/*
As to the background connection pooling section:
timeoutMS set at the MongoClient level MUST be used as the timeout for all commands sent as part of the handshake.
We first configure a failpoint to block all hello/isMaster commands for 50 ms, then set timeoutMS = 10 ms on MongoClient
and wait for awaitMinPoolSize = 1000. So that means the background thread tries to populate connections under a 10ms timeout
cap while the failpoint blocks for 50ms, so all attempts effectively fail.
*/
def.skipAccordingToSpec("background connection pooling section")
.test("client-side-operations-timeout", "timeoutMS behaves correctly during command execution",
"short-circuit is not enabled with only 1 RTT measurement")
.test("client-side-operations-timeout", "timeoutMS behaves correctly during command execution",
"command is not sent if RTT is greater than timeoutMS");
def.skipNoncompliantReactive("No good way to fulfill tryNext() requirement with a Publisher<T>")
.test("client-side-operations-timeout", "timeoutMS behaves correctly for tailable awaitData cursors",
"apply remaining timeoutMS if less than maxAwaitTimeMS");
Expand Down