/mandos/release

To get this branch, use:
bzr branch http://bzr.recompile.se/loggerhead/mandos/release

« back to all changes in this revision

Viewing changes to mandos-ctl

  • Committer: teddy at recompile
  • Date: 2020-01-12 01:42:09 UTC
  • mto: This revision was merged to the branch mainline in revision 396.
  • Revision ID: teddy@recompile.se-20200112014209-ktr3acloxzbmhbnt
mandos-ctl: Add DBussy support

Add support in mandos-ctl for the "DBussy" Python D-Bus module.  Use
it by default, if it is available.

* mandos-ctl: Try to import the "dbussy" and its high-level module
  "ravel".
  (main): Use DBussy if import succeeded.
  (dbussy_adapter): New.
  (Test_dbussy_adapter_SystemBus): New test class.

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
1
#!/usr/bin/python3 -bbI
2
2
# -*- after-save-hook: (lambda () (let ((command (if (fboundp 'file-local-name) (file-local-name (buffer-file-name)) (or (file-remote-p (buffer-file-name) 'localname) (buffer-file-name))))) (if (= (progn (if (get-buffer "*Test*") (kill-buffer "*Test*")) (process-file-shell-command (format "%s --check" (shell-quote-argument command)) nil "*Test*")) 0) (let ((w (get-buffer-window "*Test*"))) (if w (delete-window w))) (progn (with-current-buffer "*Test*" (compilation-mode)) (display-buffer "*Test*" '(display-buffer-in-side-window)))))); coding: utf-8 -*-
3
3
#
4
 
# Mandos Control - Control or query the Mandos server
 
4
# Mandos Monitor - Control and monitor the Mandos server
5
5
#
6
 
# Copyright © 2008-2020 Teddy Hogeborn
7
 
# Copyright © 2008-2020 Björn Påhlsson
 
6
# Copyright © 2008-2019 Teddy Hogeborn
 
7
# Copyright © 2008-2019 Björn Påhlsson
8
8
#
9
9
# This file is part of Mandos.
10
10
#
89
89
 
90
90
locale.setlocale(locale.LC_ALL, "")
91
91
 
92
 
version = "1.8.14"
 
92
version = "1.8.9"
93
93
 
94
94
 
95
95
def main():
497
497
                             self.properties_iface, interface, key,
498
498
                             value)
499
499
 
500
 
        def call_method(self, methodname, busname, objectpath,
501
 
                        interface, *args):
502
 
            raise NotImplementedError()
503
 
 
504
 
 
505
500
    class MandosBus(SystemBus):
506
501
        busname_domain = "se.recompile"
507
502
        busname = busname_domain + ".Mandos"
726
721
            with self.convert_exception(dbus.Error):
727
722
                value =  method(*args)
728
723
            # DBussy returns values either as an empty list or as a
729
 
            # list of one element with the return value
 
724
            # tuple: (signature, value)
730
725
            if value:
731
726
                return self.type_filter(value[0])
732
727
 
738
733
 
739
734
        def type_filter(self, value):
740
735
            """Convert the most bothersome types to Python types"""
741
 
            # A D-Bus Variant value is represented as the Python type
742
 
            # Tuple[dbussy.DBUS.Signature, Any]
743
736
            if isinstance(value, tuple):
744
737
                if (len(value) == 2
745
738
                    and isinstance(value[0],
1634
1627
        finally:
1635
1628
            dbus_logger.removeFilter(counting_handler)
1636
1629
 
1637
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
1630
        self.assertNotIsInstance(e, dbus.ConnectFailed)
1638
1631
 
1639
1632
        # Make sure the dbus logger was suppressed
1640
1633
        self.assertEqual(0, counting_handler.count)
1777
1770
            self.call_method(bus, "methodname", "busname",
1778
1771
                             "objectpath", "interface")
1779
1772
 
1780
 
        self.assertNotIsInstance(e.exception, dbus.ConnectFailed)
 
1773
        self.assertNotIsInstance(e, dbus.ConnectFailed)
1781
1774
 
1782
1775
    def test_get_converts_to_correct_exception(self):
1783
1776
        bus = pydbus_adapter.SystemBus(
2439
2432
        busname = "se.recompile.Mandos"
2440
2433
        client_interface = "se.recompile.Mandos.Client"
2441
2434
        command.Approve().run(self.bus.clients, self.bus)
2442
 
        self.assertTrue(self.bus.clients)
2443
2435
        for clientpath in self.bus.clients:
2444
2436
            self.assertIn(("Approve", busname, clientpath,
2445
2437
                           client_interface, (True,)), self.bus.calls)
2448
2440
        busname = "se.recompile.Mandos"
2449
2441
        client_interface = "se.recompile.Mandos.Client"
2450
2442
        command.Deny().run(self.bus.clients, self.bus)
2451
 
        self.assertTrue(self.bus.clients)
2452
2443
        for clientpath in self.bus.clients:
2453
2444
            self.assertIn(("Approve", busname, clientpath,
2454
2445
                           client_interface, (False,)),
2455
2446
                          self.bus.calls)
2456
2447
 
2457
2448
    def test_Remove(self):
2458
 
        busname = "se.recompile.Mandos"
2459
 
        server_path = "/"
2460
 
        server_interface = "se.recompile.Mandos"
2461
 
        orig_clients = self.bus.clients.copy()
2462
2449
        command.Remove().run(self.bus.clients, self.bus)
2463
 
        self.assertFalse(self.bus.clients)
2464
 
        for clientpath in orig_clients:
2465
 
            self.assertIn(("RemoveClient", busname,
2466
 
                           server_path, server_interface,
 
2450
        for clientpath in self.bus.clients:
 
2451
            self.assertIn(("RemoveClient", dbus_busname,
 
2452
                           dbus_server_path, dbus_server_interface,
2467
2453
                           (clientpath,)), self.bus.calls)
2468
2454
 
2469
2455
    expected_json = {
2671
2657
        else:
2672
2658
            cmd_args = [() for x in range(len(self.values_to_get))]
2673
2659
            values_to_get = self.values_to_get
2674
 
        self.assertTrue(values_to_get)
2675
2660
        for value_to_get, cmd_arg in zip(values_to_get, cmd_args):
2676
2661
            for clientpath in self.bus.clients:
2677
2662
                self.bus.clients[clientpath][self.propname] = (
2678
2663
                    Unique())
2679
2664
            self.command(*cmd_arg).run(self.bus.clients, self.bus)
2680
 
            self.assertTrue(self.bus.clients)
2681
2665
            for clientpath in self.bus.clients:
2682
2666
                value = (self.bus.clients[clientpath]
2683
2667
                         [self.propname])