http://files.dns-sd.org/draft-cheshire-nat-pmp.txt␊ |
␊ |
Requires Python 2.3 or later.␊ |
Tested on Python 2.3, 2.4, 2.5 against Apple AirPort Express.␊ |
Tested on Python 2.5, 2.6 against Apple AirPort Express.␊ |
␊ |
0.2.2 - changed gateway autodetect, per github issue #1. thanks to jirib␊ |
0.2 - changed useException to use_exception, responseDataClass to response_data_class parameters in function calls for consistency␊ |
0.1 - repackaged via setuptools. Fixed major bug in gateway detection. Experimental gateway detection support for Windows 7. Python 2.6 testing.␊ |
0.0.1.2 - NT autodetection code. Thanks to roee shlomo for the gateway detection regex!␊ |
0.0.1.1 - Removed broken mutex code␊ |
0.0.1 - Initial release␊ |
␊ |
"""␊ |
␊ |
__version__ = "0.0.1.2"␊ |
__license__ = """Copyright (c) 2008, Yiming Liu, All rights reserved.␊ |
__version__ = "0.2"␊ |
__license__ = """Copyright (c) 2008-2010, Yiming Liu, All rights reserved.␊ |
␊ |
Redistribution and use in source and binary forms, with or without modification,␊ |
are permitted provided that the following conditions are met:␊ |
|
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE␊ |
POSSIBILITY OF SUCH DAMAGE."""␊ |
␊ |
__author__ = "Yiming Liu <http://sarth.thallos.org/>"␊ |
__author__ = "Yiming Liu <http://www.yimingliu.com/>"␊ |
␊ |
import struct, socket, select, time␊ |
import struct, socket, select, time, platform␊ |
import sys, os, re␊ |
␊ |
NATPMP_PORT = 5351␊ |
|
It does not guarantee correct results.␊ |
␊ |
This function requires the presence of␊ |
netstat on the path on POSIX and NT. It requires ip on␊ |
Linux.␊ |
netstat on the path on POSIX and NT.␊ |
"""␊ |
addr = ""␊ |
shell_command = 'netstat -rn'␊ |
if os.name == "posix":␊ |
pattern = re.compile('default\s+([\w.:]+)\s+\w')␊ |
if "linux" in sys.platform:␊ |
shell_command = "ip route show"␊ |
pattern = re.compile('default via\s+([\w.:]+)\s+\w')␊ |
pattern = re.compile('(?:default|0\.0\.0\.0|::/0)\s+([\w\.:]+)\s+.*UG')␊ |
elif os.name == "nt":␊ |
pattern = re.compile(".*?Default Gateway:[ ]+(.*?)\n")␊ |
system_out = os.popen(shell_command, 'r').read() # TODO: this could be a security issue␊ |
if platform.version().startswith("6.1"):␊ |
pattern = re.compile(".*?0.0.0.0[ ]+0.0.0.0[ ]+(.*?)[ ]+?.*?\n")␊ |
else:␊ |
pattern = re.compile(".*?Default Gateway:[ ]+(.*?)\n")␊ |
system_out = os.popen(shell_command, 'r').read()␊ |
if not system_out:␊ |
raise NATPMPNetworkError(NATPMP_GATEWAY_CANNOT_FIND, error_str(NATPMP_GATEWAY_CANNOT_FIND))␊ |
match = pattern.search(system_out)␊ |
|
response_socket.connect((gateway, NATPMP_PORT))␊ |
return response_socket␊ |
␊ |
def get_public_address(gateway_ip=get_gateway_addr(), retry=9):␊ |
def get_public_address(gateway_ip=None, retry=9):␊ |
"""A high-level function that returns the public interface IP of␊ |
the current host by querying the NAT-PMP gateway. IP is␊ |
returned as string.␊ |
|
retry - the number of times to retry the request if unsuccessful.␊ |
Defaults to 9 as per specification.␊ |
"""␊ |
if gateway_ip == None:␊ |
gateway_ip = get_gateway_addr()␊ |
addr = None␊ |
addr_request = PublicAddressRequest()␊ |
addr_response = send_request_with_retry(gateway_ip, addr_request, responseDataClass=PublicAddressResponse, retry=retry)␊ |
addr_response = send_request_with_retry(gateway_ip, addr_request, response_data_class=PublicAddressResponse, retry=retry)␊ |
if addr_response.result != 0:␊ |
#sys.stderr.write("NAT-PMP error %d: %s\n" % (addr_response.result, error_str(addr_response.result)))␊ |
#sys.stderr.flush()␊ |
|
addr = addr_response.ip␊ |
return addr␊ |
␊ |
def map_tcp_port(public_port, private_port, lifetime=3600, gateway_ip=get_gateway_addr(), retry=9, useException=True):␊ |
def map_tcp_port(public_port, private_port, lifetime=3600, gateway_ip=None, retry=9, use_exception=True):␊ |
"""A high-level wrapper to map_port() that requests a mapping␊ |
for a public TCP port on the NAT to a private TCP port on this host.␊ |
Returns the complete response on success.␊ |
|
get_gateway_addr()␊ |
retry - the number of times to retry the request if unsuccessful.␊ |
Defaults to 9 as per specification.␊ |
useException - throw an exception if an error result is␊ |
use_exception - throw an exception if an error result is␊ |
received from the gateway. Defaults to True.␊ |
"""␊ |
return map_port(NATPMP_PROTOCOL_TCP, public_port, private_port, lifetime, gateway_ip=gateway_ip, retry=retry, useException=useException)␊ |
return map_port(NATPMP_PROTOCOL_TCP, public_port, private_port, lifetime, gateway_ip=gateway_ip, retry=retry, use_exception=use_exception)␊ |
␊ |
def map_udp_port(public_port, private_port, lifetime=3600, gateway_ip=get_gateway_addr(), retry=9, useException=True):␊ |
def map_udp_port(public_port, private_port, lifetime=3600, gateway_ip=None, retry=9, use_exception=True):␊ |
"""A high-level wrapper to map_port() that requests a mapping for␊ |
a public UDP port on the NAT to a private UDP port on this host.␊ |
Returns the complete response on success.␊ |
|
get_gateway_addr()␊ |
retry - the number of times to retry the request if unsuccessful.␊ |
Defaults to 9 as per specification.␊ |
useException - throw an exception if an error result is␊ |
use_exception - throw an exception if an error result is␊ |
received from the gateway. Defaults to True.␊ |
"""␊ |
return map_port(NATPMP_PROTOCOL_UDP, public_port, private_port, lifetime, gateway_ip=gateway_ip, retry=retry, useException=useException)␊ |
return map_port(NATPMP_PROTOCOL_UDP, public_port, private_port, lifetime, gateway_ip=gateway_ip, retry=retry, use_exception=use_exception)␊ |
␊ |
def map_port(protocol, public_port, private_port, lifetime=3600, gateway_ip=get_gateway_addr(), retry=9, useException=True):␊ |
def map_port(protocol, public_port, private_port, lifetime=3600, gateway_ip=None, retry=9, use_exception=True):␊ |
"""A function to map public_port to private_port of protocol.␊ |
Returns the complete response on success.␊ |
␊ |
|
get_gateway_addr()␊ |
retry - the number of times to retry the request if unsuccessful.␊ |
Defaults to 9 as per specification.␊ |
useException - throw an exception if an error result␊ |
use_exception - throw an exception if an error result␊ |
is received from the gateway. Defaults to True.␊ |
"""␊ |
if protocol not in [NATPMP_PROTOCOL_UDP, NATPMP_PROTOCOL_TCP]:␊ |
raise ValueError("Must be either NATPMP_PROTOCOL_UDP or NATPMP_PROTOCOL_TCP")␊ |
if gateway_ip == None:␊ |
gateway_ip = get_gateway_addr()␊ |
response = None␊ |
port_mapping_request = PortMapRequest(protocol, private_port, public_port, lifetime)␊ |
port_mapping_response = send_request_with_retry(gateway_ip, port_mapping_request, responseDataClass=PortMapResponse, retry=retry)␊ |
if port_mapping_response.result != 0 and useException:␊ |
port_mapping_response = send_request_with_retry(gateway_ip, port_mapping_request, response_data_class=PortMapResponse, retry=retry)␊ |
if port_mapping_response.result != 0 and use_exception:␊ |
raise NATPMPResultError(port_mapping_response.result, error_str(port_mapping_response.result), port_mapping_response)␊ |
return port_mapping_response␊ |
␊ |
|
data,source_addr = resp_socket.recvfrom(responseSize)␊ |
return data,source_addr␊ |
␊ |
def send_request_with_retry(gateway_ip, request, responseDataClass=None, retry=9):␊ |
def send_request_with_retry(gateway_ip, request, response_data_class=None, retry=9):␊ |
gateway_socket = get_gateway_socket(gateway_ip)␊ |
n = 1␊ |
data = ""␊ |
|
n += 1␊ |
if n >= retry and not data:␊ |
raise NATPMPUnsupportedError(NATPMP_GATEWAY_NO_SUPPORT, error_str(NATPMP_GATEWAY_NO_SUPPORT))␊ |
if data and responseDataClass:␊ |
data = responseDataClass(data)␊ |
if data and response_data_class:␊ |
data = response_data_class(data)␊ |
return data␊ |
␊ |
␊ |