Skip to content

Instantly share code, notes, and snippets.

@csom
Forked from pklaus/ddnsserver.py
Last active July 30, 2022 17:06
Show Gist options
  • Select an option

  • Save csom/c927ed2d93dc30476487f948ee3145be to your computer and use it in GitHub Desktop.

Select an option

Save csom/c927ed2d93dc30476487f948ee3145be to your computer and use it in GitHub Desktop.
Simple DNS server with EDNS (UDP and TCP) in Python using dnslib.py that replicate the service that o-o.myaddr.google.com provide
#!/usr/bin/env python
"""
LICENSE http://www.apache.org/licenses/LICENSE-2.0
"""
import argparse
from ast import And
import datetime
from ossaudiodev import SNDCTL_DSP_BIND_CHANNEL
import sys
import time
import threading
import traceback
import socketserver
import struct
try:
from dnslib import *
except ImportError:
print("Missing dependency dnslib: <https://pypi.python.org/pypi/dnslib>. Please install it with `pip`.")
sys.exit(2)
class DomainName(str):
def __getattr__(self, item):
return DomainName(item + '.' + self)
D = DomainName('dnsapp.example.se.')
IP = '127.0.0.1'
TTL = 60
CLIENT = "0.0.0.0"
soa_record = SOA(
mname='dnsserver.example.se.', # primary name server
rname=D.admin, # email of the domain administrator
times=(
201307231, # serial number
60 * 60 * 1, # refresh
60 * 60 * 3, # retry
60 * 60 * 24, # expire
60 * 60 * 1, # minimum
)
)
ns_records = [NS('dnsserver.example.se.')]
records = {
D: [A(IP), AAAA((0,) * 16), soa_record] + ns_records,
D.testapp: [TXT(CLIENT)],
}
def dns_response(data, rawdata):
request = DNSRecord.parse(data)
print(request)
reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q)
qname = request.q.qname
qn = str(qname)
qtype = request.q.qtype
qt = QTYPE[qtype]
for opts in request.ar:
for line in str(opts).split("\n"):
if line.find('0001') == 23:
pos = line.find('0001')
print ("Recieved EDNS8DATA: ", line[pos:])
cdir = line[pos+4:pos+6]
if (len(line[pos+4:]) == 6):
first = line[-2:]
edns8cs = str(int(first,16)) + ".0.0.0/" + str(int(cdir,16))
print ("Recieved EDNS CLIENT SUBNET: ", edns8cs)
edns8csdata = '0001' + cdir + cdir + first
elif (len(line[pos+4:]) == 8):
first = line[-4:-2]
second = line[-2:]
edns8cs = str(int(first,16)) + "." + str(int(second,16)) + ".0.0/" + str(int(cdir,16))
print ("Recieved EDNS CLIENT SUBNET: ", edns8cs)
edns8csdata = '0001' + cdir + cdir + first + second
elif (len(line[pos+4:]) == 10):
first = line[-6:-4]
second = line[-4:-2]
third = line[-2:]
edns8cs = str(int(first,16)) + "." + str(int(second,16)) + "." + str(int(third,16)) + ".0/" + str(int(cdir,16))
print ("Recieved EDNS CLIENT SUBNET: ", edns8cs)
edns8csdata = '0001' + cdir + cdir + first + second + third
elif (len(line[pos+4:]) == 12):
first = line[-8:-6]
second = line[-6:-4]
third = line[-4:-2]
fourth = line[-2:]
edns8cs = str(int(first,16)) + "." + str(int(second,16)) + "." + str(int(third,16)) + "." + str(int(fourth,16)) + "/" + str(int(cdir,16))
print ("Recieved EDNS CLIENT SUBNET: ", edns8cs)
edns8csdata = '0001' + cdir + cdir + first + second + third
if int(cdir,16) > 24:
edns8csdata = '0001' + cdir + '18' + first + second + third + '00'
if line.find('code: 10') == 8:
pos = line.find('code: 10') + 16
edns10data = line[pos:]
print ("Recieved EDNS10DATA: ", edns10data)
if qn == D or qn.endswith('.' + D):
for name, rrs in records.items():
if name == qn:
for rdata in rrs:
rqt = rdata.__class__.__name__
if (qn == D.testapp) and (qt in ['*', rqt]):
reply.add_answer(RR(rname=qname, rtype=getattr(QTYPE, rqt), rclass=1, ttl=TTL, rdata=TXT(rawdata[0])))
reply.add_answer(RR(rname=qname, rtype=QTYPE.TXT, rclass=1, ttl=TTL, rdata=TXT("1. Hello, the TXT shows either your DNS-resolvers public IP")))
reply.add_answer(RR(rname=qname, rtype=QTYPE.TXT, rclass=1, ttl=TTL, rdata=TXT("2. or yours if you use dnsserver.mackapaer.se as resolver")))
reply.add_answer(RR(rname=qname, rtype=QTYPE.TXT, rclass=1, ttl=TTL, rdata=TXT("3. It also shows an extra TXT-record with the edns0-client-subnet if provided")))
try:
if edns8cs:
reply.add_answer(RR(rname=qname, rtype=QTYPE.TXT, rclass=1, ttl=TTL, rdata=TXT("edns0-client-subnet: %s" % (edns8cs))))
reply.add_ar(EDNS0(udp_len=4096,opts=[EDNSOption(8,bytes.fromhex(edns8csdata))]))
except:
reply.add_ar(EDNS0(udp_len=4096,flags="do"))
elif qt in ['*', rqt]:
reply.add_answer(RR(rname=qname, rtype=getattr(QTYPE, rqt), rclass=1, ttl=TTL, rdata=rdata))
elif qn.endswith('.myaddr.google.com.'):
try:
if edns8cs:
print ("---- Proxy Request to 9.9.9.11#53 :\n", DNSRecord.parse(data))
response = send_udp(data,"9.9.9.11",53)
except:
firsth = '{:02x}'.format(int(str(rawdata[0]).split('.')[0]))
secondh = '{:02x}'.format(int(str(rawdata[0]).split('.')[1]))
thirdh = '{:02x}'.format(int(str(rawdata[0]).split('.')[2]))
cdirh = '18'
edns8data = '0001{}00{}{}{}'.format(cdirh,firsth,secondh,thirdh)
print ("Generated EDNS8DATA: ", edns8data)
requestproxy = DNSRecord(DNSHeader(id=request.header.id, ad=1, rd=1), q=request.q)
requestproxy.add_ar(EDNS0(udp_len=4096,opts=[EDNSOption(8,bytes.fromhex(edns8data))]))
print ("---- Proxyed request sent to 9.9.9.11#53 :\n", requestproxy)
response = send_udp(requestproxy.pack(),"9.9.9.11",53)
print ("---- Proxyed reply recieved from {} :\n{}".format(response[1], DNSRecord.parse(response[0])))
return response[0]
print("---- Reply:\n", reply)
return reply.pack()
class BaseRequestHandler(socketserver.BaseRequestHandler):
def get_data(self):
raise NotImplementedError
def send_data(self, data):
raise NotImplementedError
def handle(self):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
print("\n\n%s request %s (%s %s):" % (self.__class__.__name__[:3], now, self.client_address[0],
self.client_address[1]))
try:
data = self.get_data()
print(len(data), data) # repr(data).replace('\\x', '')[1:-1]
self.send_data(dns_response(data,self.client_address))
except Exception:
traceback.print_exc(file=sys.stderr)
class TCPRequestHandler(BaseRequestHandler):
def get_data(self):
data = self.request.recv(8192).strip()
sz = struct.unpack('>H', data[:2])[0]
if sz < len(data) - 2:
raise Exception("Wrong size of TCP packet")
elif sz > len(data) - 2:
raise Exception("Too big TCP packet")
return data[2:]
def send_data(self, data):
sz = struct.pack('>H', len(data))
return self.request.sendall(sz + data)
class UDPRequestHandler(BaseRequestHandler):
def get_data(self):
return self.request[0]
def send_data(self, data):
return self.request[1].sendto(data, self.client_address)
def send_udp(data,host,port):
"""
Helper function to send/receive DNS UDP request
"""
sock = None
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.sendto(data,(host,port))
response,server = sock.recvfrom(8192)
return (response,server)
finally:
if (sock is not None):
sock.close()
def main():
parser = argparse.ArgumentParser(description='Start a DNS implemented in Python.')
parser = argparse.ArgumentParser(description='Start a DNS implemented in Python. Usually DNSs use UDP on port 53.')
parser.add_argument('--port', default=5053, type=int, help='The port to listen on.')
parser.add_argument('--tcp', action='store_true', help='Listen to TCP connections.')
parser.add_argument('--udp', action='store_true', help='Listen to UDP datagrams.')
args = parser.parse_args()
if not (args.udp or args.tcp): parser.error("Please select at least one of --udp or --tcp.")
print("Starting nameserver...")
servers = []
if args.udp: servers.append(socketserver.ThreadingUDPServer(('', args.port), UDPRequestHandler))
if args.tcp: servers.append(socketserver.ThreadingTCPServer(('', args.port), TCPRequestHandler))
for s in servers:
thread = threading.Thread(target=s.serve_forever) # that thread will start one more thread for each request
thread.daemon = True # exit the server thread when the main thread terminates
thread.start()
print("%s server loop running in thread: %s" % (s.RequestHandlerClass.__name__[:3], thread.name))
try:
while 1:
time.sleep(1)
sys.stderr.flush()
sys.stdout.flush()
except KeyboardInterrupt:
pass
finally:
for s in servers:
s.shutdown()
if __name__ == '__main__':
main()
@csom
Copy link
Copy Markdown
Author

csom commented Jul 30, 2022

dig testapp.dnsapp.example.se TXT @dnsserver.example.se +subnet=22.22.22.22/24

; <<>> DiG 9.16.1-Ubuntu <<>> testapp.dnsapp.example.se TXT @dnsserver.example.se +subnet=22.22.22.22/24
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 10414
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 5, AUTHORITY: 0, ADDITIONAL: 1

;; OPT PSEUDOSECTION:
; EDNS: version: 0, flags:; udp: 4096
; CLIENT-SUBNET: 22.22.22.0/24/24
;; QUESTION SECTION:
;testapp.dnsapp.example.se. IN TXT

;; ANSWER SECTION:
testapp.dnsapp.example.se. 60 IN TXT "xxx.xxx.xxx.xxx"
testapp.dnsapp.example.se. 60 IN TXT "1. Hello, the TXT shows either your DNS-resolvers public IP"
testapp.dnsapp.example.se. 60 IN TXT "2. or yours if you use dnsserver.example.se as resolver"
testapp.dnsapp.example.se. 60 IN TXT "3. It also shows an extra TXT-record with the edns0-client-subnet if provided"
testapp.dnsapp.example.se. 60 IN TXT "edns0-client-subnet: 22.22.22.0/24"

;; Query time: 7 msec
;; SERVER: xxx.xxx.xxx.xxx#53(xxx.xxx.xxx.xxx)
;; WHEN: lör jul 30 18:21:08 CEST 2022
;; MSG SIZE rcvd: 369

@csom
Copy link
Copy Markdown
Author

csom commented Jul 30, 2022

This is just a small fork that allows the original dns-server to serve as google's dns service: o-o.myaddr.google.com
If you use this dns to relay to googles service this server adds the edns-client-subnet in the relaying request.

In this script the service is located at: testapp.dnsapp.domain.com. Use TXT query to get the info.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment