Skip to content

Commit

Permalink
Refactor test to avoid code duplciation, as suggested by @rgacogne
Browse files Browse the repository at this point in the history
  • Loading branch information
omoerbeek committed Jun 18, 2024
1 parent a7f8db9 commit 3aebfac
Showing 1 changed file with 25 additions and 92 deletions.
117 changes: 25 additions & 92 deletions regression-tests.recursor-dnssec/test_Protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,25 +985,8 @@ def testTagged(self):
self.checkProtobufTags(msg, tags)
self.checkNoRemainingMessage()

class ProtobufTagCacheTest(TestRecursorProtobuf):
"""
This test makes sure that we correctly cache tags (actually not cache them)
"""

_confdir = 'ProtobufTagCache'
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
_lua_config_file = """
protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=false, logResponses=true } )
""" % (protobufServersParameters[0].port, protobufServersParameters[1].port)
_lua_dns_script_file = """
function gettag(remote, ednssubnet, localip, qname, qtype, ednsoptions, tcp)
if qname:equal('tagged.example.') or qname:equal('taggedtcp.example.') then
return 0, { '' .. math.random() }
end
return 0
end
"""
class ProtobufTagCacheBase(TestRecursorProtobuf):
__test__ = False

def testTagged(self):
name = 'tagged.example.'
Expand Down Expand Up @@ -1077,11 +1060,33 @@ def testTaggedTCP(self):
ts2 = msg.response.tags[0]
self.assertNotEqual(ts1, ts2)

class ProtobufTagCacheFFITest(TestRecursorProtobuf):
class ProtobufTagCacheTest(ProtobufTagCacheBase):
"""
This test makes sure that we correctly cache tags (actually not cache them)
"""

__test__ = True
_confdir = 'ProtobufTagCache'
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
_lua_config_file = """
protobufServer({"127.0.0.1:%d", "127.0.0.1:%d"}, { logQueries=false, logResponses=true } )
""" % (protobufServersParameters[0].port, protobufServersParameters[1].port)
_lua_dns_script_file = """
function gettag(remote, ednssubnet, localip, qname, qtype, ednsoptions, tcp)
if qname:equal('tagged.example.') or qname:equal('taggedtcp.example.') then
return 0, { '' .. math.random() }
end
return 0
end
"""

class ProtobufTagCacheFFITest(ProtobufTagCacheBase):
"""
This test makes sure that we correctly cache tags (actually not cache them) for the FFI case
"""

__test__ = True
_confdir = 'ProtobufTagCacheFFI'
_config_template = """
auth-zones=example=configs/%s/example.zone""" % _confdir
Expand All @@ -1107,78 +1112,6 @@ class ProtobufTagCacheFFITest(TestRecursorProtobuf):
end
"""

def testTagged(self):
name = 'tagged.example.'
expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.84')
query = dns.message.make_query(name, 'A', want_dnssec=True)
query.flags |= dns.flags.CD
res = self.sendUDPQuery(query)
self.assertRRsetInAnswer(res, expected)

msg = self.getFirstProtobufMessage()
self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
self.assertEqual(len(msg.response.rrs), 1)
rr = msg.response.rrs[0]
# we have max-cache-ttl set to 15
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.84')
self.checkNoRemainingMessage()
self.assertEqual(len(msg.response.tags), 1)
ts1 = msg.response.tags[0]

# Again to check PC case
res = self.sendUDPQuery(query)
self.assertRRsetInAnswer(res, expected)

msg = self.getFirstProtobufMessage()
self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.UDP, res)
self.assertEqual(len(msg.response.rrs), 1)
rr = msg.response.rrs[0]
# time may have passed, so do not check TTL
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.84')
self.checkNoRemainingMessage()
self.assertEqual(len(msg.response.tags), 1)
ts2 = msg.response.tags[0]
self.assertNotEqual(ts1, ts2)

def testTaggedTCP(self):
name = 'taggedtcp.example.'
expected = dns.rrset.from_text(name, 0, dns.rdataclass.IN, 'A', '192.0.2.87')
query = dns.message.make_query(name, 'A', want_dnssec=True)
query.flags |= dns.flags.CD
res = self.sendTCPQuery(query)
self.assertRRsetInAnswer(res, expected)

msg = self.getFirstProtobufMessage()
self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.TCP, res)
self.assertEqual(len(msg.response.rrs), 1)
rr = msg.response.rrs[0]
# we have max-cache-ttl set to 15
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.87')
self.checkNoRemainingMessage()
print(msg.response)
self.assertEqual(len(msg.response.tags), 1)
ts1 = msg.response.tags[0]

# Again to check PC case
res = self.sendTCPQuery(query)
self.assertRRsetInAnswer(res, expected)

msg = self.getFirstProtobufMessage()
self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.TCP, res)
print(msg.response)
self.assertEqual(len(msg.response.rrs), 1)
rr = msg.response.rrs[0]
# time may have passed, so do not check TTL
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15, checkTTL=False)
self.assertEqual(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.87')
self.checkNoRemainingMessage()
self.assertEqual(len(msg.response.tags), 1)
ts2 = msg.response.tags[0]
self.assertNotEqual(ts1, ts2)

class ProtobufSelectedFromLuaTest(TestRecursorProtobuf):
"""
This test makes sure that we correctly export queries and responses but only if they have been selected from Lua.
Expand Down

0 comments on commit 3aebfac

Please sign in to comment.