79
77
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
83
parser = argparse.ArgumentParser()
85
add_command_line_options(parser)
87
options = parser.parse_args()
89
check_option_syntax(parser, options)
91
clientnames = options.client
94
log.setLevel(logging.DEBUG)
97
bus = dbus.SystemBus()
98
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
99
dbus_busname, server_dbus_path)
100
mandos_dbus_objc = bus.get_object(dbus_busname,
102
except dbus.exceptions.DBusException:
103
log.critical("Could not connect to Mandos server")
106
mandos_serv = dbus.Interface(mandos_dbus_objc,
107
dbus_interface=server_dbus_interface)
108
mandos_serv_object_manager = dbus.Interface(
109
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
111
# Filter out log message from dbus module
112
dbus_logger = logging.getLogger("dbus.proxies")
113
class NullFilter(logging.Filter):
114
def filter(self, record):
116
dbus_filter = NullFilter()
118
dbus_logger.addFilter(dbus_filter)
119
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
120
server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
121
mandos_clients = {path: ifs_and_props[client_dbus_interface]
122
for path, ifs_and_props in
123
mandos_serv_object_manager
124
.GetManagedObjects().items()
125
if client_dbus_interface in ifs_and_props}
126
except dbus.exceptions.DBusException as e:
127
log.critical("Failed to access Mandos server through D-Bus:"
131
# restore dbus logger
132
dbus_logger.removeFilter(dbus_filter)
134
# Compile dict of (clients: properties) to process
138
clients = {objpath: properties
139
for objpath, properties in mandos_clients.items()}
141
for name in clientnames:
142
for objpath, properties in mandos_clients.items():
143
if properties["Name"] == name:
144
clients[objpath] = properties
147
log.critical("Client not found on server: %r", name)
150
# Run all commands on clients
151
commands = commands_from_options(options)
152
for command in commands:
153
command.run(clients, bus, mandos_serv)
156
def add_command_line_options(parser):
157
parser.add_argument("--version", action="version",
158
version="%(prog)s {}".format(version),
159
help="show version number and exit")
160
parser.add_argument("-a", "--all", action="store_true",
161
help="Select all clients")
162
parser.add_argument("-v", "--verbose", action="store_true",
163
help="Print all fields")
164
parser.add_argument("-j", "--dump-json", action="store_true",
165
help="Dump client data in JSON format")
166
enable_disable = parser.add_mutually_exclusive_group()
167
enable_disable.add_argument("-e", "--enable", action="store_true",
168
help="Enable client")
169
enable_disable.add_argument("-d", "--disable",
171
help="disable client")
172
parser.add_argument("-b", "--bump-timeout", action="store_true",
173
help="Bump timeout for client")
174
start_stop_checker = parser.add_mutually_exclusive_group()
175
start_stop_checker.add_argument("--start-checker",
177
help="Start checker for client")
178
start_stop_checker.add_argument("--stop-checker",
180
help="Stop checker for client")
181
parser.add_argument("-V", "--is-enabled", action="store_true",
182
help="Check if client is enabled")
183
parser.add_argument("-r", "--remove", action="store_true",
184
help="Remove client")
185
parser.add_argument("-c", "--checker",
186
help="Set checker command for client")
187
parser.add_argument("-t", "--timeout", type=string_to_delta,
188
help="Set timeout for client")
189
parser.add_argument("--extended-timeout", type=string_to_delta,
190
help="Set extended timeout for client")
191
parser.add_argument("-i", "--interval", type=string_to_delta,
192
help="Set checker interval for client")
193
approve_deny_default = parser.add_mutually_exclusive_group()
194
approve_deny_default.add_argument(
195
"--approve-by-default", action="store_true",
196
default=None, dest="approved_by_default",
197
help="Set client to be approved by default")
198
approve_deny_default.add_argument(
199
"--deny-by-default", action="store_false",
200
dest="approved_by_default",
201
help="Set client to be denied by default")
202
parser.add_argument("--approval-delay", type=string_to_delta,
203
help="Set delay before client approve/deny")
204
parser.add_argument("--approval-duration", type=string_to_delta,
205
help="Set duration of one client approval")
206
parser.add_argument("-H", "--host", help="Set host for client")
207
parser.add_argument("-s", "--secret",
208
type=argparse.FileType(mode="rb"),
209
help="Set password blob (file) for client")
210
approve_deny = parser.add_mutually_exclusive_group()
211
approve_deny.add_argument(
212
"-A", "--approve", action="store_true",
213
help="Approve any current client request")
214
approve_deny.add_argument("-D", "--deny", action="store_true",
215
help="Deny any current client request")
216
parser.add_argument("--debug", action="store_true",
217
help="Debug mode (show D-Bus commands)")
218
parser.add_argument("--check", action="store_true",
219
help="Run self-test")
220
parser.add_argument("client", nargs="*", help="Client name")
223
def string_to_delta(interval):
224
"""Parse a string and return a datetime.timedelta"""
227
return rfc3339_duration_to_delta(interval)
228
except ValueError as e:
229
log.warning("%s - Parsing as pre-1.6.1 interval instead",
231
return parse_pre_1_6_1_interval(interval)
80
def milliseconds_to_string(ms):
81
td = datetime.timedelta(0, 0, 0, ms)
82
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
83
.format(days="{}T".format(td.days) if td.days else "",
84
hours=td.seconds // 3600,
85
minutes=(td.seconds % 3600) // 60,
86
seconds=td.seconds % 60))
234
89
def rfc3339_duration_to_delta(duration):
407
def check_option_syntax(parser, options):
408
"""Apply additional restrictions on options, not expressible in
411
def has_actions(options):
412
return any((options.enable,
414
options.bump_timeout,
415
options.start_checker,
416
options.stop_checker,
419
options.checker is not None,
420
options.timeout is not None,
421
options.extended_timeout is not None,
422
options.interval is not None,
423
options.approved_by_default is not None,
424
options.approval_delay is not None,
425
options.approval_duration is not None,
426
options.host is not None,
427
options.secret is not None,
431
if has_actions(options) and not (options.client or options.all):
432
parser.error("Options require clients names or --all.")
433
if options.verbose and has_actions(options):
434
parser.error("--verbose can only be used alone.")
435
if options.dump_json and (options.verbose
436
or has_actions(options)):
437
parser.error("--dump-json can only be used alone.")
438
if options.all and not has_actions(options):
439
parser.error("--all requires an action.")
440
if options.is_enabled and len(options.client) > 1:
441
parser.error("--is-enabled requires exactly one client")
443
options.remove = False
444
if has_actions(options) and not options.deny:
445
parser.error("--remove can only be combined with --deny")
446
options.remove = True
449
def commands_from_options(options):
453
if options.is_enabled:
454
commands.append(IsEnabledCmd())
457
commands.append(ApproveCmd())
460
commands.append(DenyCmd())
463
commands.append(RemoveCmd())
465
if options.dump_json:
466
commands.append(DumpJSONCmd())
469
commands.append(EnableCmd())
472
commands.append(DisableCmd())
474
if options.bump_timeout:
475
commands.append(BumpTimeoutCmd())
477
if options.start_checker:
478
commands.append(StartCheckerCmd())
480
if options.stop_checker:
481
commands.append(StopCheckerCmd())
483
if options.approved_by_default is not None:
484
if options.approved_by_default:
485
commands.append(ApproveByDefaultCmd())
487
commands.append(DenyByDefaultCmd())
489
if options.checker is not None:
490
commands.append(SetCheckerCmd(options.checker))
492
if options.host is not None:
493
commands.append(SetHostCmd(options.host))
495
if options.secret is not None:
496
commands.append(SetSecretCmd(options.secret))
498
if options.timeout is not None:
499
commands.append(SetTimeoutCmd(options.timeout))
501
if options.extended_timeout:
503
SetExtendedTimeoutCmd(options.extended_timeout))
505
if options.interval is not None:
506
commands.append(SetIntervalCmd(options.interval))
508
if options.approval_delay is not None:
509
commands.append(SetApprovalDelayCmd(options.approval_delay))
511
if options.approval_duration is not None:
513
SetApprovalDurationCmd(options.approval_duration))
515
# If no command option has been given, show table of clients,
516
# optionally verbosely
518
commands.append(PrintTableCmd(verbose=options.verbose))
273
## Classes for commands.
275
# Abstract classes first
523
276
class Command(object):
524
277
"""Abstract class for commands"""
525
def run(self, clients, bus=None, mandos=None):
278
def run(self, mandos, clients):
526
279
"""Normal commands should implement run_on_one_client(), but
527
280
commands which want to operate on all clients at the same time
528
281
can override this run() method instead."""
529
282
self.mandos = mandos
530
for clientpath, properties in clients.items():
531
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
532
dbus_busname, str(clientpath))
533
client = bus.get_object(dbus_busname, clientpath)
283
for client, properties in clients.items():
534
284
self.run_on_one_client(client, properties)
537
class IsEnabledCmd(Command):
538
def run(self, clients, bus=None, mandos=None):
539
client, properties = next(iter(clients.items()))
540
if self.is_enabled(client, properties):
543
def is_enabled(self, client, properties):
544
return properties["Enabled"]
547
class ApproveCmd(Command):
548
def run_on_one_client(self, client, properties):
549
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
550
client.__dbus_object_path__, client_dbus_interface)
551
client.Approve(dbus.Boolean(True),
552
dbus_interface=client_dbus_interface)
555
class DenyCmd(Command):
556
def run_on_one_client(self, client, properties):
557
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
558
client.__dbus_object_path__, client_dbus_interface)
559
client.Approve(dbus.Boolean(False),
560
dbus_interface=client_dbus_interface)
563
class RemoveCmd(Command):
564
def run_on_one_client(self, client, properties):
565
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
566
server_dbus_path, server_dbus_interface,
567
str(client.__dbus_object_path__))
568
self.mandos.RemoveClient(client.__dbus_object_path__)
571
286
class PrintCmd(Command):
572
287
"""Abstract class for commands printing client details"""
573
288
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
577
292
"LastApprovalRequest", "ApprovalDelay",
578
293
"ApprovalDuration", "Checker", "ExtendedTimeout",
579
294
"Expires", "LastCheckerStatus")
580
def run(self, clients, bus=None, mandos=None):
581
print(self.output(clients.values()))
582
def output(self, clients):
583
raise NotImplementedError()
586
class DumpJSONCmd(PrintCmd):
587
def output(self, clients):
588
data = {client["Name"]:
589
{key: self.dbus_boolean_to_bool(client[key])
590
for key in self.all_keywords}
591
for client in clients.values()}
592
return json.dumps(data, indent=4, separators=(',', ': '))
594
def dbus_boolean_to_bool(value):
595
if isinstance(value, dbus.Boolean):
295
def run(self, mandos, clients):
296
print(self.output(clients))
298
class PropertyCmd(Command):
299
"""Abstract class for Actions for setting one client property"""
300
def run_on_one_client(self, client, properties):
301
"""Set the Client's D-Bus property"""
302
client.Set(client_interface, self.property, self.value_to_set,
303
dbus_interface=dbus.PROPERTIES_IFACE)
305
class ValueArgumentMixIn(object):
306
"""Mixin class for commands taking a value as argument"""
307
def __init__(self, value):
308
self.value_to_set = value
310
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
311
"""Mixin class for commands taking a value argument as
314
def value_to_set(self):
317
def value_to_set(self, value):
318
"""When setting, convert value to a datetime.timedelta"""
319
self._vts = int(round(value.total_seconds() * 1000))
321
# Actual (non-abstract) command classes
600
323
class PrintTableCmd(PrintCmd):
601
324
def __init__(self, verbose=False):
602
325
self.verbose = verbose
604
327
def output(self, clients):
605
default_keywords = ("Name", "Enabled", "Timeout",
328
default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
607
329
keywords = default_keywords
609
331
keywords = self.all_keywords
610
return str(self.TableOfClients(clients, keywords))
332
return str(self.TableOfClients(clients.values(), keywords))
612
334
class TableOfClients(object):
684
406
for key in self.keywords})
687
def milliseconds_to_string(ms):
688
td = datetime.timedelta(0, 0, 0, ms)
689
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
690
.format(days="{}T".format(td.days) if td.days else "",
691
hours=td.seconds // 3600,
692
minutes=(td.seconds % 3600) // 60,
693
seconds=td.seconds % 60))
696
class PropertyCmd(Command):
697
"""Abstract class for Actions for setting one client property"""
698
def run_on_one_client(self, client, properties):
699
"""Set the Client's D-Bus property"""
700
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
701
client.__dbus_object_path__,
702
dbus.PROPERTIES_IFACE, client_dbus_interface,
703
self.propname, self.value_to_set
704
if not isinstance(self.value_to_set, dbus.Boolean)
705
else bool(self.value_to_set))
706
client.Set(client_dbus_interface, self.propname,
708
dbus_interface=dbus.PROPERTIES_IFACE)
711
raise NotImplementedError()
410
class DumpJSONCmd(PrintCmd):
411
def output(self, clients):
412
data = {client["Name"]:
413
{key: self.dbus_boolean_to_bool(client[key])
414
for key in self.all_keywords}
415
for client in clients.values()}
416
return json.dumps(data, indent=4, separators=(',', ': '))
418
def dbus_boolean_to_bool(value):
419
if isinstance(value, dbus.Boolean):
423
class IsEnabledCmd(Command):
424
def run_on_one_client(self, client, properties):
425
if self.is_enabled(client, properties):
428
def is_enabled(self, client, properties):
429
return bool(properties["Enabled"])
431
class RemoveCmd(Command):
432
def run_on_one_client(self, client, properties):
433
self.mandos.RemoveClient(client.__dbus_object_path__)
435
class ApproveCmd(Command):
436
def run_on_one_client(self, client, properties):
437
client.Approve(dbus.Boolean(True),
438
dbus_interface=client_interface)
440
class DenyCmd(Command):
441
def run_on_one_client(self, client, properties):
442
client.Approve(dbus.Boolean(False),
443
dbus_interface=client_interface)
714
445
class EnableCmd(PropertyCmd):
716
447
value_to_set = dbus.Boolean(True)
719
449
class DisableCmd(PropertyCmd):
721
451
value_to_set = dbus.Boolean(False)
724
453
class BumpTimeoutCmd(PropertyCmd):
725
propname = "LastCheckedOK"
454
property = "LastCheckedOK"
726
455
value_to_set = ""
729
457
class StartCheckerCmd(PropertyCmd):
730
propname = "CheckerRunning"
458
property = "CheckerRunning"
731
459
value_to_set = dbus.Boolean(True)
734
461
class StopCheckerCmd(PropertyCmd):
735
propname = "CheckerRunning"
462
property = "CheckerRunning"
736
463
value_to_set = dbus.Boolean(False)
739
465
class ApproveByDefaultCmd(PropertyCmd):
740
propname = "ApprovedByDefault"
466
property = "ApprovedByDefault"
741
467
value_to_set = dbus.Boolean(True)
744
469
class DenyByDefaultCmd(PropertyCmd):
745
propname = "ApprovedByDefault"
470
property = "ApprovedByDefault"
746
471
value_to_set = dbus.Boolean(False)
749
class PropertyValueCmd(PropertyCmd):
750
"""Abstract class for PropertyCmd recieving a value as argument"""
751
def __init__(self, value):
752
self.value_to_set = value
755
class SetCheckerCmd(PropertyValueCmd):
759
class SetHostCmd(PropertyValueCmd):
763
class SetSecretCmd(PropertyValueCmd):
473
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
476
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
479
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
766
481
def value_to_set(self):
770
485
"""When setting, read data from supplied file object"""
771
486
self._vts = value.read()
775
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
776
"""Abstract class for PropertyValueCmd taking a value argument as
777
a datetime.timedelta() but should store it as milliseconds."""
779
def value_to_set(self):
782
def value_to_set(self, value):
783
"""When setting, convert value from a datetime.timedelta"""
784
self._vts = int(round(value.total_seconds() * 1000))
787
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
791
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
792
propname = "ExtendedTimeout"
795
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
796
propname = "Interval"
799
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
800
propname = "ApprovalDelay"
803
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
804
propname = "ApprovalDuration"
490
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
493
class SetExtendedTimeoutCmd(PropertyCmd,
494
MillisecondsValueArgumentMixIn):
495
property = "ExtendedTimeout"
497
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
498
property = "Interval"
500
class SetApprovalDelayCmd(PropertyCmd,
501
MillisecondsValueArgumentMixIn):
502
property = "ApprovalDelay"
504
class SetApprovalDurationCmd(PropertyCmd,
505
MillisecondsValueArgumentMixIn):
506
property = "ApprovalDuration"
508
def add_command_line_options(parser):
509
parser.add_argument("--version", action="version",
510
version="%(prog)s {}".format(version),
511
help="show version number and exit")
512
parser.add_argument("-a", "--all", action="store_true",
513
help="Select all clients")
514
parser.add_argument("-v", "--verbose", action="store_true",
515
help="Print all fields")
516
parser.add_argument("-j", "--dump-json", action="store_true",
517
help="Dump client data in JSON format")
518
enable_disable = parser.add_mutually_exclusive_group()
519
enable_disable.add_argument("-e", "--enable", action="store_true",
520
help="Enable client")
521
enable_disable.add_argument("-d", "--disable",
523
help="disable client")
524
parser.add_argument("-b", "--bump-timeout", action="store_true",
525
help="Bump timeout for client")
526
start_stop_checker = parser.add_mutually_exclusive_group()
527
start_stop_checker.add_argument("--start-checker",
529
help="Start checker for client")
530
start_stop_checker.add_argument("--stop-checker",
532
help="Stop checker for client")
533
parser.add_argument("-V", "--is-enabled", action="store_true",
534
help="Check if client is enabled")
535
parser.add_argument("-r", "--remove", action="store_true",
536
help="Remove client")
537
parser.add_argument("-c", "--checker",
538
help="Set checker command for client")
539
parser.add_argument("-t", "--timeout", type=string_to_delta,
540
help="Set timeout for client")
541
parser.add_argument("--extended-timeout", type=string_to_delta,
542
help="Set extended timeout for client")
543
parser.add_argument("-i", "--interval", type=string_to_delta,
544
help="Set checker interval for client")
545
approve_deny_default = parser.add_mutually_exclusive_group()
546
approve_deny_default.add_argument(
547
"--approve-by-default", action="store_true",
548
default=None, dest="approved_by_default",
549
help="Set client to be approved by default")
550
approve_deny_default.add_argument(
551
"--deny-by-default", action="store_false",
552
dest="approved_by_default",
553
help="Set client to be denied by default")
554
parser.add_argument("--approval-delay", type=string_to_delta,
555
help="Set delay before client approve/deny")
556
parser.add_argument("--approval-duration", type=string_to_delta,
557
help="Set duration of one client approval")
558
parser.add_argument("-H", "--host", help="Set host for client")
559
parser.add_argument("-s", "--secret",
560
type=argparse.FileType(mode="rb"),
561
help="Set password blob (file) for client")
562
approve_deny = parser.add_mutually_exclusive_group()
563
approve_deny.add_argument(
564
"-A", "--approve", action="store_true",
565
help="Approve any current client request")
566
approve_deny.add_argument("-D", "--deny", action="store_true",
567
help="Deny any current client request")
568
parser.add_argument("--check", action="store_true",
569
help="Run self-test")
570
parser.add_argument("client", nargs="*", help="Client name")
573
def commands_from_options(options):
577
if options.dump_json:
578
commands.append(DumpJSONCmd())
581
commands.append(EnableCmd())
584
commands.append(DisableCmd())
586
if options.bump_timeout:
587
commands.append(BumpTimeoutCmd())
589
if options.start_checker:
590
commands.append(StartCheckerCmd())
592
if options.stop_checker:
593
commands.append(StopCheckerCmd())
595
if options.is_enabled:
596
commands.append(IsEnabledCmd())
599
commands.append(RemoveCmd())
601
if options.checker is not None:
602
commands.append(SetCheckerCmd(options.checker))
604
if options.timeout is not None:
605
commands.append(SetTimeoutCmd(options.timeout))
607
if options.extended_timeout:
609
SetExtendedTimeoutCmd(options.extended_timeout))
611
if options.interval is not None:
612
commands.append(SetIntervalCmd(options.interval))
614
if options.approved_by_default is not None:
615
if options.approved_by_default:
616
commands.append(ApproveByDefaultCmd())
618
commands.append(DenyByDefaultCmd())
620
if options.approval_delay is not None:
621
commands.append(SetApprovalDelayCmd(options.approval_delay))
623
if options.approval_duration is not None:
625
SetApprovalDurationCmd(options.approval_duration))
627
if options.host is not None:
628
commands.append(SetHostCmd(options.host))
630
if options.secret is not None:
631
commands.append(SetSecretCmd(options.secret))
634
commands.append(ApproveCmd())
637
commands.append(DenyCmd())
639
# If no command option has been given, show table of clients,
640
# optionally verbosely
642
commands.append(PrintTableCmd(verbose=options.verbose))
648
parser = argparse.ArgumentParser()
650
add_command_line_options(parser)
652
options = parser.parse_args()
654
def has_actions(options):
655
return any((options.enable,
657
options.bump_timeout,
658
options.start_checker,
659
options.stop_checker,
662
options.checker is not None,
663
options.timeout is not None,
664
options.extended_timeout is not None,
665
options.interval is not None,
666
options.approved_by_default is not None,
667
options.approval_delay is not None,
668
options.approval_duration is not None,
669
options.host is not None,
670
options.secret is not None,
674
if has_actions(options) and not (options.client or options.all):
675
parser.error("Options require clients names or --all.")
676
if options.verbose and has_actions(options):
677
parser.error("--verbose can only be used alone.")
678
if options.dump_json and (options.verbose
679
or has_actions(options)):
680
parser.error("--dump-json can only be used alone.")
681
if options.all and not has_actions(options):
682
parser.error("--all requires an action.")
683
if options.is_enabled and len(options.client) > 1:
684
parser.error("--is-enabled requires exactly one client")
686
clientnames = options.client
689
bus = dbus.SystemBus()
690
mandos_dbus_objc = bus.get_object(busname, server_path)
691
except dbus.exceptions.DBusException:
692
log.critical("Could not connect to Mandos server")
695
mandos_serv = dbus.Interface(mandos_dbus_objc,
696
dbus_interface=server_interface)
697
mandos_serv_object_manager = dbus.Interface(
698
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
700
# Filter out log message from dbus module
701
dbus_logger = logging.getLogger("dbus.proxies")
702
class NullFilter(logging.Filter):
703
def filter(self, record):
705
dbus_filter = NullFilter()
707
dbus_logger.addFilter(dbus_filter)
708
mandos_clients = {path: ifs_and_props[client_interface]
709
for path, ifs_and_props in
710
mandos_serv_object_manager
711
.GetManagedObjects().items()
712
if client_interface in ifs_and_props}
713
except dbus.exceptions.DBusException as e:
714
log.critical("Failed to access Mandos server through D-Bus:"
718
# restore dbus logger
719
dbus_logger.removeFilter(dbus_filter)
721
# Compile dict of (clients: properties) to process
725
clients = {bus.get_object(busname, path): properties
726
for path, properties in mandos_clients.items()}
728
for name in clientnames:
729
for path, client in mandos_clients.items():
730
if client["Name"] == name:
731
client_objc = bus.get_object(busname, path)
732
clients[client_objc] = client
735
log.critical("Client not found on server: %r", name)
738
# Run all commands on clients
739
commands = commands_from_options(options)
740
for command in commands:
741
command.run(mandos_serv, clients)
744
class Test_milliseconds_to_string(unittest.TestCase):
746
self.assertEqual(milliseconds_to_string(93785000),
748
def test_no_days(self):
749
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
750
def test_all_zero(self):
751
self.assertEqual(milliseconds_to_string(0), "00:00:00")
752
def test_no_fractional_seconds(self):
753
self.assertEqual(milliseconds_to_string(400), "00:00:00")
754
self.assertEqual(milliseconds_to_string(900), "00:00:00")
755
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
808
757
class Test_string_to_delta(unittest.TestCase):
809
758
def test_handles_basic_rfc3339(self):
810
759
self.assertEqual(string_to_delta("PT0S"),
838
787
self.assertEqual(value, datetime.timedelta(0, 7200))
841
class Test_check_option_syntax(unittest.TestCase):
842
# This mostly corresponds to the definition from has_actions() in
843
# check_option_syntax()
845
# The actual values set here are not that important, but we do
846
# at least stick to the correct types, even though they are
850
"bump_timeout": True,
851
"start_checker": True,
852
"stop_checker": True,
856
"timeout": datetime.timedelta(),
857
"extended_timeout": datetime.timedelta(),
858
"interval": datetime.timedelta(),
859
"approved_by_default": True,
860
"approval_delay": datetime.timedelta(),
861
"approval_duration": datetime.timedelta(),
863
"secret": io.BytesIO(b"x"),
869
self.parser = argparse.ArgumentParser()
870
add_command_line_options(self.parser)
872
@contextlib.contextmanager
873
def assertParseError(self):
874
with self.assertRaises(SystemExit) as e:
875
with self.temporarily_suppress_stderr():
877
# Exit code from argparse is guaranteed to be "2". Reference:
878
# https://docs.python.org/3/library
879
# /argparse.html#exiting-methods
880
self.assertEqual(e.exception.code, 2)
883
@contextlib.contextmanager
884
def temporarily_suppress_stderr():
885
null = os.open(os.path.devnull, os.O_RDWR)
886
stderrcopy = os.dup(sys.stderr.fileno())
887
os.dup2(null, sys.stderr.fileno())
893
os.dup2(stderrcopy, sys.stderr.fileno())
896
def check_option_syntax(self, options):
897
check_option_syntax(self.parser, options)
899
def test_actions_requires_client_or_all(self):
900
for action, value in self.actions.items():
901
options = self.parser.parse_args()
902
setattr(options, action, value)
903
with self.assertParseError():
904
self.check_option_syntax(options)
906
def test_actions_conflicts_with_verbose(self):
907
for action, value in self.actions.items():
908
options = self.parser.parse_args()
909
setattr(options, action, value)
910
options.verbose = True
911
with self.assertParseError():
912
self.check_option_syntax(options)
914
def test_dump_json_conflicts_with_verbose(self):
915
options = self.parser.parse_args()
916
options.dump_json = True
917
options.verbose = True
918
with self.assertParseError():
919
self.check_option_syntax(options)
921
def test_dump_json_conflicts_with_action(self):
922
for action, value in self.actions.items():
923
options = self.parser.parse_args()
924
setattr(options, action, value)
925
options.dump_json = True
926
with self.assertParseError():
927
self.check_option_syntax(options)
929
def test_all_can_not_be_alone(self):
930
options = self.parser.parse_args()
932
with self.assertParseError():
933
self.check_option_syntax(options)
935
def test_all_is_ok_with_any_action(self):
936
for action, value in self.actions.items():
937
options = self.parser.parse_args()
938
setattr(options, action, value)
940
self.check_option_syntax(options)
942
def test_is_enabled_fails_without_client(self):
943
options = self.parser.parse_args()
944
options.is_enabled = True
945
with self.assertParseError():
946
self.check_option_syntax(options)
948
def test_is_enabled_works_with_one_client(self):
949
options = self.parser.parse_args()
950
options.is_enabled = True
951
options.client = ["foo"]
952
self.check_option_syntax(options)
954
def test_is_enabled_fails_with_two_clients(self):
955
options = self.parser.parse_args()
956
options.is_enabled = True
957
options.client = ["foo", "barbar"]
958
with self.assertParseError():
959
self.check_option_syntax(options)
961
def test_remove_can_only_be_combined_with_action_deny(self):
962
for action, value in self.actions.items():
963
if action in {"remove", "deny"}:
965
options = self.parser.parse_args()
966
setattr(options, action, value)
968
options.remove = True
969
with self.assertParseError():
970
self.check_option_syntax(options)
973
class Test_command_from_options(unittest.TestCase):
975
self.parser = argparse.ArgumentParser()
976
add_command_line_options(self.parser)
977
def assert_command_from_args(self, args, command_cls,
979
"""Assert that parsing ARGS should result in an instance of
980
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
981
options = self.parser.parse_args(args)
982
check_option_syntax(self.parser, options)
983
commands = commands_from_options(options)
984
self.assertEqual(len(commands), 1)
985
command = commands[0]
986
self.assertIsInstance(command, command_cls)
987
for key, value in cmd_attrs.items():
988
self.assertEqual(getattr(command, key), value)
989
def test_print_table(self):
990
self.assert_command_from_args([], PrintTableCmd,
993
def test_print_table_verbose(self):
994
self.assert_command_from_args(["--verbose"], PrintTableCmd,
997
def test_print_table_verbose_short(self):
998
self.assert_command_from_args(["-v"], PrintTableCmd,
1001
def test_enable(self):
1002
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1004
def test_enable_short(self):
1005
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1007
def test_disable(self):
1008
self.assert_command_from_args(["--disable", "foo"],
1011
def test_disable_short(self):
1012
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1014
def test_bump_timeout(self):
1015
self.assert_command_from_args(["--bump-timeout", "foo"],
1018
def test_bump_timeout_short(self):
1019
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1021
def test_start_checker(self):
1022
self.assert_command_from_args(["--start-checker", "foo"],
1025
def test_stop_checker(self):
1026
self.assert_command_from_args(["--stop-checker", "foo"],
1029
def test_remove(self):
1030
self.assert_command_from_args(["--remove", "foo"],
1033
def test_remove_short(self):
1034
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1036
def test_checker(self):
1037
self.assert_command_from_args(["--checker", ":", "foo"],
1038
SetCheckerCmd, value_to_set=":")
1040
def test_checker_empty(self):
1041
self.assert_command_from_args(["--checker", "", "foo"],
1042
SetCheckerCmd, value_to_set="")
1044
def test_checker_short(self):
1045
self.assert_command_from_args(["-c", ":", "foo"],
1046
SetCheckerCmd, value_to_set=":")
1048
def test_timeout(self):
1049
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1051
value_to_set=300000)
1053
def test_timeout_short(self):
1054
self.assert_command_from_args(["-t", "PT5M", "foo"],
1056
value_to_set=300000)
1058
def test_extended_timeout(self):
1059
self.assert_command_from_args(["--extended-timeout", "PT15M",
1061
SetExtendedTimeoutCmd,
1062
value_to_set=900000)
1064
def test_interval(self):
1065
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1067
value_to_set=120000)
1069
def test_interval_short(self):
1070
self.assert_command_from_args(["-i", "PT2M", "foo"],
1072
value_to_set=120000)
1074
def test_approve_by_default(self):
1075
self.assert_command_from_args(["--approve-by-default", "foo"],
1076
ApproveByDefaultCmd)
1078
def test_deny_by_default(self):
1079
self.assert_command_from_args(["--deny-by-default", "foo"],
1082
def test_approval_delay(self):
1083
self.assert_command_from_args(["--approval-delay", "PT30S",
1084
"foo"], SetApprovalDelayCmd,
1087
def test_approval_duration(self):
1088
self.assert_command_from_args(["--approval-duration", "PT1S",
1089
"foo"], SetApprovalDurationCmd,
1092
def test_host(self):
1093
self.assert_command_from_args(["--host", "foo.example.org",
1095
value_to_set="foo.example.org")
1097
def test_host_short(self):
1098
self.assert_command_from_args(["-H", "foo.example.org",
1100
value_to_set="foo.example.org")
1102
def test_secret_devnull(self):
1103
self.assert_command_from_args(["--secret", os.path.devnull,
1104
"foo"], SetSecretCmd,
1107
def test_secret_tempfile(self):
1108
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1109
value = b"secret\0xyzzy\nbar"
1112
self.assert_command_from_args(["--secret", f.name,
1113
"foo"], SetSecretCmd,
1116
def test_secret_devnull_short(self):
1117
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1118
SetSecretCmd, value_to_set=b"")
1120
def test_secret_tempfile_short(self):
1121
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1122
value = b"secret\0xyzzy\nbar"
1125
self.assert_command_from_args(["-s", f.name, "foo"],
1129
def test_approve(self):
1130
self.assert_command_from_args(["--approve", "foo"],
1133
def test_approve_short(self):
1134
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1136
def test_deny(self):
1137
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1139
def test_deny_short(self):
1140
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1142
def test_dump_json(self):
1143
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1145
def test_is_enabled(self):
1146
self.assert_command_from_args(["--is-enabled", "foo"],
1149
def test_is_enabled_short(self):
1150
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1152
def test_deny_before_remove(self):
1153
options = self.parser.parse_args(["--deny", "--remove",
1155
check_option_syntax(self.parser, options)
1156
commands = commands_from_options(options)
1157
self.assertEqual(len(commands), 2)
1158
self.assertIsInstance(commands[0], DenyCmd)
1159
self.assertIsInstance(commands[1], RemoveCmd)
1161
def test_deny_before_remove_reversed(self):
1162
options = self.parser.parse_args(["--remove", "--deny",
1164
check_option_syntax(self.parser, options)
1165
commands = commands_from_options(options)
1166
self.assertEqual(len(commands), 2)
1167
self.assertIsInstance(commands[0], DenyCmd)
1168
self.assertIsInstance(commands[1], RemoveCmd)
1171
790
class TestCmd(unittest.TestCase):
1172
791
"""Abstract class for tests of command classes"""
1173
792
def setUp(self):
1175
794
class MockClient(object):
1176
795
def __init__(self, name, **attributes):
1177
self.__dbus_object_path__ = "/clients/{}".format(name)
796
self.__dbus_object_path__ = "objpath_{}".format(name)
1178
797
self.attributes = attributes
1179
798
self.attributes["Name"] = name
1181
def Set(self, interface, propname, value, dbus_interface):
1182
testcase.assertEqual(interface, client_dbus_interface)
1183
testcase.assertEqual(dbus_interface,
1184
dbus.PROPERTIES_IFACE)
1185
self.attributes[propname] = value
1186
def Get(self, interface, propname, dbus_interface):
1187
testcase.assertEqual(interface, client_dbus_interface)
1188
testcase.assertEqual(dbus_interface,
1189
dbus.PROPERTIES_IFACE)
1190
return self.attributes[propname]
800
def Set(self, interface, property, value, dbus_interface):
801
testcase.assertEqual(interface, client_interface)
802
testcase.assertEqual(dbus_interface,
803
dbus.PROPERTIES_IFACE)
804
self.attributes[property] = value
805
def Get(self, interface, property, dbus_interface):
806
testcase.assertEqual(interface, client_interface)
807
testcase.assertEqual(dbus_interface,
808
dbus.PROPERTIES_IFACE)
809
return self.attributes[property]
1191
810
def Approve(self, approve, dbus_interface):
1192
testcase.assertEqual(dbus_interface,
1193
client_dbus_interface)
811
testcase.assertEqual(dbus_interface, client_interface)
1194
812
self.calls.append(("Approve", (approve,
1195
813
dbus_interface)))
1196
814
self.client = MockClient(
1243
861
LastCheckerStatus=-2)
1244
862
self.clients = collections.OrderedDict(
1246
("/clients/foo", self.client.attributes),
1247
("/clients/barbar", self.other_client.attributes),
864
(self.client, self.client.attributes),
865
(self.other_client, self.other_client.attributes),
1249
self.one_client = {"/clients/foo": self.client.attributes}
1254
def get_object(client_bus_name, path):
1255
self.assertEqual(client_bus_name, dbus_busname)
1257
"/clients/foo": self.client,
1258
"/clients/barbar": self.other_client,
1263
class TestIsEnabledCmd(TestCmd):
1264
def test_is_enabled(self):
1265
self.assertTrue(all(IsEnabledCmd().is_enabled(client,
1267
for client, properties
1268
in self.clients.items()))
1269
def test_is_enabled_run_exits_successfully(self):
1270
with self.assertRaises(SystemExit) as e:
1271
IsEnabledCmd().run(self.one_client)
1272
if e.exception.code is not None:
1273
self.assertEqual(e.exception.code, 0)
1275
self.assertIsNone(e.exception.code)
1276
def test_is_enabled_run_exits_with_failure(self):
1277
self.client.attributes["Enabled"] = dbus.Boolean(False)
1278
with self.assertRaises(SystemExit) as e:
1279
IsEnabledCmd().run(self.one_client)
1280
if isinstance(e.exception.code, int):
1281
self.assertNotEqual(e.exception.code, 0)
1283
self.assertIsNotNone(e.exception.code)
1286
class TestApproveCmd(TestCmd):
1287
def test_approve(self):
1288
ApproveCmd().run(self.clients, self.bus)
1289
for clientpath in self.clients:
1290
client = self.bus.get_object(dbus_busname, clientpath)
1291
self.assertIn(("Approve", (True, client_dbus_interface)),
1295
class TestDenyCmd(TestCmd):
1296
def test_deny(self):
1297
DenyCmd().run(self.clients, self.bus)
1298
for clientpath in self.clients:
1299
client = self.bus.get_object(dbus_busname, clientpath)
1300
self.assertIn(("Approve", (False, client_dbus_interface)),
1303
class TestRemoveCmd(TestCmd):
1304
def test_remove(self):
1305
class MockMandos(object):
1308
def RemoveClient(self, dbus_path):
1309
self.calls.append(("RemoveClient", (dbus_path,)))
1310
mandos = MockMandos()
1311
super(TestRemoveCmd, self).setUp()
1312
RemoveCmd().run(self.clients, self.bus, mandos)
1313
self.assertEqual(len(mandos.calls), 2)
1314
for clientpath in self.clients:
1315
self.assertIn(("RemoveClient", (clientpath,)),
867
self.one_client = {self.client: self.client.attributes}
869
class TestPrintTableCmd(TestCmd):
870
def test_normal(self):
871
output = PrintTableCmd().output(self.clients)
872
expected_output = """
873
Name Enabled Timeout Last Successful Check
874
foo Yes 00:05:00 2019-02-03T00:00:00
875
barbar Yes 00:05:00 2019-02-04T00:00:00
877
self.assertEqual(output, expected_output)
878
def test_verbose(self):
879
output = PrintTableCmd(verbose=True).output(self.clients)
880
expected_output = """
881
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
882
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
883
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
885
self.assertEqual(output, expected_output)
886
def test_one_client(self):
887
output = PrintTableCmd().output(self.one_client)
888
expected_output = """
889
Name Enabled Timeout Last Successful Check
890
foo Yes 00:05:00 2019-02-03T00:00:00
892
self.assertEqual(output, expected_output)
1319
894
class TestDumpJSONCmd(TestCmd):
1320
895
def setUp(self):
1378
953
expected_json = {"foo": self.expected_json["foo"]}
1379
954
self.assertDictEqual(json_data, expected_json)
1382
class TestPrintTableCmd(TestCmd):
1383
def test_normal(self):
1384
output = PrintTableCmd().output(self.clients.values())
1385
expected_output = "\n".join((
1386
"Name Enabled Timeout Last Successful Check",
1387
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1388
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1390
self.assertEqual(output, expected_output)
1391
def test_verbose(self):
1392
output = PrintTableCmd(verbose=True).output(
1393
self.clients.values())
1408
"Last Successful Check ",
1409
"2019-02-03T00:00:00 ",
1410
"2019-02-04T00:00:00 ",
1413
"2019-01-02T00:00:00 ",
1414
"2019-01-03T00:00:00 ",
1426
("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
1428
("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
1432
"778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
1433
"3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
1435
"Check Is Running ",
1440
"2019-01-03T00:00:00 ",
1441
"2019-01-04T00:00:00 ",
1443
"Approval Is Pending ",
1447
"Approved By Default ",
1451
"Last Approval Request ",
1453
"2019-01-03T00:00:00 ",
1459
"Approval Duration ",
1464
"fping -q -- %(host)s ",
1467
"Extended Timeout ",
1472
"2019-02-04T00:00:00 ",
1473
"2019-02-05T00:00:00 ",
1475
"Last Checker Status",
1480
num_lines = max(len(rows) for rows in columns)
1481
expected_output = "\n".join("".join(rows[line]
1482
for rows in columns)
1483
for line in range(num_lines))
1484
self.assertEqual(output, expected_output)
1485
def test_one_client(self):
1486
output = PrintTableCmd().output(self.one_client.values())
1487
expected_output = """
1488
Name Enabled Timeout Last Successful Check
1489
foo Yes 00:05:00 2019-02-03T00:00:00
1491
self.assertEqual(output, expected_output)
1494
class Test_milliseconds_to_string(unittest.TestCase):
1496
self.assertEqual(milliseconds_to_string(93785000),
1498
def test_no_days(self):
1499
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
1500
def test_all_zero(self):
1501
self.assertEqual(milliseconds_to_string(0), "00:00:00")
1502
def test_no_fractional_seconds(self):
1503
self.assertEqual(milliseconds_to_string(400), "00:00:00")
1504
self.assertEqual(milliseconds_to_string(900), "00:00:00")
1505
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
956
class TestIsEnabledCmd(TestCmd):
957
def test_is_enabled(self):
958
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
959
for client, properties in self.clients.items()))
960
def test_is_enabled_run_exits_successfully(self):
961
with self.assertRaises(SystemExit) as e:
962
IsEnabledCmd().run(None, self.one_client)
963
if e.exception.code is not None:
964
self.assertEqual(e.exception.code, 0)
966
self.assertIsNone(e.exception.code)
967
def test_is_enabled_run_exits_with_failure(self):
968
self.client.attributes["Enabled"] = dbus.Boolean(False)
969
with self.assertRaises(SystemExit) as e:
970
IsEnabledCmd().run(None, self.one_client)
971
if isinstance(e.exception.code, int):
972
self.assertNotEqual(e.exception.code, 0)
974
self.assertIsNotNone(e.exception.code)
976
class TestRemoveCmd(TestCmd):
977
def test_remove(self):
978
class MockMandos(object):
981
def RemoveClient(self, dbus_path):
982
self.calls.append(("RemoveClient", (dbus_path,)))
983
mandos = MockMandos()
984
super(TestRemoveCmd, self).setUp()
985
RemoveCmd().run(mandos, self.clients)
986
self.assertEqual(len(mandos.calls), 2)
987
for client in self.clients:
988
self.assertIn(("RemoveClient",
989
(client.__dbus_object_path__,)),
992
class TestApproveCmd(TestCmd):
993
def test_approve(self):
994
ApproveCmd().run(None, self.clients)
995
for client in self.clients:
996
self.assertIn(("Approve", (True, client_interface)),
999
class TestDenyCmd(TestCmd):
1000
def test_deny(self):
1001
DenyCmd().run(None, self.clients)
1002
for client in self.clients:
1003
self.assertIn(("Approve", (False, client_interface)),
1006
class TestEnableCmd(TestCmd):
1007
def test_enable(self):
1008
for client in self.clients:
1009
client.attributes["Enabled"] = False
1011
EnableCmd().run(None, self.clients)
1013
for client in self.clients:
1014
self.assertTrue(client.attributes["Enabled"])
1016
class TestDisableCmd(TestCmd):
1017
def test_disable(self):
1018
DisableCmd().run(None, self.clients)
1020
for client in self.clients:
1021
self.assertFalse(client.attributes["Enabled"])
1508
1023
class Unique(object):
1509
1024
"""Class for objects which exist only to be unique objects, since
1510
1025
unittest.mock.sentinel only exists in Python 3.3"""
1513
1027
class TestPropertyCmd(TestCmd):
1514
1028
"""Abstract class for tests of PropertyCmd classes"""
1515
1029
def runTest(self):
1519
1033
self.values_to_set)
1520
1034
for value_to_set, value_to_get in zip(self.values_to_set,
1521
1035
values_to_get):
1522
for clientpath in self.clients:
1523
client = self.bus.get_object(dbus_busname, clientpath)
1524
old_value = client.attributes[self.propname]
1036
for client in self.clients:
1037
old_value = client.attributes[self.property]
1525
1038
self.assertNotIsInstance(old_value, Unique)
1526
client.attributes[self.propname] = Unique()
1039
client.attributes[self.property] = Unique()
1527
1040
self.run_command(value_to_set, self.clients)
1528
for clientpath in self.clients:
1529
client = self.bus.get_object(dbus_busname, clientpath)
1530
value = client.attributes[self.propname]
1041
for client in self.clients:
1042
value = client.attributes[self.property]
1531
1043
self.assertNotIsInstance(value, Unique)
1532
1044
self.assertEqual(value, value_to_get)
1533
1045
def run_command(self, value, clients):
1534
self.command().run(clients, self.bus)
1537
class TestEnableCmd(TestCmd):
1538
def test_enable(self):
1539
for clientpath in self.clients:
1540
client = self.bus.get_object(dbus_busname, clientpath)
1541
client.attributes["Enabled"] = False
1543
EnableCmd().run(self.clients, self.bus)
1545
for clientpath in self.clients:
1546
client = self.bus.get_object(dbus_busname, clientpath)
1547
self.assertTrue(client.attributes["Enabled"])
1550
class TestDisableCmd(TestCmd):
1551
def test_disable(self):
1552
DisableCmd().run(self.clients, self.bus)
1553
for clientpath in self.clients:
1554
client = self.bus.get_object(dbus_busname, clientpath)
1555
self.assertFalse(client.attributes["Enabled"])
1046
self.command().run(None, clients)
1558
1048
class TestBumpTimeoutCmd(TestPropertyCmd):
1559
1049
command = BumpTimeoutCmd
1560
propname = "LastCheckedOK"
1050
property = "LastCheckedOK"
1561
1051
values_to_set = [""]
1564
1053
class TestStartCheckerCmd(TestPropertyCmd):
1565
1054
command = StartCheckerCmd
1566
propname = "CheckerRunning"
1055
property = "CheckerRunning"
1567
1056
values_to_set = [dbus.Boolean(True)]
1570
1058
class TestStopCheckerCmd(TestPropertyCmd):
1571
1059
command = StopCheckerCmd
1572
propname = "CheckerRunning"
1060
property = "CheckerRunning"
1573
1061
values_to_set = [dbus.Boolean(False)]
1576
1063
class TestApproveByDefaultCmd(TestPropertyCmd):
1577
1064
command = ApproveByDefaultCmd
1578
propname = "ApprovedByDefault"
1065
property = "ApprovedByDefault"
1579
1066
values_to_set = [dbus.Boolean(True)]
1582
1068
class TestDenyByDefaultCmd(TestPropertyCmd):
1583
1069
command = DenyByDefaultCmd
1584
propname = "ApprovedByDefault"
1070
property = "ApprovedByDefault"
1585
1071
values_to_set = [dbus.Boolean(False)]
1588
class TestPropertyValueCmd(TestPropertyCmd):
1589
"""Abstract class for tests of PropertyValueCmd classes"""
1073
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1074
"""Abstract class for tests of PropertyCmd classes using the
1075
ValueArgumentMixIn"""
1590
1076
def runTest(self):
1591
if type(self) is TestPropertyValueCmd:
1077
if type(self) is TestValueArgumentPropertyCmd:
1593
return super(TestPropertyValueCmd, self).runTest()
1079
return super(TestValueArgumentPropertyCmd, self).runTest()
1594
1080
def run_command(self, value, clients):
1595
self.command(value).run(clients, self.bus)
1598
class TestSetCheckerCmd(TestPropertyValueCmd):
1081
self.command(value).run(None, clients)
1083
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1599
1084
command = SetCheckerCmd
1600
propname = "Checker"
1085
property = "Checker"
1601
1086
values_to_set = ["", ":", "fping -q -- %s"]
1604
class TestSetHostCmd(TestPropertyValueCmd):
1088
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1605
1089
command = SetHostCmd
1607
1091
values_to_set = ["192.0.2.3", "foo.example.org"]
1610
class TestSetSecretCmd(TestPropertyValueCmd):
1093
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1611
1094
command = SetSecretCmd
1613
values_to_set = [io.BytesIO(b""),
1096
values_to_set = [open("/dev/null", "rb"),
1614
1097
io.BytesIO(b"secret\0xyzzy\nbar")]
1615
1098
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1618
class TestSetTimeoutCmd(TestPropertyValueCmd):
1100
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1619
1101
command = SetTimeoutCmd
1620
propname = "Timeout"
1102
property = "Timeout"
1621
1103
values_to_set = [datetime.timedelta(),
1622
1104
datetime.timedelta(minutes=5),
1623
1105
datetime.timedelta(seconds=1),
1669
1147
datetime.timedelta(weeks=52)]
1670
1148
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1150
class Test_command_from_options(unittest.TestCase):
1152
self.parser = argparse.ArgumentParser()
1153
add_command_line_options(self.parser)
1154
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1155
"""Assert that parsing ARGS should result in an instance of
1156
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1157
options = self.parser.parse_args(args)
1158
commands = commands_from_options(options)
1159
self.assertEqual(len(commands), 1)
1160
command = commands[0]
1161
self.assertIsInstance(command, command_cls)
1162
for key, value in cmd_attrs.items():
1163
self.assertEqual(getattr(command, key), value)
1164
def test_print_table(self):
1165
self.assert_command_from_args([], PrintTableCmd,
1168
def test_print_table_verbose(self):
1169
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1172
def test_print_table_verbose_short(self):
1173
self.assert_command_from_args(["-v"], PrintTableCmd,
1176
def test_enable(self):
1177
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1179
def test_enable_short(self):
1180
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1182
def test_disable(self):
1183
self.assert_command_from_args(["--disable", "foo"],
1186
def test_disable_short(self):
1187
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1189
def test_bump_timeout(self):
1190
self.assert_command_from_args(["--bump-timeout", "foo"],
1193
def test_bump_timeout_short(self):
1194
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1196
def test_start_checker(self):
1197
self.assert_command_from_args(["--start-checker", "foo"],
1200
def test_stop_checker(self):
1201
self.assert_command_from_args(["--stop-checker", "foo"],
1204
def test_remove(self):
1205
self.assert_command_from_args(["--remove", "foo"],
1208
def test_remove_short(self):
1209
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1211
def test_checker(self):
1212
self.assert_command_from_args(["--checker", ":", "foo"],
1213
SetCheckerCmd, value_to_set=":")
1215
def test_checker_empty(self):
1216
self.assert_command_from_args(["--checker", "", "foo"],
1217
SetCheckerCmd, value_to_set="")
1219
def test_checker_short(self):
1220
self.assert_command_from_args(["-c", ":", "foo"],
1221
SetCheckerCmd, value_to_set=":")
1223
def test_timeout(self):
1224
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1226
value_to_set=300000)
1228
def test_timeout_short(self):
1229
self.assert_command_from_args(["-t", "PT5M", "foo"],
1231
value_to_set=300000)
1233
def test_extended_timeout(self):
1234
self.assert_command_from_args(["--extended-timeout", "PT15M",
1236
SetExtendedTimeoutCmd,
1237
value_to_set=900000)
1239
def test_interval(self):
1240
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1242
value_to_set=120000)
1244
def test_interval_short(self):
1245
self.assert_command_from_args(["-i", "PT2M", "foo"],
1247
value_to_set=120000)
1249
def test_approve_by_default(self):
1250
self.assert_command_from_args(["--approve-by-default", "foo"],
1251
ApproveByDefaultCmd)
1253
def test_deny_by_default(self):
1254
self.assert_command_from_args(["--deny-by-default", "foo"],
1257
def test_approval_delay(self):
1258
self.assert_command_from_args(["--approval-delay", "PT30S",
1259
"foo"], SetApprovalDelayCmd,
1262
def test_approval_duration(self):
1263
self.assert_command_from_args(["--approval-duration", "PT1S",
1264
"foo"], SetApprovalDurationCmd,
1267
def test_host(self):
1268
self.assert_command_from_args(["--host", "foo.example.org",
1270
value_to_set="foo.example.org")
1272
def test_host_short(self):
1273
self.assert_command_from_args(["-H", "foo.example.org",
1275
value_to_set="foo.example.org")
1277
def test_secret_devnull(self):
1278
self.assert_command_from_args(["--secret", os.path.devnull,
1279
"foo"], SetSecretCmd,
1282
def test_secret_tempfile(self):
1283
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1284
value = b"secret\0xyzzy\nbar"
1287
self.assert_command_from_args(["--secret", f.name,
1288
"foo"], SetSecretCmd,
1291
def test_secret_devnull_short(self):
1292
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1293
SetSecretCmd, value_to_set=b"")
1295
def test_secret_tempfile_short(self):
1296
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1297
value = b"secret\0xyzzy\nbar"
1300
self.assert_command_from_args(["-s", f.name, "foo"],
1304
def test_approve(self):
1305
self.assert_command_from_args(["--approve", "foo"],
1308
def test_approve_short(self):
1309
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1311
def test_deny(self):
1312
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1314
def test_deny_short(self):
1315
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1317
def test_dump_json(self):
1318
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1320
def test_is_enabled(self):
1321
self.assert_command_from_args(["--is-enabled", "foo"],
1324
def test_is_enabled_short(self):
1325
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1674
1329
def should_only_run_tests():