Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion hivemind/dht/dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
import os
import signal
from functools import partial
from typing import Awaitable, Callable, Iterable, List, Optional, Sequence, TypeVar, Union
from typing import Awaitable, Callable, Iterable, Optional, Sequence, TypeVar, Union

from multiaddr import Multiaddr

from hivemind.dht.node import DEFAULT_NUM_WORKERS, DHTNode
from hivemind.dht.routing import DHTKey, DHTValue, Subkey
from hivemind.dht.validation import CompositeValidator, RecordValidatorBase
from hivemind.p2p import P2P, PeerID
from hivemind.p2p.p2p_daemon_bindings import BandwidthMetrics
from hivemind.utils import MPFuture, get_logger, switch_to_uvloop
from hivemind.utils.timed_storage import DHTExpiration, ValueWithExpiration

Expand Down Expand Up @@ -87,6 +88,48 @@ def __init__(
if start:
self.run_in_background(await_ready=await_ready)

def list_peers(self) -> str:
assert os.getpid() != self.pid, "calling *external* DHT interface from inside DHT will result in a deadlock"
future = MPFuture()
self._outer_pipe.send(("_list_peers", [future], dict()))
return future.result()

def get_bandwidth_metrics(
self, for_self: bool = False, for_all_peers: bool = False, peers: Sequence[Union[str, PeerID]] = []
) -> BandwidthMetrics:
"""
Get bandwidth rate metrics(bytes / sec, in and out):
for_self - for self(this host totals)
for_all_peers - for all active peers(MAY BE SLOW)
peers[..]: - for the list of peer ids
"""
assert os.getpid() != self.pid, "calling *external* DHT interface from inside DHT will result in a deadlock"
future = MPFuture()
self._outer_pipe.send(("_get_bandwidth_metrics", [future, for_self, for_all_peers, peers], dict()))
return future.result()

async def _get_bandwidth_metrics(
self, future: MPFuture, for_self: bool, for_all_peers: bool, peers: Sequence[Union[str, PeerID]]
):
try:
result = await self._node.p2p.get_bandwidth_metrics(for_self, for_all_peers, peers)
if not future.done():
future.set_result(result)
except BaseException as e:
if not future.done():
future.set_exception(e)
raise

async def _list_peers(self, future: MPFuture):
try:
result = await self._node.p2p.list_peers()
if not future.done():
future.set_result(result)
except BaseException as e:
if not future.done():
future.set_exception(e)
raise

def run(self) -> None:
"""Serve DHT forever. This function will not return until DHT node is shut down"""

Expand Down
15 changes: 14 additions & 1 deletion hivemind/p2p/p2p_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import hivemind.hivemind_cli as cli
import hivemind.p2p.p2p_daemon_bindings.p2pclient as p2pclient
from hivemind.p2p.p2p_daemon_bindings.control import DEFAULT_MAX_MSG_SIZE, P2PDaemonError, P2PHandlerError
from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
from hivemind.p2p.p2p_daemon_bindings.datastructures import BandwidthMetrics, PeerID, PeerInfo, StreamInfo
from hivemind.p2p.p2p_daemon_bindings.utils import ControlFailure
from hivemind.proto import crypto_pb2
from hivemind.proto.p2pd_pb2 import RPCError
Expand Down Expand Up @@ -335,6 +335,19 @@ async def get_visible_maddrs(self, latest: bool = False) -> List[Multiaddr]:
async def list_peers(self) -> List[PeerInfo]:
return list(await self._client.list_peers())

async def get_bandwidth_metrics(
self, for_self: bool = False, for_all_peers: bool = False, peers: Sequence[Union[str, PeerID]] = []
) -> BandwidthMetrics:
"""
Get bandwidth rate metrics(bytes / sec, in and out):
for_self - for self(this host totals)
for_all_peers - for all active peers(MAY BE SLOW)
peers[..]: - for the list of peer ids
"""

peer_ids = [PeerID.from_base58(peer) if isinstance(peer, str) else peer for peer in peers]
return await self._client.get_bandwidth_metrics(for_self, for_all_peers, peer_ids)

async def wait_for_at_least_n_peers(self, n_peers: int, attempts: int = 3, delay: float = 1) -> None:
for _ in range(attempts):
peers = await self._client.list_peers()
Expand Down
2 changes: 1 addition & 1 deletion hivemind/p2p/p2p_daemon_bindings/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo
from hivemind.p2p.p2p_daemon_bindings.datastructures import BandwidthMetrics, PeerID, PeerInfo
from hivemind.p2p.p2p_daemon_bindings.utils import P2PDaemonError, P2PHandlerError
21 changes: 20 additions & 1 deletion hivemind/p2p/p2p_daemon_bindings/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from multiaddr import Multiaddr, protocols

from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
from hivemind.p2p.p2p_daemon_bindings.datastructures import BandwidthMetrics, PeerID, PeerInfo, StreamInfo
from hivemind.p2p.p2p_daemon_bindings.utils import (
DispatchFailure,
P2PDaemonError,
Expand Down Expand Up @@ -357,6 +357,25 @@ async def list_peers(self) -> Tuple[PeerInfo, ...]:
peers = tuple(PeerInfo.from_protobuf(pinfo) for pinfo in resp.peers)
return peers

async def get_bandwidth_metrics(
self, for_self: bool, for_all_peers: bool, peer_ids: Sequence[PeerID]
) -> BandwidthMetrics:
bandwidth_req = p2pd_pb.BandwidthMetricsRequest(
forSelf=for_self, forAllPeers=for_all_peers, ids=[id.to_bytes() for id in peer_ids]
)
req = p2pd_pb.Request(type=p2pd_pb.Request.BANDWIDTH_METRICS, bwr=bandwidth_req)

reader, writer = await self.daemon_connector.open_connection()
await write_pbmsg(writer, req)
resp = p2pd_pb.Response() # type: ignore
await read_pbmsg_safe(reader, resp)
writer.close()
raise_if_failed(resp)

pb_bwr = resp.bwr
bwm = BandwidthMetrics.from_protobuf(pb_bwr)
return bwm

async def disconnect(self, peer_id: PeerID) -> None:
disconnect_req = p2pd_pb.DisconnectRequest(peer=peer_id.to_bytes())
req = p2pd_pb.Request(type=p2pd_pb.Request.DISCONNECT, disconnect=disconnect_req)
Expand Down
26 changes: 22 additions & 4 deletions hivemind/p2p/p2p_daemon_bindings/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,11 @@ def from_protobuf(cls, pb_msg: p2pd_pb2.StreamInfo) -> "StreamInfo":


class PeerInfo:
def __init__(self, peer_id: PeerID, addrs: Sequence[Multiaddr]) -> None:
def __init__(self, peer_id: PeerID, addrs: Sequence[Multiaddr], rate_in=0, rate_out=0) -> None:
self.peer_id = peer_id
self.addrs = list(addrs)
self.rate_in = rate_in
self.rate_out = rate_out

def __eq__(self, other: Any) -> bool:
return isinstance(other, PeerInfo) and self.peer_id == other.peer_id and self.addrs == other.addrs
Expand All @@ -130,13 +132,15 @@ def __eq__(self, other: Any) -> bool:
def from_protobuf(cls, peer_info_pb: p2pd_pb2.PeerInfo) -> "PeerInfo":
peer_id = PeerID(peer_info_pb.id)
addrs = [Multiaddr(addr) for addr in peer_info_pb.addrs]
return PeerInfo(peer_id, addrs)
rate_in = peer_info_pb.ratein
rate_out = peer_info_pb.rateout
return PeerInfo(peer_id, addrs, rate_in, rate_out)

def __str__(self):
return f"{self.peer_id.pretty()} {','.join(str(a) for a in self.addrs)}"
return f"{self.peer_id.pretty()} {','.join(str(a) for a in self.addrs)}, rate_in = {repr(self.rate_in)}, rate_out = {repr(self.rate_out)})"

def __repr__(self):
return f"PeerInfo(peer_id={repr(self.peer_id)}, addrs={repr(self.addrs)})"
return f"PeerInfo(peer_id={repr(self.peer_id)}, addrs={repr(self.addrs)}, rate_in = {repr(self.rate_in)}, rate_out = {repr(self.rate_out)})"


class InvalidAddrError(ValueError):
Expand Down Expand Up @@ -165,3 +169,17 @@ def info_from_p2p_addr(addr: Multiaddr) -> PeerInfo:
addr = Multiaddr.join(*parts[:-1])

return PeerInfo(peer_id, [addr])


class BandwidthMetrics:
def __init__(self, rateIn: int = 0, rateOut: int = 0, peers: Sequence[PeerID] = []) -> None:
self.selfRateIn = rateIn
self.selfRateOut = rateOut
self.peers = peers

@classmethod
def from_protobuf(cls, bmr: p2pd_pb2.BandwidthMetricsResponse) -> "BandwidthMetrics":
rate_in = bmr.selfRateIn
rate_out = bmr.selfRateOut
peers = [PeerInfo.from_protobuf(peer_info) for peer_info in bmr.peers]
return BandwidthMetrics(rate_in, rate_out, peers)
14 changes: 13 additions & 1 deletion hivemind/p2p/p2p_daemon_bindings/p2pclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
StreamHandler,
TUnaryHandler,
)
from hivemind.p2p.p2p_daemon_bindings.datastructures import PeerID, PeerInfo, StreamInfo
from hivemind.p2p.p2p_daemon_bindings.datastructures import BandwidthMetrics, PeerID, PeerInfo, StreamInfo


class Client:
Expand Down Expand Up @@ -91,6 +91,18 @@ async def list_peers(self) -> Tuple[PeerInfo, ...]:
"""
return await self.control.list_peers()

async def get_bandwidth_metrics(
self, for_self: bool = False, for_all_peers: bool = False, peers: Sequence[PeerID] = []
) -> BandwidthMetrics:
"""
Get list of peers that node connect to
Get bandwidth rate metrics(bytes / sec, in and out):
for_self - for self(this host totals)
for_all_peers - for all active peers(MAY BE SLOW)
peers[..]: - for the list of peer ids
"""
return await self.control.get_bandwidth_metrics(for_self, for_all_peers, peers)

async def disconnect(self, peer_id: PeerID) -> None:
"""
Disconnect from node with specified peer id
Expand Down
17 changes: 17 additions & 0 deletions hivemind/proto/p2pd.proto
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ message Request {
DISCONNECT = 7;
PUBSUB = 8;
PERSISTENT_CONN_UPGRADE = 9;
BANDWIDTH_METRICS = 11;
}

required Type type = 1;
Expand All @@ -31,6 +32,7 @@ message Request {
optional ConnManagerRequest connManager = 6;
optional DisconnectRequest disconnect = 7;
optional PSRequest pubsub = 8;
optional BandwidthMetricsRequest bwr = 10;
}

message Response {
Expand All @@ -46,6 +48,7 @@ message Response {
optional DHTResponse dht = 5;
repeated PeerInfo peers = 6;
optional PSResponse pubsub = 7;
optional BandwidthMetricsResponse bwr = 8;
}

message PersistentConnectionRequest {
Expand Down Expand Up @@ -147,6 +150,8 @@ message DHTResponse {
message PeerInfo {
required bytes id = 1;
repeated bytes addrs = 2;
optional double ratein = 3;
optional double rateout = 4;
}

message ConnManagerRequest {
Expand Down Expand Up @@ -226,3 +231,15 @@ message Cancel {
message RPCError {
optional string message = 1;
}

message BandwidthMetricsRequest {
optional bool forSelf = 1 [default = false];
optional bool forAllPeers = 2 [default = false];
repeated bytes ids = 3;
}

message BandwidthMetricsResponse {
optional double selfRateIn = 1;
optional double selfRateOut = 2;
repeated PeerInfo peers = 3;
}
2 changes: 1 addition & 1 deletion tests/test_p2p_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async def test_check_if_identity_free():
"host_maddrs",
[
[Multiaddr("/ip4/127.0.0.1/tcp/0")],
[Multiaddr("/ip4/127.0.0.1/udp/0/quic-v1")],
[Multiaddr("/ip4/127.0.0.1/udp/0/quic")],
[Multiaddr("/ip4/127.0.0.1/tcp/0"), Multiaddr("/ip4/127.0.0.1/udp/0/quic")],
],
)
Expand Down