-
-
Notifications
You must be signed in to change notification settings - Fork 111
Description
First of all, what is the ServiceReference? It is a flat structure of each method in each service in the specified endpoint.
I have two different cases in the exchange project.
- I want to know when some qualifier will be available in the cluster to invoke (subscribe to) it. And it doesn't matter how many microservices will be available, at least 1.
Ok. Right now I have two approaches to achieve it:
Microservices ms = ...
GreetingService greetingService = ms.call().create().api(GreetingService.class);
ms.discovery().listen()
.filter(DiscoveryEvent::isRegistered)
.flatMap(discoveryEvent ->
Flux.fromIterable(discoveryEvent.serviceEndpoint().serviceRegistrations()))
.filter(serviceRegistration ->
serviceRegistration.namespace().equals(GreetingService.SERVICE_NAME))
.take(1)
.subscribe($ -> greetingService.invoke().subscribe())
and the next one (note it will be work correctly after merging #330):
Microservices ms = ...
GreetingService greetingService = ms.call().create().api(GreetingService.class);
ms.serviceRegistry().listen()
.filter(RegistryEvent::isAdded)
.filter(registryEvent ->
registryEvent.serviceReference().namespace().equals(GreetingService.SERVICE_NAME))
.take(1)
.subscribe($ -> greetingService.invoke().subscribe())
So these are very similar approaches, I think we need to leave only one of them because they look like a duplicate logic (two publishers does the same). And we shouldn't reveal the discovery instance at all, because it has a lot of important methods with side effects like start
and shutdown
.
- I want to subscribe to all events from some services, but such services are a few in the cluster and I want to use our service proxy for it. For now, the proxy uses some router strategy and it selects a ServiceReference itself to invoke it. Imagine we have a few equivalent services which produce some events and your new service should aggregate them into one stream. In a nutshell, you want to subscribe to all events from all services with the same qualifier. How can I achieve it? I know that ServiceReference contains a host and a port inside. So I'd like to do something like that:
private final FluxSink<Event> sink = subject.serialize().sink();
...
Microservices ms = ...
ms.serviceRegistry().listen()
.filter(RegistryEvent::isAdded)
.map(RegistryEvent::serviceReference)
.filter(serviceReference ->
serviceReference.namespace().equals(GreetingService.SERVICE_NAME))
.subscribe(serviceReference ->
ms.call().create()
.serviceReference(serviceReference)
.api(GreetingService.class)
.invoke()
.subscribe(FluxSink::next));
Of course, maybe I need to subscribe to RegistryEvent::isRemoved to manage the life cycle of the created proxy. Maybe it will be better if that all (manage life cycle, when it will be available/disconnected, aggregation process and to complete when all will be completed) will be inside the ServiceCall and I just use something like that:
Microservices ms = ...
ms.call().create()
.failFast(false)
// The `Fail-Fast` principle after invoking `subscribe`, for `Flux` and `Mono`.
// Is it necessary to wait for when this qualifier will be available or not?
.subscribeToAll(true) // only for `Flux`
.api(GreetingService.class)
.invoke()
.subscribe(event -> ...);
This one can resolve the first problem (failFast
).