==> msgbroker.py <== """ Module contains routines used by several another modules. Daemon manager and mod daemon: Mod daemon creates appropriate files in the /dev/shm directory. Daemon manager handles all requests to this named pipe based API with help of asyncio. """ import asyncio class MsgBroker(): """ This is asyncio message broker for negi3mods. Every module has indivisual main loop with indivisual neg-ipc-file. """ lock = asyncio.Lock() @classmethod def mainloop(cls, loop, mods, port) -> None: """ Mainloop by loop create task """ cls.mods = mods loop.create_task(asyncio.start_server( cls.handle_client, 'localhost', port)) loop.run_forever() @classmethod async def handle_client(cls, reader, _) -> None: """ Proceed client message here """ async with cls.lock: while True: response = (await reader.readline()).decode('utf8').split() if not response: return name = response[0] cls.mods[name].send_msg(response[1:]) ==> negewmh.py <== """ In this module we have EWMH routines to detect dialog windows, visible windows, etc using python-xlib and python-ewmh. """ from typing import List from contextlib import contextmanager import Xlib import Xlib.display from ewmh import EWMH class NegEWMH(): """ Custom EWMH support functions """ disp = Xlib.display.Display() ewmh = EWMH() @staticmethod @contextmanager def window_obj(disp, win_id): """Simplify dealing with BadWindow (make it either valid or None)""" window_obj = None if win_id: try: window_obj = disp.create_resource_object('window', win_id) except Xlib.error.XError: pass yield window_obj @staticmethod def is_dialog_win(win) -> bool: """ Check that window [win] is not dialog window At first check typical window roles and classes, because of it more fast, then using python EWMH module to detect dialog window type or modal state of window. Args: win : target window to check """ if win.window_instance == "Places" \ or win.window_role in { "GtkFileChooserDialog", "confirmEx", "gimp-file-open"} \ or win.window_class == "Dialog": return True with NegEWMH.window_obj(NegEWMH.disp, win.window) as win_obj: win_type = NegEWMH.ewmh.getWmWindowType(win_obj, str=True) if '_NET_WM_WINDOW_TYPE_DIALOG' in win_type: return True win_state = NegEWMH.ewmh.getWmState(win_obj, str=True) if '_NET_WM_STATE_MODAL' in win_state: return True return False @staticmethod def find_visible_windows(windows_on_ws: List) -> List: """ Find windows visible on the screen now. Args: windows_on_ws: windows list which going to be filtered with this function. """ visible_windows = [] for win in windows_on_ws: with NegEWMH.window_obj(NegEWMH.disp, win.window) as win_obj: win_state = NegEWMH.ewmh.getWmState(win_obj, str=True) if '_NET_WM_STATE_HIDDEN' not in win_state: visible_windows.append(win) return visible_windows ==> negi3mod.py <== from typing import List class negi3mod(): def __init__(self): self.bindings = {} def send_msg(self, args: List) -> None: """ Creates bindings from socket IPC to current module public function calls. This function defines bindings to the module methods that can be used by external users as i3-bindings, sxhkd, etc. Need the [send] binary which can send commands to the appropriate socket. Args: args (List): argument list for the selected function. """ self.bindings[args[0]](*args[1:]) ==> standalone_cfg.py <== """ Dynamic TOML-based config for basic negi3mods. It is the simplified version of cfg for modules like polybar_vol, etc. There are no external dependecies like i3 or asyncio. """ import sys import toml import traceback import asyncio import inotipy from misc import Misc class modconfig(): def __init__(self, loop): # set asyncio loop self.loop = loop # detect current negi3mod self.mod = self.__class__.__name__ # config dir path self.i3_cfg_path = Misc.i3path() + '/cfg/' # negi3mod config path self.mod_cfg_path = self.i3_cfg_path + self.mod + '.cfg' # load current config self.load_config() # run inotify watcher to update config on change. self.run_inotify_watchers() def reload_config(self): """ Reload config. Call load_config and reinit all stuff. """ prev_conf = self.cfg try: self.load_config() self.__init__() self.special_reload() except Exception: traceback.print_exc(file=sys.stdout) self.cfg = prev_conf self.__init__() def conf(self, *conf_path): """ Helper to extract config for current tag. Args: conf_path: path of config from where extract. """ ret = {} for part in conf_path: if not ret: ret = self.cfg.get(part) else: ret = ret.get(part) return ret def load_config(self): """ Reload config itself and convert lists in it to sets for the better performance. """ with open(self.mod_cfg_path, "r") as fp: self.cfg = toml.load(fp) def dump_config(self): """ Dump current config, can be used for debugging. """ with open(self.mod_cfg_path, "r+") as fp: toml.dump(self.cfg, fp) self.cfg = toml.load(fp) def cfg_watcher(self): """ cfg watcher to update modules config in realtime. """ watcher = inotipy.Watcher.create() watcher.watch(self.i3_cfg_path, inotipy.IN.MODIFY) return watcher async def cfg_worker(self, watcher): """ Reload target config Args: watcher: watcher for cfg. """ while True: event = await watcher.get() if event.name == self.mod + '.cfg': self.reload_config() Misc.notify_msg(f'[Reloaded {self.mod}]') def run_inotify_watchers(self): """ Start all watchers here via ensure_future to run it in background. """ asyncio.ensure_future(self.cfg_worker(self.cfg_watcher())) ==> vol.py <== """ Volume-manager daemon module. This is a volume manager. Smart tool which allow you control volume of mpd, mpv or whatever, depending on the context. For example if mpd playing it set up/down the mpd volume, if it is not then it handles mpv volume via mpvc if mpv window is not focused or via sending 0, 9 keyboard commands if it is. """ import subprocess import socket import asyncio from cfg import cfg from negi3mod import negi3mod class vol(negi3mod, cfg): def __init__(self, i3, loop) -> None: """ Init function Args: i3: i3ipc connection loop: asyncio loop. It's need to be given as parameter because of you need to bypass asyncio-loop to the thread """ # Initialize cfg. cfg.__init__(self, i3, loop=loop) # i3ipc connection, bypassed by negi3mods runner. self.i3ipc = i3 # Bypass loop from negi3mods script here. self.loop = loop # Default increment step for mpd. self.inc = self.conf("mpd_inc") # Default mpd address. self.mpd_addr = self.conf("mpd_addr") # Default mpd port. self.mpd_port = self.conf("mpd_port") # Default mpd buffer size. self.mpd_buf_size = self.conf("mpd_buf_size") # Default mpv socket. self.mpv_socket = self.conf("mpv_socket") # Send 0, 9 keys to the mpv window or not. self.use_mpv09 = self.conf("use_mpv09") # Cache current window on focus. self.i3ipc.on("window::focus", self.set_curr_win) # Default mpd status is False self.mpd_playing = False # MPD idle command listens to the player events by default. self.idle_cmd_str = "idle player\n" # MPD status string, which we need send to extract most of information. self.status_cmd_str = "status\n" self.bindings = { "u": self.volume_up, "d": self.volume_down, "reload": self.reload_config, } # Initial state for the current_win self.current_win = self.i3ipc.get_tree().find_focused() # Setup asyncio, because of it is used in another thread. asyncio.set_event_loop(self.loop) asyncio.ensure_future(self.update_mpd_status(self.loop)) def set_curr_win(self, i3, event) -> None: """ Cache the current window. Args: i3: i3ipc connection. event: i3ipc event. We can extract window from it using event.container. """ self.current_win = event.container async def update_mpd_status(self, loop) -> None: """ Asynchronous function to get current MPD status. Args: loop: asyncio.loop """ reader, writer = await asyncio.open_connection( host=self.mpd_addr, port=self.mpd_port, loop=loop ) data = await reader.read(self.mpd_buf_size) if data.startswith(b'OK'): writer.write(self.status_cmd_str.encode(encoding='utf-8')) stat_data = await reader.read(self.mpd_buf_size) if 'state: play' in stat_data.decode('UTF-8').split('\n'): self.mpd_playing = True else: self.mpd_playing = False while True: writer.write(self.idle_cmd_str.encode(encoding='utf-8')) data = await reader.read(self.mpd_buf_size) if 'player' in data.decode('UTF-8').split('\n')[0]: writer.write(self.status_cmd_str.encode(encoding='utf-8')) stat_data = await reader.read(self.mpd_buf_size) if 'state: play' in stat_data.decode('UTF-8').split('\n'): self.mpd_playing = True else: self.mpd_playing = False else: self.mpd_playing = False if writer.transport._conn_lost: # TODO: add function to wait for MPD port here. break def change_volume(self, val: int) -> None: """ Change volume here. This function using MPD state information, information about currently focused window from i3, etc to perform contextual volume changing. Args: val (int): volume step. """ val_str = str(val) mpv_key = '9' mpv_cmd = '--decrease' if val > 0: val_str = "+" + str(val) mpv_key = '0' mpv_cmd = '--increase' if self.mpd_playing: self.mpd_socket = socket.socket( socket.AF_INET6, socket.SOCK_STREAM ) try: self.mpd_socket.connect((self.mpd_addr, int(self.mpd_port))) self.mpd_socket.send(bytes( f'volume {val_str}\nclose\n', 'UTF-8' )) self.mpd_socket.recv(self.mpd_buf_size) finally: self.mpd_socket.close() elif self.use_mpv09 and self.current_win.window_class == "mpv": subprocess.run([ 'xdotool', 'type', '--clearmodifiers', '--delay', '0', str(mpv_key) * abs(val) ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False ) elif self.use_mpv09: subprocess.run([ 'mpvc', 'set', 'volume', mpv_cmd, str(abs(val)) ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False ) else: return def volume_up(self, *args) -> None: """ Increase target volume level. Args: args (*args): used as multiplexer for volume changing because of pipe-based nature of negi3mods IPC. """ count = len(args) if count <= 0: count = 1 self.change_volume(count) def volume_down(self, *args) -> None: """ Decrease target volume level. Args: args (*args): used as multiplexer for volume changing because of pipe-based nature of negi3mods IPC. """ count = len(args) if count <= 0: count = 1 self.change_volume(-count) ==> win_action.py <== """ 2bwm-like features module. There are a lot of various actions over the floating windows in this module, which may reminds you 2bwm, subtle, or another similar window managers. You can change window geometry, move it to the half or quad size of the screen space, etc. Partially code is taken from https://github.com/miseran/i3-tools, thanks to you, miseran(https://github.com/miseran) """ import collections from typing import Mapping from display import Display from cfg import cfg from negi3mod import negi3mod class win_action(negi3mod, cfg): """ Named scratchpad class Parents: cfg: configuration manager to autosave/autoload TOML-configutation with inotify """ def __init__(self, i3, loop=None) -> None: """ Init function Main part is in self.initialize, which performs initialization itself. Attributes: i3: i3ipc connection loop: asyncio loop. It's need to be given as parameter because of you need to bypass asyncio-loop to the thread """ # Initialize cfg. cfg.__init__(self, i3) # i3ipc connection, bypassed by negi3mods runner. self.i3ipc = i3 # cache list length maxlength = self.conf("cache_list_size") # create list with the finite number of elements by the [None] * N hack self.geom_list = collections.deque( [None] * maxlength, maxlen=maxlength ) # we need to know current resolution for almost all operations here. self.current_resolution = Display.get_screen_resolution() # here we load information about useless gaps self.load_useless_gaps() # config about useless gaps for quad splitting, True by default self.quad_use_gaps = self.conf("quad_use_gaps") # config about useless gaps for half splitting, True by default self.x2_use_gaps = self.conf("x2_use_gaps") # coeff to grow window in all dimensions self.grow_coeff = self.conf("grow_coeff") # coeff to shrink window in all dimensions self.shrink_coeff = self.conf("shrink_coeff") self.bindings = { "reload": self.reload_config, "maximize": self.maximize, "maxhor": lambda: self.maximize(by='X'), "maxvert": lambda: self.maximize(by='Y'), "x2": self.x2, "x4": self.quad, "quad": self.quad, "grow": self.grow, "shrink": self.shrink, "center": self.move_center, "revert_maximize": self.revert_maximize, "resize": self.resize, "tab-focus": self.focus_tab, "tab-move": self.move_tab, } def load_useless_gaps(self) -> None: """ Load useless gaps settings. """ try: self.useless_gaps = self.cfg.get("useless_gaps", { "w": 12, "a": 12, "s": 12, "d": 12 }) for field in ["w", "a", "s", "d"]: if self.useless_gaps[field] < 0: self.useless_gaps[field] = abs(self.useless_gaps[field]) except (KeyError, TypeError, AttributeError): self.useless_gaps = {"w": 0, "a": 0, "s": 0, "d": 0} def center_geom(self, win, change_geom: bool = False, degrade_coeff: float = 0.82): """ Move window to the center with geometry optional changing. Args: win: target window. change_geom (bool): predicate to change geom to the [degrade_coeff] of the screen space in both dimenhions. degrade_coeff (int): coefficient which denotes change geom of the target window. """ geom = {} center = {} if degrade_coeff > 1.0: degrade_coeff = 1.0 center['x'] = int(self.current_resolution['width'] / 2) center['y'] = int(self.current_resolution['height'] / 2) if not change_geom: geom['width'] = int(win.rect.width) geom['height'] = int(win.rect.height) else: geom['width'] = int( self.current_resolution['width'] * degrade_coeff ) geom['height'] = int( self.current_resolution['height'] * degrade_coeff ) geom['x'] = center['x'] - int(geom['width'] / 2) geom['y'] = center['y'] - int(geom['height'] / 2) return geom def move_center(self, resize: str) -> None: """ Move window to center. Args: resize (str): predicate which shows resize target window or not. """ focused = self.i3ipc.get_tree().find_focused() if resize in {"default", "none"}: geom = self.center_geom(focused) win_action.set_geom(focused, geom) elif resize in {"resize", "on", "yes"}: geom = self.center_geom(focused, change_geom=True) win_action.set_geom(focused, geom) else: return def get_prev_geom(self): """ Get previous window geometry. """ self.geom_list.append( { "id": self.current_win.id, "geom": self.save_geom() } ) return self.geom_list[-1]["geom"] @staticmethod def multiple_geom(win, coeff: float) -> Mapping[str, int]: """ Generic function to shrink/grow floating window geometry. Args: win: target window. coeff (float): generic coefficient which denotes grow/shrink geom of the target window. """ return { 'x': int(win.rect.x), 'y': int(win.rect.y), 'width': int(win.rect.width * coeff), 'height': int(win.rect.height * coeff), } def grow(self) -> None: """ Grow floating window geometry by [self.grow_coeff]. """ focused = self.i3ipc.get_tree().find_focused() geom = win_action.multiple_geom(focused, self.grow_coeff) win_action.set_geom(focused, geom) def shrink(self) -> None: """ Shrink floating window geometry by [self.shrink_coeff]. """ focused = self.i3ipc.get_tree().find_focused() geom = win_action.multiple_geom(focused, self.shrink_coeff) win_action.set_geom(focused, geom) def x2(self, mode: str) -> None: """ Move window to the 1st or 2nd half of the screen space with the given orientation. Args: mode (h1,h2,v1,v2): defines h1,h2,v1 or v2 half of screen space to move. """ curr_scr = self.current_resolution self.current_win = self.i3ipc.get_tree().find_focused() if self.x2_use_gaps: gaps = self.useless_gaps else: gaps = {"w": 0, "a": 0, "s": 0, "d": 0} half_width = int(curr_scr['width'] / 2) half_height = int(curr_scr['height'] / 2) double_dgaps = int(gaps['d'] * 2) double_sgaps = int(gaps['s'] * 2) if mode in {'h1', 'hup'}: geom = { 'x': gaps['a'], 'y': gaps['w'], 'width': curr_scr['width'] - double_dgaps, 'height': half_height - double_sgaps, } elif mode in {'h2', 'hdown'}: geom = { 'x': gaps['a'], 'y': half_height + gaps['w'], 'width': curr_scr['width'] - double_dgaps, 'height': half_height - double_sgaps, } elif mode in {'v1', 'vleft'}: geom = { 'x': gaps['a'], 'y': gaps['w'], 'width': half_width - double_dgaps, 'height': curr_scr['height'] - double_sgaps, } elif mode in {'v2', 'vright'}: geom = { 'x': gaps['a'] + half_width, 'y': gaps['w'], 'width': half_width - double_dgaps, 'height': curr_scr['height'] - double_sgaps, } else: return if self.current_win is not None: if not self.geom_list[-1]: self.get_prev_geom() elif self.geom_list[-1]: prev = self.geom_list[-1].get('id', {}) if prev != self.current_win.id: geom = self.get_prev_geom() win_action.set_geom(self.current_win, geom) def quad(self, mode: int) -> None: """ Move window to the 1,2,3,4 quad of 2D screen space Args: mode (1,2,3,4): defines 1,2,3 or 4 quad of screen space to move. """ try: mode = int(mode) except TypeError: print("cannot convert mode={mode} to int") return curr_scr = self.current_resolution self.current_win = self.i3ipc.get_tree().find_focused() if self.quad_use_gaps: gaps = self.useless_gaps else: gaps = {"w": 0, "a": 0, "s": 0, "d": 0} half_width = int(curr_scr['width'] / 2) half_height = int(curr_scr['height'] / 2) double_dgaps = int(gaps['d'] * 2) double_sgaps = int(gaps['s'] * 2) if mode == 1: geom = { 'x': gaps['a'], 'y': gaps['w'], 'width': half_width - double_dgaps, 'height': half_height - double_sgaps, } elif mode == 2: geom = { 'x': half_width + gaps['a'], 'y': gaps['w'], 'width': half_width - double_dgaps, 'height': half_height - double_sgaps, } elif mode == 3: geom = { 'x': gaps['a'], 'y': gaps['w'] + half_height, 'width': half_width - double_dgaps, 'height': half_height - double_sgaps, } elif mode == 4: geom = { 'x': gaps['a'] + half_width, 'y': gaps['w'] + half_height, 'width': half_width - double_dgaps, 'height': half_height - double_sgaps, } else: return if self.current_win is not None: if not self.geom_list[-1]: self.get_prev_geom() elif self.geom_list[-1]: prev = self.geom_list[-1].get('id', {}) if prev != self.current_win.id: geom = self.get_prev_geom() win_action.set_geom(self.current_win, geom) def maximize(self, by: str = 'XY') -> None: """ Maximize window by attribute. Args: by (str): maximize by X, Y or XY. """ geom = {} self.current_win = self.i3ipc.get_tree().find_focused() if self.current_win is not None: if not self.geom_list[-1]: geom = self.get_prev_geom() elif self.geom_list[-1]: prev = self.geom_list[-1].get('id', {}) if prev != self.current_win.id: geom = self.get_prev_geom() else: # do nothing return if by in {'XY', 'YX'}: max_geom = self.maximized_geom( geom.copy(), gaps={}, byX=True, byY=True ) elif by == 'X': max_geom = self.maximized_geom( geom.copy(), gaps={}, byX=True, byY=False ) elif by == 'Y': max_geom = self.maximized_geom( geom.copy(), gaps={}, byX=False, byY=True ) win_action.set_geom(self.current_win, max_geom) def revert_maximize(self) -> None: """ Revert changed window state. """ try: focused = self.i3ipc.get_tree().find_focused() if self.geom_list[-1].get("geom", {}): win_action.set_geom(focused, self.geom_list[-1]["geom"]) del self.geom_list[-1] except (KeyError, TypeError, AttributeError): pass def maximized_geom(self, geom: dict, gaps: dict, byX: bool = False, byY: bool = False) -> dict: """ Return maximized geom. Args: geom (dict): var to return maximized geometry. gaps (dict): dict to define useless gaps. byX (bool): maximize by X. byY (bool): maximize by Y. """ if gaps == {}: gaps = self.useless_gaps if byX: geom['x'] = 0 + gaps['a'] geom['width'] = self.current_resolution['width'] - gaps['d'] * 2 if byY: geom['y'] = 0 + gaps['w'] geom['height'] = self.current_resolution['height'] - gaps['s'] * 2 return geom @staticmethod def set_geom(win, geom: dict) -> dict: """ Generic function to set geometry. Args: win: target window to change windows. geom (dict): geometry. """ win.command(f"move absolute position {geom['x']} {geom['y']}") win.command(f"resize set {geom['width']} {geom['height']} px") @staticmethod def set_resize_params_single(direction, amount): """ Set resize parameters for the single window """ if direction == "natural": direction = "horizontal" elif direction == "orthogonal": direction = "vertical" if int(amount) < 0: mode = "plus" amount = -amount else: mode = "minus" return direction, mode, int(amount) @staticmethod def set_resize_params_multiple(direction, amount, vertical): """ Set resize parameters for the block of windows """ mode = "" if direction == "horizontal": direction = "width" elif direction == "vertical": direction = "height" elif direction == "natural": direction = "height" if vertical else "width" elif direction == "orthogonal": direction = "width" if vertical else "height" elif direction == "top": direction = "up" elif direction == "bottom": direction = "down" if int(amount) < 0: mode = "shrink" amount = -int(amount) else: mode = "grow" return direction, mode, int(amount) def resize(self, direction, amount): """ Resize the current container along to the given direction. If there is only a single container, resize by adjusting gaps. If the direction is "natural", resize vertically in a splitv container, else horizontally. If it is "orhtogonal", do the opposite. """ if direction not in [ "natural", "orthogonal", "horizontal", "vertical", "top", "bottom", "left", "right", ]: try: amount = int(amount) except ValueError: print("Bad resize amount given.") return node = self.i3ipc.get_tree().find_focused() single, vertical = True, False # Check if there is only a single leaf. # If not, check if the curent container is in a vertical split. while True: parent = node.parent if node.type == "workspace" or not parent: break elif parent.type == "floating_con": single = False break elif len(parent.nodes) > 1 and parent.layout == "splith": single = False break elif len(parent.nodes) > 1 and parent.layout == "splitv": single = False vertical = True break node = parent if single: direction, mode, amount = self.set_resize_params_single( direction, amount ) self.i3ipc.command(f"gaps {direction} current {mode} {amount}") else: direction, mode, amount = self.set_resize_params_multiple( direction, amount, vertical ) self.i3ipc.command( f"resize {mode} {direction} {amount} px or {amount//16} ppt" ) @staticmethod def create_geom_from_rect(rect) -> dict: """ Create geometry from the given rectangle. Args: rect: rect to extract geometry from. """ geom = {} geom['x'] = rect.x geom['y'] = rect.y geom['height'] = rect.height geom['width'] = rect.width return geom def save_geom(self, target_win=None) -> dict: """ Save geometry. Args: target_win: [optional] denotes target window. """ if target_win is None: target_win = self.current_win return win_action.create_geom_from_rect(target_win.rect) @staticmethod def focused_order(node): """Iterate through the children of "node" in most recently focused order. """ for focus_id in node.focus: return next(n for n in node.nodes if n.id == focus_id) @staticmethod def focused_child(node): """Return the most recently focused child of "node".""" return next(win_action.focused_order(node)) @staticmethod def is_in_line(old, new, direction): """ Return true if container "new" can reasonably be considered to be in direction "direction" of container "old". """ if direction in {"up", "down"}: return new.rect.x <= old.rect.x + old.rect.width*0.9 \ and new.rect.x + new.rect.width >= \ old.rect.x + old.rect.width * 0.1 if direction in {"left", "right"}: return new.rect.y <= old.rect.y + old.rect.height*0.9 \ and new.rect.y + new.rect.height >= \ old.rect.y + old.rect.height * 0.1 return None def output_in_direction(self, output, window, direction): """ Return the output in direction "direction" of window "window" on output "output". """ tree = self.i3ipc.get_tree().find_focused() for new in self.focused_order(tree): if new.name == "__i3": continue if not self.is_in_line(window, new, direction): continue orct = output.rect nrct = new.rect if (direction == "left" and nrct.x + nrct.width == orct.x) \ or (direction == "right" and nrct.x == orct.x + orct.width) \ or (direction == "up" and nrct.y + nrct.height == orct.y) \ or (direction == "down" and nrct.y == orct.y + orct.height): return new return None def focus_tab(self, direction): """ Cycle through the innermost stacked or tabbed ancestor container, or through floating containers. """ if direction == "next": delta = 1 elif direction == "prev": delta = -1 else: return tree = self.i3ipc.get_tree() node = tree.find_focused() # Find innermost tabbed or stacked container, or detect floating. while True: parent = node.parent if not parent or node.type != "con": return if parent.layout in {"tabbed", "stacked"} \ or parent.type == "floating_con": break node = parent if parent.type == "floating_con": node = parent parent = node.parent # The order of floating_nodes is not useful, sort it somehow. parent_nodes = sorted(parent.floating_nodes, key=lambda n: n.id) else: parent_nodes = parent.nodes index = parent_nodes.index(node) node = parent_nodes[(index + delta) % len(parent_nodes)] # Find most recently focused leaf in new tab. while node.nodes: node = self.focused_child(node) self.i3ipc.command(f'[con_id="{node.id}"] focus') def move_tab(self, direction): """ Move the innermost stacked or tabbed ancestor container. """ if direction == "next": delta = 1 elif direction == "prev": delta = -1 else: return node = self.i3ipc.get_tree().find_focused() # Find innermost tabbed or stacked container. while True: parent = node.parent if not parent or node.type != "con": return if parent.layout in ["tabbed", "stacked"]: break node = parent index = parent.nodes.index(node) if 0 <= index + delta < len(parent.nodes): other = parent.nodes[index + delta] self.i3ipc.command( f'[con_id="{node.id}"] swap container with con_id {other.id}' ) ==> win_history.py <== """ Advanced alt-tab module. This module allows you to focus previous window a-la "alt-tab" not by workspace but by window itself. To achieve that I am using self.window_history to store information about previous windows. We need this because previously selected window may be closed, and then you cannot focus it. """ from typing import Iterator from itertools import cycle from cfg import cfg from negewmh import NegEWMH from negi3mod import negi3mod class win_history(negi3mod, cfg): """ Advanced alt-tab class. """ def __init__(self, i3, loop=None) -> None: """ Init function Args: i3: i3ipc connection loop: asyncio loop. It's need to be given as parameter because of you need to bypass asyncio-loop to the thread """ # Initialize cfg. cfg.__init__(self, i3) # i3ipc connection, bypassed by negi3mods runner self.i3ipc = i3 # previous / current window list self.window_history = [] # depth of history list self.max_win_history = 4 # workspaces with auto alt-tab when close self.autoback = self.conf('autoback') self.bindings = { "switch": self.alt_tab, "reload": self.reload_config, "focus_next": self.focus_next, "focus_prev": self.focus_prev, "focus_next_visible": self.focus_next_visible, "focus_prev_visible": self.focus_prev_visible, } self.i3ipc.on('window::focus', self.on_window_focus) self.i3ipc.on('window::close', self.goto_nonempty_ws_on_close) def reload_config(self) -> None: """ Reloads config. Dummy. """ self.__init__(self.i3ipc) def alt_tab(self) -> None: """ Focus previous window. """ wids = set(w.id for w in self.i3ipc.get_tree().leaves()) for wid in self.window_history[1:]: if wid not in wids: self.window_history.remove(wid) else: self.i3ipc.command(f'[con_id={wid}] focus') return def on_window_focus(self, _, event) -> None: """ Store information about current / previous windows. Args: i3: i3ipc connection. event: i3ipc event. We can extract window from it using event.container. """ wid = event.container.id if wid in self.window_history: self.window_history.remove(wid) self.window_history.insert(0, wid) if len(self.window_history) > self.max_win_history: del self.window_history[self.max_win_history:] def get_windows_on_ws(self) -> Iterator: """ Get windows on the current workspace. """ return filter( lambda x: x.window, self.i3ipc.get_tree().find_focused().workspace().leaves() ) def goto_visible(self, reversed_order=False): """ Focus next visible window. Args: reversed_order(bool) : [optional] predicate to change order. """ wins = NegEWMH.find_visible_windows(self.get_windows_on_ws()) self.goto_win(wins, reversed_order) def goto_win(self, wins, reversed_order=False): if reversed_order: cycle_windows = cycle(reversed(wins)) else: cycle_windows = cycle(wins) for window in cycle_windows: if window.focused: focus_to = next(cycle_windows) self.i3ipc.command('[id="%d"] focus' % focus_to.window) break def goto_any(self, reversed_order: bool = False) -> None: """ Focus any next window. Args: reversed_order(bool) : [optional] predicate to change order. """ wins = self.i3ipc.get_tree().leaves() self.goto_win(wins, reversed_order) def focus_next(self) -> None: self.goto_any(reversed_order=False) def focus_prev(self) -> None: self.goto_any(reversed_order=True) def focus_next_visible(self) -> None: self.goto_visible(reversed_order=False) def focus_prev_visible(self) -> None: self.goto_visible(reversed_order=True) def goto_nonempty_ws_on_close(self, i3, _) -> None: """ Go back for temporary tags like pictures or media. This function make auto alt-tab for workspaces which should by temporary. This is good if you do not want to see empty workspace after switching to the media content workspace. Args: i3: i3ipc connection. event: i3ipc event. We can extract window from it using event.container. """ workspace = i3.get_tree().find_focused().workspace() focused_ws_name = workspace.name if not workspace.leaves(): for ws_substr in self.autoback: if focused_ws_name.endswith(ws_substr): self.alt_tab() return