1#!/usr/bin/env python3
2# SPDX-License-Identifier: LGPL-2.1-or-later
3#
4# networkd integration test
5# This uses temporary configuration in /run and temporary veth devices, and
6# does not write anything on disk or change any system configuration;
7# but it assumes (and checks at the beginning) that networkd is not currently
8# running.
9#
10# This can be run on a normal installation, in qemu, systemd-nspawn (with
11# --private-network), LXD (with "--config raw.lxc=lxc.aa_profile=unconfined"),
12# or LXC system containers. You need at least the "ip" tool from the iproute
13# package; it is recommended to install dnsmasq too to get full test coverage.
14#
15# ATTENTION: This uses the *installed* networkd, not the one from the built
16# source tree.
17#
18# © 2015 Canonical Ltd.
19# Author: Martin Pitt <martin.pitt@ubuntu.com>
20
21import errno
22import os
23import shutil
24import socket
25import subprocess
26import sys
27import tempfile
28import time
29import unittest
30
31HAVE_DNSMASQ = shutil.which('dnsmasq') is not None
32IS_CONTAINER = subprocess.call(['systemd-detect-virt', '--quiet', '--container']) == 0
33
34NETWORK_UNITDIR = '/run/systemd/network'
35
36NETWORKD_WAIT_ONLINE = shutil.which('systemd-networkd-wait-online',
37                                    path='/usr/lib/systemd:/lib/systemd')
38
39RESOLV_CONF = '/run/systemd/resolve/resolv.conf'
40
41tmpmounts = []
42running_units = []
43stopped_units = []
44
45
46def setUpModule():
47    global tmpmounts
48
49    """Initialize the environment, and perform sanity checks on it."""
50    if NETWORKD_WAIT_ONLINE is None:
51        raise OSError(errno.ENOENT, 'systemd-networkd-wait-online not found')
52
53    # Do not run any tests if the system is using networkd already and it's not virtualized
54    if (subprocess.call(['systemctl', 'is-active', '--quiet', 'systemd-networkd.service']) == 0 and
55            subprocess.call(['systemd-detect-virt', '--quiet']) != 0):
56        raise unittest.SkipTest('not virtualized and networkd is already active')
57
58    # Ensure we don't mess with an existing networkd config
59    for u in ['systemd-networkd.socket', 'systemd-networkd', 'systemd-resolved']:
60        if subprocess.call(['systemctl', 'is-active', '--quiet', u]) == 0:
61            subprocess.call(['systemctl', 'stop', u])
62            running_units.append(u)
63        else:
64            stopped_units.append(u)
65
66    # create static systemd-network user for networkd-test-router.service (it
67    # needs to do some stuff as root and can't start as user; but networkd
68    # still insists on the user)
69    if subprocess.call(['getent', 'passwd', 'systemd-network']) != 0:
70        subprocess.call(['useradd', '--system', '--no-create-home', 'systemd-network'])
71
72    for d in ['/etc/systemd/network', '/run/systemd/network',
73              '/run/systemd/netif', '/run/systemd/resolve']:
74        if os.path.isdir(d):
75            subprocess.check_call(["mount", "-t", "tmpfs", "none", d])
76            tmpmounts.append(d)
77    if os.path.isdir('/run/systemd/resolve'):
78        os.chmod('/run/systemd/resolve', 0o755)
79        shutil.chown('/run/systemd/resolve', 'systemd-resolve', 'systemd-resolve')
80    if os.path.isdir('/run/systemd/netif'):
81        os.chmod('/run/systemd/netif', 0o755)
82        shutil.chown('/run/systemd/netif', 'systemd-network', 'systemd-network')
83
84    # Avoid "Failed to open /dev/tty" errors in containers.
85    os.environ['SYSTEMD_LOG_TARGET'] = 'journal'
86
87    # Ensure the unit directory exists so tests can dump files into it.
88    os.makedirs(NETWORK_UNITDIR, exist_ok=True)
89
90
91def tearDownModule():
92    global tmpmounts
93    for d in tmpmounts:
94        subprocess.check_call(["umount", "--lazy", d])
95    for u in stopped_units:
96        subprocess.call(["systemctl", "stop", u])
97    for u in running_units:
98        subprocess.call(["systemctl", "restart", u])
99
100
101class NetworkdTestingUtilities:
102    """Provide a set of utility functions to facilitate networkd tests.
103
104    This class must be inherited along with unittest.TestCase to define
105    some required methods.
106    """
107
108    def add_veth_pair(self, veth, peer, veth_options=(), peer_options=()):
109        """Add a veth interface pair, and queue them to be removed."""
110        subprocess.check_call(['ip', 'link', 'add', 'name', veth] +
111                              list(veth_options) +
112                              ['type', 'veth', 'peer', 'name', peer] +
113                              list(peer_options))
114        self.addCleanup(subprocess.call, ['ip', 'link', 'del', 'dev', peer])
115
116    def write_config(self, path, contents):
117        """"Write a configuration file, and queue it to be removed."""
118
119        with open(path, 'w') as f:
120            f.write(contents)
121
122        self.addCleanup(os.remove, path)
123
124    def write_network(self, unit_name, contents):
125        """Write a network unit file, and queue it to be removed."""
126        self.write_config(os.path.join(NETWORK_UNITDIR, unit_name), contents)
127
128    def write_network_dropin(self, unit_name, dropin_name, contents):
129        """Write a network unit drop-in, and queue it to be removed."""
130        dropin_dir = os.path.join(NETWORK_UNITDIR, "{}.d".format(unit_name))
131        dropin_path = os.path.join(dropin_dir, "{}.conf".format(dropin_name))
132
133        os.makedirs(dropin_dir, exist_ok=True)
134        self.addCleanup(os.rmdir, dropin_dir)
135        with open(dropin_path, 'w') as dropin:
136            dropin.write(contents)
137        self.addCleanup(os.remove, dropin_path)
138
139    def read_attr(self, link, attribute):
140        """Read a link attributed from the sysfs."""
141        # Note we don't want to check if interface `link' is managed, we
142        # want to evaluate link variable and pass the value of the link to
143        # assert_link_states e.g. eth0=managed.
144        self.assert_link_states(**{link:'managed'})
145        with open(os.path.join('/sys/class/net', link, attribute)) as f:
146            return f.readline().strip()
147
148    def assert_link_states(self, **kwargs):
149        """Match networkctl link states to the given ones.
150
151        Each keyword argument should be the name of a network interface
152        with its expected value of the "SETUP" column in output from
153        networkctl.  The interfaces have five seconds to come online
154        before the check is performed.  Every specified interface must
155        be present in the output, and any other interfaces found in the
156        output are ignored.
157
158        A special interface state "managed" is supported, which matches
159        any value in the "SETUP" column other than "unmanaged".
160        """
161        if not kwargs:
162            return
163        interfaces = set(kwargs)
164
165        # Wait for the requested interfaces, but don't fail for them.
166        subprocess.call([NETWORKD_WAIT_ONLINE, '--timeout=5'] +
167                        ['--interface={}'.format(iface) for iface in kwargs])
168
169        # Validate each link state found in the networkctl output.
170        out = subprocess.check_output(['networkctl', '--no-legend']).rstrip()
171        for line in out.decode('utf-8').split('\n'):
172            fields = line.split()
173            if len(fields) >= 5 and fields[1] in kwargs:
174                iface = fields[1]
175                expected = kwargs[iface]
176                actual = fields[-1]
177                if (actual != expected and
178                        not (expected == 'managed' and actual != 'unmanaged')):
179                    self.fail("Link {} expects state {}, found {}".format(iface, expected, actual))
180                interfaces.remove(iface)
181
182        # Ensure that all requested interfaces have been covered.
183        if interfaces:
184            self.fail("Missing links in status output: {}".format(interfaces))
185
186
187class BridgeTest(NetworkdTestingUtilities, unittest.TestCase):
188    """Provide common methods for testing networkd against servers."""
189
190    def setUp(self):
191        self.write_network('port1.netdev', '''\
192[NetDev]
193Name=port1
194Kind=dummy
195MACAddress=12:34:56:78:9a:bc
196''')
197        self.write_network('port2.netdev', '''\
198[NetDev]
199Name=port2
200Kind=dummy
201MACAddress=12:34:56:78:9a:bd
202''')
203        self.write_network('mybridge.netdev', '''\
204[NetDev]
205Name=mybridge
206Kind=bridge
207''')
208        self.write_network('port1.network', '''\
209[Match]
210Name=port1
211[Network]
212Bridge=mybridge
213''')
214        self.write_network('port2.network', '''\
215[Match]
216Name=port2
217[Network]
218Bridge=mybridge
219''')
220        self.write_network('mybridge.network', '''\
221[Match]
222Name=mybridge
223[Network]
224DNS=192.168.250.1
225Address=192.168.250.33/24
226Gateway=192.168.250.1
227''')
228        subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
229        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
230
231    def tearDown(self):
232        subprocess.check_call(['systemctl', 'stop', 'systemd-networkd.socket'])
233        subprocess.check_call(['systemctl', 'stop', 'systemd-networkd.service'])
234        subprocess.check_call(['ip', 'link', 'del', 'mybridge'])
235        subprocess.check_call(['ip', 'link', 'del', 'port1'])
236        subprocess.check_call(['ip', 'link', 'del', 'port2'])
237
238    def test_bridge_init(self):
239        self.assert_link_states(
240            port1='managed',
241            port2='managed',
242            mybridge='managed')
243
244    def test_bridge_port_priority(self):
245        self.assertEqual(self.read_attr('port1', 'brport/priority'), '32')
246        self.write_network_dropin('port1.network', 'priority', '''\
247[Bridge]
248Priority=28
249''')
250        subprocess.check_call(['ip', 'link', 'set', 'dev', 'port1', 'down'])
251        subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
252        subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
253                               'port1', '--timeout=5'])
254        self.assertEqual(self.read_attr('port1', 'brport/priority'), '28')
255
256    def test_bridge_port_priority_set_zero(self):
257        """It should be possible to set the bridge port priority to 0"""
258        self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
259        self.write_network_dropin('port2.network', 'priority', '''\
260[Bridge]
261Priority=0
262''')
263        subprocess.check_call(['ip', 'link', 'set', 'dev', 'port2', 'down'])
264        subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
265        subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
266                               'port2', '--timeout=5'])
267        self.assertEqual(self.read_attr('port2', 'brport/priority'), '0')
268
269    def test_bridge_port_property(self):
270        """Test the "[Bridge]" section keys"""
271        self.assertEqual(self.read_attr('port2', 'brport/priority'), '32')
272        self.write_network_dropin('port2.network', 'property', '''\
273[Bridge]
274UnicastFlood=true
275HairPin=true
276Isolated=true
277UseBPDU=true
278FastLeave=true
279AllowPortToBeRoot=true
280Cost=555
281Priority=23
282''')
283        subprocess.check_call(['ip', 'link', 'set', 'dev', 'port2', 'down'])
284        subprocess.check_call(['systemctl', 'restart', 'systemd-networkd'])
285        subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
286                               'port2', '--timeout=5'])
287
288        self.assertEqual(self.read_attr('port2', 'brport/priority'), '23')
289        self.assertEqual(self.read_attr('port2', 'brport/hairpin_mode'), '1')
290        self.assertEqual(self.read_attr('port2', 'brport/isolated'), '1')
291        self.assertEqual(self.read_attr('port2', 'brport/path_cost'), '555')
292        self.assertEqual(self.read_attr('port2', 'brport/multicast_fast_leave'), '1')
293        self.assertEqual(self.read_attr('port2', 'brport/unicast_flood'), '1')
294        self.assertEqual(self.read_attr('port2', 'brport/bpdu_guard'), '1')
295        self.assertEqual(self.read_attr('port2', 'brport/root_block'), '1')
296
297class ClientTestBase(NetworkdTestingUtilities):
298    """Provide common methods for testing networkd against servers."""
299
300    @classmethod
301    def setUpClass(klass):
302        klass.orig_log_level = subprocess.check_output(
303            ['systemctl', 'show', '--value', '--property', 'LogLevel'],
304            universal_newlines=True).strip()
305        subprocess.check_call(['systemd-analyze', 'log-level', 'debug'])
306
307    @classmethod
308    def tearDownClass(klass):
309        subprocess.check_call(['systemd-analyze', 'log-level', klass.orig_log_level])
310
311    def setUp(self):
312        self.iface = 'test_eth42'
313        self.if_router = 'router_eth42'
314        self.workdir_obj = tempfile.TemporaryDirectory()
315        self.workdir = self.workdir_obj.name
316        self.config = 'test_eth42.network'
317
318        # get current journal cursor
319        subprocess.check_output(['journalctl', '--sync'])
320        out = subprocess.check_output(['journalctl', '-b', '--quiet',
321                                       '--no-pager', '-n0', '--show-cursor'],
322                                      universal_newlines=True)
323        self.assertTrue(out.startswith('-- cursor:'))
324        self.journal_cursor = out.split()[-1]
325
326        subprocess.call(['systemctl', 'reset-failed', 'systemd-networkd', 'systemd-resolved'])
327
328    def tearDown(self):
329        self.shutdown_iface()
330        subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket'])
331        subprocess.call(['systemctl', 'stop', 'systemd-networkd.service'])
332        subprocess.call(['ip', 'link', 'del', 'dummy0'],
333                        stderr=subprocess.DEVNULL)
334
335    def show_journal(self, unit):
336        '''Show journal of given unit since start of the test'''
337
338        print('---- {} ----'.format(unit))
339        subprocess.check_output(['journalctl', '--sync'])
340        sys.stdout.flush()
341        subprocess.call(['journalctl', '-b', '--no-pager', '--quiet',
342                         '--cursor', self.journal_cursor, '-u', unit])
343
344    def create_iface(self, ipv6=False):
345        '''Create test interface with DHCP server behind it'''
346
347        raise NotImplementedError('must be implemented by a subclass')
348
349    def shutdown_iface(self):
350        '''Remove test interface and stop DHCP server'''
351
352        raise NotImplementedError('must be implemented by a subclass')
353
354    def print_server_log(self):
355        '''Print DHCP server log for debugging failures'''
356
357        raise NotImplementedError('must be implemented by a subclass')
358
359    def start_unit(self, unit):
360        try:
361            subprocess.check_call(['systemctl', 'start', unit])
362        except subprocess.CalledProcessError:
363            self.show_journal(unit)
364            raise
365
366    def do_test(self, coldplug=True, ipv6=False, extra_opts='',
367                online_timeout=10, dhcp_mode='yes'):
368        self.start_unit('systemd-resolved')
369        self.write_network(self.config, '''\
370[Match]
371Name={iface}
372[Network]
373DHCP={dhcp_mode}
374{extra_opts}
375'''.format(iface=self.iface, dhcp_mode=dhcp_mode, extra_opts=extra_opts))
376
377        if coldplug:
378            # create interface first, then start networkd
379            self.create_iface(ipv6=ipv6)
380            self.start_unit('systemd-networkd')
381        elif coldplug is not None:
382            # start networkd first, then create interface
383            self.start_unit('systemd-networkd')
384            self.create_iface(ipv6=ipv6)
385        else:
386            # "None" means test sets up interface by itself
387            self.start_unit('systemd-networkd')
388
389        try:
390            subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface',
391                                   self.iface, '--timeout=%i' % online_timeout])
392
393            if ipv6:
394                # check iface state and IP 6 address; FIXME: we need to wait a bit
395                # longer, as the iface is "configured" already with IPv4 *or*
396                # IPv6, but we want to wait for both
397                for _ in range(10):
398                    out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.iface])
399                    if b'state UP' in out and b'inet6 2600' in out and b'inet 192.168' in out and b'tentative' not in out:
400                        break
401                    time.sleep(1)
402                else:
403                    self.fail('timed out waiting for IPv6 configuration')
404
405                self.assertRegex(out, b'inet6 2600::.* scope global .*dynamic')
406                self.assertRegex(out, b'inet6 fe80::.* scope link')
407            else:
408                # should have link-local address on IPv6 only
409                out = subprocess.check_output(['ip', '-6', 'a', 'show', 'dev', self.iface])
410                self.assertRegex(out, br'inet6 fe80::.* scope link')
411                self.assertNotIn(b'scope global', out)
412
413            # should have IPv4 address
414            out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
415            self.assertIn(b'state UP', out)
416            self.assertRegex(out, br'inet 192.168.5.\d+/.* scope global dynamic')
417
418            # check networkctl state
419            out = subprocess.check_output(['networkctl'])
420            self.assertRegex(out, (r'{}\s+ether\s+[a-z-]+\s+unmanaged'.format(self.if_router)).encode())
421            self.assertRegex(out, (r'{}\s+ether\s+routable\s+configured'.format(self.iface)).encode())
422
423            out = subprocess.check_output(['networkctl', '-n', '0', 'status', self.iface])
424            self.assertRegex(out, br'Type:\s+ether')
425            self.assertRegex(out, br'State:\s+routable.*configured')
426            self.assertRegex(out, br'Online state:\s+online')
427            self.assertRegex(out, br'Address:\s+192.168.5.\d+')
428            if ipv6:
429                self.assertRegex(out, br'2600::')
430            else:
431                self.assertNotIn(br'2600::', out)
432            self.assertRegex(out, br'fe80::')
433            self.assertRegex(out, br'Gateway:\s+192.168.5.1')
434            self.assertRegex(out, br'DNS:\s+192.168.5.1')
435        except (AssertionError, subprocess.CalledProcessError):
436            # show networkd status, journal, and DHCP server log on failure
437            with open(os.path.join(NETWORK_UNITDIR, self.config)) as f:
438                print('\n---- {} ----\n{}'.format(self.config, f.read()))
439            print('---- interface status ----')
440            sys.stdout.flush()
441            subprocess.call(['ip', 'a', 'show', 'dev', self.iface])
442            print('---- networkctl status {} ----'.format(self.iface))
443            sys.stdout.flush()
444            rc = subprocess.call(['networkctl', '-n', '0', 'status', self.iface])
445            if rc != 0:
446                print("'networkctl status' exited with an unexpected code {}".format(rc))
447            self.show_journal('systemd-networkd.service')
448            self.print_server_log()
449            raise
450
451        for timeout in range(50):
452            with open(RESOLV_CONF) as f:
453                contents = f.read()
454            if 'nameserver 192.168.5.1\n' in contents:
455                break
456            time.sleep(0.1)
457        else:
458            self.fail('nameserver 192.168.5.1 not found in ' + RESOLV_CONF)
459
460        if coldplug is False:
461            # check post-down.d hook
462            self.shutdown_iface()
463
464    def test_coldplug_dhcp_yes_ip4(self):
465        # we have a 12s timeout on RA, so we need to wait longer
466        self.do_test(coldplug=True, ipv6=False, online_timeout=15)
467
468    def test_coldplug_dhcp_yes_ip4_no_ra(self):
469        # with disabling RA explicitly things should be fast
470        self.do_test(coldplug=True, ipv6=False,
471                     extra_opts='IPv6AcceptRA=False')
472
473    def test_coldplug_dhcp_ip4_only(self):
474        # we have a 12s timeout on RA, so we need to wait longer
475        self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
476                     online_timeout=15)
477
478    def test_coldplug_dhcp_ip4_only_no_ra(self):
479        # with disabling RA explicitly things should be fast
480        self.do_test(coldplug=True, ipv6=False, dhcp_mode='ipv4',
481                     extra_opts='IPv6AcceptRA=False')
482
483    def test_coldplug_dhcp_ip6(self):
484        self.do_test(coldplug=True, ipv6=True)
485
486    def test_hotplug_dhcp_ip4(self):
487        # With IPv4 only we have a 12s timeout on RA, so we need to wait longer
488        self.do_test(coldplug=False, ipv6=False, online_timeout=15)
489
490    def test_hotplug_dhcp_ip6(self):
491        self.do_test(coldplug=False, ipv6=True)
492
493    def test_route_only_dns(self):
494        self.write_network('myvpn.netdev', '''\
495[NetDev]
496Name=dummy0
497Kind=dummy
498MACAddress=12:34:56:78:9a:bc
499''')
500        self.write_network('myvpn.network', '''\
501[Match]
502Name=dummy0
503[Network]
504Address=192.168.42.100/24
505DNS=192.168.42.1
506Domains= ~company
507''')
508
509        try:
510            self.do_test(coldplug=True, ipv6=False,
511                         extra_opts='IPv6AcceptRouterAdvertisements=False')
512        except subprocess.CalledProcessError as e:
513            # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
514            if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']:
515                raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848')
516            else:
517                raise
518
519        with open(RESOLV_CONF) as f:
520            contents = f.read()
521            # ~company is not a search domain, only a routing domain
522            self.assertNotRegex(contents, 'search.*company')
523            # our global server should appear
524            self.assertIn('nameserver 192.168.5.1\n', contents)
525            # should not have domain-restricted server as global server
526            self.assertNotIn('nameserver 192.168.42.1\n', contents)
527
528    def test_route_only_dns_all_domains(self):
529        self.write_network('myvpn.netdev', '''[NetDev]
530Name=dummy0
531Kind=dummy
532MACAddress=12:34:56:78:9a:bc
533''')
534        self.write_network('myvpn.network', '''[Match]
535Name=dummy0
536[Network]
537Address=192.168.42.100/24
538DNS=192.168.42.1
539Domains= ~company ~.
540''')
541
542        try:
543            self.do_test(coldplug=True, ipv6=False,
544                         extra_opts='IPv6AcceptRouterAdvertisements=False')
545        except subprocess.CalledProcessError as e:
546            # networkd often fails to start in LXC: https://github.com/systemd/systemd/issues/11848
547            if IS_CONTAINER and e.cmd == ['systemctl', 'start', 'systemd-networkd']:
548                raise unittest.SkipTest('https://github.com/systemd/systemd/issues/11848')
549            else:
550                raise
551
552        with open(RESOLV_CONF) as f:
553            contents = f.read()
554
555        # ~company is not a search domain, only a routing domain
556        self.assertNotRegex(contents, 'search.*company')
557
558        # our global server should appear
559        self.assertIn('nameserver 192.168.5.1\n', contents)
560        # should have company server as global server due to ~.
561        self.assertIn('nameserver 192.168.42.1\n', contents)
562
563
564@unittest.skipUnless(HAVE_DNSMASQ, 'dnsmasq not installed')
565class DnsmasqClientTest(ClientTestBase, unittest.TestCase):
566    '''Test networkd client against dnsmasq'''
567
568    def setUp(self):
569        super().setUp()
570        self.dnsmasq = None
571        self.iface_mac = 'de:ad:be:ef:47:11'
572
573    def create_iface(self, ipv6=False, dnsmasq_opts=None):
574        '''Create test interface with DHCP server behind it'''
575
576        # add veth pair
577        subprocess.check_call(['ip', 'link', 'add', 'name', self.iface,
578                               'address', self.iface_mac,
579                               'type', 'veth', 'peer', 'name', self.if_router])
580
581        # give our router an IP
582        subprocess.check_call(['ip', 'a', 'flush', 'dev', self.if_router])
583        subprocess.check_call(['ip', 'a', 'add', '192.168.5.1/24', 'dev', self.if_router])
584        if ipv6:
585            subprocess.check_call(['ip', 'a', 'add', '2600::1/64', 'dev', self.if_router])
586        subprocess.check_call(['ip', 'link', 'set', self.if_router, 'up'])
587
588        # add DHCP server
589        self.dnsmasq_log = os.path.join(self.workdir, 'dnsmasq.log')
590        lease_file = os.path.join(self.workdir, 'dnsmasq.leases')
591        if ipv6:
592            extra_opts = ['--enable-ra', '--dhcp-range=2600::10,2600::20']
593        else:
594            extra_opts = []
595        if dnsmasq_opts:
596            extra_opts += dnsmasq_opts
597        self.dnsmasq = subprocess.Popen(
598            ['dnsmasq', '--keep-in-foreground', '--log-queries',
599             '--log-facility=' + self.dnsmasq_log, '--conf-file=/dev/null',
600             '--dhcp-leasefile=' + lease_file, '--bind-interfaces',
601             '--interface=' + self.if_router, '--except-interface=lo',
602             '--dhcp-range=192.168.5.10,192.168.5.200'] + extra_opts)
603
604    def shutdown_iface(self):
605        '''Remove test interface and stop DHCP server'''
606
607        if self.if_router:
608            subprocess.check_call(['ip', 'link', 'del', 'dev', self.if_router])
609            self.if_router = None
610        if self.dnsmasq:
611            self.dnsmasq.kill()
612            self.dnsmasq.wait()
613            self.dnsmasq = None
614
615    def print_server_log(self):
616        '''Print DHCP server log for debugging failures'''
617
618        with open(self.dnsmasq_log) as f:
619            sys.stdout.write('\n\n---- dnsmasq log ----\n{}\n------\n\n'.format(f.read()))
620
621    def test_resolved_domain_restricted_dns(self):
622        '''resolved: domain-restricted DNS servers'''
623
624        # enable DNSSEC in allow downgrade mode, and turn off stuff we don't want to test to make looking at logs easier
625        conf = '/run/systemd/resolved.conf.d/test-enable-dnssec.conf'
626        os.makedirs(os.path.dirname(conf), exist_ok=True)
627        with open(conf, 'w') as f:
628            f.write('[Resolve]\nDNSSEC=allow-downgrade\nLLMNR=no\nMulticastDNS=no\nDNSOverTLS=no\n')
629        self.addCleanup(os.remove, conf)
630
631        # create interface for generic connections; this will map all DNS names
632        # to 192.168.42.1
633        self.create_iface(dnsmasq_opts=['--address=/#/192.168.42.1'])
634        self.write_network('general.network', '''\
635[Match]
636Name={}
637[Network]
638DHCP=ipv4
639IPv6AcceptRA=False
640DNSSECNegativeTrustAnchors=search.example.com
641'''.format(self.iface))
642
643        # create second device/dnsmasq for a .company/.lab VPN interface
644        # static IPs for simplicity
645        self.add_veth_pair('testvpnclient', 'testvpnrouter')
646        subprocess.check_call(['ip', 'a', 'flush', 'dev', 'testvpnrouter'])
647        subprocess.check_call(['ip', 'a', 'add', '10.241.3.1/24', 'dev', 'testvpnrouter'])
648        subprocess.check_call(['ip', 'link', 'set', 'testvpnrouter', 'up'])
649
650        vpn_dnsmasq_log = os.path.join(self.workdir, 'dnsmasq-vpn.log')
651        vpn_dnsmasq = subprocess.Popen(
652            ['dnsmasq', '--keep-in-foreground', '--log-queries',
653             '--log-facility=' + vpn_dnsmasq_log, '--conf-file=/dev/null',
654             '--dhcp-leasefile=/dev/null', '--bind-interfaces',
655             '--interface=testvpnrouter', '--except-interface=lo',
656             '--address=/math.lab/10.241.3.3', '--address=/cantina.company/10.241.4.4'])
657        self.addCleanup(vpn_dnsmasq.wait)
658        self.addCleanup(vpn_dnsmasq.kill)
659
660        self.write_network('vpn.network', '''\
661[Match]
662Name=testvpnclient
663[Network]
664IPv6AcceptRA=False
665Address=10.241.3.2/24
666DNS=10.241.3.1
667Domains=~company ~lab
668DNSSECNegativeTrustAnchors=company lab
669''')
670
671        self.start_unit('systemd-networkd')
672        subprocess.check_call([NETWORKD_WAIT_ONLINE, '--interface', self.iface,
673                               '--interface=testvpnclient', '--timeout=20'])
674
675        # ensure we start fresh with every test
676        subprocess.check_call(['systemctl', 'restart', 'systemd-resolved'])
677
678        # test vpnclient specific domains; these should *not* be answered by
679        # the general DNS
680        out = subprocess.check_output(['resolvectl', 'query', 'math.lab'])
681        self.assertIn(b'math.lab: 10.241.3.3', out)
682        out = subprocess.check_output(['resolvectl', 'query', 'kettle.cantina.company'])
683        self.assertIn(b'kettle.cantina.company: 10.241.4.4', out)
684
685        # test general domains
686        out = subprocess.check_output(['resolvectl', 'query', 'search.example.com'])
687        self.assertIn(b'search.example.com: 192.168.42.1', out)
688
689        with open(self.dnsmasq_log) as f:
690            general_log = f.read()
691        with open(vpn_dnsmasq_log) as f:
692            vpn_log = f.read()
693
694        # VPN domains should only be sent to VPN DNS
695        self.assertRegex(vpn_log, 'query.*math.lab')
696        self.assertRegex(vpn_log, 'query.*cantina.company')
697        self.assertNotIn('.lab', general_log)
698        self.assertNotIn('.company', general_log)
699
700        # general domains should not be sent to the VPN DNS
701        self.assertRegex(general_log, 'query.*search.example.com')
702        self.assertNotIn('search.example.com', vpn_log)
703
704    def test_resolved_etc_hosts(self):
705        '''resolved queries to /etc/hosts'''
706
707        # enabled DNSSEC in allow-downgrade mode
708        conf = '/run/systemd/resolved.conf.d/test-enable-dnssec.conf'
709        os.makedirs(os.path.dirname(conf), exist_ok=True)
710        with open(conf, 'w') as f:
711            f.write('[Resolve]\nDNSSEC=allow-downgrade\nLLMNR=no\nMulticastDNS=no\nDNSOverTLS=no\n')
712        self.addCleanup(os.remove, conf)
713
714        # Add example.com to NTA list for this test
715        negative = '/run/dnssec-trust-anchors.d/example.com.negative'
716        os.makedirs(os.path.dirname(negative), exist_ok=True)
717        with open(negative, 'w') as f:
718            f.write('example.com\n16.172.in-addr.arpa\n')
719        self.addCleanup(os.remove, negative)
720
721        # create /etc/hosts bind mount which resolves my.example.com for IPv4
722        hosts = os.path.join(self.workdir, 'hosts')
723        with open(hosts, 'w') as f:
724            f.write('172.16.99.99  my.example.com\n')
725        subprocess.check_call(['mount', '--bind', hosts, '/etc/hosts'])
726        self.addCleanup(subprocess.call, ['umount', '/etc/hosts'])
727        subprocess.check_call(['systemctl', 'restart', 'systemd-resolved.service'])
728
729        # note: different IPv4 address here, so that it's easy to tell apart
730        # what resolved the query
731        self.create_iface(dnsmasq_opts=['--host-record=my.example.com,172.16.99.1,2600::99:99',
732                                        '--host-record=other.example.com,172.16.0.42,2600::42',
733                                        '--mx-host=example.com,mail.example.com'],
734                          ipv6=True)
735        self.do_test(coldplug=None, ipv6=True)
736
737        try:
738            # family specific queries
739            out = subprocess.check_output(['resolvectl', 'query', '-4', 'my.example.com'])
740            self.assertIn(b'my.example.com: 172.16.99.99', out)
741            # we don't expect an IPv6 answer; if /etc/hosts has any IP address,
742            # it's considered a sufficient source
743            self.assertNotEqual(subprocess.call(['resolvectl', 'query', '-6', 'my.example.com']), 0)
744            # "any family" query; IPv4 should come from /etc/hosts
745            out = subprocess.check_output(['resolvectl', 'query', 'my.example.com'])
746            self.assertIn(b'my.example.com: 172.16.99.99', out)
747            # IP → name lookup; again, takes the /etc/hosts one
748            out = subprocess.check_output(['resolvectl', 'query', '172.16.99.99'])
749            self.assertIn(b'172.16.99.99: my.example.com', out)
750
751            # non-address RRs should fall back to DNS
752            out = subprocess.check_output(['resolvectl', 'query', '--type=MX', 'example.com'])
753            self.assertIn(b'example.com IN MX 1 mail.example.com', out)
754
755            # other domains query DNS
756            out = subprocess.check_output(['resolvectl', 'query', 'other.example.com'])
757            self.assertIn(b'172.16.0.42', out)
758            out = subprocess.check_output(['resolvectl', 'query', '172.16.0.42'])
759            self.assertIn(b'172.16.0.42: other.example.com', out)
760        except (AssertionError, subprocess.CalledProcessError):
761            self.show_journal('systemd-resolved.service')
762            self.print_server_log()
763            raise
764
765    def test_transient_hostname(self):
766        '''networkd sets transient hostname from DHCP'''
767
768        orig_hostname = socket.gethostname()
769        self.addCleanup(socket.sethostname, orig_hostname)
770        # temporarily move /etc/hostname away; restart hostnamed to pick it up
771        if os.path.exists('/etc/hostname'):
772            subprocess.check_call(['mount', '--bind', '/dev/null', '/etc/hostname'])
773            self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
774        subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
775        self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
776
777        self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
778        self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
779
780        try:
781            # should have received the fixed IP above
782            out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
783            self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
784            # should have set transient hostname in hostnamed; this is
785            # sometimes a bit lagging (issue #4753), so retry a few times
786            for retry in range(1, 6):
787                out = subprocess.check_output(['hostnamectl'])
788                if b'testgreen' in out:
789                    break
790                time.sleep(5)
791                sys.stdout.write('[retry %i] ' % retry)
792                sys.stdout.flush()
793            else:
794                self.fail('Transient hostname not found in hostnamectl:\n{}'.format(out.decode()))
795            # and also applied to the system
796            self.assertEqual(socket.gethostname(), 'testgreen')
797        except AssertionError:
798            self.show_journal('systemd-networkd.service')
799            self.show_journal('systemd-hostnamed.service')
800            self.print_server_log()
801            raise
802
803    def test_transient_hostname_with_static(self):
804        '''transient hostname is not applied if static hostname exists'''
805
806        orig_hostname = socket.gethostname()
807        self.addCleanup(socket.sethostname, orig_hostname)
808
809        if not os.path.exists('/etc/hostname'):
810            self.write_config('/etc/hostname', "foobarqux")
811        else:
812            self.write_config('/run/hostname.tmp', "foobarqux")
813            subprocess.check_call(['mount', '--bind', '/run/hostname.tmp', '/etc/hostname'])
814            self.addCleanup(subprocess.call, ['umount', '/etc/hostname'])
815
816        socket.sethostname("foobarqux");
817
818        subprocess.check_call(['systemctl', 'stop', 'systemd-hostnamed.service'])
819        self.addCleanup(subprocess.call, ['systemctl', 'stop', 'systemd-hostnamed.service'])
820
821        self.create_iface(dnsmasq_opts=['--dhcp-host={},192.168.5.210,testgreen'.format(self.iface_mac)])
822        self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=False', dhcp_mode='ipv4')
823
824        try:
825            # should have received the fixed IP above
826            out = subprocess.check_output(['ip', '-4', 'a', 'show', 'dev', self.iface])
827            self.assertRegex(out, b'inet 192.168.5.210/24 .* scope global dynamic')
828            # static hostname wins over transient one, thus *not* applied
829            self.assertEqual(socket.gethostname(), "foobarqux")
830        except AssertionError:
831            self.show_journal('systemd-networkd.service')
832            self.show_journal('systemd-hostnamed.service')
833            self.print_server_log()
834            raise
835
836
837class NetworkdClientTest(ClientTestBase, unittest.TestCase):
838    '''Test networkd client against networkd server'''
839
840    def setUp(self):
841        super().setUp()
842        self.dnsmasq = None
843
844    def create_iface(self, ipv6=False, dhcpserver_opts=None):
845        '''Create test interface with DHCP server behind it'''
846
847        # run "router-side" networkd in own mount namespace to shield it from
848        # "client-side" configuration and networkd
849        (fd, script) = tempfile.mkstemp(prefix='networkd-router.sh')
850        self.addCleanup(os.remove, script)
851        with os.fdopen(fd, 'w+') as f:
852            f.write('''\
853#!/bin/sh
854set -eu
855mkdir -p /run/systemd/network
856mkdir -p /run/systemd/netif
857mount -t tmpfs none /run/systemd/network
858mount -t tmpfs none /run/systemd/netif
859[ ! -e /run/dbus ] || mount -t tmpfs none /run/dbus
860# create router/client veth pair
861cat <<EOF >/run/systemd/network/test.netdev
862[NetDev]
863Name={ifr}
864Kind=veth
865
866[Peer]
867Name={ifc}
868EOF
869
870cat <<EOF >/run/systemd/network/test.network
871[Match]
872Name={ifr}
873
874[Network]
875Address=192.168.5.1/24
876{addr6}
877DHCPServer=yes
878
879[DHCPServer]
880PoolOffset=10
881PoolSize=50
882DNS=192.168.5.1
883{dhopts}
884EOF
885
886# run networkd as in systemd-networkd.service
887exec $(systemctl cat systemd-networkd.service | sed -n '/^ExecStart=/ {{ s/^.*=//; s/^[@+-]//; s/^!*//; p}}')
888'''.format(ifr=self.if_router,
889           ifc=self.iface,
890           addr6=('Address=2600::1/64' if ipv6 else ''),
891           dhopts=(dhcpserver_opts or '')))
892
893            os.fchmod(fd, 0o755)
894
895        subprocess.check_call(['systemd-run', '--unit=networkd-test-router.service',
896                               '-p', 'InaccessibleDirectories=-/etc/systemd/network',
897                               '-p', 'InaccessibleDirectories=-/run/systemd/network',
898                               '-p', 'InaccessibleDirectories=-/run/systemd/netif',
899                               '--service-type=notify', script])
900
901        # wait until devices got created
902        for _ in range(50):
903            out = subprocess.check_output(['ip', 'a', 'show', 'dev', self.if_router])
904            if b'state UP' in out and b'scope global' in out:
905                break
906            time.sleep(0.1)
907
908    def shutdown_iface(self):
909        '''Remove test interface and stop DHCP server'''
910
911        if self.if_router:
912            subprocess.check_call(['systemctl', 'stop', 'networkd-test-router.service'])
913            # ensure failed transient unit does not stay around
914            subprocess.call(['systemctl', 'reset-failed', 'networkd-test-router.service'])
915            subprocess.call(['ip', 'link', 'del', 'dev', self.if_router])
916            self.if_router = None
917
918    def print_server_log(self):
919        '''Print DHCP server log for debugging failures'''
920
921        self.show_journal('networkd-test-router.service')
922
923    @unittest.skip('networkd does not have DHCPv6 server support')
924    def test_hotplug_dhcp_ip6(self):
925        pass
926
927    @unittest.skip('networkd does not have DHCPv6 server support')
928    def test_coldplug_dhcp_ip6(self):
929        pass
930
931    def test_search_domains(self):
932
933        # we don't use this interface for this test
934        self.if_router = None
935
936        self.write_network('test.netdev', '''\
937[NetDev]
938Name=dummy0
939Kind=dummy
940MACAddress=12:34:56:78:9a:bc
941''')
942        self.write_network('test.network', '''\
943[Match]
944Name=dummy0
945[Network]
946Address=192.168.42.100/24
947DNS=192.168.42.1
948Domains= one two three four five six seven eight nine ten
949''')
950
951        self.start_unit('systemd-networkd')
952
953        for timeout in range(50):
954            with open(RESOLV_CONF) as f:
955                contents = f.read()
956            if ' one' in contents:
957                break
958            time.sleep(0.1)
959        self.assertRegex(contents, 'search .*one two three four five six seven eight nine ten')
960
961    def test_dropin(self):
962        # we don't use this interface for this test
963        self.if_router = None
964
965        self.write_network('test.netdev', '''\
966[NetDev]
967Name=dummy0
968Kind=dummy
969MACAddress=12:34:56:78:9a:bc
970''')
971        self.write_network('test.network', '''\
972[Match]
973Name=dummy0
974[Network]
975Address=192.168.42.100/24
976DNS=192.168.42.1
977''')
978        self.write_network_dropin('test.network', 'dns', '''\
979[Network]
980DNS=127.0.0.1
981''')
982
983        self.start_unit('systemd-resolved')
984        self.start_unit('systemd-networkd')
985
986        for timeout in range(50):
987            with open(RESOLV_CONF) as f:
988                contents = f.read()
989            if ' 127.0.0.1' in contents and '192.168.42.1' in contents:
990                break
991            time.sleep(0.1)
992        self.assertIn('nameserver 192.168.42.1\n', contents)
993        self.assertIn('nameserver 127.0.0.1\n', contents)
994
995    def test_dhcp_timezone(self):
996        '''networkd sets time zone from DHCP'''
997
998        def get_tz():
999            out = subprocess.check_output(['busctl', 'get-property', 'org.freedesktop.timedate1',
1000                                           '/org/freedesktop/timedate1', 'org.freedesktop.timedate1', 'Timezone'])
1001            assert out.startswith(b's "')
1002            out = out.strip()
1003            assert out.endswith(b'"')
1004            return out[3:-1].decode()
1005
1006        orig_timezone = get_tz()
1007        self.addCleanup(subprocess.call, ['timedatectl', 'set-timezone', orig_timezone])
1008
1009        self.create_iface(dhcpserver_opts='EmitTimezone=yes\nTimezone=Pacific/Honolulu')
1010        self.do_test(coldplug=None, extra_opts='IPv6AcceptRA=false\n[DHCP]\nUseTimezone=true', dhcp_mode='ipv4')
1011
1012        # should have applied the received timezone
1013        try:
1014            self.assertEqual(get_tz(), 'Pacific/Honolulu')
1015        except AssertionError:
1016            self.show_journal('systemd-networkd.service')
1017            self.show_journal('systemd-hostnamed.service')
1018            raise
1019
1020
1021class MatchClientTest(unittest.TestCase, NetworkdTestingUtilities):
1022    """Test [Match] sections in .network files.
1023
1024    Be aware that matching the test host's interfaces will wipe their
1025    configuration, so as a precaution, all network files should have a
1026    restrictive [Match] section to only ever interfere with the
1027    temporary veth interfaces created here.
1028    """
1029
1030    def tearDown(self):
1031        """Stop networkd."""
1032        subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket'])
1033        subprocess.call(['systemctl', 'stop', 'systemd-networkd.service'])
1034
1035    def test_basic_matching(self):
1036        """Verify the Name= line works throughout this class."""
1037        self.add_veth_pair('test_if1', 'fake_if2')
1038        self.write_network('test.network', "[Match]\nName=test_*\n[Network]")
1039        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1040        self.assert_link_states(test_if1='managed', fake_if2='unmanaged')
1041
1042    def test_inverted_matching(self):
1043        """Verify that a '!'-prefixed value inverts the match."""
1044        # Use a MAC address as the interfaces' common matching attribute
1045        # to avoid depending on udev, to support testing in containers.
1046        mac = '00:01:02:03:98:99'
1047        self.add_veth_pair('test_veth', 'test_peer',
1048                           ['addr', mac], ['addr', mac])
1049        self.write_network('no-veth.network', """\
1050[Match]
1051MACAddress={}
1052Name=!nonexistent *peer*
1053[Network]""".format(mac))
1054        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1055        self.assert_link_states(test_veth='managed', test_peer='unmanaged')
1056
1057
1058class UnmanagedClientTest(unittest.TestCase, NetworkdTestingUtilities):
1059    """Test if networkd manages the correct interfaces."""
1060
1061    def setUp(self):
1062        """Write .network files to match the named veth devices."""
1063        # Define the veth+peer pairs to be created.
1064        # Their pairing doesn't actually matter, only their names do.
1065        self.veths = {
1066            'm1def': 'm0unm',
1067            'm1man': 'm1unm',
1068        }
1069
1070        # Define the contents of .network files to be read in order.
1071        self.configs = (
1072            "[Match]\nName=m1def\n",
1073            "[Match]\nName=m1unm\n[Link]\nUnmanaged=yes\n",
1074            "[Match]\nName=m1*\n[Link]\nUnmanaged=no\n",
1075        )
1076
1077        # Write out the .network files to be cleaned up automatically.
1078        for i, config in enumerate(self.configs):
1079            self.write_network("%02d-test.network" % i, config)
1080
1081    def tearDown(self):
1082        """Stop networkd."""
1083        subprocess.call(['systemctl', 'stop', 'systemd-networkd.socket'])
1084        subprocess.call(['systemctl', 'stop', 'systemd-networkd.service'])
1085
1086    def create_iface(self):
1087        """Create temporary veth pairs for interface matching."""
1088        for veth, peer in self.veths.items():
1089            self.add_veth_pair(veth, peer)
1090
1091    def test_unmanaged_setting(self):
1092        """Verify link states with Unmanaged= settings, hot-plug."""
1093        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1094        self.create_iface()
1095        self.assert_link_states(m1def='managed',
1096                                m1man='managed',
1097                                m1unm='unmanaged',
1098                                m0unm='unmanaged')
1099
1100    def test_unmanaged_setting_coldplug(self):
1101        """Verify link states with Unmanaged= settings, cold-plug."""
1102        self.create_iface()
1103        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1104        self.assert_link_states(m1def='managed',
1105                                m1man='managed',
1106                                m1unm='unmanaged',
1107                                m0unm='unmanaged')
1108
1109    def test_catchall_config(self):
1110        """Verify link states with a catch-all config, hot-plug."""
1111        # Don't actually catch ALL interfaces.  It messes up the host.
1112        self.write_network('all.network', "[Match]\nName=m[01]???\n")
1113        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1114        self.create_iface()
1115        self.assert_link_states(m1def='managed',
1116                                m1man='managed',
1117                                m1unm='unmanaged',
1118                                m0unm='managed')
1119
1120    def test_catchall_config_coldplug(self):
1121        """Verify link states with a catch-all config, cold-plug."""
1122        # Don't actually catch ALL interfaces.  It messes up the host.
1123        self.write_network('all.network', "[Match]\nName=m[01]???\n")
1124        self.create_iface()
1125        subprocess.check_call(['systemctl', 'start', 'systemd-networkd'])
1126        self.assert_link_states(m1def='managed',
1127                                m1man='managed',
1128                                m1unm='unmanaged',
1129                                m0unm='managed')
1130
1131
1132if __name__ == '__main__':
1133    unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout,
1134                                                     verbosity=2))
1135