-
Notifications
You must be signed in to change notification settings - Fork 117
Refactor RoutingManager #741
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
* request object. Default implementation comes here. | ||
*/ | ||
public abstract class RoutingManager | ||
public interface RoutingManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have java doc for this and method as this becomes our primary interface for all RoutingManager implementations. ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a good idea to have an interface to enforce the rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added javadoc
CacheBuilder.newBuilder() | ||
.maximumSize(10000) | ||
.expireAfterAccess(30, TimeUnit.MINUTES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can extract this into separate builder and reuse 3 time while building cache.
for example
private final CacheBuilder<Object, Object> builder = CacheBuilder.newBuilder()
.maximumSize(10000)
.expireAfterAccess(30, TimeUnit.MINUTES);
queryIdBackendCache = builder.build(
new CacheLoader<>()
{
@Override
public String load(String queryId)
{
return findBackendForUnknownQueryId(queryId);
}
});
queryIdRoutingGroupCache = builder.build(
new CacheLoader<>()
{
@Override
public String load(String queryId)
{
return findRoutingGroupForUnknownQueryId(queryId);
}
});
queryIdExternalUrlCache = builder.build(
new CacheLoader<>()
{
@Override
public String load(String queryId)
{
return findExternalUrlForUnknownQueryId(queryId);
}
});
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
TrinoStatus status = backendToStatus.get(backendId); | ||
if (status == null) { | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TrinoStatus status = backendToStatus.get(backendId); | |
if (status == null) { | |
return true; | |
} | |
TrinoStatus status = backendToStatus.getOrDefault(backendId, TrinoStatus.UNKNOWN); | |
return status != TrinoStatus.HEALTHY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public ProxyBackendConfiguration provideDefaultBackendConfiguration(String user) | ||
{ | ||
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveDefaultBackends(); | ||
backends.removeIf(backend -> isBackendNotHealthy(backend.getName())); | ||
return selectBackend(backends, user).orElseThrow(() -> new IllegalStateException("Number of active backends found zero")); | ||
} | ||
|
||
/** | ||
* Performs routing to a given cluster group. This falls back to a default backend, if no scheduled | ||
* backend is found. | ||
*/ | ||
@Override | ||
public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user) | ||
{ | ||
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveBackends(routingGroup); | ||
backends.removeIf(backend -> isBackendNotHealthy(backend.getName())); | ||
return selectBackend(backends, user).orElseGet(() -> provideDefaultBackendConfiguration(user)); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are removing the unhealthy backends from candidate list. You can use lambda to filter out unwanted entries. It will simplify the predicate function that identifies healthy backends.
For example:
public ProxyBackendConfiguration provideDefaultBackendConfiguration(String user) | |
{ | |
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveDefaultBackends(); | |
backends.removeIf(backend -> isBackendNotHealthy(backend.getName())); | |
return selectBackend(backends, user).orElseThrow(() -> new IllegalStateException("Number of active backends found zero")); | |
} | |
/** | |
* Performs routing to a given cluster group. This falls back to a default backend, if no scheduled | |
* backend is found. | |
*/ | |
@Override | |
public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user) | |
{ | |
List<ProxyBackendConfiguration> backends = gatewayBackendManager.getActiveBackends(routingGroup); | |
backends.removeIf(backend -> isBackendNotHealthy(backend.getName())); | |
return selectBackend(backends, user).orElseGet(() -> provideDefaultBackendConfiguration(user)); | |
} | |
public ProxyBackendConfiguration provideDefaultBackendConfiguration(String user) { | |
var backends = gatewayBackendManager.getActiveDefaultBackends() | |
.stream() | |
.filter(backend -> isBackendHealthy(backend.getName())) | |
.toList(); | |
return selectBackend(backends, user).orElseThrow(() -> new IllegalStateException("Number of active backends found zero")); | |
} | |
@Override | |
public ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user) { | |
var backends = gatewayBackendManager.getActiveBackends(routingGroup) | |
.stream() | |
.filter(backend -> isBackendHealthy(backend.getName())) | |
.toList(); | |
return selectBackend(backends, user).orElseGet(() -> provideDefaultBackendConfiguration(user)); | |
} | |
private boolean isBackendHealthy(String backendId) { | |
TrinoStatus status = backendToStatus.getOrDefault(backendId, TrinoStatus.UNKNOWN); | |
return status == TrinoStatus.HEALTHY; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* This class performs health check, stats counts for each backend and provides a backend given | ||
* request object. Default implementation comes here. | ||
*/ | ||
public abstract class BaseRoutingManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion IMO, for ease of reading you may want to group methods by visibility and order them.
1/ constructor
2/ abstract methods
3/ public method
4/ protected - package
5/ private
I see the abstract methods as an interface for subclasses so it is easier to discover them by future maintainers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
return routingGroup; | ||
} | ||
|
||
protected void updateBackEndHealth(List<ClusterStats> stats) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we delete this method ?
It get confusing as we have 3 methods that update the state in cache.
You move this method logic into the interface method public void updateClusterStats
. This would simplify and we avoid method overloading of interface method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if (entry.getValue().isDone()) { | ||
int responseCode = entry.getValue().get(); | ||
if (responseCode == 200) { | ||
log.info("Found query [%s] on backend [%s]", queryId, entry.getKey()); | ||
setBackendForQueryId(queryId, entry.getKey()); | ||
return entry.getKey(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q: Curious, Why did you add condition check for future.isDone()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of these code are existing - https://github.com/trinodb/trino-gateway/blob/main/gateway-ha/src/main/java/io/trino/gateway/ha/router/RoutingManager.java
I changed the file name from RoutingManager to BaseRoutingManager and created interface file with name RoutingManager. So git instead of marking it as file name change, assumed everything in the file as new changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we separate out this PR into 2 different ones.
(1) Just fix the bug and add the minimal interface needed, and the fixed tests.
(2) Rest of the refactoring
8853b9b
to
7cdbcbf
Compare
Tried splitting it into 2 commits but feels bit complicated as the refactoring kind of took care of the bug. |
I'm concerned that having a default implementation in the A better approach would be to move the default logic into its own class, say On a related note, could you detail what new runtime tests are being added to catch these kinds of integration failures? |
composition might makes things very complicated. Interface with an abstract base implementation is a common pattern which should be ok. For instance in Trino there is a TrinoCatalog interface with AbtsractTrinoCatalog implementation that has common methods implemented. The primary problem I see is the interface is bloated and can further be made lean. There should only be 4 methods -
Maintaining 3 separate cache and separately setting every cache makes no sense as they all point to same data. One cache with all data together should be enough and exposing just one method to get and set the backEnd configuration. This should make things less confusing to be overridden and implemented. If this new lean interface sounds good I can make the changes. |
70113c2
to
ed4c772
Compare
return externalUrl; | ||
} | ||
|
||
private static LoadingCache<String, String> buildCache(Function<String, String> loader) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this have to be a static
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
this is definitely a good improvement to our routing logic. Thanks for the java doc. |
gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java
Outdated
Show resolved
Hide resolved
gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java
Outdated
Show resolved
Hide resolved
gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java
Outdated
Show resolved
Hide resolved
@maswin Do you still have bandwidth to work on this? |
Sorry about the delay. Just addressed the review comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for making this change.
int backendId = Math.abs(RANDOM.nextInt()) % backends.size(); | ||
return backends.get(backendId); | ||
} | ||
void updateBackEndHealth(String backendId, TrinoStatus value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we rename this to updateClusterHealth
since we want to refer Trino clusters as "cluster" instead of "backend"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a number of places in which we use backend, including the UI code -
i.e,
ProxyBackendConfiguration
BackendStateManager
GatewayBackendManager
Isn't it better to change them all at once in a separate commit?
} | ||
return externalUrl; | ||
} | ||
void setBackendForQueryId(String queryId, String backend); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setClusterForQueryId
// Fallback on first active backend if queryId mapping not found. | ||
return gatewayBackendManager.getActiveBackends(defaultRoutingGroup).get(0).getProxyTo(); | ||
} | ||
String findBackendForQueryId(String queryId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
findClusterForQueryId
@sourcery-ai review |
Reviewer's GuideThis PR refactors the routing framework by splitting the monolithic RoutingManager into a RoutingManager interface and a BaseRoutingManager implementation, fixes a bug in QueryCountBasedRouter by overriding the correct routing selection method and replacing a synchronized list with ConcurrentHashMap, renames and unifies stats and health‐update methods, and updates all affected tests to align with the new API. Sequence diagram for backend health and stats update flowsequenceDiagram
participant HealthCheckObserver
participant RoutingManager
HealthCheckObserver->>RoutingManager: updateClusterStats(List<ClusterStats>)
RoutingManager->>RoutingManager: updateBackEndHealth(clusterId, trinoStatus) (for each ClusterStats)
RoutingManager->>RoutingManager: update internal backendToStatus map
Sequence diagram for backend selection via provideBackendConfigurationsequenceDiagram
participant Client
participant RoutingManager
participant BaseRoutingManager
participant QueryCountBasedRouter
Client->>RoutingManager: provideBackendConfiguration(routingGroup, user)
RoutingManager->>BaseRoutingManager: provideBackendConfiguration(routingGroup, user)
BaseRoutingManager->>QueryCountBasedRouter: selectBackend(backends, user)
QueryCountBasedRouter-->>BaseRoutingManager: Optional<ProxyBackendConfiguration>
BaseRoutingManager-->>Client: ProxyBackendConfiguration
Class diagram for refactored RoutingManager hierarchyclassDiagram
class RoutingManager {
<<interface>>
+void updateBackEndHealth(String backendId, TrinoStatus value)
+void updateClusterStats(List<ClusterStats> stats)
+void setBackendForQueryId(String queryId, String backend)
+void setRoutingGroupForQueryId(String queryId, String routingGroup)
+String findBackendForQueryId(String queryId)
+String findExternalUrlForQueryId(String queryId)
+String findRoutingGroupForQueryId(String queryId)
+ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user)
}
class BaseRoutingManager {
-GatewayBackendManager gatewayBackendManager
-ConcurrentHashMap<String, TrinoStatus> backendToStatus
-String defaultRoutingGroup
-QueryHistoryManager queryHistoryManager
-LoadingCache<String, String> queryIdBackendCache
-LoadingCache<String, String> queryIdRoutingGroupCache
-LoadingCache<String, String> queryIdExternalUrlCache
+ProxyBackendConfiguration provideDefaultBackendConfiguration(String user)
+ProxyBackendConfiguration provideBackendConfiguration(String routingGroup, String user)
+void updateBackEndHealth(String backendId, TrinoStatus value)
+void updateClusterStats(List<ClusterStats> stats)
+void setBackendForQueryId(String queryId, String backend)
+void setRoutingGroupForQueryId(String queryId, String routingGroup)
+String findBackendForQueryId(String queryId)
+String findExternalUrlForQueryId(String queryId)
+String findRoutingGroupForQueryId(String queryId)
+abstract Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user)
}
class StochasticRoutingManager {
+Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user)
}
class QueryCountBasedRouter {
-ConcurrentHashMap<String, LocalStats> clusterStats
+synchronized Map<String, LocalStats> clusterStats()
+synchronized void updateClusterStats(List<ClusterStats> stats)
+protected synchronized Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user)
}
RoutingManager <|.. BaseRoutingManager
BaseRoutingManager <|-- StochasticRoutingManager
BaseRoutingManager <|-- QueryCountBasedRouter
Class diagram for QueryCountBasedRouter stats refactorclassDiagram
class QueryCountBasedRouter {
-ConcurrentHashMap<String, LocalStats> clusterStats
+synchronized Map<String, LocalStats> clusterStats()
+synchronized void updateClusterStats(List<ClusterStats> stats)
+protected synchronized Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user)
}
class LocalStats {
+LocalStats(ClusterStats stats)
+int runningQueryCount()
+int queuedQueryCount()
+String routingGroup()
+TrinoStatus trinoStatus()
+ProxyBackendConfiguration backendConfiguration()
}
QueryCountBasedRouter "1" *-- "*" LocalStats
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and they look great!
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `gateway-ha/src/main/java/io/trino/gateway/ha/router/QueryCountBasedRouter.java:232` </location>
<code_context>
+ // queries
@Override
- public synchronized void updateBackEndStats(List<ClusterStats> stats)
+ protected synchronized Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user)
{
- clusterStats = stats.stream().map(a -> new LocalStats(a)).collect(Collectors.toList());
</code_context>
<issue_to_address>
**suggestion:** selectBackend implementation does not handle missing clusterStats for a backend gracefully.
Consider adding logging or error handling for backends missing from clusterStats to help identify potential monitoring or initialization issues.
Suggested implementation:
```java
@Override
protected synchronized Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user)
{
// Log missing clusterStats for backends
backends.stream()
.filter(backend -> !clusterStats.containsKey(backend.getName()))
.forEach(backend -> {
logger.warn("Missing clusterStats for backend: {}", backend.getName());
});
Optional<ProxyBackendConfiguration> cluster = backends.stream()
.filter(backend -> clusterStats.containsKey(backend.getName()))
.min((a, b) -> compareStats(clusterStats.get(a.getName()), clusterStats.get(b.getName()), user));
cluster.ifPresent(c -> updateLocalStats(clusterStats.get(c.getName()), user));
return cluster;
}
}
```
Make sure that a suitable logger (e.g., `private static final Logger logger = LoggerFactory.getLogger(QueryCountBasedRouter.class);`) is defined in the class if it is not already present.
If you use a different logging framework, adjust the logging statement accordingly.
</issue_to_address>
### Comment 2
<location> `gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java:52` </location>
<code_context>
+ implements RoutingManager
+{
+ private static final Logger log = Logger.get(BaseRoutingManager.class);
+ private final ExecutorService executorService = Executors.newFixedThreadPool(5);
+ private final GatewayBackendManager gatewayBackendManager;
+ private final ConcurrentHashMap<String, TrinoStatus> backendToStatus;
</code_context>
<issue_to_address>
**issue (bug_risk):** ExecutorService is never shut down, which may lead to resource leaks.
Consider adding a shutdown method or integrating ExecutorService lifecycle management to prevent resource leaks when RoutingManager instances are disposed.
</issue_to_address>
### Comment 3
<location> `gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java:278` </location>
<code_context>
+ });
+ }
+
+ private boolean isBackendHealthy(String backendId)
+ {
+ TrinoStatus status = backendToStatus.getOrDefault(backendId, TrinoStatus.UNKNOWN);
</code_context>
<issue_to_address>
**suggestion:** isBackendHealthy defaults to TrinoStatus.UNKNOWN, which may mask unhealthy backends.
Since UNKNOWN is used as the default when a backend's status is missing, please confirm if UNKNOWN should be considered unhealthy. Also, consider adding logging to highlight when backend health is not tracked.
Suggested implementation:
```java
private boolean isBackendHealthy(String backendId)
{
TrinoStatus status = backendToStatus.getOrDefault(backendId, TrinoStatus.UNKNOWN);
if (status == TrinoStatus.UNKNOWN) {
// Log a warning when backend health is not tracked
logger.warn("Backend health for '{}' is UNKNOWN and not tracked.", backendId);
return false;
}
return status == TrinoStatus.HEALTHY;
}
}
```
Make sure that a suitable logger (e.g., `private static final Logger logger = LoggerFactory.getLogger(BaseRoutingManager.class);`) is defined in the class if it isn't already. If not, add the logger declaration at the top of the class.
</issue_to_address>
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
// queries | ||
@Override | ||
public synchronized void updateBackEndStats(List<ClusterStats> stats) | ||
protected synchronized Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: selectBackend implementation does not handle missing clusterStats for a backend gracefully.
Consider adding logging or error handling for backends missing from clusterStats to help identify potential monitoring or initialization issues.
Suggested implementation:
@Override
protected synchronized Optional<ProxyBackendConfiguration> selectBackend(List<ProxyBackendConfiguration> backends, String user)
{
// Log missing clusterStats for backends
backends.stream()
.filter(backend -> !clusterStats.containsKey(backend.getName()))
.forEach(backend -> {
logger.warn("Missing clusterStats for backend: {}", backend.getName());
});
Optional<ProxyBackendConfiguration> cluster = backends.stream()
.filter(backend -> clusterStats.containsKey(backend.getName()))
.min((a, b) -> compareStats(clusterStats.get(a.getName()), clusterStats.get(b.getName()), user));
cluster.ifPresent(c -> updateLocalStats(clusterStats.get(c.getName()), user));
return cluster;
}
}
Make sure that a suitable logger (e.g., private static final Logger logger = LoggerFactory.getLogger(QueryCountBasedRouter.class);
) is defined in the class if it is not already present.
If you use a different logging framework, adjust the logging statement accordingly.
gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java
Show resolved
Hide resolved
gateway-ha/src/main/java/io/trino/gateway/ha/router/BaseRoutingManager.java
Show resolved
Hide resolved
sourcery-ai has 2 good suggestions. Otherwise LGTM, don't forget to rebase with the main branch. |
Done |
Description
This commit addresses a number of items
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required, with the following suggested text:
* Fix some things.
Summary by Sourcery
Refactor the routing manager hierarchy to improve extensibility and concurrency, fix a broken query-count-based router, and update related tests and dependencies
Bug Fixes:
Enhancements:
Tests:
Chores: