#!/usr/bin/env python3 import argparse import io import pathlib import random import traceback from datetime import datetime import yaml from selenium import webdriver # We need selenium 4 from selenium.webdriver.common.by import By from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options as ChromeOptions from typing import Any, Dict, Iterable, List class DDSession: driver: webdriver.Chrome def __init__( self, instance: str, users_file: str, out_dir: pathlib.Path, stepTimeoutSeconds: int = 20, ): self.instance = instance self.users_file = [ [i.strip() for i in l.split(sep=",", maxsplit=1)] for l in users_file.split("\n") if l ] self.out_dir = out_dir self.stepTimeoutSeconds = stepTimeoutSeconds if not self.out_dir.is_dir(): self.out_dir.mkdir() self.start_time = datetime.utcnow() self.last_time = self.start_time self.screenshot_counter = 0 self.data: Dict[str, Any] = dict( instance=self.instance, start_time=self.start_time, steps=[] ) self.reset() def persist(self) -> None: yaml.dump(self.data, (self.out_dir / "session.yml").open("w")) (self.out_dir / "session.html").open("w").write(self.summarise()["html"]) def summarise(self) -> Dict[str, Any]: return DDSession.summarise_data(self.data) @staticmethod def summarise_data(data) -> Dict[str, Any]: import copy from operator import getitem from itertools import groupby def summarise_item(g: List[Dict[str, Any]]) -> Dict[str, Any]: if "substeps" in g[0]: items = [sum((st["step_t"] for st in s["substeps"])) for s in g] else: items = [st["step_t"] for st in g] return { "max": max(items), "count": len(items), "average": sum(items) / len(items), } def summarise(d: Iterable[Dict[str, Any]]) -> Dict[str, Any]: dict_data = dict() for k, g in groupby( sorted(d, key=lambda x: getitem(x, "step")), key=lambda x: getitem(x, "step"), ): dict_data[k] = summarise_item(list(g)) return dict_data summary: Dict[str, Any] = dict() summary["overview"] = summarise(data["steps"]) d = copy.deepcopy(data["steps"]) substeps = [ dict(st, step=f"{step['step']}: {st['substep']}") for step in data["steps"] for st in step["substeps"] ] summary["stepbystep"] = summarise(substeps) html = """ """ html += f"""

{data['instance']}

{data['start_time']}

Summary
{yaml.dump(summary)}
""" for s in data["steps"]: html += f"""

{s['step']}: {s['t']} s (+ {sum((st['step_t'] for st in s['substeps']))} s)

""" for st in s["substeps"]: html += f"""

{s['step']} - {st['substep']}: {st['t']} s (+ {st['step_t']} s)

""" html += f"""""" html += "" summary["html"] = html return summary @property def executed_seconds(self) -> float: return (datetime.utcnow() - self.start_time).total_seconds() @property def nextcloud_url(self) -> str: return f"https://nextcloud.{self.instance}" def reset(self, new_driver: bool = False) -> None: # If needed: # https://stackoverflow.com/a/72922584 if getattr(self, "driver", None) is None or new_driver: if getattr(self, "driver", None) is not None: self.driver.quit() options = ChromeOptions() for arg in [ "--headless", "--no-sandbox", "--disable-infobars", "--disable-extensions", "--disable-dev-shm-usage", "window-size=1400,600", ]: options.add_argument(arg) self.driver = webdriver.Chrome(options=options) msg = "Restart" else: self.driver.delete_all_cookies() msg = "Cleaned cache" self.screenshot("Browser", msg) def perform_login(self) -> None: # Wait until Keycloak is shown WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until( lambda d: d.find_elements(By.ID, "kc-form-login") ) self.screenshot("Login", "Request") # Fill in form data for login (username, password) = random.choice(self.users_file) usernameField = self.driver.find_element(By.ID, "username") usernameField.send_keys(username) passwordField = self.driver.find_element(By.ID, "password") passwordField.send_keys(password) loginButton = self.driver.find_element(By.ID, "kc-login") self.screenshot("Login", "Form filled") loginButton.click() self.screenshot("Login", "Form sent") # TODO: wait for redirection middle step and session.screenshot it def load_nextcloud_files(self) -> None: # Start loading self.driver.get(self.nextcloud_url) # Wait until Nextcloud with Megamenu is shown or a login is requested WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until( lambda d: ( d.find_elements(By.ID, "kc-form-login") + d.find_elements(By.ID, "menu-apps-icon") ) ) # Detect whether or not a login is needed login_required = self.driver.find_elements(By.ID, "kc-form-login") if login_required: # Perform login self.perform_login() self.screenshot("Nextcloud", "Loading") # Wait until Nextcloud files app has loaded WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until( lambda d: d.find_elements( By.CSS_SELECTOR, "#app-content-files td.selection" ) ) self.screenshot("Nextcloud", "Loaded") def open_file_in_onlyoffice(self, file_base_name: str) -> None: # Get and click docx file # This assumes we are on the nextcloud files app with a FILE_BASE file window_count = len(self.driver.window_handles) docxFileLabel = self.driver.find_element(By.PARTIAL_LINK_TEXT, file_base_name) docxFileLabel.click() # Switch to OnlyOffice window WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until( EC.number_of_windows_to_be(window_count + 1) ) child = self.driver.window_handles[window_count] self.driver.switch_to.window(child) self.screenshot("OnlyOffice", "Opening") # Wait for OnlyOffice to start loading oofCSS = "div.app-onlyoffice #app iframe" WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until( lambda d: d.find_elements(By.CSS_SELECTOR, oofCSS) ) self.screenshot("OnlyOffice", "Loading 1") # Switch to its iframe oof = WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until( lambda d: EC.frame_to_be_available_and_switch_to_it( d.find_element(By.CSS_SELECTOR, oofCSS) )(d) ) self.screenshot("OnlyOffice", "Loading 2") oofLoaded = lambda d: EC.element_to_be_clickable( d.find_element(By.ID, "id-toolbar-btn-save") )(d) # Get the first loading screen WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until( lambda d: oofLoaded(d) or d.find_elements(By.CSS_SELECTOR, "#loading-mask div.loader-page") ) self.screenshot("OnlyOffice", "Loading 3") # Wait for OnlyOffice second loading phase WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until( lambda d: oofLoaded(d) or d.find_elements(By.CSS_SELECTOR, "div.asc-loadmask-title") ) self.screenshot("OnlyOffice", "Loading 4") # Wait a for final loading (save button clickable) WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds * 3).until(oofLoaded) self.screenshot("OnlyOffice", "Loaded") # Close OnlyOffice window self.driver.close() # And change back to the main window self.driver.switch_to.window(self.driver.window_handles[window_count - 1]) self.screenshot("OnlyOffice", "Closed") def screenshot(self, step: str, substep_txt: str) -> None: # Get data scr_now = datetime.utcnow() scr_s = (scr_now - self.start_time).total_seconds() step_t = (scr_now - self.last_time).total_seconds() scr_id = self.screenshot_counter # Upgrade values self.last_time = scr_now self.screenshot_counter += 1 # Constants scr_fn = f"{scr_id:04d}.png" scr_path = self.out_dir / scr_fn # Get screenie scr_img = self.driver.get_screenshot_as_png() # Write it open(scr_path, "wb").write(scr_img) # Add to report substep: Dict[str, Any] = dict( id=scr_id, substep=substep_txt, step_t=step_t, t=scr_s, png=scr_fn ) if self.data["steps"] and self.data["steps"][-1]["step"] == step: self.data["steps"][-1]["substeps"].append(substep) else: self.data["steps"].append(dict(step=step, t=scr_s, substeps=[substep])) def randomBool() -> bool: return random.choice([True, False]) def main_test( instance: str, users_file: io.TextIOWrapper, out_dir: pathlib.Path, duration: int, stepTimeoutSeconds: int = 20, ) -> DDSession: session = DDSession( instance=instance, users_file=users_file.read(), out_dir=out_dir, stepTimeoutSeconds=20, ) first_run = True while session.executed_seconds < duration: # 50% chance of requiring login reseting cookies # First run always requires login, skip reset if not first_run and randomBool(): # Possibly driver too # 25% chance of cleaning all cache and reopening the browser session.reset(new_driver=randomBool()) first_run = False try: # Load nextcloud files session.load_nextcloud_files() # Open file in OnlyOffice session.open_file_in_onlyoffice("template_1") except Exception as ex: session.screenshot("Exception", str(ex)) print(traceback.format_exc()) session.persist() return session def main_summary(filename: io.TextIOWrapper) -> None: data = yaml.safe_load(filename) summary = DDSession.summarise_data(data) html_fn = pathlib.Path(filename.name).with_suffix(".html") open(html_fn, "w").write(summary["html"]) if __name__ == "__main__": import pathlib parser = argparse.ArgumentParser( prog="DD Selenium tester", description="Run basic UI tests on DD" ) subparsers = parser.add_subparsers(required=True) test_parser = subparsers.add_parser("test") test_parser.add_argument("instance") test_parser.add_argument( "-u", "--users-file", default="dd-stress-test.users.csv", type=argparse.FileType("r"), ) test_parser.add_argument("-d", "--duration", default=300, type=int) test_parser.add_argument("-o", "--out-dir", default="results", type=pathlib.Path) summary_parser = subparsers.add_parser("summary") summary_parser.add_argument( "filename", default="session.yml", type=argparse.FileType("r") ) ns = parser.parse_args() if "instance" in ns: main_test(**vars(ns)) else: main_summary(**vars(ns))