digitaldemocratic/stress-tests/dd-tests-selenium.py

366 lines
12 KiB
Python

#!/usr/bin/env python3
import argparse
import io
import pathlib
import random
import traceback
from datetime import datetime
import yaml
from selenium import webdriver
# We need selenium 4
from selenium.webdriver.common.by import By
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options as ChromeOptions
from typing import Any, Dict, Iterable, List
STEP_TIMEOUT_SECONDS = 40
class DDSession:
driver: webdriver.Chrome
def __init__(
self,
instance: str,
users_file: str,
out_dir: pathlib.Path,
stepTimeoutSeconds: int = STEP_TIMEOUT_SECONDS,
):
self.instance = instance
self.users_file = [
[i.strip() for i in l.split(sep=",", maxsplit=1)]
for l in users_file.split("\n")
if l
]
self.out_dir = out_dir
self.stepTimeoutSeconds = stepTimeoutSeconds
if not self.out_dir.is_dir():
self.out_dir.mkdir()
self.start_time = datetime.utcnow()
self.last_time = self.start_time
self.screenshot_counter = 0
self.data: Dict[str, Any] = dict(
instance=self.instance, start_time=self.start_time, steps=[]
)
self.reset()
def persist(self) -> None:
yaml.dump(self.data, (self.out_dir / "session.yml").open("w"))
(self.out_dir / "session.html").open("w").write(self.summarise()["html"])
def summarise(self) -> Dict[str, Any]:
return DDSession.summarise_data(self.data)
@staticmethod
def summarise_data(data) -> Dict[str, Any]:
import copy
from operator import getitem
from itertools import groupby
def summarise_item(g: List[Dict[str, Any]]) -> Dict[str, Any]:
if "substeps" in g[0]:
items = [sum((st["step_t"] for st in s["substeps"])) for s in g]
else:
items = [st["step_t"] for st in g]
return {
"max": max(items),
"count": len(items),
"average": sum(items) / len(items),
}
def summarise(d: Iterable[Dict[str, Any]]) -> Dict[str, Any]:
dict_data = dict()
for k, g in groupby(
sorted(d, key=lambda x: getitem(x, "step")),
key=lambda x: getitem(x, "step"),
):
dict_data[k] = summarise_item(list(g))
return dict_data
summary: Dict[str, Any] = dict()
summary["overview"] = summarise(data["steps"])
d = copy.deepcopy(data["steps"])
substeps = [
dict(st, step=f"{step['step']}: {st['substep']}")
for step in data["steps"]
for st in step["substeps"]
]
summary["stepbystep"] = summarise(substeps)
html = """<html>
<head><style>img { margin-bottom: 60px; }</style></head>
<body>
"""
html += f"""
<h1>{data['instance']}</h1>
<h2>{data['start_time']}</h2>
<details><summary>Summary</summary>
<pre>{yaml.dump(summary)}
</pre>
</details>
"""
for s in data["steps"]:
html += f"""<h3>{s['step']}: {s['t']} s (+ {sum((st['step_t'] for st in s['substeps']))} s)</h3>"""
for st in s["substeps"]:
html += f"""<h4>{s['step']} - {st['substep']}: {st['t']} s (+ {st['step_t']} s)</h4>"""
html += f"""<img src="{st['png']}"/>"""
html += "</body></html>"
summary["html"] = html
return summary
@property
def executed_seconds(self) -> float:
return (datetime.utcnow() - self.start_time).total_seconds()
@property
def nextcloud_url(self) -> str:
return f"https://nextcloud.{self.instance}"
def reset(self, new_driver: bool = False) -> None:
# If needed:
# https://stackoverflow.com/a/72922584
if getattr(self, "driver", None) is None or new_driver:
if getattr(self, "driver", None) is not None:
self.driver.quit()
options = ChromeOptions()
for arg in [
"--headless",
"--no-sandbox",
"--disable-infobars",
"--disable-extensions",
"--disable-dev-shm-usage",
"window-size=1400,600",
]:
options.add_argument(arg)
self.driver = webdriver.Chrome(options=options)
msg = "Restart"
else:
self.driver.delete_all_cookies()
msg = "Cleaned cache"
self.screenshot("Browser", msg)
def perform_login(self) -> None:
# Wait until Keycloak is shown
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
lambda d: d.find_elements(By.ID, "kc-form-login")
)
self.screenshot("Login", "Request")
# Fill in form data for login
(username, password) = random.choice(self.users_file)
usernameField = self.driver.find_element(By.ID, "username")
usernameField.send_keys(username)
passwordField = self.driver.find_element(By.ID, "password")
passwordField.send_keys(password)
loginButton = self.driver.find_element(By.ID, "kc-login")
self.screenshot("Login", "Form filled")
loginButton.click()
self.screenshot("Login", "Form sent")
# TODO: wait for redirection middle step and session.screenshot it
def load_nextcloud_files(self) -> None:
# Start loading
self.driver.get(self.nextcloud_url)
# Wait until Nextcloud with Megamenu is shown or a login is requested
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
lambda d: (
d.find_elements(By.ID, "kc-form-login")
+ d.find_elements(By.ID, "menu-apps-icon")
+ d.find_elements(By.ID, "menu-apps-btn")
)
)
# Detect whether or not a login is needed
login_required = self.driver.find_elements(By.ID, "kc-form-login")
if login_required:
# Perform login
self.perform_login()
self.screenshot("Nextcloud", "Loading")
# Wait until Nextcloud files app has loaded
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
lambda d: d.find_elements(
By.CSS_SELECTOR, "#app-content-files td.selection"
)
)
self.screenshot("Nextcloud", "Loaded")
def open_file_in_onlyoffice(self, file_base_name: str) -> None:
# Get and click docx file
# This assumes we are on the nextcloud files app with a FILE_BASE file
window_count = len(self.driver.window_handles)
docxFileLabel = self.driver.find_element(By.PARTIAL_LINK_TEXT, file_base_name)
docxFileLabel.click()
# Switch to OnlyOffice window
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
EC.number_of_windows_to_be(window_count + 1)
)
child = self.driver.window_handles[window_count]
self.driver.switch_to.window(child)
self.screenshot("OnlyOffice", "Opening")
# Wait for OnlyOffice to start loading
oofCSS = ".app-onlyoffice #app iframe"
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
lambda d: d.find_elements(By.CSS_SELECTOR, oofCSS)
)
self.screenshot("OnlyOffice", "Loading 1")
# Switch to its iframe
oof = WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
lambda d: EC.frame_to_be_available_and_switch_to_it(
d.find_element(By.CSS_SELECTOR, oofCSS)
)(d)
)
self.screenshot("OnlyOffice", "Loading 2")
oofLoaded = lambda d: EC.element_to_be_clickable(
d.find_element(By.ID, "id-toolbar-btn-save")
)(d)
# Get the first loading screen
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
lambda d: oofLoaded(d)
or d.find_elements(By.CSS_SELECTOR, "#loading-mask div.loader-page")
)
self.screenshot("OnlyOffice", "Loading 3")
# Wait for OnlyOffice second loading phase
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds).until(
lambda d: oofLoaded(d)
or d.find_elements(By.CSS_SELECTOR, "div.asc-loadmask-title")
)
self.screenshot("OnlyOffice", "Loading 4")
# Wait a for final loading (save button clickable)
WebDriverWait(self.driver, timeout=self.stepTimeoutSeconds * 3).until(oofLoaded)
self.screenshot("OnlyOffice", "Loaded")
# Close OnlyOffice window
self.driver.close()
# And change back to the main window
self.driver.switch_to.window(self.driver.window_handles[window_count - 1])
self.screenshot("OnlyOffice", "Closed")
def screenshot(self, step: str, substep_txt: str) -> None:
# Get data
scr_now = datetime.utcnow()
scr_s = (scr_now - self.start_time).total_seconds()
step_t = (scr_now - self.last_time).total_seconds()
scr_id = self.screenshot_counter
# Upgrade values
self.last_time = scr_now
self.screenshot_counter += 1
# Constants
scr_fn = f"{scr_id:04d}.png"
scr_path = self.out_dir / scr_fn
# Get screenie
scr_img = self.driver.get_screenshot_as_png()
# Write it
open(scr_path, "wb").write(scr_img)
# Add to report
substep: Dict[str, Any] = dict(
id=scr_id, substep=substep_txt, step_t=step_t, t=scr_s, png=scr_fn
)
if self.data["steps"] and self.data["steps"][-1]["step"] == step:
self.data["steps"][-1]["substeps"].append(substep)
else:
self.data["steps"].append(dict(step=step, t=scr_s, substeps=[substep]))
def randomBool() -> bool:
return random.choice([True, False])
def main_test(
instance: str,
users_file: io.TextIOWrapper,
out_dir: pathlib.Path,
duration: int,
stepTimeoutSeconds: int = STEP_TIMEOUT_SECONDS,
) -> DDSession:
session = DDSession(
instance=instance,
users_file=users_file.read(),
out_dir=out_dir,
stepTimeoutSeconds=stepTimeoutSeconds,
)
first_run = True
while session.executed_seconds < duration:
# 50% chance of requiring login reseting cookies
# First run always requires login, skip reset
if not first_run and randomBool():
# Possibly driver too
# 25% chance of cleaning all cache and reopening the browser
session.reset(new_driver=randomBool())
first_run = False
try:
# Load nextcloud files
session.load_nextcloud_files()
# Open file in OnlyOffice
session.open_file_in_onlyoffice("template_1")
except Exception as ex:
session.screenshot("Exception", str(ex))
print(traceback.format_exc())
session.persist()
return session
def main_summary(filename: io.TextIOWrapper) -> None:
data = yaml.safe_load(filename)
summary = DDSession.summarise_data(data)
html_fn = pathlib.Path(filename.name).with_suffix(".html")
open(html_fn, "w").write(summary["html"])
if __name__ == "__main__":
import pathlib
parser = argparse.ArgumentParser(
prog="DD Selenium tester", description="Run basic UI tests on DD"
)
subparsers = parser.add_subparsers(required=True)
test_parser = subparsers.add_parser("test")
test_parser.add_argument("instance")
test_parser.add_argument(
"-u",
"--users-file",
default="dd-stress-test.users.csv",
type=argparse.FileType("r"),
)
test_parser.add_argument("-d", "--duration", default=300, type=int)
test_parser.add_argument("-o", "--out-dir", default="results", type=pathlib.Path)
summary_parser = subparsers.add_parser("summary")
summary_parser.add_argument(
"filename", default="session.yml", type=argparse.FileType("r")
)
ns = parser.parse_args()
if "instance" in ns:
main_test(**vars(ns))
else:
main_summary(**vars(ns))