=== modified file 'mandos-monitor'
--- mandos-monitor 2016-06-23 20:10:40 +0000
+++ mandos-monitor 2016-08-25 17:06:42 +0000
@@ -1,11 +1,11 @@
#!/usr/bin/python
# -*- mode: python; coding: utf-8 -*-
-#
+#
# Mandos Monitor - Control and monitor the Mandos server
-#
+#
# Copyright © 2009-2016 Teddy Hogeborn
# Copyright © 2009-2016 Björn Påhlsson
-#
+#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
@@ -15,13 +15,13 @@
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
-#
+#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see
# .
-#
+#
# Contact the authors at .
-#
+#
from __future__ import (division, absolute_import, print_function,
unicode_literals)
@@ -45,12 +45,13 @@
import locale
+import logging
+
if sys.version_info.major == 2:
str = unicode
locale.setlocale(locale.LC_ALL, '')
-import logging
logging.getLogger('dbus.proxies').setLevel(logging.CRITICAL)
# Some useful constants
@@ -64,6 +65,7 @@
except AttributeError:
dbus.OBJECT_MANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
+
def isoformat_to_datetime(iso):
"Parse an ISO 8601 date string to a datetime.datetime()"
if not iso:
@@ -77,8 +79,9 @@
int(day),
int(hour),
int(minute),
- int(second), # Whole seconds
- int(fraction*1000000)) # Microseconds
+ int(second), # Whole seconds
+ int(fraction*1000000)) # Microseconds
+
class MandosClientPropertyCache(object):
"""This wraps a Mandos Client D-Bus proxy object, caches the
@@ -86,22 +89,21 @@
changed.
"""
def __init__(self, proxy_object=None, properties=None, **kwargs):
- self.proxy = proxy_object # Mandos Client proxy object
+ self.proxy = proxy_object # Mandos Client proxy object
self.properties = dict() if properties is None else properties
self.property_changed_match = (
self.proxy.connect_to_signal("PropertiesChanged",
self.properties_changed,
dbus.PROPERTIES_IFACE,
byte_arrays=True))
-
+
if properties is None:
- self.properties.update(
- self.proxy.GetAll(client_interface,
- dbus_interface
- = dbus.PROPERTIES_IFACE))
-
+ self.properties.update(self.proxy.GetAll(
+ client_interface,
+ dbus_interface=dbus.PROPERTIES_IFACE))
+
super(MandosClientPropertyCache, self).__init__(**kwargs)
-
+
def properties_changed(self, interface, properties, invalidated):
"""This is called whenever we get a PropertiesChanged signal
It updates the changed properties in the "properties" dict.
@@ -109,7 +111,7 @@
# Update properties dict with new value
if interface == client_interface:
self.properties.update(properties)
-
+
def delete(self):
self.property_changed_match.remove()
@@ -117,7 +119,7 @@
class MandosClientWidget(urwid.FlowWidget, MandosClientPropertyCache):
"""A Mandos Client which is visible on the screen.
"""
-
+
def __init__(self, server_proxy_object=None, update_hook=None,
delete_hook=None, logger=None, **kwargs):
# Called on update
@@ -128,9 +130,9 @@
self.server_proxy_object = server_proxy_object
# Logger
self.logger = logger
-
+
self._update_timer_callback_tag = None
-
+
# The widget shown normally
self._text_widget = urwid.Text("")
# The widget shown when we have focus
@@ -138,7 +140,7 @@
super(MandosClientWidget, self).__init__(**kwargs)
self.update()
self.opened = False
-
+
self.match_objects = (
self.proxy.connect_to_signal("CheckerCompleted",
self.checker_completed,
@@ -162,7 +164,7 @@
byte_arrays=True))
self.logger('Created client {}'
.format(self.properties["Name"]), level=0)
-
+
def using_timer(self, flag):
"""Call this method with True or False when timer should be
activated or deactivated.
@@ -175,7 +177,7 @@
elif not (flag or self._update_timer_callback_tag is None):
GLib.source_remove(self._update_timer_callback_tag)
self._update_timer_callback_tag = None
-
+
def checker_completed(self, exitstatus, condition, command):
if exitstatus == 0:
self.logger('Checker for client {} (command "{}")'
@@ -195,17 +197,17 @@
.format(self.properties["Name"], command,
os.WTERMSIG(condition)))
self.update()
-
+
def checker_started(self, command):
"""Server signals that a checker started."""
self.logger('Client {} started checker "{}"'
.format(self.properties["Name"],
command), level=0)
-
+
def got_secret(self):
self.logger('Client {} received its secret'
.format(self.properties["Name"]))
-
+
def need_approval(self, timeout, default):
if not default:
message = 'Client {} needs approval within {} seconds'
@@ -213,49 +215,48 @@
message = 'Client {} will get its secret in {} seconds'
self.logger(message.format(self.properties["Name"],
timeout/1000))
-
+
def rejected(self, reason):
self.logger('Client {} was rejected; reason: {}'
.format(self.properties["Name"], reason))
-
+
def selectable(self):
"""Make this a "selectable" widget.
This overrides the method from urwid.FlowWidget."""
return True
-
+
def rows(self, maxcolrow, focus=False):
"""How many rows this widget will occupy might depend on
whether we have focus or not.
This overrides the method from urwid.FlowWidget"""
return self.current_widget(focus).rows(maxcolrow, focus=focus)
-
+
def current_widget(self, focus=False):
if focus or self.opened:
return self._focus_widget
return self._widget
-
+
def update(self):
"Called when what is visible on the screen should be updated."
# How to add standout mode to a style
- with_standout = { "normal": "standout",
- "bold": "bold-standout",
- "underline-blink":
- "underline-blink-standout",
- "bold-underline-blink":
- "bold-underline-blink-standout",
- }
-
+ with_standout = {"normal": "standout",
+ "bold": "bold-standout",
+ "underline-blink":
+ "underline-blink-standout",
+ "bold-underline-blink":
+ "bold-underline-blink-standout",
+ }
+
# Rebuild focus and non-focus widgets using current properties
-
+
# Base part of a client. Name!
base = '{name}: '.format(name=self.properties["Name"])
if not self.properties["Enabled"]:
message = "DISABLED"
self.using_timer(False)
elif self.properties["ApprovalPending"]:
- timeout = datetime.timedelta(milliseconds
- = self.properties
- ["ApprovalDelay"])
+ timeout = datetime.timedelta(
+ milliseconds=self.properties["ApprovalDelay"])
last_approval_request = isoformat_to_datetime(
self.properties["LastApprovalRequest"])
if last_approval_request is not None:
@@ -288,7 +289,7 @@
message = "enabled"
self.using_timer(False)
self._text = "{}{}".format(base, message)
-
+
if not urwid.supports_unicode():
self._text = self._text.encode("ascii", "replace")
textlist = [("normal", self._text)]
@@ -304,14 +305,14 @@
# Run update hook, if any
if self.update_hook is not None:
self.update_hook()
-
+
def update_timer(self):
"""called by GLib. Will indefinitely loop until
GLib.source_remove() on tag is called
"""
self.update()
return True # Keep calling this
-
+
def delete(self, **kwargs):
if self._update_timer_callback_tag is not None:
GLib.source_remove(self._update_timer_callback_tag)
@@ -322,31 +323,31 @@
if self.delete_hook is not None:
self.delete_hook(self)
return super(MandosClientWidget, self).delete(**kwargs)
-
+
def render(self, maxcolrow, focus=False):
"""Render differently if we have focus.
This overrides the method from urwid.FlowWidget"""
return self.current_widget(focus).render(maxcolrow,
focus=focus)
-
+
def keypress(self, maxcolrow, key):
"""Handle keys.
This overrides the method from urwid.FlowWidget"""
if key == "+":
self.proxy.Set(client_interface, "Enabled",
- dbus.Boolean(True), ignore_reply = True,
- dbus_interface = dbus.PROPERTIES_IFACE)
+ dbus.Boolean(True), ignore_reply=True,
+ dbus_interface=dbus.PROPERTIES_IFACE)
elif key == "-":
self.proxy.Set(client_interface, "Enabled", False,
- ignore_reply = True,
- dbus_interface = dbus.PROPERTIES_IFACE)
+ ignore_reply=True,
+ dbus_interface=dbus.PROPERTIES_IFACE)
elif key == "a":
self.proxy.Approve(dbus.Boolean(True, variant_level=1),
- dbus_interface = client_interface,
+ dbus_interface=client_interface,
ignore_reply=True)
elif key == "d":
self.proxy.Approve(dbus.Boolean(False, variant_level=1),
- dbus_interface = client_interface,
+ dbus_interface=client_interface,
ignore_reply=True)
elif key == "R" or key == "_" or key == "ctrl k":
self.server_proxy_object.RemoveClient(self.proxy
@@ -354,14 +355,14 @@
ignore_reply=True)
elif key == "s":
self.proxy.Set(client_interface, "CheckerRunning",
- dbus.Boolean(True), ignore_reply = True,
- dbus_interface = dbus.PROPERTIES_IFACE)
+ dbus.Boolean(True), ignore_reply=True,
+ dbus_interface=dbus.PROPERTIES_IFACE)
elif key == "S":
self.proxy.Set(client_interface, "CheckerRunning",
- dbus.Boolean(False), ignore_reply = True,
- dbus_interface = dbus.PROPERTIES_IFACE)
+ dbus.Boolean(False), ignore_reply=True,
+ dbus_interface=dbus.PROPERTIES_IFACE)
elif key == "C":
- self.proxy.CheckedOK(dbus_interface = client_interface,
+ self.proxy.CheckedOK(dbus_interface=client_interface,
ignore_reply=True)
# xxx
# elif key == "p" or key == "=":
@@ -372,12 +373,12 @@
# self.open()
else:
return key
-
+
def properties_changed(self, interface, properties, invalidated):
"""Call self.update() if any properties changed.
This overrides the method from MandosClientPropertyCache"""
- old_values = { key: self.properties.get(key)
- for key in properties.keys() }
+ old_values = {key: self.properties.get(key)
+ for key in properties.keys()}
super(MandosClientWidget, self).properties_changed(
interface, properties, invalidated)
if any(old_values[key] != self.properties.get(key)
@@ -391,7 +392,8 @@
use them as an excuse to shift focus away from this widget.
"""
def keypress(self, *args, **kwargs):
- ret = super(ConstrainedListBox, self).keypress(*args, **kwargs)
+ ret = (super(ConstrainedListBox, self)
+ .keypress(*args, **kwargs))
if ret in ("up", "down"):
return
return ret
@@ -403,9 +405,9 @@
"""
def __init__(self, max_log_length=1000, log_level=1):
DBusGMainLoop(set_as_default=True)
-
+
self.screen = urwid.curses_display.Screen()
-
+
self.screen.register_palette((
("normal",
"default", "default", None),
@@ -416,7 +418,8 @@
("standout",
"standout", "default", "standout"),
("bold-underline-blink",
- "bold,underline,blink", "default", "bold,underline,blink"),
+ "bold,underline,blink", "default",
+ "bold,underline,blink"),
("bold-standout",
"bold,standout", "default", "bold,standout"),
("underline-blink-standout",
@@ -426,66 +429,63 @@
"bold,underline,blink,standout", "default",
"bold,underline,blink,standout"),
))
-
+
if urwid.supports_unicode():
- self.divider = "─" # \u2500
- #self.divider = "━" # \u2501
+ self.divider = "─" # \u2500
else:
- #self.divider = "-" # \u002d
- self.divider = "_" # \u005f
-
+ self.divider = "_" # \u005f
+
self.screen.start()
-
+
self.size = self.screen.get_cols_rows()
-
+
self.clients = urwid.SimpleListWalker([])
self.clients_dict = {}
-
+
# We will add Text widgets to this list
self.log = []
self.max_log_length = max_log_length
-
+
self.log_level = log_level
-
+
# We keep a reference to the log widget so we can remove it
# from the ListWalker without it getting destroyed
self.logbox = ConstrainedListBox(self.log)
-
+
# This keeps track of whether self.uilist currently has
# self.logbox in it or not
self.log_visible = True
self.log_wrap = "any"
-
+
self.rebuild()
self.log_message_raw(("bold",
"Mandos Monitor version " + version))
self.log_message_raw(("bold",
"q: Quit ?: Help"))
-
+
self.busname = domain + '.Mandos'
self.main_loop = GLib.MainLoop()
-
+
def client_not_found(self, fingerprint, address):
self.log_message("Client with address {} and fingerprint {}"
" could not be found"
.format(address, fingerprint))
-
+
def rebuild(self):
"""This rebuilds the User Interface.
Call this when the widget layout needs to change"""
self.uilist = []
- #self.uilist.append(urwid.ListBox(self.clients))
+ # self.uilist.append(urwid.ListBox(self.clients))
self.uilist.append(urwid.Frame(ConstrainedListBox(self.
clients),
- #header=urwid.Divider(),
+ # header=urwid.Divider(),
header=None,
- footer=
- urwid.Divider(div_char=
- self.divider)))
+ footer=urwid.Divider(
+ div_char=self.divider)))
if self.log_visible:
self.uilist.append(self.logbox)
self.topwidget = urwid.Pile(self.uilist)
-
+
def log_message(self, message, level=1):
"""Log message formatted with timestamp"""
if level < self.log_level:
@@ -493,26 +493,26 @@
timestamp = datetime.datetime.now().isoformat()
self.log_message_raw("{}: {}".format(timestamp, message),
level=level)
-
+
def log_message_raw(self, markup, level=1):
"""Add a log message to the log buffer."""
if level < self.log_level:
return
self.log.append(urwid.Text(markup, wrap=self.log_wrap))
- if (self.max_log_length
- and len(self.log) > self.max_log_length):
- del self.log[0:len(self.log)-self.max_log_length-1]
+ if self.max_log_length:
+ if len(self.log) > self.max_log_length:
+ del self.log[0:len(self.log)-self.max_log_length-1]
self.logbox.set_focus(len(self.logbox.body.contents),
coming_from="above")
self.refresh()
-
+
def toggle_log_display(self):
"""Toggle visibility of the log buffer."""
self.log_visible = not self.log_visible
self.rebuild()
self.log_message("Log visibility changed to: {}"
.format(self.log_visible), level=0)
-
+
def change_log_display(self):
"""Change type of log display.
Currently, this toggles wrapping of text lines."""
@@ -524,10 +524,10 @@
textwidget.set_wrap_mode(self.log_wrap)
self.log_message("Wrap mode: {}".format(self.log_wrap),
level=0)
-
+
def find_and_remove_client(self, path, interfaces):
"""Find a client by its object path and remove it.
-
+
This is connected to the InterfacesRemoved signal from the
Mandos server object."""
if client_interface not in interfaces:
@@ -541,10 +541,10 @@
.format(path))
return
client.delete()
-
+
def add_new_client(self, path, ifs_and_props):
"""Find a client by its object path and remove it.
-
+
This is connected to the InterfacesAdded signal from the
Mandos server object.
"""
@@ -552,21 +552,15 @@
# Not a Mandos client object; ignore
return
client_proxy_object = self.bus.get_object(self.busname, path)
- self.add_client(MandosClientWidget(server_proxy_object
- =self.mandos_serv,
- proxy_object
- =client_proxy_object,
- update_hook
- =self.refresh,
- delete_hook
- =self.remove_client,
- logger
- =self.log_message,
- properties
- = dict(ifs_and_props[
- client_interface])),
+ self.add_client(MandosClientWidget(
+ server_proxy_object=self.mandos_serv,
+ proxy_object=client_proxy_object,
+ update_hook=self.refresh,
+ delete_hook=self.remove_client,
+ logger=self.log_message,
+ properties=dict(ifs_and_props[client_interface])),
path=path)
-
+
def add_client(self, client, path=None):
self.clients.append(client)
if path is None:
@@ -574,47 +568,46 @@
self.clients_dict[path] = client
self.clients.sort(key=lambda c: c.properties["Name"])
self.refresh()
-
+
def remove_client(self, client, path=None):
self.clients.remove(client)
if path is None:
path = client.proxy.object_path
del self.clients_dict[path]
self.refresh()
-
+
def refresh(self):
"""Redraw the screen"""
canvas = self.topwidget.render(self.size, focus=True)
self.screen.draw_screen(self.size, canvas)
-
+
def run(self):
"""Start the main loop and exit when it's done."""
self.bus = dbus.SystemBus()
mandos_dbus_objc = self.bus.get_object(
self.busname, "/", follow_name_owner_changes=True)
- self.mandos_serv = dbus.Interface(mandos_dbus_objc,
- dbus_interface
- = server_interface)
+ self.mandos_serv = dbus.Interface(
+ mandos_dbus_objc, dbus_interface=server_interface)
try:
mandos_clients = (self.mandos_serv
.GetAllClientsWithProperties())
if not mandos_clients:
- self.log_message_raw(("bold", "Note: Server has no clients."))
+ self.log_message_raw(("bold",
+ "Note: Server has no clients."))
except dbus.exceptions.DBusException:
- self.log_message_raw(("bold", "Note: No Mandos server running."))
+ self.log_message_raw(("bold",
+ "Note: No Mandos server running."))
mandos_clients = dbus.Dictionary()
-
+
(self.mandos_serv
.connect_to_signal("InterfacesRemoved",
self.find_and_remove_client,
- dbus_interface
- = dbus.OBJECT_MANAGER_IFACE,
+ dbus_interface=dbus.OBJECT_MANAGER_IFACE,
byte_arrays=True))
(self.mandos_serv
.connect_to_signal("InterfacesAdded",
self.add_new_client,
- dbus_interface
- = dbus.OBJECT_MANAGER_IFACE,
+ dbus_interface=dbus.OBJECT_MANAGER_IFACE,
byte_arrays=True))
(self.mandos_serv
.connect_to_signal("ClientNotFound",
@@ -624,19 +617,15 @@
for path, client in mandos_clients.items():
client_proxy_object = self.bus.get_object(self.busname,
path)
- self.add_client(MandosClientWidget(server_proxy_object
- =self.mandos_serv,
- proxy_object
- =client_proxy_object,
- properties=client,
- update_hook
- =self.refresh,
- delete_hook
- =self.remove_client,
- logger
- =self.log_message),
+ self.add_client(MandosClientWidget(
+ server_proxy_object=self.mandos_serv,
+ proxy_object=client_proxy_object,
+ properties=client,
+ update_hook=self.refresh,
+ delete_hook=self.remove_client,
+ logger=self.log_message),
path=path)
-
+
self.refresh()
self._input_callback_tag = (GLib.io_add_watch
(sys.stdin.fileno(),
@@ -646,28 +635,28 @@
# Main loop has finished, we should close everything now
GLib.source_remove(self._input_callback_tag)
self.screen.stop()
-
+
def stop(self):
self.main_loop.quit()
-
+
def process_input(self, source, condition):
keys = self.screen.get_input()
- translations = { "ctrl n": "down", # Emacs
- "ctrl p": "up", # Emacs
- "ctrl v": "page down", # Emacs
- "meta v": "page up", # Emacs
- " ": "page down", # less
- "f": "page down", # less
- "b": "page up", # less
- "j": "down", # vi
- "k": "up", # vi
- }
+ translations = {"ctrl n": "down", # Emacs
+ "ctrl p": "up", # Emacs
+ "ctrl v": "page down", # Emacs
+ "meta v": "page up", # Emacs
+ " ": "page down", # less
+ "f": "page down", # less
+ "b": "page up", # less
+ "j": "down", # vi
+ "k": "up", # vi
+ }
for key in keys:
try:
key = translations[key]
except KeyError: # :-)
pass
-
+
if key == "q" or key == "Q":
self.stop()
break
@@ -721,24 +710,24 @@
else:
self.log_level = 0
self.log_message("Verbose mode: On")
- #elif (key == "end" or key == "meta >" or key == "G"
- # or key == ">"):
- # pass # xxx end-of-buffer
- #elif (key == "home" or key == "meta <" or key == "g"
- # or key == "<"):
- # pass # xxx beginning-of-buffer
- #elif key == "ctrl e" or key == "$":
- # pass # xxx move-end-of-line
- #elif key == "ctrl a" or key == "^":
- # pass # xxx move-beginning-of-line
- #elif key == "ctrl b" or key == "meta (" or key == "h":
- # pass # xxx left
- #elif key == "ctrl f" or key == "meta )" or key == "l":
- # pass # xxx right
- #elif key == "a":
- # pass # scroll up log
- #elif key == "z":
- # pass # scroll down log
+ # elif (key == "end" or key == "meta >" or key == "G"
+ # or key == ">"):
+ # pass # xxx end-of-buffer
+ # elif (key == "home" or key == "meta <" or key == "g"
+ # or key == "<"):
+ # pass # xxx beginning-of-buffer
+ # elif key == "ctrl e" or key == "$":
+ # pass # xxx move-end-of-line
+ # elif key == "ctrl a" or key == "^":
+ # pass # xxx move-beginning-of-line
+ # elif key == "ctrl b" or key == "meta (" or key == "h":
+ # pass # xxx left
+ # elif key == "ctrl f" or key == "meta )" or key == "l":
+ # pass # xxx right
+ # elif key == "a":
+ # pass # scroll up log
+ # elif key == "z":
+ # pass # scroll down log
elif self.topwidget.selectable():
self.topwidget.keypress(self.size, key)
self.refresh()