1#!/usr/bin/env python3 2# SPDX-License-Identifier: LGPL-2.1-or-later 3# 4# networkd integration test 5# This uses temporary configuration in /run and temporary veth devices, and 6# does not write anything on disk or change any system configuration; 7# but it assumes (and checks at the beginning) that networkd is not currently 8# running. 9# 10# This can be run on a normal installation, in qemu, systemd-nspawn (with 11# --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"), 12# or LXC system containers. You need at least the "ip" tool from the iproute 13# package; it is recommended to install dnsmasq too to get full test coverage. 14# 15# ATTENTION: This uses the *installed* networkd, not the one from the built 16# source tree. 17# 18# © 2015 Canonical Ltd. 19# Author: Martin Pitt <martin.pitt@ubuntu.com> 20 21import errno 22import os 23import shutil 24import socket 25import subprocess 26import sys 27import tempfile 28import time 29import unittest 30 31HAVE_DNSMASQ = shutil.which('dnsmasq') is not None 32IS_CONTAINER = subprocess.call(['systemd-detect-virt', '--quiet', '--container']) == 0 33 34NETWORK_UNITDIR = '/run/systemd/network' 35 36NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online', 37 path='/usr/lib/systemd:/lib/systemd') 38 39RESOLV_CONF = '/run/systemd/resolve/resolv.conf' 40 41tmpmounts = [] 42running_units = [] 43stopped_units = [] 44 45 46def setUpModule(): 47 global tmpmounts 48 49 """Initialize the environment, and perform sanity checks on it.""" 50 if NETWORKD_WAIT_ONLINE is None: 51 raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found') 52 53 # Do not run any tests if the system is using networkd already and it's not virtualized 54 if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and 55 subprocess.call(['systemd-detect-virt', '--quiet']) != 0): 56 raise unittest.SkipTest('not virtualized and networkd is already active') 57 58 # Ensure we don't mess with an existing networkd config 59 for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']: 60 if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0: 61 subprocess.call(['systemctl', 'stop', u]) 62 running_units.append(u) 63 else: 64 stopped_units.append(u) 65 66 # create static systemd-network user for networkd-test-router.service (it 67 # needs to do some stuff as root and can't start as user; but networkd 68 # still insists on the user) 69 if subprocess.call(['getent', 'passwd', 'systemd-network']) != 0: 70 subprocess.call(['useradd', '--system', '--no-create-home', 'systemd-network']) 71 72 for d in ['/etc/systemd/network', '/run/systemd/network', 73 '/run/systemd/netif', '/run/systemd/resolve']: 74 if os.path.isdir(d): 75 subprocess.check_call(["mount", "-t", "tmpfs", "none", d]) 76 tmpmounts.append(d) 77 if os.path.isdir('/run/systemd/resolve'): 78 os.chmod('/run/systemd/resolve', 0o755) 79 shutil.chown('/run/systemd/resolve', 'systemd-resolve', 'systemd-resolve') 80 if os.path.isdir('/run/systemd/netif'): 81 os.chmod('/run/systemd/netif', 0o755) 82 shutil.chown('/run/systemd/netif', 'systemd-network', 'systemd-network') 83 84 # Avoid "Failed to open /dev/tty" errors in containers. 85 os.environ['SYSTEMD_LOG_TARGET'] = 'journal' 86 87 # Ensure the unit directory exists so tests can dump files into it. 88 os.makedirs(NETWORK_UNITDIR, exist_ok=True) 89 90 91def tearDownModule(): 92 global tmpmounts 93 for d in tmpmounts: 94 subprocess.check_call(["umount", "--lazy", d]) 95 for u in stopped_units: 96 subprocess.call(["systemctl", "stop", u]) 97 for u in running_units: 98 subprocess.call(["systemctl", "restart", u]) 99 100 101class NetworkdTestingUtilities: 102 """Provide a set of utility functions to facilitate networkd tests. 103 104 This class must be inherited along with unittest.TestCase to define 105 some required methods. 106 """ 107 108 def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()): 109 """Add a veth interface pair, and queue them to be removed.""" 110 subprocess.check_call(['ip', 'link', 'add', 'name', veth] + 111 list(veth_options) + 112 ['type', 'veth', 'peer', 'name', peer] + 113 list(peer_options)) 114 self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer]) 115 116 def write_config(self, path, contents): 117 """"Write a configuration file, and queue it to be removed.""" 118 119 with open(path, 'w') as f: 120 f.write(contents) 121 122 self.addCleanup(os.remove, path) 123 124 def write_network(self, unit_name, contents): 125 """Write a network unit file, and queue it to be removed.""" 126 self.write_config(os.path.join(NETWORK_UNITDIR, unit_name), contents) 127 128 def write_network_dropin(self, unit_name, dropin_name, contents): 129 """Write a network unit drop-in, and queue it to be removed.""" 130 dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name)) 131 dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name)) 132 133 os.makedirs(dropin_dir, exist_ok=True) 134 self.addCleanup(os.rmdir, dropin_dir) 135 with open(dropin_path, 'w') as dropin: 136 dropin.write(contents) 137 self.addCleanup(os.remove, dropin_path) 138 139 def read_attr(self, link, attribute): 140 """Read a link attributed from the sysfs.""" 141 # Note we don't want to check if interface `link' is managed, we 142 # want to evaluate link variable and pass the value of the link to 143 # assert_link_states e.g. eth0=managed. 144 self.assert_link_states(**{link:'managed'}) 145 with open(os.path.join('/sys/class/net', link, attribute)) as f: 146 return f.readline().strip() 147 148 def assert_link_states(self, **kwargs): 149 """Match networkctl link states to the given ones. 150 151 Each keyword argument should be the name of a network interface 152 with its expected value of the "SETUP" column in output from 153 networkctl. The interfaces have five seconds to come online 154 before the check is performed. Every specified interface must 155 be present in the output, and any other interfaces found in the 156 output are ignored. 157 158 A special interface state "managed" is supported, which matches 159 any value in the "SETUP" column other than "unmanaged". 160 """ 161 if not kwargs: 162 return 163 interfaces = set(kwargs) 164 165 # Wait for the requested interfaces, but don't fail for them. 166 subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] + 167 ['--interface={}'.format(iface) for iface in kwargs]) 168 169 # Validate each link state found in the networkctl output. 170 out = subprocess.check_output(['networkctl', '--no-legend']).rstrip() 171 for line in out.decode('utf-8').split('\n'): 172 fields = line.split() 173 if len(fields) >= 5 and fields[1] in kwargs: 174 iface = fields[1] 175 expected = kwargs[iface] 176 actual = fields[-1] 177 if (actual != expected and 178 not (expected == 'managed' and actual != 'unmanaged')): 179 self.fail("Link {} expects state {}, found {}".format(iface, expected, actual)) 180 interfaces.remove(iface) 181 182 # Ensure that all requested interfaces have been covered. 183 if interfaces: 184 self.fail("Missing links in status output: {}".format(interfaces)) 185 186 187class BridgeTest(NetworkdTestingUtilities, unittest.TestCase): 188 """Provide common methods for testing networkd against servers.""" 189 190 def setUp(self): 191 self.write_network('port1.netdev', '''\ 192[NetDev] 193Name=port1 194Kind=dummy 195MACAddress=12:34:56:78:9a:bc 196''') 197 self.write_network('port2.netdev', '''\ 198[NetDev] 199Name=port2 200Kind=dummy 201MACAddress=12:34:56:78:9a:bd 202''') 203 self.write_network('mybridge.netdev', '''\ 204[NetDev] 205Name=mybridge 206Kind=bridge 207''') 208 self.write_network('port1.network', '''\ 209[Match] 210Name=port1 211[Network] 212Bridge=mybridge 213''') 214 self.write_network('port2.network', '''\ 215[Match] 216Name=port2 217[Network] 218Bridge=mybridge 219''') 220 self.write_network('mybridge.network', '''\ 221[Match] 222Name=mybridge 223[Network] 224DNS=192.168.250.1 225Address=192.168.250.33/24 226Gateway=192.168.250.1 227''') 228 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved']) 229 subprocess.check_call(['systemctl', 'start', 'systemd-networkd']) 230 231 def tearDown(self): 232 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd.socket']) 233 subprocess.check_call(['systemctl', 'stop', 'systemd-networkd.service']) 234 subprocess.check_call(['ip', 'link', 'del', 'mybridge']) 235 subprocess.check_call(['ip', 'link', 'del', 'port1']) 236 subprocess.check_call(['ip', 'link', 'del', 'port2']) 237 238 def test_bridge_init(self): 239 self.assert_link_states( 240 port1='managed', 241 port2='managed', 242 mybridge='managed') 243 244 def test_bridge_port_priority(self): 245 self.assertEqual(self.read_attr('port1', 'brport/priority'), '32') 246 self.write_network_dropin('port1.network', 'priority', '''\ 247[Bridge] 248Priority=28 249''') 250 subprocess.check_call(['ip', 'link', 'set', 'dev', 'port1', 'down']) 251 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd']) 252 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', 253 'port1', '--timeout=5']) 254 self.assertEqual(self.read_attr('port1', 'brport/priority'), '28') 255 256 def test_bridge_port_priority_set_zero(self): 257 """It should be possible to set the bridge port priority to 0""" 258 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32') 259 self.write_network_dropin('port2.network', 'priority', '''\ 260[Bridge] 261Priority=0 262''') 263 subprocess.check_call(['ip', 'link', 'set', 'dev', 'port2', 'down']) 264 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd']) 265 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', 266 'port2', '--timeout=5']) 267 self.assertEqual(self.read_attr('port2', 'brport/priority'), '0') 268 269 def test_bridge_port_property(self): 270 """Test the "[Bridge]" section keys""" 271 self.assertEqual(self.read_attr('port2', 'brport/priority'), '32') 272 self.write_network_dropin('port2.network', 'property', '''\ 273[Bridge] 274UnicastFlood=true 275HairPin=true 276Isolated=true 277UseBPDU=true 278FastLeave=true 279AllowPortToBeRoot=true 280Cost=555 281Priority=23 282''') 283 subprocess.check_call(['ip', 'link', 'set', 'dev', 'port2', 'down']) 284 subprocess.check_call(['systemctl', 'restart', 'systemd-networkd']) 285 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', 286 'port2', '--timeout=5']) 287 288 self.assertEqual(self.read_attr('port2', 'brport/priority'), '23') 289 self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1') 290 self.assertEqual(self.read_attr('port2', 'brport/isolated'), '1') 291 self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555') 292 self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1') 293 self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1') 294 self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1') 295 self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1') 296 297class ClientTestBase(NetworkdTestingUtilities): 298 """Provide common methods for testing networkd against servers.""" 299 300 @classmethod 301 def setUpClass(klass): 302 klass.orig_log_level = subprocess.check_output( 303 ['systemctl', 'show', '--value', '--property', 'LogLevel'], 304 universal_newlines=True).strip() 305 subprocess.check_call(['systemd-analyze', 'log-level', 'debug']) 306 307 @classmethod 308 def tearDownClass(klass): 309 subprocess.check_call(['systemd-analyze', 'log-level', klass.orig_log_level]) 310 311 def setUp(self): 312 self.iface = 'test_eth42' 313 self.if_router = 'router_eth42' 314 self.workdir_obj = tempfile.TemporaryDirectory() 315 self.workdir = self.workdir_obj.name 316 self.config = 'test_eth42.network' 317 318 # get current journal cursor 319 subprocess.check_output(['journalctl', '--sync']) 320 out = subprocess.check_output(['journalctl', '-b', '--quiet', 321 '--no-pager', '-n0', '--show-cursor'], 322 universal_newlines=True) 323 self.assertTrue(out.startswith('-- cursor:')) 324 self.journal_cursor = out.split()[-1] 325 326 subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved']) 327 328 def tearDown(self): 329 self.shutdown_iface() 330 subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket']) 331 subprocess.call(['systemctl', 'stop', 'systemd-networkd.service']) 332 subprocess.call(['ip', 'link', 'del', 'dummy0'], 333 stderr=subprocess.DEVNULL) 334 335 def show_journal(self, unit): 336 '''Show journal of given unit since start of the test''' 337 338 print('---- {} ----'.format(unit)) 339 subprocess.check_output(['journalctl', '--sync']) 340 sys.stdout.flush() 341 subprocess.call(['journalctl', '-b', '--no-pager', '--quiet', 342 '--cursor', self.journal_cursor, '-u', unit]) 343 344 def create_iface(self, ipv6=False): 345 '''Create test interface with DHCP server behind it''' 346 347 raise NotImplementedError('must be implemented by a subclass') 348 349 def shutdown_iface(self): 350 '''Remove test interface and stop DHCP server''' 351 352 raise NotImplementedError('must be implemented by a subclass') 353 354 def print_server_log(self): 355 '''Print DHCP server log for debugging failures''' 356 357 raise NotImplementedError('must be implemented by a subclass') 358 359 def start_unit(self, unit): 360 try: 361 subprocess.check_call(['systemctl', 'start', unit]) 362 except subprocess.CalledProcessError: 363 self.show_journal(unit) 364 raise 365 366 def do_test(self, coldplug=True, ipv6=False, extra_opts='', 367 online_timeout=10, dhcp_mode='yes'): 368 self.start_unit('systemd-resolved') 369 self.write_network(self.config, '''\ 370[Match] 371Name={iface} 372[Network] 373DHCP={dhcp_mode} 374{extra_opts} 375'''.format(iface=self.iface, dhcp_mode=dhcp_mode, extra_opts=extra_opts)) 376 377 if coldplug: 378 # create interface first, then start networkd 379 self.create_iface(ipv6=ipv6) 380 self.start_unit('systemd-networkd') 381 elif coldplug is not None: 382 # start networkd first, then create interface 383 self.start_unit('systemd-networkd') 384 self.create_iface(ipv6=ipv6) 385 else: 386 # "None" means test sets up interface by itself 387 self.start_unit('systemd-networkd') 388 389 try: 390 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', 391 self.iface, '--timeout=%i' % online_timeout]) 392 393 if ipv6: 394 # check iface state and IP 6 address; FIXME: we need to wait a bit 395 # longer, as the iface is "configured" already with IPv4 *or* 396 # IPv6, but we want to wait for both 397 for _ in range(10): 398 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface]) 399 if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out and b'tentative' not in out: 400 break 401 time.sleep(1) 402 else: 403 self.fail('timed out waiting for IPv6 configuration') 404 405 self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic') 406 self.assertRegex(out, b'inet6 fe80::.* scope link') 407 else: 408 # should have link-local address on IPv6 only 409 out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface]) 410 self.assertRegex(out, br'inet6 fe80::.* scope link') 411 self.assertNotIn(b'scope global', out) 412 413 # should have IPv4 address 414 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface]) 415 self.assertIn(b'state UP', out) 416 self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic') 417 418 # check networkctl state 419 out = subprocess.check_output(['networkctl']) 420 self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode()) 421 self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode()) 422 423 out = subprocess.check_output(['networkctl', '-n', '0', 'status', self.iface]) 424 self.assertRegex(out, br'Type:\s+ether') 425 self.assertRegex(out, br'State:\s+routable.*configured') 426 self.assertRegex(out, br'Online state:\s+online') 427 self.assertRegex(out, br'Address:\s+192.168.5.\d+') 428 if ipv6: 429 self.assertRegex(out, br'2600::') 430 else: 431 self.assertNotIn(br'2600::', out) 432 self.assertRegex(out, br'fe80::') 433 self.assertRegex(out, br'Gateway:\s+192.168.5.1') 434 self.assertRegex(out, br'DNS:\s+192.168.5.1') 435 except (AssertionError, subprocess.CalledProcessError): 436 # show networkd status, journal, and DHCP server log on failure 437 with open(os.path.join(NETWORK_UNITDIR, self.config)) as f: 438 print('\n---- {} ----\n{}'.format(self.config, f.read())) 439 print('---- interface status ----') 440 sys.stdout.flush() 441 subprocess.call(['ip', 'a', 'show', 'dev', self.iface]) 442 print('---- networkctl status {} ----'.format(self.iface)) 443 sys.stdout.flush() 444 rc = subprocess.call(['networkctl', '-n', '0', 'status', self.iface]) 445 if rc != 0: 446 print("'networkctl status' exited with an unexpected code {}".format(rc)) 447 self.show_journal('systemd-networkd.service') 448 self.print_server_log() 449 raise 450 451 for timeout in range(50): 452 with open(RESOLV_CONF) as f: 453 contents = f.read() 454 if 'nameserver 192.168.5.1\n' in contents: 455 break 456 time.sleep(0.1) 457 else: 458 self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF) 459 460 if coldplug is False: 461 # check post-down.d hook 462 self.shutdown_iface() 463 464 def test_coldplug_dhcp_yes_ip4(self): 465 # we have a 12s timeout on RA, so we need to wait longer 466 self.do_test(coldplug=True, ipv6=False, online_timeout=15) 467 468 def test_coldplug_dhcp_yes_ip4_no_ra(self): 469 # with disabling RA explicitly things should be fast 470 self.do_test(coldplug=True, ipv6=False, 471 extra_opts='IPv6AcceptRA=False') 472 473 def test_coldplug_dhcp_ip4_only(self): 474 # we have a 12s timeout on RA, so we need to wait longer 475 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4', 476 online_timeout=15) 477 478 def test_coldplug_dhcp_ip4_only_no_ra(self): 479 # with disabling RA explicitly things should be fast 480 self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4', 481 extra_opts='IPv6AcceptRA=False') 482 483 def test_coldplug_dhcp_ip6(self): 484 self.do_test(coldplug=True, ipv6=True) 485 486 def test_hotplug_dhcp_ip4(self): 487 # With IPv4 only we have a 12s timeout on RA, so we need to wait longer 488 self.do_test(coldplug=False, ipv6=False, online_timeout=15) 489 490 def test_hotplug_dhcp_ip6(self): 491 self.do_test(coldplug=False, ipv6=True) 492 493 def test_route_only_dns(self): 494 self.write_network('myvpn.netdev', '''\ 495[NetDev] 496Name=dummy0 497Kind=dummy 498MACAddress=12:34:56:78:9a:bc 499''') 500 self.write_network('myvpn.network', '''\ 501[Match] 502Name=dummy0 503[Network] 504Address=192.168.42.100/24 505DNS=192.168.42.1 506Domains= ~company 507''') 508 509 try: 510 self.do_test(coldplug=True, ipv6=False, 511 extra_opts='IPv6AcceptRouterAdvertisements=False') 512 except subprocess.CalledProcessError as e: 513 # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848 514 if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']: 515 raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848') 516 else: 517 raise 518 519 with open(RESOLV_CONF) as f: 520 contents = f.read() 521 # ~company is not a search domain, only a routing domain 522 self.assertNotRegex(contents, 'search.*company') 523 # our global server should appear 524 self.assertIn('nameserver 192.168.5.1\n', contents) 525 # should not have domain-restricted server as global server 526 self.assertNotIn('nameserver 192.168.42.1\n', contents) 527 528 def test_route_only_dns_all_domains(self): 529 self.write_network('myvpn.netdev', '''[NetDev] 530Name=dummy0 531Kind=dummy 532MACAddress=12:34:56:78:9a:bc 533''') 534 self.write_network('myvpn.network', '''[Match] 535Name=dummy0 536[Network] 537Address=192.168.42.100/24 538DNS=192.168.42.1 539Domains= ~company ~. 540''') 541 542 try: 543 self.do_test(coldplug=True, ipv6=False, 544 extra_opts='IPv6AcceptRouterAdvertisements=False') 545 except subprocess.CalledProcessError as e: 546 # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848 547 if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']: 548 raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848') 549 else: 550 raise 551 552 with open(RESOLV_CONF) as f: 553 contents = f.read() 554 555 # ~company is not a search domain, only a routing domain 556 self.assertNotRegex(contents, 'search.*company') 557 558 # our global server should appear 559 self.assertIn('nameserver 192.168.5.1\n', contents) 560 # should have company server as global server due to ~. 561 self.assertIn('nameserver 192.168.42.1\n', contents) 562 563 564@unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed') 565class DnsmasqClientTest(ClientTestBase, unittest.TestCase): 566 '''Test networkd client against dnsmasq''' 567 568 def setUp(self): 569 super().setUp() 570 self.dnsmasq = None 571 self.iface_mac = 'de:ad:be:ef:47:11' 572 573 def create_iface(self, ipv6=False, dnsmasq_opts=None): 574 '''Create test interface with DHCP server behind it''' 575 576 # add veth pair 577 subprocess.check_call(['ip', 'link', 'add', 'name', self.iface, 578 'address', self.iface_mac, 579 'type', 'veth', 'peer', 'name', self.if_router]) 580 581 # give our router an IP 582 subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router]) 583 subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router]) 584 if ipv6: 585 subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router]) 586 subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up']) 587 588 # add DHCP server 589 self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log') 590 lease_file = os.path.join(self.workdir, 'dnsmasq.leases') 591 if ipv6: 592 extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20'] 593 else: 594 extra_opts = [] 595 if dnsmasq_opts: 596 extra_opts += dnsmasq_opts 597 self.dnsmasq = subprocess.Popen( 598 ['dnsmasq', '--keep-in-foreground', '--log-queries', 599 '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null', 600 '--dhcp-leasefile=' + lease_file, '--bind-interfaces', 601 '--interface=' + self.if_router, '--except-interface=lo', 602 '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts) 603 604 def shutdown_iface(self): 605 '''Remove test interface and stop DHCP server''' 606 607 if self.if_router: 608 subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router]) 609 self.if_router = None 610 if self.dnsmasq: 611 self.dnsmasq.kill() 612 self.dnsmasq.wait() 613 self.dnsmasq = None 614 615 def print_server_log(self): 616 '''Print DHCP server log for debugging failures''' 617 618 with open(self.dnsmasq_log) as f: 619 sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read())) 620 621 def test_resolved_domain_restricted_dns(self): 622 '''resolved: domain-restricted DNS servers''' 623 624 # enable DNSSEC in allow downgrade mode, and turn off stuff we don't want to test to make looking at logs easier 625 conf = '/run/systemd/resolved.conf.d/test-enable-dnssec.conf' 626 os.makedirs(os.path.dirname(conf), exist_ok=True) 627 with open(conf, 'w') as f: 628 f.write('[Resolve]\nDNSSEC=allow-downgrade\nLLMNR=no\nMulticastDNS=no\nDNSOverTLS=no\n') 629 self.addCleanup(os.remove, conf) 630 631 # create interface for generic connections; this will map all DNS names 632 # to 192.168.42.1 633 self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1']) 634 self.write_network('general.network', '''\ 635[Match] 636Name={} 637[Network] 638DHCP=ipv4 639IPv6AcceptRA=False 640DNSSECNegativeTrustAnchors=search.example.com 641'''.format(self.iface)) 642 643 # create second device/dnsmasq for a .company/.lab VPN interface 644 # static IPs for simplicity 645 self.add_veth_pair('testvpnclient', 'testvpnrouter') 646 subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter']) 647 subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter']) 648 subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up']) 649 650 vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log') 651 vpn_dnsmasq = subprocess.Popen( 652 ['dnsmasq', '--keep-in-foreground', '--log-queries', 653 '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null', 654 '--dhcp-leasefile=/dev/null', '--bind-interfaces', 655 '--interface=testvpnrouter', '--except-interface=lo', 656 '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4']) 657 self.addCleanup(vpn_dnsmasq.wait) 658 self.addCleanup(vpn_dnsmasq.kill) 659 660 self.write_network('vpn.network', '''\ 661[Match] 662Name=testvpnclient 663[Network] 664IPv6AcceptRA=False 665Address=10.241.3.2/24 666DNS=10.241.3.1 667Domains=~company ~lab 668DNSSECNegativeTrustAnchors=company lab 669''') 670 671 self.start_unit('systemd-networkd') 672 subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface, 673 '--interface=testvpnclient', '--timeout=20']) 674 675 # ensure we start fresh with every test 676 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved']) 677 678 # test vpnclient specific domains; these should *not* be answered by 679 # the general DNS 680 out = subprocess.check_output(['resolvectl', 'query', 'math.lab']) 681 self.assertIn(b'math.lab: 10.241.3.3', out) 682 out = subprocess.check_output(['resolvectl', 'query', 'kettle.cantina.company']) 683 self.assertIn(b'kettle.cantina.company: 10.241.4.4', out) 684 685 # test general domains 686 out = subprocess.check_output(['resolvectl', 'query', 'search.example.com']) 687 self.assertIn(b'search.example.com: 192.168.42.1', out) 688 689 with open(self.dnsmasq_log) as f: 690 general_log = f.read() 691 with open(vpn_dnsmasq_log) as f: 692 vpn_log = f.read() 693 694 # VPN domains should only be sent to VPN DNS 695 self.assertRegex(vpn_log, 'query.*math.lab') 696 self.assertRegex(vpn_log, 'query.*cantina.company') 697 self.assertNotIn('.lab', general_log) 698 self.assertNotIn('.company', general_log) 699 700 # general domains should not be sent to the VPN DNS 701 self.assertRegex(general_log, 'query.*search.example.com') 702 self.assertNotIn('search.example.com', vpn_log) 703 704 def test_resolved_etc_hosts(self): 705 '''resolved queries to /etc/hosts''' 706 707 # enabled DNSSEC in allow-downgrade mode 708 conf = '/run/systemd/resolved.conf.d/test-enable-dnssec.conf' 709 os.makedirs(os.path.dirname(conf), exist_ok=True) 710 with open(conf, 'w') as f: 711 f.write('[Resolve]\nDNSSEC=allow-downgrade\nLLMNR=no\nMulticastDNS=no\nDNSOverTLS=no\n') 712 self.addCleanup(os.remove, conf) 713 714 # Add example.com to NTA list for this test 715 negative = '/run/dnssec-trust-anchors.d/example.com.negative' 716 os.makedirs(os.path.dirname(negative), exist_ok=True) 717 with open(negative, 'w') as f: 718 f.write('example.com\n16.172.in-addr.arpa\n') 719 self.addCleanup(os.remove, negative) 720 721 # create /etc/hosts bind mount which resolves my.example.com for IPv4 722 hosts = os.path.join(self.workdir, 'hosts') 723 with open(hosts, 'w') as f: 724 f.write('172.16.99.99 my.example.com\n') 725 subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts']) 726 self.addCleanup(subprocess.call, ['umount', '/etc/hosts']) 727 subprocess.check_call(['systemctl', 'restart', 'systemd-resolved.service']) 728 729 # note: different IPv4 address here, so that it's easy to tell apart 730 # what resolved the query 731 self.create_iface(dnsmasq_opts=['--host-record=my.example.com,172.16.99.1,2600::99:99', 732 '--host-record=other.example.com,172.16.0.42,2600::42', 733 '--mx-host=example.com,mail.example.com'], 734 ipv6=True) 735 self.do_test(coldplug=None, ipv6=True) 736 737 try: 738 # family specific queries 739 out = subprocess.check_output(['resolvectl', 'query', '-4', 'my.example.com']) 740 self.assertIn(b'my.example.com: 172.16.99.99', out) 741 # we don't expect an IPv6 answer; if /etc/hosts has any IP address, 742 # it's considered a sufficient source 743 self.assertNotEqual(subprocess.call(['resolvectl', 'query', '-6', 'my.example.com']), 0) 744 # "any family" query; IPv4 should come from /etc/hosts 745 out = subprocess.check_output(['resolvectl', 'query', 'my.example.com']) 746 self.assertIn(b'my.example.com: 172.16.99.99', out) 747 # IP → name lookup; again, takes the /etc/hosts one 748 out = subprocess.check_output(['resolvectl', 'query', '172.16.99.99']) 749 self.assertIn(b'172.16.99.99: my.example.com', out) 750 751 # non-address RRs should fall back to DNS 752 out = subprocess.check_output(['resolvectl', 'query', '--type=MX', 'example.com']) 753 self.assertIn(b'example.com IN MX 1 mail.example.com', out) 754 755 # other domains query DNS 756 out = subprocess.check_output(['resolvectl', 'query', 'other.example.com']) 757 self.assertIn(b'172.16.0.42', out) 758 out = subprocess.check_output(['resolvectl', 'query', '172.16.0.42']) 759 self.assertIn(b'172.16.0.42: other.example.com', out) 760 except (AssertionError, subprocess.CalledProcessError): 761 self.show_journal('systemd-resolved.service') 762 self.print_server_log() 763 raise 764 765 def test_transient_hostname(self): 766 '''networkd sets transient hostname from DHCP''' 767 768 orig_hostname = socket.gethostname() 769 self.addCleanup(socket.sethostname, orig_hostname) 770 # temporarily move /etc/hostname away; restart hostnamed to pick it up 771 if os.path.exists('/etc/hostname'): 772 subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname']) 773 self.addCleanup(subprocess.call, ['umount', '/etc/hostname']) 774 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service']) 775 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service']) 776 777 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)]) 778 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4') 779 780 try: 781 # should have received the fixed IP above 782 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface]) 783 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic') 784 # should have set transient hostname in hostnamed; this is 785 # sometimes a bit lagging (issue #4753), so retry a few times 786 for retry in range(1, 6): 787 out = subprocess.check_output(['hostnamectl']) 788 if b'testgreen' in out: 789 break 790 time.sleep(5) 791 sys.stdout.write('[retry %i] ' % retry) 792 sys.stdout.flush() 793 else: 794 self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode())) 795 # and also applied to the system 796 self.assertEqual(socket.gethostname(), 'testgreen') 797 except AssertionError: 798 self.show_journal('systemd-networkd.service') 799 self.show_journal('systemd-hostnamed.service') 800 self.print_server_log() 801 raise 802 803 def test_transient_hostname_with_static(self): 804 '''transient hostname is not applied if static hostname exists''' 805 806 orig_hostname = socket.gethostname() 807 self.addCleanup(socket.sethostname, orig_hostname) 808 809 if not os.path.exists('/etc/hostname'): 810 self.write_config('/etc/hostname', "foobarqux") 811 else: 812 self.write_config('/run/hostname.tmp', "foobarqux") 813 subprocess.check_call(['mount', '--bind', '/run/hostname.tmp', '/etc/hostname']) 814 self.addCleanup(subprocess.call, ['umount', '/etc/hostname']) 815 816 socket.sethostname("foobarqux"); 817 818 subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service']) 819 self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service']) 820 821 self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)]) 822 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4') 823 824 try: 825 # should have received the fixed IP above 826 out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface]) 827 self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic') 828 # static hostname wins over transient one, thus *not* applied 829 self.assertEqual(socket.gethostname(), "foobarqux") 830 except AssertionError: 831 self.show_journal('systemd-networkd.service') 832 self.show_journal('systemd-hostnamed.service') 833 self.print_server_log() 834 raise 835 836 837class NetworkdClientTest(ClientTestBase, unittest.TestCase): 838 '''Test networkd client against networkd server''' 839 840 def setUp(self): 841 super().setUp() 842 self.dnsmasq = None 843 844 def create_iface(self, ipv6=False, dhcpserver_opts=None): 845 '''Create test interface with DHCP server behind it''' 846 847 # run "router-side" networkd in own mount namespace to shield it from 848 # "client-side" configuration and networkd 849 (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh') 850 self.addCleanup(os.remove, script) 851 with os.fdopen(fd, 'w+') as f: 852 f.write('''\ 853#!/bin/sh 854set -eu 855mkdir -p /run/systemd/network 856mkdir -p /run/systemd/netif 857mount -t tmpfs none /run/systemd/network 858mount -t tmpfs none /run/systemd/netif 859[ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus 860# create router/client veth pair 861cat <<EOF >/run/systemd/network/test.netdev 862[NetDev] 863Name={ifr} 864Kind=veth 865 866[Peer] 867Name={ifc} 868EOF 869 870cat <<EOF >/run/systemd/network/test.network 871[Match] 872Name={ifr} 873 874[Network] 875Address=192.168.5.1/24 876{addr6} 877DHCPServer=yes 878 879[DHCPServer] 880PoolOffset=10 881PoolSize=50 882DNS=192.168.5.1 883{dhopts} 884EOF 885 886# run networkd as in systemd-networkd.service 887exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ {{ s/^.*=//; s/^[@+-]//; s/^!*//; p}}') 888'''.format(ifr=self.if_router, 889 ifc=self.iface, 890 addr6=('Address=2600::1/64' if ipv6 else ''), 891 dhopts=(dhcpserver_opts or ''))) 892 893 os.fchmod(fd, 0o755) 894 895 subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service', 896 '-p', 'InaccessibleDirectories=-/etc/systemd/network', 897 '-p', 'InaccessibleDirectories=-/run/systemd/network', 898 '-p', 'InaccessibleDirectories=-/run/systemd/netif', 899 '--service-type=notify', script]) 900 901 # wait until devices got created 902 for _ in range(50): 903 out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router]) 904 if b'state UP' in out and b'scope global' in out: 905 break 906 time.sleep(0.1) 907 908 def shutdown_iface(self): 909 '''Remove test interface and stop DHCP server''' 910 911 if self.if_router: 912 subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service']) 913 # ensure failed transient unit does not stay around 914 subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service']) 915 subprocess.call(['ip', 'link', 'del', 'dev', self.if_router]) 916 self.if_router = None 917 918 def print_server_log(self): 919 '''Print DHCP server log for debugging failures''' 920 921 self.show_journal('networkd-test-router.service') 922 923 @unittest.skip('networkd does not have DHCPv6 server support') 924 def test_hotplug_dhcp_ip6(self): 925 pass 926 927 @unittest.skip('networkd does not have DHCPv6 server support') 928 def test_coldplug_dhcp_ip6(self): 929 pass 930 931 def test_search_domains(self): 932 933 # we don't use this interface for this test 934 self.if_router = None 935 936 self.write_network('test.netdev', '''\ 937[NetDev] 938Name=dummy0 939Kind=dummy 940MACAddress=12:34:56:78:9a:bc 941''') 942 self.write_network('test.network', '''\ 943[Match] 944Name=dummy0 945[Network] 946Address=192.168.42.100/24 947DNS=192.168.42.1 948Domains= one two three four five six seven eight nine ten 949''') 950 951 self.start_unit('systemd-networkd') 952 953 for timeout in range(50): 954 with open(RESOLV_CONF) as f: 955 contents = f.read() 956 if ' one' in contents: 957 break 958 time.sleep(0.1) 959 self.assertRegex(contents, 'search .*one two three four five six seven eight nine ten') 960 961 def test_dropin(self): 962 # we don't use this interface for this test 963 self.if_router = None 964 965 self.write_network('test.netdev', '''\ 966[NetDev] 967Name=dummy0 968Kind=dummy 969MACAddress=12:34:56:78:9a:bc 970''') 971 self.write_network('test.network', '''\ 972[Match] 973Name=dummy0 974[Network] 975Address=192.168.42.100/24 976DNS=192.168.42.1 977''') 978 self.write_network_dropin('test.network', 'dns', '''\ 979[Network] 980DNS=127.0.0.1 981''') 982 983 self.start_unit('systemd-resolved') 984 self.start_unit('systemd-networkd') 985 986 for timeout in range(50): 987 with open(RESOLV_CONF) as f: 988 contents = f.read() 989 if ' 127.0.0.1' in contents and '192.168.42.1' in contents: 990 break 991 time.sleep(0.1) 992 self.assertIn('nameserver 192.168.42.1\n', contents) 993 self.assertIn('nameserver 127.0.0.1\n', contents) 994 995 def test_dhcp_timezone(self): 996 '''networkd sets time zone from DHCP''' 997 998 def get_tz(): 999 out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1', 1000 '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone']) 1001 assert out.startswith(b's "') 1002 out = out.strip() 1003 assert out.endswith(b'"') 1004 return out[3:-1].decode() 1005 1006 orig_timezone = get_tz() 1007 self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone]) 1008 1009 self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu') 1010 self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4') 1011 1012 # should have applied the received timezone 1013 try: 1014 self.assertEqual(get_tz(), 'Pacific/Honolulu') 1015 except AssertionError: 1016 self.show_journal('systemd-networkd.service') 1017 self.show_journal('systemd-hostnamed.service') 1018 raise 1019 1020 1021class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities): 1022 """Test [Match] sections in .network files. 1023 1024 Be aware that matching the test host's interfaces will wipe their 1025 configuration, so as a precaution, all network files should have a 1026 restrictive [Match] section to only ever interfere with the 1027 temporary veth interfaces created here. 1028 """ 1029 1030 def tearDown(self): 1031 """Stop networkd.""" 1032 subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket']) 1033 subprocess.call(['systemctl', 'stop', 'systemd-networkd.service']) 1034 1035 def test_basic_matching(self): 1036 """Verify the Name= line works throughout this class.""" 1037 self.add_veth_pair('test_if1', 'fake_if2') 1038 self.write_network('test.network', "[Match]\nName=test_*\n[Network]") 1039 subprocess.check_call(['systemctl', 'start', 'systemd-networkd']) 1040 self.assert_link_states(test_if1='managed', fake_if2='unmanaged') 1041 1042 def test_inverted_matching(self): 1043 """Verify that a '!'-prefixed value inverts the match.""" 1044 # Use a MAC address as the interfaces' common matching attribute 1045 # to avoid depending on udev, to support testing in containers. 1046 mac = '00:01:02:03:98:99' 1047 self.add_veth_pair('test_veth', 'test_peer', 1048 ['addr', mac], ['addr', mac]) 1049 self.write_network('no-veth.network', """\ 1050[Match] 1051MACAddress={} 1052Name=!nonexistent *peer* 1053[Network]""".format(mac)) 1054 subprocess.check_call(['systemctl', 'start', 'systemd-networkd']) 1055 self.assert_link_states(test_veth='managed', test_peer='unmanaged') 1056 1057 1058class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities): 1059 """Test if networkd manages the correct interfaces.""" 1060 1061 def setUp(self): 1062 """Write .network files to match the named veth devices.""" 1063 # Define the veth+peer pairs to be created. 1064 # Their pairing doesn't actually matter, only their names do. 1065 self.veths = { 1066 'm1def': 'm0unm', 1067 'm1man': 'm1unm', 1068 } 1069 1070 # Define the contents of .network files to be read in order. 1071 self.configs = ( 1072 "[Match]\nName=m1def\n", 1073 "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n", 1074 "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n", 1075 ) 1076 1077 # Write out the .network files to be cleaned up automatically. 1078 for i, config in enumerate(self.configs): 1079 self.write_network("%02d-test.network" % i, config) 1080 1081 def tearDown(self): 1082 """Stop networkd.""" 1083 subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket']) 1084 subprocess.call(['systemctl', 'stop', 'systemd-networkd.service']) 1085 1086 def create_iface(self): 1087 """Create temporary veth pairs for interface matching.""" 1088 for veth, peer in self.veths.items(): 1089 self.add_veth_pair(veth, peer) 1090 1091 def test_unmanaged_setting(self): 1092 """Verify link states with Unmanaged= settings, hot-plug.""" 1093 subprocess.check_call(['systemctl', 'start', 'systemd-networkd']) 1094 self.create_iface() 1095 self.assert_link_states(m1def='managed', 1096 m1man='managed', 1097 m1unm='unmanaged', 1098 m0unm='unmanaged') 1099 1100 def test_unmanaged_setting_coldplug(self): 1101 """Verify link states with Unmanaged= settings, cold-plug.""" 1102 self.create_iface() 1103 subprocess.check_call(['systemctl', 'start', 'systemd-networkd']) 1104 self.assert_link_states(m1def='managed', 1105 m1man='managed', 1106 m1unm='unmanaged', 1107 m0unm='unmanaged') 1108 1109 def test_catchall_config(self): 1110 """Verify link states with a catch-all config, hot-plug.""" 1111 # Don't actually catch ALL interfaces. It messes up the host. 1112 self.write_network('all.network', "[Match]\nName=m[01]???\n") 1113 subprocess.check_call(['systemctl', 'start', 'systemd-networkd']) 1114 self.create_iface() 1115 self.assert_link_states(m1def='managed', 1116 m1man='managed', 1117 m1unm='unmanaged', 1118 m0unm='managed') 1119 1120 def test_catchall_config_coldplug(self): 1121 """Verify link states with a catch-all config, cold-plug.""" 1122 # Don't actually catch ALL interfaces. It messes up the host. 1123 self.write_network('all.network', "[Match]\nName=m[01]???\n") 1124 self.create_iface() 1125 subprocess.check_call(['systemctl', 'start', 'systemd-networkd']) 1126 self.assert_link_states(m1def='managed', 1127 m1man='managed', 1128 m1unm='unmanaged', 1129 m0unm='managed') 1130 1131 1132if __name__ == '__main__': 1133 unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, 1134 verbosity=2)) 1135