Files
Status/Lib/site-packages/pythonping/icmp.py

222 lines
7.3 KiB
Python

import os
import struct
def checksum(data):
"""Creates the ICMP checksum as in RFC 1071
:param data: Data to calculate the checksum ofs
:type data: bytes
:return: Calculated checksum
:rtype: int
Divides the data in 16-bits chunks, then make their 1's complement sum"""
subtotal = 0
for i in range(0, len(data)-1, 2):
subtotal += ((data[i] << 8) + data[i+1]) # Sum 16 bits chunks together
if len(data) % 2: # If length is odd
subtotal += (data[len(data)-1] << 8) # Sum the last byte plus one empty byte of padding
while subtotal >> 16: # Add carry on the right until fits in 16 bits
subtotal = (subtotal & 0xFFFF) + (subtotal >> 16)
check = ~subtotal # Performs the one complement
return ((check << 8) & 0xFF00) | ((check >> 8) & 0x00FF) # Swap bytes
class ICMPType:
"""Represents an ICMP type, as combination of type and code
ICMP Types should inherit from this class so that the code can identify them easily.
This is a static class, not meant to be instantiated"""
def __init__(self):
raise TypeError('ICMPType may not be instantiated')
class Types(ICMPType):
class EchoReply(ICMPType):
type_id = 0
ECHO_REPLY = (type_id, 0,)
class DestinationUnreachable(ICMPType):
type_id = 3
NETWORK_UNREACHABLE = (type_id, 0,)
HOST_UNREACHABLE = (type_id, 1,)
PROTOCOL_UNREACHABLE = (type_id, 2,)
PORT_UNREACHABLE = (type_id, 3,)
FRAGMENTATION_REQUIRED = (type_id, 4,)
SOURCE_ROUTE_FAILED = (type_id, 5,)
NETWORK_UNKNOWN = (type_id, 6,)
HOST_UNKNOWN = (type_id, 7,)
SOURCE_HOST_ISOLATED = (type_id, 8,)
NETWORK_ADMINISTRATIVELY_PROHIBITED = (type_id, 9,)
HOST_ADMINISTRATIVELY_PROHIBITED = (type_id, 10,)
NETWORK_UNREACHABLE_TOS = (type_id, 11,)
HOST_UNREACHABLE_TOS = (type_id, 12,)
COMMUNICATION_ADMINISTRATIVELY_PROHIBITED = (type_id, 13,)
HOST_PRECEDENCE_VIOLATION = (type_id, 14,)
PRECEDENCE_CUTOFF = (type_id, 15,)
class SourceQuench(ICMPType):
type_id = 4
SOURCE_QUENCH = (type_id, 0,)
class Redirect(ICMPType):
type_id = 5
FOR_NETWORK = (type_id, 0,)
FOR_HOST = (type_id, 1,)
FOR_TOS_AND_NETWORK = (type_id, 2,)
FOR_TOS_AND_HOST = (type_id, 3,)
class EchoRequest(ICMPType):
type_id = 8
ECHO_REQUEST = (type_id, 0,)
class RouterAdvertisement(ICMPType):
type_id = 9
ROUTER_ADVERTISEMENT = (type_id, 0,)
class RouterSolicitation(ICMPType):
type_id = 10
ROUTER_SOLICITATION = (type_id, 0)
# Aliases
ROUTER_DISCOVERY = ROUTER_SOLICITATION
ROUTER_SELECTION = ROUTER_SOLICITATION
class TimeExceeded(ICMPType):
type_id = 11
TTL_EXPIRED_IN_TRANSIT = (type_id, 0)
FRAGMENT_REASSEMBLY_TIME_EXCEEDED = (type_id, 1)
class BadIPHeader(ICMPType):
type_id = 12
POINTER_INDICATES_ERROR = (type_id, 0)
MISSING_REQUIRED_OPTION = (type_id, 1)
BAD_LENGTH = (type_id, 2)
class Timestamp(ICMPType):
type_id = 13
TIMESTAMP = (type_id, 0)
class TimestampReply(ICMPType):
type_id = 14
TIMESTAMP_REPLY = (type_id, 0)
class InformationRequest(ICMPType):
type_id = 15
INFORMATION_REQUEST = (type_id, 0)
class InformationReply(ICMPType):
type_id = 16
INFORMATION_REPLY = (type_id, 0)
class AddressMaskRequest(ICMPType):
type_id = 17
ADDRESS_MASK_REQUEST = (type_id, 0)
class AddressMaskReply(ICMPType):
type_id = 18
ADDRESS_MASK_REPLY = (type_id, 0)
class Traceroute(ICMPType):
type_id = 30
INFORMATION_REQUEST = (type_id, 30)
class ICMP:
LEN_TO_PAYLOAD = 41 # Ethernet, IP and ICMP header lengths combined
def __init__(self, message_type=Types.EchoReply, payload=None, identifier=None, sequence_number=1):
"""Creates an ICMP packet
:param message_type: Type of ICMP message to send
:type message_type: Union[ICMPType, (int, int), int]
:param payload: utf8 string or bytes payload
:type payload: Union[str, bytes]
:param identifier: ID of this ICMP packet
:type identifier: int"""
self.message_code = 0
if issubclass(message_type, ICMPType):
self.message_type = message_type.type_id
elif isinstance(message_type, tuple):
self.message_type = message_type[0]
self.message_code = message_type[1]
elif isinstance(message_type, int):
self.message_type = message_type
if payload is None:
payload = bytes('1', 'utf8')
elif isinstance(payload, str):
payload = bytes(payload, 'utf8')
self.payload = payload
if identifier is None:
identifier = os.getpid()
self.id = identifier & 0xFFFF # Prevent identifiers bigger than 16 bits
self.sequence_number = sequence_number
self.received_checksum = None
self.raw = None
@property
def packet(self):
"""The raw packet with header, ready to be sent from a socket"""
p = self._header(check=self.expected_checksum) + self.payload
if self.raw is None:
self.raw = p
return p
def _header(self, check=0):
"""The raw ICMP header
:param check: Checksum value
:type check: int
:return: The packed header
:rtype: bytes"""
# TODO implement sequence number
return struct.pack("BBHHH",
self.message_type,
self.message_code,
check,
self.id,
self.sequence_number)
def __repr__(self):
return ' '.join('{:02x}'.format(b) for b in self.raw)
@property
def is_valid(self):
"""True if the received checksum is valid, otherwise False"""
if self.received_checksum is None:
return True
return self.expected_checksum == self.received_checksum
@property
def expected_checksum(self):
"""The checksum expected for this packet, calculated with checksum field set to 0"""
return checksum(self._header() + self.payload)
@property
def header_length(self):
"""Length of the ICMP header"""
return len(self._header())
@staticmethod
def generate_from_raw(raw):
"""Creates a new ICMP representation from the raw bytes
:param raw: The raw packet including payload
:type raw: bytes
:return: An ICMP instance representing the packet
:rtype: ICMP"""
packet = ICMP()
packet.unpack(raw)
return packet
def unpack(self, raw):
"""Unpacks a raw packet and stores it in this object
:param raw: The raw packet, including payload
:type raw: bytes"""
self.raw = raw
self.message_type, \
self.message_code, \
self.received_checksum, \
self.id, \
self.sequence_number = struct.unpack("BBHHH", raw[20:28])
self.payload = raw[28:]