"""Module that actually performs the ping, sending and receiving packets""" import os import sys import time from . import icmp from . import network # Python 3.5 compatibility if sys.version_info[1] == 5: from enum import IntEnum, Enum class AutoNumber(Enum): def __new__(cls): value = len(cls.__members__) + 1 obj = object.__new__(cls) obj._value = value return obj class SuccessOn(AutoNumber): One = () Most = () All = () else: from enum import IntEnum, auto class SuccessOn(IntEnum): One = auto() Most = auto() All = auto() class Message: """Represents an ICMP message with destination socket""" def __init__(self, target, packet, source): """Creates a message that may be sent, or used to represent a response :param target: Target IP or hostname of the message :type target: str :param packet: ICMP packet composing the message :type packet: icmp.ICMP :param source: Source IP or hostname of the message :type source: str""" self.target = target self.packet = packet self.source = source def send(self, source_socket): """Places the message on a socket :param source_socket: The socket to place the message on :type source_socket: network.Socket""" source_socket.send(self.packet.packet) def __repr__(self): return repr(self.packet) def represent_seconds_in_ms(seconds): """Converts seconds into human-readable milliseconds with 2 digits decimal precision :param seconds: Seconds to convert :type seconds: Union[int, float] :return: The same time expressed in milliseconds, with 2 digits of decimal precision :rtype: float""" return round(seconds * 1000, 2) class Response: """Represents a response to an ICMP message, with metadata like timing""" def __init__(self, message, time_elapsed, source_request=None, repr_format=None): """Creates a representation of ICMP message received in response :param message: The message received :type message: Union[None, Message] :param time_elapsed: Time elapsed since the original request was sent, in seconds :type time_elapsed: float :param source_request: ICMP packet represeting the request that originated this response :type source_request: ICMP :param repr_format: How to __repr__ the response. Allowed: legacy, None :type repr_format: str""" self.message = message self.time_elapsed = time_elapsed self.source_request = source_request self.repr_format = repr_format @property def success(self): return self.error_message is None @property def error_message(self): if self.message is None: return 'No response' if self.message.packet.message_type == 0 and self.message.packet.message_code == 0: # Echo Reply, response OK - no error return None if self.message.packet.message_type == 3: # Destination unreachable, returning more details based on message code unreachable_messages = [ 'Network Unreachable', 'Host Unreachable', 'Protocol Unreachable', 'Port Unreachable', 'Fragmentation Required', 'Source Route Failed', 'Network Unknown', 'Host Unknown', 'Source Host Isolated', 'Communication with Destination Network is Administratively Prohibited', 'Communication with Destination Host is Administratively Prohibited', 'Network Unreachable for ToS', 'Host Unreachable for ToS', 'Communication Administratively Prohibited', 'Host Precedence Violation', 'Precedence Cutoff in Effect' ] try: return unreachable_messages[self.message.packet.message_code] except IndexError: # Should never generate IndexError, this serves as additional protection return 'Unreachable' # Error was not identified return 'Network Error' @property def time_elapsed_ms(self): return represent_seconds_in_ms(self.time_elapsed) def legacy_repr(self): if self.message is None: return 'Request timed out' if self.success: return 'Reply from {0}, {1} bytes in {2}ms'.format(self.message.source, len(self.message.packet.raw), self.time_elapsed_ms) # Not successful, but with some code (e.g. destination unreachable) return '{0} from {1} in {2}ms'.format(self.error_message, self.message.source, self.time_elapsed_ms) def __repr__(self): if self.repr_format == 'legacy': return self.legacy_repr() if self.message is None: return 'Timed out' if self.success: return 'status=OK\tfrom={0}\tms={1}\t\tbytes\tsnt={2}\trcv={3}'.format( self.message.source, self.time_elapsed_ms, len(self.source_request.raw)+20, len(self.message.packet.raw) ) return 'status=ERR\tfrom={1}\terror="{0}"'.format(self.message.source, self.error_message) class ResponseList: """Represents a series of ICMP responses""" def __init__(self, initial_set=[], verbose=False, output=sys.stdout): """Creates a ResponseList with initial data if available :param initial_set: Already existing responses :type initial_set: list :param verbose: Flag to enable verbose mode, defaults to False :type verbose: bool :param output: File where to write verbose output, defaults to stdout :type output: file""" self._responses = [] self.clear() self.verbose = verbose self.output = output self.rtt_avg = 0 self.rtt_min = 0 self.rtt_max = 0 self.stats_packets_sent = 0 self.stats_packets_returned = 0 for response in initial_set: self.append(response) def success(self, option=SuccessOn.One): """Check success state of the request. :param option: Sets a threshold for success sign. ( 1 - SuccessOn.One, 2 - SuccessOn.Most, 3 - SuccessOn.All ) :type option: int :return: Whether this set of responses is successful :rtype: bool """ result = False success_list = [resp.success for resp in self._responses] if option == SuccessOn.One: result = True in success_list elif option == SuccessOn.Most: result = success_list.count(True) / len(success_list) > 0.5 elif option == SuccessOn.All: result = False not in success_list return result @property def packet_loss(self): return self.packets_lost @property def rtt_min_ms(self): return represent_seconds_in_ms(self.rtt_min) @property def rtt_max_ms(self): return represent_seconds_in_ms(self.rtt_max) @property def rtt_avg_ms(self): return represent_seconds_in_ms(self.rtt_avg) def clear(self): self._responses = [] self.stats_packets_sent = 0 self.stats_packets_returned = 0 def append(self, value): self._responses.append(value) self.stats_packets_sent += 1 if len(self) == 1: self.rtt_avg = value.time_elapsed self.rtt_max = value.time_elapsed self.rtt_min = value.time_elapsed else: # Calculate the total of time, add the new value and divide for the new number self.rtt_avg = ((self.rtt_avg * (len(self)-1)) + value.time_elapsed) / len(self) if value.time_elapsed > self.rtt_max: self.rtt_max = value.time_elapsed if value.time_elapsed < self.rtt_min: self.rtt_min = value.time_elapsed if value.success: self.stats_packets_returned += 1 if self.verbose: print(value, file=self.output) @property def stats_packets_lost(self): return self.stats_packets_sent - self.stats_packets_returned @property def stats_success_ratio(self): return self.stats_packets_returned / self.stats_packets_sent @property def stats_lost_ratio(self): return 1 - self.stats_success_ratio @property def packets_lost(self): return self.stats_lost_ratio def __len__(self): return len(self._responses) def __repr__(self): ret = '' for response in self._responses: ret += '{0}\r\n'.format(response) ret += '\r\n' ret += 'Round Trip Times min/avg/max is {0}/{1}/{2} ms'.format(self.rtt_min_ms, self.rtt_avg_ms, self.rtt_max_ms) return ret def __iter__(self): for response in self._responses: yield response class Communicator: """Instance actually communicating over the network, sending messages and handling responses""" def __init__(self, target, payload_provider, timeout, interval, socket_options=(), seed_id=None, verbose=False, output=sys.stdout, source=None, repr_format=None): """Creates an instance that can handle communication with the target device :param target: IP or hostname of the remote device :type target: str :param payload_provider: An iterable list of payloads to send :type payload_provider: PayloadProvider :param timeout: Timeout that will apply to all ping messages, in seconds :type timeout: Union[int, float] :param interval: Interval to wait between pings, in seconds :type interval: int :param socket_options: Options to specify for the network.Socket :type socket_options: tuple :param seed_id: The first ICMP packet ID to use :type seed_id: Union[None, int] :param verbose: Flag to enable verbose mode, defaults to False :type verbose: bool :param output: File where to write verbose output, defaults to stdout :type output: file :param repr_format: How to __repr__ the response. Allowed: legacy, None :type repr_format: str""" self.socket = network.Socket(target, 'icmp', options=socket_options, source=source) self.provider = payload_provider self.timeout = timeout self.interval = interval self.responses = ResponseList(verbose=verbose, output=output) self.seed_id = seed_id self.repr_format = repr_format # note that to make Communicator instances thread safe, the seed ID must be unique per thread if self.seed_id is None: self.seed_id = os.getpid() & 0xFFFF def __del__(self): pass def send_ping(self, packet_id, sequence_number, payload): """Sends one ICMP Echo Request on the socket :param packet_id: The ID to use for the packet :type packet_id: int :param sequence_number: The sequence number to use for the packet :type sequence_number: int :param payload: The payload of the ICMP message :type payload: Union[str, bytes] :rtype: ICMP""" i = icmp.ICMP( icmp.Types.EchoRequest, payload=payload, identifier=packet_id, sequence_number=sequence_number) self.socket.send(i.packet) return i def listen_for(self, packet_id, timeout, payload_pattern=None, source_request=None): """Listens for a packet of a given id for a given timeout :param packet_id: The ID of the packet to listen for, the same for request and response :type packet_id: int :param timeout: How long to listen for the specified packet, in seconds :type timeout: float :param payload_pattern: Payload reply pattern to match to request, if set to None, match by ID only :type payload_pattern: Union[None, bytes] :return: The response to the request with the specified packet_id :rtype: Response""" time_left = timeout response = icmp.ICMP() while time_left > 0: # Keep listening until a packet arrives raw_packet, source_socket, time_left = self.socket.receive(time_left) # If we actually received something if raw_packet != b'': response.unpack(raw_packet) # Ensure we have not unpacked the packet we sent (RHEL will also listen to outgoing packets) if response.id == packet_id and response.message_type != icmp.Types.EchoRequest.type_id: if payload_pattern is None: # To allow Windows-like behaviour (no payload inspection, but only match packet identifiers), # simply allow for it to be an always true in the legacy usage case payload_matched = True else: payload_matched = (payload_pattern == response.payload) if payload_matched: return Response(Message('', response, source_socket[0]), timeout - time_left, source_request, repr_format=self.repr_format) return Response(None, timeout, source_request, repr_format=self.repr_format) @staticmethod def increase_seq(sequence_number): """Increases an ICMP sequence number and reset if it gets bigger than 2 bytes :param sequence_number: The sequence number to increase :type sequence_number: int :return: The increased sequence number of 1, in case an increase was not possible :rtype: int""" sequence_number += 1 if sequence_number > 0xFFFF: sequence_number = 1 return sequence_number def run(self, match_payloads=False): """Performs all the pings and stores the responses :param match_payloads: optional to set to True to make sure requests and replies have equivalent payloads :type match_payloads: bool""" self.responses.clear() identifier = self.seed_id seq = 1 for payload in self.provider: icmp_out = self.send_ping(identifier, seq, payload) if not match_payloads: self.responses.append(self.listen_for(identifier, self.timeout, None, icmp_out)) else: self.responses.append(self.listen_for(identifier, self.timeout, icmp_out.payload, icmp_out)) seq = self.increase_seq(seq) if self.interval: time.sleep(self.interval)