79
78
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
83
parser = argparse.ArgumentParser()
85
add_command_line_options(parser)
87
options = parser.parse_args()
89
check_option_syntax(parser, options)
91
clientnames = options.client
94
log.setLevel(logging.DEBUG)
97
bus = dbus.SystemBus()
98
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
99
dbus_busname, server_dbus_path)
100
mandos_dbus_objc = bus.get_object(dbus_busname,
102
except dbus.exceptions.DBusException:
103
log.critical("Could not connect to Mandos server")
106
mandos_serv = dbus.Interface(mandos_dbus_objc,
107
dbus_interface=server_dbus_interface)
108
mandos_serv_object_manager = dbus.Interface(
109
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
111
# Filter out log message from dbus module
112
dbus_logger = logging.getLogger("dbus.proxies")
113
class NullFilter(logging.Filter):
114
def filter(self, record):
116
dbus_filter = NullFilter()
118
dbus_logger.addFilter(dbus_filter)
119
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", dbus_busname,
120
server_dbus_path, dbus.OBJECT_MANAGER_IFACE)
121
mandos_clients = {path: ifs_and_props[client_dbus_interface]
122
for path, ifs_and_props in
123
mandos_serv_object_manager
124
.GetManagedObjects().items()
125
if client_dbus_interface in ifs_and_props}
126
except dbus.exceptions.DBusException as e:
127
log.critical("Failed to access Mandos server through D-Bus:"
131
# restore dbus logger
132
dbus_logger.removeFilter(dbus_filter)
134
# Compile dict of (clients: properties) to process
138
clients = {objpath: properties
139
for objpath, properties in mandos_clients.items()}
141
for name in clientnames:
142
for objpath, properties in mandos_clients.items():
143
if properties["Name"] == name:
144
clients[objpath] = properties
147
log.critical("Client not found on server: %r", name)
150
# Run all commands on clients
151
commands = commands_from_options(options)
152
for command in commands:
153
command.run(clients, bus, mandos_serv)
156
def add_command_line_options(parser):
157
parser.add_argument("--version", action="version",
158
version="%(prog)s {}".format(version),
159
help="show version number and exit")
160
parser.add_argument("-a", "--all", action="store_true",
161
help="Select all clients")
162
parser.add_argument("-v", "--verbose", action="store_true",
163
help="Print all fields")
164
parser.add_argument("-j", "--dump-json", action="store_true",
165
help="Dump client data in JSON format")
166
enable_disable = parser.add_mutually_exclusive_group()
167
enable_disable.add_argument("-e", "--enable", action="store_true",
168
help="Enable client")
169
enable_disable.add_argument("-d", "--disable",
171
help="disable client")
172
parser.add_argument("-b", "--bump-timeout", action="store_true",
173
help="Bump timeout for client")
174
start_stop_checker = parser.add_mutually_exclusive_group()
175
start_stop_checker.add_argument("--start-checker",
177
help="Start checker for client")
178
start_stop_checker.add_argument("--stop-checker",
180
help="Stop checker for client")
181
parser.add_argument("-V", "--is-enabled", action="store_true",
182
help="Check if client is enabled")
183
parser.add_argument("-r", "--remove", action="store_true",
184
help="Remove client")
185
parser.add_argument("-c", "--checker",
186
help="Set checker command for client")
187
parser.add_argument("-t", "--timeout", type=string_to_delta,
188
help="Set timeout for client")
189
parser.add_argument("--extended-timeout", type=string_to_delta,
190
help="Set extended timeout for client")
191
parser.add_argument("-i", "--interval", type=string_to_delta,
192
help="Set checker interval for client")
193
approve_deny_default = parser.add_mutually_exclusive_group()
194
approve_deny_default.add_argument(
195
"--approve-by-default", action="store_true",
196
default=None, dest="approved_by_default",
197
help="Set client to be approved by default")
198
approve_deny_default.add_argument(
199
"--deny-by-default", action="store_false",
200
dest="approved_by_default",
201
help="Set client to be denied by default")
202
parser.add_argument("--approval-delay", type=string_to_delta,
203
help="Set delay before client approve/deny")
204
parser.add_argument("--approval-duration", type=string_to_delta,
205
help="Set duration of one client approval")
206
parser.add_argument("-H", "--host", help="Set host for client")
207
parser.add_argument("-s", "--secret",
208
type=argparse.FileType(mode="rb"),
209
help="Set password blob (file) for client")
210
approve_deny = parser.add_mutually_exclusive_group()
211
approve_deny.add_argument(
212
"-A", "--approve", action="store_true",
213
help="Approve any current client request")
214
approve_deny.add_argument("-D", "--deny", action="store_true",
215
help="Deny any current client request")
216
parser.add_argument("--debug", action="store_true",
217
help="Debug mode (show D-Bus commands)")
218
parser.add_argument("--check", action="store_true",
219
help="Run self-test")
220
parser.add_argument("client", nargs="*", help="Client name")
223
def string_to_delta(interval):
224
"""Parse a string and return a datetime.timedelta"""
227
return rfc3339_duration_to_delta(interval)
228
except ValueError as e:
229
log.warning("%s - Parsing as pre-1.6.1 interval instead",
231
return parse_pre_1_6_1_interval(interval)
81
def milliseconds_to_string(ms):
82
td = datetime.timedelta(0, 0, 0, ms)
83
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
84
.format(days="{}T".format(td.days) if td.days else "",
85
hours=td.seconds // 3600,
86
minutes=(td.seconds % 3600) // 60,
87
seconds=td.seconds % 60))
234
90
def rfc3339_duration_to_delta(duration):
407
def check_option_syntax(parser, options):
408
"""Apply additional restrictions on options, not expressible in
411
def has_actions(options):
412
return any((options.enable,
414
options.bump_timeout,
415
options.start_checker,
416
options.stop_checker,
419
options.checker is not None,
420
options.timeout is not None,
421
options.extended_timeout is not None,
422
options.interval is not None,
423
options.approved_by_default is not None,
424
options.approval_delay is not None,
425
options.approval_duration is not None,
426
options.host is not None,
427
options.secret is not None,
431
if has_actions(options) and not (options.client or options.all):
432
parser.error("Options require clients names or --all.")
433
if options.verbose and has_actions(options):
434
parser.error("--verbose can only be used alone.")
435
if options.dump_json and (options.verbose
436
or has_actions(options)):
437
parser.error("--dump-json can only be used alone.")
438
if options.all and not has_actions(options):
439
parser.error("--all requires an action.")
440
if options.is_enabled and len(options.client) > 1:
441
parser.error("--is-enabled requires exactly one client")
443
options.remove = False
444
if has_actions(options) and not options.deny:
445
parser.error("--remove can only be combined with --deny")
446
options.remove = True
449
def commands_from_options(options):
453
if options.is_enabled:
454
commands.append(IsEnabledCmd())
457
commands.append(ApproveCmd())
460
commands.append(DenyCmd())
463
commands.append(RemoveCmd())
465
if options.dump_json:
466
commands.append(DumpJSONCmd())
469
commands.append(EnableCmd())
472
commands.append(DisableCmd())
474
if options.bump_timeout:
475
commands.append(BumpTimeoutCmd())
477
if options.start_checker:
478
commands.append(StartCheckerCmd())
480
if options.stop_checker:
481
commands.append(StopCheckerCmd())
483
if options.approved_by_default is not None:
484
if options.approved_by_default:
485
commands.append(ApproveByDefaultCmd())
487
commands.append(DenyByDefaultCmd())
489
if options.checker is not None:
490
commands.append(SetCheckerCmd(options.checker))
492
if options.host is not None:
493
commands.append(SetHostCmd(options.host))
495
if options.secret is not None:
496
commands.append(SetSecretCmd(options.secret))
498
if options.timeout is not None:
499
commands.append(SetTimeoutCmd(options.timeout))
501
if options.extended_timeout:
503
SetExtendedTimeoutCmd(options.extended_timeout))
505
if options.interval is not None:
506
commands.append(SetIntervalCmd(options.interval))
508
if options.approval_delay is not None:
509
commands.append(SetApprovalDelayCmd(options.approval_delay))
511
if options.approval_duration is not None:
513
SetApprovalDurationCmd(options.approval_duration))
515
# If no command option has been given, show table of clients,
516
# optionally verbosely
518
commands.append(PrintTableCmd(verbose=options.verbose))
274
## Classes for commands.
276
# Abstract classes first
523
277
class Command(object):
524
278
"""Abstract class for commands"""
525
def run(self, clients, bus=None, mandos=None):
279
def run(self, mandos, clients):
526
280
"""Normal commands should implement run_on_one_client(), but
527
281
commands which want to operate on all clients at the same time
528
282
can override this run() method instead."""
529
283
self.mandos = mandos
530
for clientpath, properties in clients.items():
531
log.debug("D-Bus: Connect to: (busname=%r, path=%r)",
532
dbus_busname, str(clientpath))
533
client = bus.get_object(dbus_busname, clientpath)
284
for client, properties in clients.items():
534
285
self.run_on_one_client(client, properties)
537
class IsEnabledCmd(Command):
538
def run(self, clients, bus=None, mandos=None):
539
client, properties = next(iter(clients.items()))
540
if self.is_enabled(client, properties):
543
def is_enabled(self, client, properties):
544
return properties["Enabled"]
547
class ApproveCmd(Command):
548
def run_on_one_client(self, client, properties):
549
log.debug("D-Bus: %s:%s:%s.Approve(True)", dbus_busname,
550
client.__dbus_object_path__, client_dbus_interface)
551
client.Approve(dbus.Boolean(True),
552
dbus_interface=client_dbus_interface)
555
class DenyCmd(Command):
556
def run_on_one_client(self, client, properties):
557
log.debug("D-Bus: %s:%s:%s.Approve(False)", dbus_busname,
558
client.__dbus_object_path__, client_dbus_interface)
559
client.Approve(dbus.Boolean(False),
560
dbus_interface=client_dbus_interface)
563
class RemoveCmd(Command):
564
def run_on_one_client(self, client, properties):
565
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", dbus_busname,
566
server_dbus_path, server_dbus_interface,
567
str(client.__dbus_object_path__))
568
self.mandos.RemoveClient(client.__dbus_object_path__)
571
class OutputCmd(Command):
572
"""Abstract class for commands outputting client details"""
287
class PrintCmd(Command):
288
"""Abstract class for commands printing client details"""
573
289
all_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK",
574
290
"Created", "Interval", "Host", "KeyID",
575
291
"Fingerprint", "CheckerRunning", "LastEnabled",
577
293
"LastApprovalRequest", "ApprovalDelay",
578
294
"ApprovalDuration", "Checker", "ExtendedTimeout",
579
295
"Expires", "LastCheckerStatus")
580
def run(self, clients, bus=None, mandos=None):
581
print(self.output(clients.values()))
582
def output(self, clients):
583
raise NotImplementedError()
586
class DumpJSONCmd(OutputCmd):
587
def output(self, clients):
588
data = {client["Name"]:
589
{key: self.dbus_boolean_to_bool(client[key])
590
for key in self.all_keywords}
591
for client in clients.values()}
592
return json.dumps(data, indent=4, separators=(',', ': '))
594
def dbus_boolean_to_bool(value):
595
if isinstance(value, dbus.Boolean):
600
class PrintTableCmd(OutputCmd):
296
def run(self, mandos, clients):
297
print(self.output(clients))
299
class PropertyCmd(Command):
300
"""Abstract class for Actions for setting one client property"""
301
def run_on_one_client(self, client, properties):
302
"""Set the Client's D-Bus property"""
303
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", busname,
304
client.__dbus_object_path__,
305
dbus.PROPERTIES_IFACE, client_interface,
306
self.property, self.value_to_set
307
if not isinstance(self.value_to_set, dbus.Boolean)
308
else bool(self.value_to_set))
309
client.Set(client_interface, self.property, self.value_to_set,
310
dbus_interface=dbus.PROPERTIES_IFACE)
312
class ValueArgumentMixIn(object):
313
"""Mixin class for commands taking a value as argument"""
314
def __init__(self, value):
315
self.value_to_set = value
317
class MillisecondsValueArgumentMixIn(ValueArgumentMixIn):
318
"""Mixin class for commands taking a value argument as
321
def value_to_set(self):
324
def value_to_set(self, value):
325
"""When setting, convert value to a datetime.timedelta"""
326
self._vts = int(round(value.total_seconds() * 1000))
328
# Actual (non-abstract) command classes
330
class PrintTableCmd(PrintCmd):
601
331
def __init__(self, verbose=False):
602
332
self.verbose = verbose
604
334
def output(self, clients):
605
default_keywords = ("Name", "Enabled", "Timeout",
335
default_keywords = ("Name", "Enabled", "Timeout", "LastCheckedOK")
607
336
keywords = default_keywords
609
338
keywords = self.all_keywords
610
return str(self.TableOfClients(clients, keywords))
339
return str(self.TableOfClients(clients.values(), keywords))
612
341
class TableOfClients(object):
683
412
**{key: self.string_from_client(client, key)
684
413
for key in self.keywords})
687
def milliseconds_to_string(ms):
688
td = datetime.timedelta(0, 0, 0, ms)
689
return ("{days}{hours:02}:{minutes:02}:{seconds:02}"
690
.format(days="{}T".format(td.days)
692
hours=td.seconds // 3600,
693
minutes=(td.seconds % 3600) // 60,
694
seconds=td.seconds % 60))
697
class PropertyCmd(Command):
698
"""Abstract class for Actions for setting one client property"""
699
def run_on_one_client(self, client, properties):
700
"""Set the Client's D-Bus property"""
701
log.debug("D-Bus: %s:%s:%s.Set(%r, %r, %r)", dbus_busname,
702
client.__dbus_object_path__,
703
dbus.PROPERTIES_IFACE, client_dbus_interface,
704
self.propname, self.value_to_set
705
if not isinstance(self.value_to_set, dbus.Boolean)
706
else bool(self.value_to_set))
707
client.Set(client_dbus_interface, self.propname,
709
dbus_interface=dbus.PROPERTIES_IFACE)
712
raise NotImplementedError()
417
class DumpJSONCmd(PrintCmd):
418
def output(self, clients):
419
data = {client["Name"]:
420
{key: self.dbus_boolean_to_bool(client[key])
421
for key in self.all_keywords}
422
for client in clients.values()}
423
return json.dumps(data, indent=4, separators=(',', ': '))
425
def dbus_boolean_to_bool(value):
426
if isinstance(value, dbus.Boolean):
430
class IsEnabledCmd(Command):
431
def run_on_one_client(self, client, properties):
432
if self.is_enabled(client, properties):
435
def is_enabled(self, client, properties):
436
return bool(properties["Enabled"])
438
class RemoveCmd(Command):
439
def run_on_one_client(self, client, properties):
440
log.debug("D-Bus: %s:%s:%s.RemoveClient(%r)", busname,
441
server_path, server_interface,
442
str(client.__dbus_object_path__))
443
self.mandos.RemoveClient(client.__dbus_object_path__)
445
class ApproveCmd(Command):
446
def run_on_one_client(self, client, properties):
447
log.debug("D-Bus: %s:%s.Approve(True)",
448
client.__dbus_object_path__, client_interface)
449
client.Approve(dbus.Boolean(True),
450
dbus_interface=client_interface)
452
class DenyCmd(Command):
453
def run_on_one_client(self, client, properties):
454
log.debug("D-Bus: %s:%s.Approve(False)",
455
client.__dbus_object_path__, client_interface)
456
client.Approve(dbus.Boolean(False),
457
dbus_interface=client_interface)
715
459
class EnableCmd(PropertyCmd):
717
461
value_to_set = dbus.Boolean(True)
720
463
class DisableCmd(PropertyCmd):
722
465
value_to_set = dbus.Boolean(False)
725
467
class BumpTimeoutCmd(PropertyCmd):
726
propname = "LastCheckedOK"
468
property = "LastCheckedOK"
727
469
value_to_set = ""
730
471
class StartCheckerCmd(PropertyCmd):
731
propname = "CheckerRunning"
472
property = "CheckerRunning"
732
473
value_to_set = dbus.Boolean(True)
735
475
class StopCheckerCmd(PropertyCmd):
736
propname = "CheckerRunning"
476
property = "CheckerRunning"
737
477
value_to_set = dbus.Boolean(False)
740
479
class ApproveByDefaultCmd(PropertyCmd):
741
propname = "ApprovedByDefault"
480
property = "ApprovedByDefault"
742
481
value_to_set = dbus.Boolean(True)
745
483
class DenyByDefaultCmd(PropertyCmd):
746
propname = "ApprovedByDefault"
484
property = "ApprovedByDefault"
747
485
value_to_set = dbus.Boolean(False)
750
class PropertyValueCmd(PropertyCmd):
751
"""Abstract class for PropertyCmd recieving a value as argument"""
752
def __init__(self, value):
753
self.value_to_set = value
756
class SetCheckerCmd(PropertyValueCmd):
760
class SetHostCmd(PropertyValueCmd):
764
class SetSecretCmd(PropertyValueCmd):
487
class SetCheckerCmd(PropertyCmd, ValueArgumentMixIn):
490
class SetHostCmd(PropertyCmd, ValueArgumentMixIn):
493
class SetSecretCmd(PropertyCmd, ValueArgumentMixIn):
767
495
def value_to_set(self):
771
499
"""When setting, read data from supplied file object"""
772
500
self._vts = value.read()
776
class MillisecondsPropertyValueArgumentCmd(PropertyValueCmd):
777
"""Abstract class for PropertyValueCmd taking a value argument as
778
a datetime.timedelta() but should store it as milliseconds."""
780
def value_to_set(self):
783
def value_to_set(self, value):
784
"""When setting, convert value from a datetime.timedelta"""
785
self._vts = int(round(value.total_seconds() * 1000))
788
class SetTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
792
class SetExtendedTimeoutCmd(MillisecondsPropertyValueArgumentCmd):
793
propname = "ExtendedTimeout"
796
class SetIntervalCmd(MillisecondsPropertyValueArgumentCmd):
797
propname = "Interval"
800
class SetApprovalDelayCmd(MillisecondsPropertyValueArgumentCmd):
801
propname = "ApprovalDelay"
804
class SetApprovalDurationCmd(MillisecondsPropertyValueArgumentCmd):
805
propname = "ApprovalDuration"
504
class SetTimeoutCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
507
class SetExtendedTimeoutCmd(PropertyCmd,
508
MillisecondsValueArgumentMixIn):
509
property = "ExtendedTimeout"
511
class SetIntervalCmd(PropertyCmd, MillisecondsValueArgumentMixIn):
512
property = "Interval"
514
class SetApprovalDelayCmd(PropertyCmd,
515
MillisecondsValueArgumentMixIn):
516
property = "ApprovalDelay"
518
class SetApprovalDurationCmd(PropertyCmd,
519
MillisecondsValueArgumentMixIn):
520
property = "ApprovalDuration"
522
def add_command_line_options(parser):
523
parser.add_argument("--version", action="version",
524
version="%(prog)s {}".format(version),
525
help="show version number and exit")
526
parser.add_argument("-a", "--all", action="store_true",
527
help="Select all clients")
528
parser.add_argument("-v", "--verbose", action="store_true",
529
help="Print all fields")
530
parser.add_argument("-j", "--dump-json", action="store_true",
531
help="Dump client data in JSON format")
532
enable_disable = parser.add_mutually_exclusive_group()
533
enable_disable.add_argument("-e", "--enable", action="store_true",
534
help="Enable client")
535
enable_disable.add_argument("-d", "--disable",
537
help="disable client")
538
parser.add_argument("-b", "--bump-timeout", action="store_true",
539
help="Bump timeout for client")
540
start_stop_checker = parser.add_mutually_exclusive_group()
541
start_stop_checker.add_argument("--start-checker",
543
help="Start checker for client")
544
start_stop_checker.add_argument("--stop-checker",
546
help="Stop checker for client")
547
parser.add_argument("-V", "--is-enabled", action="store_true",
548
help="Check if client is enabled")
549
parser.add_argument("-r", "--remove", action="store_true",
550
help="Remove client")
551
parser.add_argument("-c", "--checker",
552
help="Set checker command for client")
553
parser.add_argument("-t", "--timeout", type=string_to_delta,
554
help="Set timeout for client")
555
parser.add_argument("--extended-timeout", type=string_to_delta,
556
help="Set extended timeout for client")
557
parser.add_argument("-i", "--interval", type=string_to_delta,
558
help="Set checker interval for client")
559
approve_deny_default = parser.add_mutually_exclusive_group()
560
approve_deny_default.add_argument(
561
"--approve-by-default", action="store_true",
562
default=None, dest="approved_by_default",
563
help="Set client to be approved by default")
564
approve_deny_default.add_argument(
565
"--deny-by-default", action="store_false",
566
dest="approved_by_default",
567
help="Set client to be denied by default")
568
parser.add_argument("--approval-delay", type=string_to_delta,
569
help="Set delay before client approve/deny")
570
parser.add_argument("--approval-duration", type=string_to_delta,
571
help="Set duration of one client approval")
572
parser.add_argument("-H", "--host", help="Set host for client")
573
parser.add_argument("-s", "--secret",
574
type=argparse.FileType(mode="rb"),
575
help="Set password blob (file) for client")
576
approve_deny = parser.add_mutually_exclusive_group()
577
approve_deny.add_argument(
578
"-A", "--approve", action="store_true",
579
help="Approve any current client request")
580
approve_deny.add_argument("-D", "--deny", action="store_true",
581
help="Deny any current client request")
582
parser.add_argument("--debug", action="store_true",
583
help="Debug mode (show D-Bus commands)")
584
parser.add_argument("--check", action="store_true",
585
help="Run self-test")
586
parser.add_argument("client", nargs="*", help="Client name")
589
def commands_from_options(options):
593
if options.dump_json:
594
commands.append(DumpJSONCmd())
597
commands.append(EnableCmd())
600
commands.append(DisableCmd())
602
if options.bump_timeout:
603
commands.append(BumpTimeoutCmd())
605
if options.start_checker:
606
commands.append(StartCheckerCmd())
608
if options.stop_checker:
609
commands.append(StopCheckerCmd())
611
if options.is_enabled:
612
commands.append(IsEnabledCmd())
614
if options.checker is not None:
615
commands.append(SetCheckerCmd(options.checker))
617
if options.timeout is not None:
618
commands.append(SetTimeoutCmd(options.timeout))
620
if options.extended_timeout:
622
SetExtendedTimeoutCmd(options.extended_timeout))
624
if options.interval is not None:
625
commands.append(SetIntervalCmd(options.interval))
627
if options.approved_by_default is not None:
628
if options.approved_by_default:
629
commands.append(ApproveByDefaultCmd())
631
commands.append(DenyByDefaultCmd())
633
if options.approval_delay is not None:
634
commands.append(SetApprovalDelayCmd(options.approval_delay))
636
if options.approval_duration is not None:
638
SetApprovalDurationCmd(options.approval_duration))
640
if options.host is not None:
641
commands.append(SetHostCmd(options.host))
643
if options.secret is not None:
644
commands.append(SetSecretCmd(options.secret))
647
commands.append(ApproveCmd())
650
commands.append(DenyCmd())
653
commands.append(RemoveCmd())
655
# If no command option has been given, show table of clients,
656
# optionally verbosely
658
commands.append(PrintTableCmd(verbose=options.verbose))
663
def check_option_syntax(parser, options):
664
"""Apply additional restrictions on options, not expressible in
667
def has_actions(options):
668
return any((options.enable,
670
options.bump_timeout,
671
options.start_checker,
672
options.stop_checker,
675
options.checker is not None,
676
options.timeout is not None,
677
options.extended_timeout is not None,
678
options.interval is not None,
679
options.approved_by_default is not None,
680
options.approval_delay is not None,
681
options.approval_duration is not None,
682
options.host is not None,
683
options.secret is not None,
687
if has_actions(options) and not (options.client or options.all):
688
parser.error("Options require clients names or --all.")
689
if options.verbose and has_actions(options):
690
parser.error("--verbose can only be used alone.")
691
if options.dump_json and (options.verbose
692
or has_actions(options)):
693
parser.error("--dump-json can only be used alone.")
694
if options.all and not has_actions(options):
695
parser.error("--all requires an action.")
696
if options.is_enabled and len(options.client) > 1:
697
parser.error("--is-enabled requires exactly one client")
701
parser = argparse.ArgumentParser()
703
add_command_line_options(parser)
705
options = parser.parse_args()
707
check_option_syntax(parser, options)
709
clientnames = options.client
712
log.setLevel(logging.DEBUG)
715
bus = dbus.SystemBus()
716
log.debug("D-Bus: Connect to: (name=%r, path=%r)", busname,
718
mandos_dbus_objc = bus.get_object(busname, server_path)
719
except dbus.exceptions.DBusException:
720
log.critical("Could not connect to Mandos server")
723
mandos_serv = dbus.Interface(mandos_dbus_objc,
724
dbus_interface=server_interface)
725
mandos_serv_object_manager = dbus.Interface(
726
mandos_dbus_objc, dbus_interface=dbus.OBJECT_MANAGER_IFACE)
728
# Filter out log message from dbus module
729
dbus_logger = logging.getLogger("dbus.proxies")
730
class NullFilter(logging.Filter):
731
def filter(self, record):
733
dbus_filter = NullFilter()
735
dbus_logger.addFilter(dbus_filter)
736
log.debug("D-Bus: %s:%s:%s.GetManagedObjects()", busname,
737
server_path, dbus.OBJECT_MANAGER_IFACE)
738
mandos_clients = {path: ifs_and_props[client_interface]
739
for path, ifs_and_props in
740
mandos_serv_object_manager
741
.GetManagedObjects().items()
742
if client_interface in ifs_and_props}
743
except dbus.exceptions.DBusException as e:
744
log.critical("Failed to access Mandos server through D-Bus:"
748
# restore dbus logger
749
dbus_logger.removeFilter(dbus_filter)
751
# Compile dict of (clients: properties) to process
755
clients = {bus.get_object(busname, path): properties
756
for path, properties in mandos_clients.items()}
758
for name in clientnames:
759
for path, client in mandos_clients.items():
760
if client["Name"] == name:
761
client_objc = bus.get_object(busname, path)
762
clients[client_objc] = client
765
log.critical("Client not found on server: %r", name)
768
# Run all commands on clients
769
commands = commands_from_options(options)
770
for command in commands:
771
command.run(mandos_serv, clients)
774
class Test_milliseconds_to_string(unittest.TestCase):
776
self.assertEqual(milliseconds_to_string(93785000),
778
def test_no_days(self):
779
self.assertEqual(milliseconds_to_string(7385000), "02:03:05")
780
def test_all_zero(self):
781
self.assertEqual(milliseconds_to_string(0), "00:00:00")
782
def test_no_fractional_seconds(self):
783
self.assertEqual(milliseconds_to_string(400), "00:00:00")
784
self.assertEqual(milliseconds_to_string(900), "00:00:00")
785
self.assertEqual(milliseconds_to_string(1900), "00:00:01")
809
787
class Test_string_to_delta(unittest.TestCase):
810
788
def test_handles_basic_rfc3339(self):
811
789
self.assertEqual(string_to_delta("PT0S"),
839
817
self.assertEqual(value, datetime.timedelta(0, 7200))
842
class Test_check_option_syntax(unittest.TestCase):
844
self.parser = argparse.ArgumentParser()
845
add_command_line_options(self.parser)
847
def test_actions_requires_client_or_all(self):
848
for action, value in self.actions.items():
849
options = self.parser.parse_args()
850
setattr(options, action, value)
851
with self.assertParseError():
852
self.check_option_syntax(options)
854
# This mostly corresponds to the definition from has_actions() in
855
# check_option_syntax()
857
# The actual values set here are not that important, but we do
858
# at least stick to the correct types, even though they are
862
"bump_timeout": True,
863
"start_checker": True,
864
"stop_checker": True,
868
"timeout": datetime.timedelta(),
869
"extended_timeout": datetime.timedelta(),
870
"interval": datetime.timedelta(),
871
"approved_by_default": True,
872
"approval_delay": datetime.timedelta(),
873
"approval_duration": datetime.timedelta(),
875
"secret": io.BytesIO(b"x"),
880
@contextlib.contextmanager
881
def assertParseError(self):
882
with self.assertRaises(SystemExit) as e:
883
with self.temporarily_suppress_stderr():
885
# Exit code from argparse is guaranteed to be "2". Reference:
886
# https://docs.python.org/3/library
887
# /argparse.html#exiting-methods
888
self.assertEqual(e.exception.code, 2)
891
@contextlib.contextmanager
892
def temporarily_suppress_stderr():
893
null = os.open(os.path.devnull, os.O_RDWR)
894
stderrcopy = os.dup(sys.stderr.fileno())
895
os.dup2(null, sys.stderr.fileno())
901
os.dup2(stderrcopy, sys.stderr.fileno())
904
def check_option_syntax(self, options):
905
check_option_syntax(self.parser, options)
907
def test_actions_conflicts_with_verbose(self):
908
for action, value in self.actions.items():
909
options = self.parser.parse_args()
910
setattr(options, action, value)
911
options.verbose = True
912
with self.assertParseError():
913
self.check_option_syntax(options)
915
def test_dump_json_conflicts_with_verbose(self):
916
options = self.parser.parse_args()
917
options.dump_json = True
918
options.verbose = True
919
with self.assertParseError():
920
self.check_option_syntax(options)
922
def test_dump_json_conflicts_with_action(self):
923
for action, value in self.actions.items():
924
options = self.parser.parse_args()
925
setattr(options, action, value)
926
options.dump_json = True
927
with self.assertParseError():
928
self.check_option_syntax(options)
930
def test_all_can_not_be_alone(self):
931
options = self.parser.parse_args()
933
with self.assertParseError():
934
self.check_option_syntax(options)
936
def test_all_is_ok_with_any_action(self):
937
for action, value in self.actions.items():
938
options = self.parser.parse_args()
939
setattr(options, action, value)
941
self.check_option_syntax(options)
943
def test_is_enabled_fails_without_client(self):
944
options = self.parser.parse_args()
945
options.is_enabled = True
946
with self.assertParseError():
947
self.check_option_syntax(options)
949
def test_is_enabled_works_with_one_client(self):
950
options = self.parser.parse_args()
951
options.is_enabled = True
952
options.client = ["foo"]
953
self.check_option_syntax(options)
955
def test_is_enabled_fails_with_two_clients(self):
956
options = self.parser.parse_args()
957
options.is_enabled = True
958
options.client = ["foo", "barbar"]
959
with self.assertParseError():
960
self.check_option_syntax(options)
962
def test_remove_can_only_be_combined_with_action_deny(self):
963
for action, value in self.actions.items():
964
if action in {"remove", "deny"}:
966
options = self.parser.parse_args()
967
setattr(options, action, value)
969
options.remove = True
970
with self.assertParseError():
971
self.check_option_syntax(options)
974
class Test_commands_from_options(unittest.TestCase):
976
self.parser = argparse.ArgumentParser()
977
add_command_line_options(self.parser)
979
def test_is_enabled(self):
980
self.assert_command_from_args(["--is-enabled", "foo"],
983
def assert_command_from_args(self, args, command_cls,
985
"""Assert that parsing ARGS should result in an instance of
986
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
987
options = self.parser.parse_args(args)
988
check_option_syntax(self.parser, options)
989
commands = commands_from_options(options)
990
self.assertEqual(len(commands), 1)
991
command = commands[0]
992
self.assertIsInstance(command, command_cls)
993
for key, value in cmd_attrs.items():
994
self.assertEqual(getattr(command, key), value)
996
def test_is_enabled_short(self):
997
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
999
def test_approve(self):
1000
self.assert_command_from_args(["--approve", "foo"],
1003
def test_approve_short(self):
1004
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1006
def test_deny(self):
1007
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1009
def test_deny_short(self):
1010
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1012
def test_remove(self):
1013
self.assert_command_from_args(["--remove", "foo"],
1016
def test_deny_before_remove(self):
1017
options = self.parser.parse_args(["--deny", "--remove",
1019
check_option_syntax(self.parser, options)
1020
commands = commands_from_options(options)
1021
self.assertEqual(len(commands), 2)
1022
self.assertIsInstance(commands[0], DenyCmd)
1023
self.assertIsInstance(commands[1], RemoveCmd)
1025
def test_deny_before_remove_reversed(self):
1026
options = self.parser.parse_args(["--remove", "--deny",
1028
check_option_syntax(self.parser, options)
1029
commands = commands_from_options(options)
1030
self.assertEqual(len(commands), 2)
1031
self.assertIsInstance(commands[0], DenyCmd)
1032
self.assertIsInstance(commands[1], RemoveCmd)
1034
def test_remove_short(self):
1035
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1037
def test_dump_json(self):
1038
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1040
def test_enable(self):
1041
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1043
def test_enable_short(self):
1044
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1046
def test_disable(self):
1047
self.assert_command_from_args(["--disable", "foo"],
1050
def test_disable_short(self):
1051
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1053
def test_bump_timeout(self):
1054
self.assert_command_from_args(["--bump-timeout", "foo"],
1057
def test_bump_timeout_short(self):
1058
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1060
def test_start_checker(self):
1061
self.assert_command_from_args(["--start-checker", "foo"],
1064
def test_stop_checker(self):
1065
self.assert_command_from_args(["--stop-checker", "foo"],
1068
def test_approve_by_default(self):
1069
self.assert_command_from_args(["--approve-by-default", "foo"],
1070
ApproveByDefaultCmd)
1072
def test_deny_by_default(self):
1073
self.assert_command_from_args(["--deny-by-default", "foo"],
1076
def test_checker(self):
1077
self.assert_command_from_args(["--checker", ":", "foo"],
1078
SetCheckerCmd, value_to_set=":")
1080
def test_checker_empty(self):
1081
self.assert_command_from_args(["--checker", "", "foo"],
1082
SetCheckerCmd, value_to_set="")
1084
def test_checker_short(self):
1085
self.assert_command_from_args(["-c", ":", "foo"],
1086
SetCheckerCmd, value_to_set=":")
1088
def test_host(self):
1089
self.assert_command_from_args(["--host", "foo.example.org",
1091
value_to_set="foo.example.org")
1093
def test_host_short(self):
1094
self.assert_command_from_args(["-H", "foo.example.org",
1096
value_to_set="foo.example.org")
1098
def test_secret_devnull(self):
1099
self.assert_command_from_args(["--secret", os.path.devnull,
1100
"foo"], SetSecretCmd,
1103
def test_secret_tempfile(self):
1104
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1105
value = b"secret\0xyzzy\nbar"
1108
self.assert_command_from_args(["--secret", f.name,
1109
"foo"], SetSecretCmd,
1112
def test_secret_devnull_short(self):
1113
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1114
SetSecretCmd, value_to_set=b"")
1116
def test_secret_tempfile_short(self):
1117
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1118
value = b"secret\0xyzzy\nbar"
1121
self.assert_command_from_args(["-s", f.name, "foo"],
1125
def test_timeout(self):
1126
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1128
value_to_set=300000)
1130
def test_timeout_short(self):
1131
self.assert_command_from_args(["-t", "PT5M", "foo"],
1133
value_to_set=300000)
1135
def test_extended_timeout(self):
1136
self.assert_command_from_args(["--extended-timeout", "PT15M",
1138
SetExtendedTimeoutCmd,
1139
value_to_set=900000)
1141
def test_interval(self):
1142
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1144
value_to_set=120000)
1146
def test_interval_short(self):
1147
self.assert_command_from_args(["-i", "PT2M", "foo"],
1149
value_to_set=120000)
1151
def test_approval_delay(self):
1152
self.assert_command_from_args(["--approval-delay", "PT30S",
1153
"foo"], SetApprovalDelayCmd,
1156
def test_approval_duration(self):
1157
self.assert_command_from_args(["--approval-duration", "PT1S",
1158
"foo"], SetApprovalDurationCmd,
1161
def test_print_table(self):
1162
self.assert_command_from_args([], PrintTableCmd,
1165
def test_print_table_verbose(self):
1166
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1169
def test_print_table_verbose_short(self):
1170
self.assert_command_from_args(["-v"], PrintTableCmd,
1174
820
class TestCmd(unittest.TestCase):
1175
821
"""Abstract class for tests of command classes"""
1176
822
def setUp(self):
1178
824
class MockClient(object):
1179
825
def __init__(self, name, **attributes):
1180
self.__dbus_object_path__ = "/clients/{}".format(name)
826
self.__dbus_object_path__ = "objpath_{}".format(name)
1181
827
self.attributes = attributes
1182
828
self.attributes["Name"] = name
1184
def Set(self, interface, propname, value, dbus_interface):
1185
testcase.assertEqual(interface, client_dbus_interface)
1186
testcase.assertEqual(dbus_interface,
1187
dbus.PROPERTIES_IFACE)
1188
self.attributes[propname] = value
1189
def Get(self, interface, propname, dbus_interface):
1190
testcase.assertEqual(interface, client_dbus_interface)
1191
testcase.assertEqual(dbus_interface,
1192
dbus.PROPERTIES_IFACE)
1193
return self.attributes[propname]
830
def Set(self, interface, property, value, dbus_interface):
831
testcase.assertEqual(interface, client_interface)
832
testcase.assertEqual(dbus_interface,
833
dbus.PROPERTIES_IFACE)
834
self.attributes[property] = value
835
def Get(self, interface, property, dbus_interface):
836
testcase.assertEqual(interface, client_interface)
837
testcase.assertEqual(dbus_interface,
838
dbus.PROPERTIES_IFACE)
839
return self.attributes[property]
1194
840
def Approve(self, approve, dbus_interface):
1195
testcase.assertEqual(dbus_interface,
1196
client_dbus_interface)
841
testcase.assertEqual(dbus_interface, client_interface)
1197
842
self.calls.append(("Approve", (approve,
1198
843
dbus_interface)))
1199
844
self.client = MockClient(
1239
884
ApprovedByDefault=dbus.Boolean(False),
1240
885
LastApprovalRequest="2019-01-03T00:00:00",
1241
886
ApprovalDelay=30000,
1242
ApprovalDuration=93785000,
887
ApprovalDuration=1000,
1244
889
ExtendedTimeout=900000,
1245
890
Expires="2019-02-05T00:00:00",
1246
891
LastCheckerStatus=-2)
1247
892
self.clients = collections.OrderedDict(
1249
("/clients/foo", self.client.attributes),
1250
("/clients/barbar", self.other_client.attributes),
894
(self.client, self.client.attributes),
895
(self.other_client, self.other_client.attributes),
1252
self.one_client = {"/clients/foo": self.client.attributes}
1257
def get_object(client_bus_name, path):
1258
self.assertEqual(client_bus_name, dbus_busname)
1260
"/clients/foo": self.client,
1261
"/clients/barbar": self.other_client,
1266
class TestIsEnabledCmd(TestCmd):
1267
def test_is_enabled(self):
1268
self.assertTrue(all(IsEnabledCmd().is_enabled(client,
1270
for client, properties
1271
in self.clients.items()))
1272
def test_is_enabled_run_exits_successfully(self):
1273
with self.assertRaises(SystemExit) as e:
1274
IsEnabledCmd().run(self.one_client)
1275
if e.exception.code is not None:
1276
self.assertEqual(e.exception.code, 0)
1278
self.assertIsNone(e.exception.code)
1279
def test_is_enabled_run_exits_with_failure(self):
1280
self.client.attributes["Enabled"] = dbus.Boolean(False)
1281
with self.assertRaises(SystemExit) as e:
1282
IsEnabledCmd().run(self.one_client)
1283
if isinstance(e.exception.code, int):
1284
self.assertNotEqual(e.exception.code, 0)
1286
self.assertIsNotNone(e.exception.code)
1289
class TestApproveCmd(TestCmd):
1290
def test_approve(self):
1291
ApproveCmd().run(self.clients, self.bus)
1292
for clientpath in self.clients:
1293
client = self.bus.get_object(dbus_busname, clientpath)
1294
self.assertIn(("Approve", (True, client_dbus_interface)),
1298
class TestDenyCmd(TestCmd):
1299
def test_deny(self):
1300
DenyCmd().run(self.clients, self.bus)
1301
for clientpath in self.clients:
1302
client = self.bus.get_object(dbus_busname, clientpath)
1303
self.assertIn(("Approve", (False, client_dbus_interface)),
1306
class TestRemoveCmd(TestCmd):
1307
def test_remove(self):
1308
class MockMandos(object):
1311
def RemoveClient(self, dbus_path):
1312
self.calls.append(("RemoveClient", (dbus_path,)))
1313
mandos = MockMandos()
1314
super(TestRemoveCmd, self).setUp()
1315
RemoveCmd().run(self.clients, self.bus, mandos)
1316
self.assertEqual(len(mandos.calls), 2)
1317
for clientpath in self.clients:
1318
self.assertIn(("RemoveClient", (clientpath,)),
897
self.one_client = {self.client: self.client.attributes}
899
class TestPrintTableCmd(TestCmd):
900
def test_normal(self):
901
output = PrintTableCmd().output(self.clients)
902
expected_output = """
903
Name Enabled Timeout Last Successful Check
904
foo Yes 00:05:00 2019-02-03T00:00:00
905
barbar Yes 00:05:00 2019-02-04T00:00:00
907
self.assertEqual(output, expected_output)
908
def test_verbose(self):
909
output = PrintTableCmd(verbose=True).output(self.clients)
910
expected_output = """
911
Name Enabled Timeout Last Successful Check Created Interval Host Key ID Fingerprint Check Is Running Last Enabled Approval Is Pending Approved By Default Last Approval Request Approval Delay Approval Duration Checker Extended Timeout Expires Last Checker Status
912
foo Yes 00:05:00 2019-02-03T00:00:00 2019-01-02T00:00:00 00:02:00 foo.example.org 92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8ea90a27cddb12 778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 No 2019-01-03T00:00:00 No Yes 00:00:00 00:00:01 fping -q -- %(host)s 00:15:00 2019-02-04T00:00:00 0
913
barbar Yes 00:05:00 2019-02-04T00:00:00 2019-01-03T00:00:00 00:02:00 192.0.2.3 0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e46c2b020f441c 3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 Yes 2019-01-04T00:00:00 No No 2019-01-03T00:00:00 00:00:30 00:00:01 : 00:15:00 2019-02-05T00:00:00 -2
915
self.assertEqual(output, expected_output)
916
def test_one_client(self):
917
output = PrintTableCmd().output(self.one_client)
918
expected_output = """
919
Name Enabled Timeout Last Successful Check
920
foo Yes 00:05:00 2019-02-03T00:00:00
922
self.assertEqual(output, expected_output)
1322
924
class TestDumpJSONCmd(TestCmd):
1323
925
def setUp(self):
1381
983
expected_json = {"foo": self.expected_json["foo"]}
1382
984
self.assertDictEqual(json_data, expected_json)
1385
class TestPrintTableCmd(TestCmd):
1386
def test_normal(self):
1387
output = PrintTableCmd().output(self.clients.values())
1388
expected_output = "\n".join((
1389
"Name Enabled Timeout Last Successful Check",
1390
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1391
"barbar Yes 00:05:00 2019-02-04T00:00:00 ",
1393
self.assertEqual(output, expected_output)
1394
def test_verbose(self):
1395
output = PrintTableCmd(verbose=True).output(
1396
self.clients.values())
1411
"Last Successful Check ",
1412
"2019-02-03T00:00:00 ",
1413
"2019-02-04T00:00:00 ",
1416
"2019-01-02T00:00:00 ",
1417
"2019-01-03T00:00:00 ",
1429
("92ed150794387c03ce684574b1139a6594a34f895daaaf09fd8"
1431
("0558568eedd67d622f5c83b35a115f796ab612cff5ad227247e"
1435
"778827225BA7DE539C5A7CFA59CFF7CDBD9A5920 ",
1436
"3E393AEAEFB84C7E89E2F547B3A107558FCA3A27 ",
1438
"Check Is Running ",
1443
"2019-01-03T00:00:00 ",
1444
"2019-01-04T00:00:00 ",
1446
"Approval Is Pending ",
1450
"Approved By Default ",
1454
"Last Approval Request ",
1456
"2019-01-03T00:00:00 ",
1462
"Approval Duration ",
1467
"fping -q -- %(host)s ",
1470
"Extended Timeout ",
1475
"2019-02-04T00:00:00 ",
1476
"2019-02-05T00:00:00 ",
1478
"Last Checker Status",
1483
num_lines = max(len(rows) for rows in columns)
1484
expected_output = "\n".join("".join(rows[line]
1485
for rows in columns)
1486
for line in range(num_lines))
1487
self.assertEqual(output, expected_output)
1488
def test_one_client(self):
1489
output = PrintTableCmd().output(self.one_client.values())
1490
expected_output = "\n".join((
1491
"Name Enabled Timeout Last Successful Check",
1492
"foo Yes 00:05:00 2019-02-03T00:00:00 ",
1494
self.assertEqual(output, expected_output)
986
class TestIsEnabledCmd(TestCmd):
987
def test_is_enabled(self):
988
self.assertTrue(all(IsEnabledCmd().is_enabled(client, properties)
989
for client, properties in self.clients.items()))
990
def test_is_enabled_run_exits_successfully(self):
991
with self.assertRaises(SystemExit) as e:
992
IsEnabledCmd().run(None, self.one_client)
993
if e.exception.code is not None:
994
self.assertEqual(e.exception.code, 0)
996
self.assertIsNone(e.exception.code)
997
def test_is_enabled_run_exits_with_failure(self):
998
self.client.attributes["Enabled"] = dbus.Boolean(False)
999
with self.assertRaises(SystemExit) as e:
1000
IsEnabledCmd().run(None, self.one_client)
1001
if isinstance(e.exception.code, int):
1002
self.assertNotEqual(e.exception.code, 0)
1004
self.assertIsNotNone(e.exception.code)
1006
class TestRemoveCmd(TestCmd):
1007
def test_remove(self):
1008
class MockMandos(object):
1011
def RemoveClient(self, dbus_path):
1012
self.calls.append(("RemoveClient", (dbus_path,)))
1013
mandos = MockMandos()
1014
super(TestRemoveCmd, self).setUp()
1015
RemoveCmd().run(mandos, self.clients)
1016
self.assertEqual(len(mandos.calls), 2)
1017
for client in self.clients:
1018
self.assertIn(("RemoveClient",
1019
(client.__dbus_object_path__,)),
1022
class TestApproveCmd(TestCmd):
1023
def test_approve(self):
1024
ApproveCmd().run(None, self.clients)
1025
for client in self.clients:
1026
self.assertIn(("Approve", (True, client_interface)),
1029
class TestDenyCmd(TestCmd):
1030
def test_deny(self):
1031
DenyCmd().run(None, self.clients)
1032
for client in self.clients:
1033
self.assertIn(("Approve", (False, client_interface)),
1036
class TestEnableCmd(TestCmd):
1037
def test_enable(self):
1038
for client in self.clients:
1039
client.attributes["Enabled"] = False
1041
EnableCmd().run(None, self.clients)
1043
for client in self.clients:
1044
self.assertTrue(client.attributes["Enabled"])
1046
class TestDisableCmd(TestCmd):
1047
def test_disable(self):
1048
DisableCmd().run(None, self.clients)
1050
for client in self.clients:
1051
self.assertFalse(client.attributes["Enabled"])
1497
1053
class Unique(object):
1498
1054
"""Class for objects which exist only to be unique objects, since
1499
1055
unittest.mock.sentinel only exists in Python 3.3"""
1502
1057
class TestPropertyCmd(TestCmd):
1503
1058
"""Abstract class for tests of PropertyCmd classes"""
1504
1059
def runTest(self):
1508
1063
self.values_to_set)
1509
1064
for value_to_set, value_to_get in zip(self.values_to_set,
1510
1065
values_to_get):
1511
for clientpath in self.clients:
1512
client = self.bus.get_object(dbus_busname, clientpath)
1513
old_value = client.attributes[self.propname]
1066
for client in self.clients:
1067
old_value = client.attributes[self.property]
1514
1068
self.assertNotIsInstance(old_value, Unique)
1515
client.attributes[self.propname] = Unique()
1069
client.attributes[self.property] = Unique()
1516
1070
self.run_command(value_to_set, self.clients)
1517
for clientpath in self.clients:
1518
client = self.bus.get_object(dbus_busname, clientpath)
1519
value = client.attributes[self.propname]
1071
for client in self.clients:
1072
value = client.attributes[self.property]
1520
1073
self.assertNotIsInstance(value, Unique)
1521
1074
self.assertEqual(value, value_to_get)
1522
1075
def run_command(self, value, clients):
1523
self.command().run(clients, self.bus)
1526
class TestEnableCmd(TestPropertyCmd):
1528
propname = "Enabled"
1529
values_to_set = [dbus.Boolean(True)]
1532
class TestDisableCmd(TestPropertyCmd):
1533
command = DisableCmd
1534
propname = "Enabled"
1535
values_to_set = [dbus.Boolean(False)]
1076
self.command().run(None, clients)
1538
1078
class TestBumpTimeoutCmd(TestPropertyCmd):
1539
1079
command = BumpTimeoutCmd
1540
propname = "LastCheckedOK"
1080
property = "LastCheckedOK"
1541
1081
values_to_set = [""]
1544
1083
class TestStartCheckerCmd(TestPropertyCmd):
1545
1084
command = StartCheckerCmd
1546
propname = "CheckerRunning"
1085
property = "CheckerRunning"
1547
1086
values_to_set = [dbus.Boolean(True)]
1550
1088
class TestStopCheckerCmd(TestPropertyCmd):
1551
1089
command = StopCheckerCmd
1552
propname = "CheckerRunning"
1090
property = "CheckerRunning"
1553
1091
values_to_set = [dbus.Boolean(False)]
1556
1093
class TestApproveByDefaultCmd(TestPropertyCmd):
1557
1094
command = ApproveByDefaultCmd
1558
propname = "ApprovedByDefault"
1095
property = "ApprovedByDefault"
1559
1096
values_to_set = [dbus.Boolean(True)]
1562
1098
class TestDenyByDefaultCmd(TestPropertyCmd):
1563
1099
command = DenyByDefaultCmd
1564
propname = "ApprovedByDefault"
1100
property = "ApprovedByDefault"
1565
1101
values_to_set = [dbus.Boolean(False)]
1568
class TestPropertyValueCmd(TestPropertyCmd):
1569
"""Abstract class for tests of PropertyValueCmd classes"""
1103
class TestValueArgumentPropertyCmd(TestPropertyCmd):
1104
"""Abstract class for tests of PropertyCmd classes using the
1105
ValueArgumentMixIn"""
1570
1106
def runTest(self):
1571
if type(self) is TestPropertyValueCmd:
1107
if type(self) is TestValueArgumentPropertyCmd:
1573
return super(TestPropertyValueCmd, self).runTest()
1109
return super(TestValueArgumentPropertyCmd, self).runTest()
1574
1110
def run_command(self, value, clients):
1575
self.command(value).run(clients, self.bus)
1578
class TestSetCheckerCmd(TestPropertyValueCmd):
1111
self.command(value).run(None, clients)
1113
class TestSetCheckerCmd(TestValueArgumentPropertyCmd):
1579
1114
command = SetCheckerCmd
1580
propname = "Checker"
1115
property = "Checker"
1581
1116
values_to_set = ["", ":", "fping -q -- %s"]
1584
class TestSetHostCmd(TestPropertyValueCmd):
1118
class TestSetHostCmd(TestValueArgumentPropertyCmd):
1585
1119
command = SetHostCmd
1587
1121
values_to_set = ["192.0.2.3", "foo.example.org"]
1590
class TestSetSecretCmd(TestPropertyValueCmd):
1123
class TestSetSecretCmd(TestValueArgumentPropertyCmd):
1591
1124
command = SetSecretCmd
1593
1126
values_to_set = [io.BytesIO(b""),
1594
1127
io.BytesIO(b"secret\0xyzzy\nbar")]
1595
1128
values_to_get = [b"", b"secret\0xyzzy\nbar"]
1598
class TestSetTimeoutCmd(TestPropertyValueCmd):
1130
class TestSetTimeoutCmd(TestValueArgumentPropertyCmd):
1599
1131
command = SetTimeoutCmd
1600
propname = "Timeout"
1132
property = "Timeout"
1601
1133
values_to_set = [datetime.timedelta(),
1602
1134
datetime.timedelta(minutes=5),
1603
1135
datetime.timedelta(seconds=1),
1649
1177
datetime.timedelta(weeks=52)]
1650
1178
values_to_get = [0, 300000, 1000, 604800000, 31449600000]
1180
class Test_command_from_options(unittest.TestCase):
1182
self.parser = argparse.ArgumentParser()
1183
add_command_line_options(self.parser)
1184
def assert_command_from_args(self, args, command_cls, **cmd_attrs):
1185
"""Assert that parsing ARGS should result in an instance of
1186
COMMAND_CLS with (optionally) all supplied attributes (CMD_ATTRS)."""
1187
options = self.parser.parse_args(args)
1188
check_option_syntax(self.parser, options)
1189
commands = commands_from_options(options)
1190
self.assertEqual(len(commands), 1)
1191
command = commands[0]
1192
self.assertIsInstance(command, command_cls)
1193
for key, value in cmd_attrs.items():
1194
self.assertEqual(getattr(command, key), value)
1195
def test_print_table(self):
1196
self.assert_command_from_args([], PrintTableCmd,
1199
def test_print_table_verbose(self):
1200
self.assert_command_from_args(["--verbose"], PrintTableCmd,
1203
def test_print_table_verbose_short(self):
1204
self.assert_command_from_args(["-v"], PrintTableCmd,
1207
def test_enable(self):
1208
self.assert_command_from_args(["--enable", "foo"], EnableCmd)
1210
def test_enable_short(self):
1211
self.assert_command_from_args(["-e", "foo"], EnableCmd)
1213
def test_disable(self):
1214
self.assert_command_from_args(["--disable", "foo"],
1217
def test_disable_short(self):
1218
self.assert_command_from_args(["-d", "foo"], DisableCmd)
1220
def test_bump_timeout(self):
1221
self.assert_command_from_args(["--bump-timeout", "foo"],
1224
def test_bump_timeout_short(self):
1225
self.assert_command_from_args(["-b", "foo"], BumpTimeoutCmd)
1227
def test_start_checker(self):
1228
self.assert_command_from_args(["--start-checker", "foo"],
1231
def test_stop_checker(self):
1232
self.assert_command_from_args(["--stop-checker", "foo"],
1235
def test_remove(self):
1236
self.assert_command_from_args(["--remove", "foo"],
1239
def test_remove_short(self):
1240
self.assert_command_from_args(["-r", "foo"], RemoveCmd)
1242
def test_checker(self):
1243
self.assert_command_from_args(["--checker", ":", "foo"],
1244
SetCheckerCmd, value_to_set=":")
1246
def test_checker_empty(self):
1247
self.assert_command_from_args(["--checker", "", "foo"],
1248
SetCheckerCmd, value_to_set="")
1250
def test_checker_short(self):
1251
self.assert_command_from_args(["-c", ":", "foo"],
1252
SetCheckerCmd, value_to_set=":")
1254
def test_timeout(self):
1255
self.assert_command_from_args(["--timeout", "PT5M", "foo"],
1257
value_to_set=300000)
1259
def test_timeout_short(self):
1260
self.assert_command_from_args(["-t", "PT5M", "foo"],
1262
value_to_set=300000)
1264
def test_extended_timeout(self):
1265
self.assert_command_from_args(["--extended-timeout", "PT15M",
1267
SetExtendedTimeoutCmd,
1268
value_to_set=900000)
1270
def test_interval(self):
1271
self.assert_command_from_args(["--interval", "PT2M", "foo"],
1273
value_to_set=120000)
1275
def test_interval_short(self):
1276
self.assert_command_from_args(["-i", "PT2M", "foo"],
1278
value_to_set=120000)
1280
def test_approve_by_default(self):
1281
self.assert_command_from_args(["--approve-by-default", "foo"],
1282
ApproveByDefaultCmd)
1284
def test_deny_by_default(self):
1285
self.assert_command_from_args(["--deny-by-default", "foo"],
1288
def test_approval_delay(self):
1289
self.assert_command_from_args(["--approval-delay", "PT30S",
1290
"foo"], SetApprovalDelayCmd,
1293
def test_approval_duration(self):
1294
self.assert_command_from_args(["--approval-duration", "PT1S",
1295
"foo"], SetApprovalDurationCmd,
1298
def test_host(self):
1299
self.assert_command_from_args(["--host", "foo.example.org",
1301
value_to_set="foo.example.org")
1303
def test_host_short(self):
1304
self.assert_command_from_args(["-H", "foo.example.org",
1306
value_to_set="foo.example.org")
1308
def test_secret_devnull(self):
1309
self.assert_command_from_args(["--secret", os.path.devnull,
1310
"foo"], SetSecretCmd,
1313
def test_secret_tempfile(self):
1314
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1315
value = b"secret\0xyzzy\nbar"
1318
self.assert_command_from_args(["--secret", f.name,
1319
"foo"], SetSecretCmd,
1322
def test_secret_devnull_short(self):
1323
self.assert_command_from_args(["-s", os.path.devnull, "foo"],
1324
SetSecretCmd, value_to_set=b"")
1326
def test_secret_tempfile_short(self):
1327
with tempfile.NamedTemporaryFile(mode="r+b") as f:
1328
value = b"secret\0xyzzy\nbar"
1331
self.assert_command_from_args(["-s", f.name, "foo"],
1335
def test_approve(self):
1336
self.assert_command_from_args(["--approve", "foo"],
1339
def test_approve_short(self):
1340
self.assert_command_from_args(["-A", "foo"], ApproveCmd)
1342
def test_deny(self):
1343
self.assert_command_from_args(["--deny", "foo"], DenyCmd)
1345
def test_deny_short(self):
1346
self.assert_command_from_args(["-D", "foo"], DenyCmd)
1348
def test_dump_json(self):
1349
self.assert_command_from_args(["--dump-json"], DumpJSONCmd)
1351
def test_is_enabled(self):
1352
self.assert_command_from_args(["--is-enabled", "foo"],
1355
def test_is_enabled_short(self):
1356
self.assert_command_from_args(["-V", "foo"], IsEnabledCmd)
1358
def test_deny_before_remove(self):
1359
options = self.parser.parse_args(["--deny", "--remove", "foo"])
1360
check_option_syntax(self.parser, options)
1361
commands = commands_from_options(options)
1362
self.assertEqual(len(commands), 2)
1363
self.assertIsInstance(commands[0], DenyCmd)
1364
self.assertIsInstance(commands[1], RemoveCmd)
1366
def test_deny_before_remove_reversed(self):
1367
options = self.parser.parse_args(["--remove", "--deny", "--all"])
1368
check_option_syntax(self.parser, options)
1369
commands = commands_from_options(options)
1370
self.assertEqual(len(commands), 2)
1371
self.assertIsInstance(commands[0], DenyCmd)
1372
self.assertIsInstance(commands[1], RemoveCmd)
1375
class Test_check_option_syntax(unittest.TestCase):
1376
# This mostly corresponds to the definition from has_actions() in
1377
# check_option_syntax()
1379
# The actual values set here are not that important, but we do
1380
# at least stick to the correct types, even though they are
1384
"bump_timeout": True,
1385
"start_checker": True,
1386
"stop_checker": True,
1390
"timeout": datetime.timedelta(),
1391
"extended_timeout": datetime.timedelta(),
1392
"interval": datetime.timedelta(),
1393
"approved_by_default": True,
1394
"approval_delay": datetime.timedelta(),
1395
"approval_duration": datetime.timedelta(),
1397
"secret": io.BytesIO(b"x"),
1403
self.parser = argparse.ArgumentParser()
1404
add_command_line_options(self.parser)
1406
@contextlib.contextmanager
1407
def assertParseError(self):
1408
with self.assertRaises(SystemExit) as e:
1409
with self.temporarily_suppress_stderr():
1411
# Exit code from argparse is guaranteed to be "2". Reference:
1412
# https://docs.python.org/3/library/argparse.html#exiting-methods
1413
self.assertEqual(e.exception.code, 2)
1416
@contextlib.contextmanager
1417
def temporarily_suppress_stderr():
1418
null = os.open(os.path.devnull, os.O_RDWR)
1419
stderrcopy = os.dup(sys.stderr.fileno())
1420
os.dup2(null, sys.stderr.fileno())
1426
os.dup2(stderrcopy, sys.stderr.fileno())
1427
os.close(stderrcopy)
1429
def check_option_syntax(self, options):
1430
check_option_syntax(self.parser, options)
1432
def test_actions_requires_client_or_all(self):
1433
for action, value in self.actions.items():
1434
options = self.parser.parse_args()
1435
setattr(options, action, value)
1436
with self.assertParseError():
1437
self.check_option_syntax(options)
1439
def test_actions_conflicts_with_verbose(self):
1440
for action, value in self.actions.items():
1441
options = self.parser.parse_args()
1442
setattr(options, action, value)
1443
options.verbose = True
1444
with self.assertParseError():
1445
self.check_option_syntax(options)
1447
def test_dump_json_conflicts_with_verbose(self):
1448
options = self.parser.parse_args()
1449
options.dump_json = True
1450
options.verbose = True
1451
with self.assertParseError():
1452
self.check_option_syntax(options)
1454
def test_dump_json_conflicts_with_action(self):
1455
for action, value in self.actions.items():
1456
options = self.parser.parse_args()
1457
setattr(options, action, value)
1458
options.dump_json = True
1459
with self.assertParseError():
1460
self.check_option_syntax(options)
1462
def test_all_can_not_be_alone(self):
1463
options = self.parser.parse_args()
1465
with self.assertParseError():
1466
self.check_option_syntax(options)
1468
def test_all_is_ok_with_any_action(self):
1469
for action, value in self.actions.items():
1470
options = self.parser.parse_args()
1471
setattr(options, action, value)
1473
self.check_option_syntax(options)
1475
def test_is_enabled_fails_without_client(self):
1476
options = self.parser.parse_args()
1477
options.is_enabled = True
1478
with self.assertParseError():
1479
self.check_option_syntax(options)
1481
def test_is_enabled_works_with_one_client(self):
1482
options = self.parser.parse_args()
1483
options.is_enabled = True
1484
options.client = ["foo"]
1485
self.check_option_syntax(options)
1487
def test_is_enabled_fails_with_two_clients(self):
1488
options = self.parser.parse_args()
1489
options.is_enabled = True
1490
options.client = ["foo", "barbar"]
1491
with self.assertParseError():
1492
self.check_option_syntax(options)
1654
1496
def should_only_run_tests():