chore!: Relocate commercial code downstream

This commit is contained in:
Nick Bolton
2024-10-01 14:30:24 +01:00
parent 2d732a4b9d
commit 6bb1bcad8c
174 changed files with 1334 additions and 4238 deletions

View File

@ -19,25 +19,17 @@ import lib.env as env
env.ensure_in_venv(__file__)
import os, sys, time, subprocess, argparse
import os, sys, argparse
import lib.windows as windows
import psutil # type: ignore
import lib.colors as colors
import lib.file_utils as file_utils
DEFAULT_BIN_NAME = "deskflowd"
DEFAULT_SERVICE_ID = "deskflow"
DEFAULT_BIN_NAME = "deskflow-daemon"
DEFAULT_SOURCE_DIR = os.path.join("build", "temp", "bin")
DEFAULT_TARGET_DIR = os.path.join("build", "bin")
SERVICE_NOT_RUNNING_ERROR = 2
ERROR_ACCESS_VIOLATION = 0xC0000005
IGNORE_PROCESSES = ["deskflow.exe"]
class Context:
def __init__(self, verbose):
self.verbose = verbose
def main():
"""Entry point for the script."""
@ -49,6 +41,8 @@ def main():
parser.add_argument("--source-dir", default=DEFAULT_SOURCE_DIR)
parser.add_argument("--target-dir", default=DEFAULT_TARGET_DIR)
parser.add_argument("--bin-name", default=DEFAULT_BIN_NAME)
parser.add_argument("--ignore-processes", nargs="+", default=IGNORE_PROCESSES)
parser.add_argument("--service-id", default=DEFAULT_SERVICE_ID)
parser.add_argument("--verbose", action="store_true")
args = parser.parse_args()
@ -59,15 +53,15 @@ def main():
)
sys.exit(1)
context = Context(args.verbose)
service = windows.WindowsService(__file__, args)
try:
if args.reinstall:
reinstall(context, args.source_dir, args.target_dir, args.bin_name)
service.reinstall()
elif args.stop:
stop(context, args.target_dir)
service.stop()
elif args.restart:
restart(context, args.source_dir, args.target_dir)
service.restart()
else:
print("No action specified", file=sys.stderr)
exit(1)
@ -78,135 +72,5 @@ def main():
input("Press enter to continue...")
def print_verbose(context, message):
if context.verbose:
print(message)
def ensure_admin():
if not windows.is_admin():
windows.run_elevated(__file__)
sys.exit()
def restart(context, source_dir, target_dir):
"""Stops the daemon service, copies files, and restarts the daemon service."""
ensure_admin()
stop(context, target_dir)
copy_files(source_dir, target_dir)
start()
def reinstall(context, source_dir, target_dir, bin_name):
"""Stops and uninstalls daemon service, copies files, and reinstalls the daemon service."""
ensure_admin()
stop(context, target_dir)
source_bin_path = f"{os.path.join(source_dir, bin_name)}.exe"
copy_files(source_dir, target_dir)
print("Removing old daemon service")
try:
subprocess.run([source_bin_path, "/uninstall"], shell=True, check=True)
except subprocess.CalledProcessError as e:
check_access_violation(e.returncode, source_bin_path)
if e.returncode != 0:
print(
f"{colors.WARNING_TEXT} Uninstall failed, return code: {e.returncode}",
file=sys.stderr,
)
target_bin_path = os.path.join(target_dir, bin_name + ".exe")
try:
print("Installing daemon service")
subprocess.run([target_bin_path, "/install"], shell=True, check=True)
except subprocess.CalledProcessError as e:
check_access_violation(e.returncode, target_bin_path)
if e.returncode != 0:
print(f"{colors.WARNING_TEXT} Install failed, return code: {e.returncode}")
def copy_files(source_dir, target_dir):
options = file_utils.CopyOptions(ignore_errors=True, verbose=False)
print(f"Copying files from {source_dir} to {target_dir}")
file_utils.copy(f"{source_dir}/*", target_dir, options)
def stop(context, target_dir):
ensure_admin()
print("Stopping daemon service")
try:
subprocess.run(["net", "stop", "deskflow"], shell=True, check=True)
except subprocess.CalledProcessError as e:
if e.returncode == SERVICE_NOT_RUNNING_ERROR:
print_verbose(context, "Daemon service not running")
else:
raise e
# Wait for Windows to release the file handles after process termination.
wait_for_stop(context, target_dir)
def start():
ensure_admin()
print("Starting daemon service")
subprocess.run(["net", "start", "deskflow"], shell=True, check=True)
def wait_for_stop(context, target_dir):
if is_any_process_running(context, target_dir):
print("Waiting for file handles to release...", end="", flush=True)
while is_any_process_running(context, target_dir):
if not context.verbose:
print(".", end="", flush=True)
time.sleep(1)
if not context.verbose:
print()
def check_access_violation(return_code, bin_path):
if return_code == ERROR_ACCESS_VIOLATION:
print(
f"{colors.WARNING_TEXT} Process crashed with memory access violation: {bin_path}",
file=sys.stderr,
)
def is_ignored_process(exe):
for ignore_process in IGNORE_PROCESSES:
if exe.endswith(ignore_process):
return True
return False
def is_any_process_running(context, dir):
"""Check if there is any running process that contains the given directory."""
print_verbose(context, f"Checking if any process is running in: {dir}")
for proc in psutil.process_iter(attrs=["name", "exe"]):
exe = proc.info["exe"]
if not exe:
print_verbose(context, f"Skipping process with no exe: {proc}")
continue
if is_ignored_process(exe):
print_verbose(context, f"Ignoring process: {exe}")
continue
try:
if dir.lower() in exe.lower():
print_verbose(context, f"Process found: {exe}")
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return False
main()
if __name__ == "__main__":
main()

View File

@ -131,7 +131,7 @@ def parse_args(is_ci):
def run(args):
env.ensure_dependencies()
env.ensure_in_venv(__file__, auto_create=True)
env.ensure_in_venv(__file__, create_venv=True)
if not args.skip_python:
env.install_requirements()
@ -280,7 +280,7 @@ class Dependencies:
cmd_utils.run(command, shell=True, print_cmd=True)
if env_vars_set:
print(f"To load env vars, run: source {mac.shell_rc}")
print(f"To load env vars, run: source {mac.SHELL_RC}")
def linux(self):
"""Installs dependencies on Linux."""

View File

@ -107,7 +107,7 @@ def get_env_bool(name, default=False):
return value.lower() in ["true", "1", "yes"]
def get_python_executable(binary="python"):
def get_venv_executable(binary="python"):
if sys.platform == "win32":
return os.path.join(VENV_DIR, "Scripts", binary)
else:
@ -119,7 +119,7 @@ def in_venv():
return sys.prefix != sys.base_prefix
def ensure_in_venv(script_file, auto_create=False):
def ensure_in_venv(script_file, create_venv=False):
"""
Ensures the script is running in a Python virtual environment (venv).
If the script is not running in a venv, it will create one and re-run the script in the venv.
@ -128,24 +128,27 @@ def ensure_in_venv(script_file, auto_create=False):
check_dependencies(raise_error=True)
import venv
if not in_venv():
if not os.path.exists(VENV_DIR):
if not auto_create:
print(
"The Python virtual environment (.venv) needs to be created before you can "
"run this script.\n"
"Please run: scripts/setup_venv.py"
)
sys.exit(1)
if in_venv():
print(f"Running in venv, executable: {sys.executable}", flush=True)
return
print(f"Creating virtual environment at {VENV_DIR}")
venv.create(VENV_DIR, with_pip=True)
if create_venv and not os.path.exists(VENV_DIR):
print(f"Creating virtual environment at {VENV_DIR}")
venv.create(VENV_DIR, with_pip=True)
if os.path.exists(VENV_DIR):
script_file_abs = os.path.abspath(script_file)
print(f"Using virtual environment for: {script_file_abs}", flush=True)
python_executable = get_python_executable()
python_executable = get_venv_executable()
result = subprocess.run([python_executable, script_file_abs] + sys.argv[1:])
sys.exit(result.returncode)
else:
print(
"The Python virtual environment (.venv) needs to be created before you can "
"run this script.\n"
"Please run: scripts/setup_venv.py"
)
sys.exit(1)
def install_requirements():
@ -240,18 +243,6 @@ def ensure_dependencies():
cmd_utils.run(f"{sudo} {install_cmd}".strip(), shell=True, print_cmd=True)
def get_app_version():
"""
Returns the version either from the env var, or from the version file.
"""
version = get_env("DESKFLOW_VERSION", required=False)
if version:
return version
with open("VERSION", "r") as f:
return f.read().strip()
def import_colors():
import lib.colors as colors

View File

@ -18,18 +18,14 @@ import lib.cmd_utils as cmd_utils
import lib.env as env
from enum import Enum, auto
BUILD_ROOT_DIR = "build"
class PackageType(Enum):
DISTRO = auto()
TGZ = auto()
dist_dir = "dist"
build_dir = "build"
package_name = "deskflow"
test_cmd = "deskflows --version"
def run_command(command, check=True):
has_sudo = cmd_utils.has_command("sudo")
@ -44,19 +40,31 @@ def run_command(command, check=True):
cmd_utils.run(command, check, shell=True, print_cmd=True)
def package(filename_base, package_type: PackageType, leave_test_installed=False):
def package(
filename_base,
dist_dir,
test_cmd,
package_name,
package_type: PackageType,
leave_test_installed=False,
):
working_dir = BUILD_ROOT_DIR
extension, cmd = get_package_build_info(
package_type,
)
run_package_cmd(cmd, working_dir)
extension, cmd = get_package_info(package_type)
run_package_cmd(cmd)
package_filename = get_package_filename(extension)
package_filename = get_package_filename(extension, working_dir)
target_file = f"{filename_base}.{extension}"
target_path = copy_to_dist_dir(package_filename, target_file)
target_path = copy_to_dist_dir(package_filename, dist_dir, target_file)
if package_type == PackageType.DISTRO:
test_install(target_path, remove_test=not leave_test_installed)
test_install(
target_path, package_name, test_cmd, remove_test=not leave_test_installed
)
def get_package_info(package_type: PackageType):
def get_package_build_info(package_type: PackageType):
command = None
cpack_generator = None
@ -93,32 +101,36 @@ def get_package_info(package_type: PackageType):
return file_extension, command
def run_package_cmd(command):
def run_package_cmd(command, working_dir):
package_user = env.get_env("LINUX_PACKAGE_USER", required=False)
if package_user:
cmd_utils.run(
["sudo", "chown", "-R", package_user, "build"], check=True, print_cmd=True
["sudo", "chown", "-R", package_user, working_dir],
check=True,
print_cmd=True,
)
command = ["sudo", "-u", package_user] + command
cwd = os.getcwd()
try:
os.chdir("build")
os.chdir(working_dir)
cmd_utils.run(command, check=True, print_cmd=True)
finally:
os.chdir(cwd)
def get_package_filename(extension):
files = glob.glob(f"build/*.{extension}")
def get_package_filename(extension, working_dir):
files = glob.glob(f"{working_dir}/*.{extension}")
if not files:
raise ValueError(f"No .{extension} file found in build directory")
raise ValueError(
f"No .{extension} file found in build directory: {working_dir}"
)
return files[0]
def copy_to_dist_dir(source_file, target_file):
def copy_to_dist_dir(source_file, dist_dir, target_file):
os.makedirs(dist_dir, exist_ok=True)
target_path = f"{dist_dir}/{target_file}"
@ -128,7 +140,7 @@ def copy_to_dist_dir(source_file, target_file):
return target_path
def test_install(package_file, remove_test=True):
def test_install(package_file, package_name, test_cmd, remove_test=True):
distro, distro_like, _distro_version = env.get_linux_distro()
if not distro_like:
@ -140,19 +152,19 @@ def test_install(package_file, remove_test=True):
if "debian" in distro_like:
install_base = ["apt", "install", "-f", "-y"]
remove_base = ["apt", "remove", "-y"]
list_cmd = ["dpkg", "-L", "deskflow"]
list_cmd = ["dpkg", "-L", package_name]
elif "fedora" in distro_like:
install_base = ["dnf", "install", "-y"]
remove_base = ["dnf", "remove", "-y"]
list_cmd = ["rpm", "-ql", "deskflow"]
list_cmd = ["rpm", "-ql", package_name]
elif "opensuse" in distro_like:
install_base = ["zypper", "--no-gpg-checks", "install", "-y"]
remove_base = ["zypper", "remove", "-y"]
list_cmd = ["rpm", "-ql", "deskflow"]
list_cmd = ["rpm", "-ql", package_name]
elif "arch" in distro_like:
install_base = ["pacman", "-U", "--noconfirm"]
remove_base = ["pacman", "-R", "--noconfirm"]
list_cmd = ["pacman", "-Ql", "deskflow"]
list_cmd = ["pacman", "-Ql", package_name]
else:
raise RuntimeError(f"Linux distro not yet supported: {distro}")

View File

@ -19,20 +19,17 @@ import lib.cmd_utils as cmd_utils
import lib.env as env
from lib.certificate import Certificate
cert_p12_env = "APPLE_P12_CERTIFICATE"
notary_user_env = "APPLE_NOTARY_USER"
codesign_env = "APPLE_CODESIGN_ID"
shell_rc = "~/.zshrc"
dist_dir = "dist"
product_name = "Deskflow"
settings_file = "res/dist/macos/dmgbuild/settings.py"
app_path = "build/bundle/Deskflow.app"
security_path = "/usr/bin/security"
sudo_path = "/usr/bin/sudo"
notarytool_path = "/usr/bin/notarytool"
codesign_path = "/usr/bin/codesign"
xcode_select_path = "/usr/bin/xcode-select"
keychain_path = "/Library/Keychains/System.keychain"
CERT_P12_ENV = "APPLE_P12_CERTIFICATE"
NOTARY_USER_ENV = "APPLE_NOTARY_USER"
CODESIGN_ENV = "APPLE_CODESIGN_ID"
SHELL_RC = "~/.zshrc"
SETTINGS_FILE = "res/dist/mac/dmgbuild/settings.py"
SECURITY_PATH = "/usr/bin/security"
SUDO_PATH = "/usr/bin/sudo"
NOTARYTOOL_PATH = "/usr/bin/notarytool"
CODESIGN_PATH = "/usr/bin/codesign"
XCODE_SELECT_PATH = "/usr/bin/xcode-select"
KEYCHAIN_PATH = "/Library/Keychains/System.keychain"
def set_env_var(name, value):
@ -42,7 +39,7 @@ def set_env_var(name, value):
Returns True if the variable was added, False if it already exists.
"""
text = f'export {name}="{value}:${name}"'
file = os.path.expanduser(shell_rc)
file = os.path.expanduser(SHELL_RC)
if os.path.exists(file):
with open(file, "r") as f:
if text in f.read():
@ -51,11 +48,11 @@ def set_env_var(name, value):
print(f"Setting environment variable: {name}={name}")
with open(file, "a") as f:
f.write(f"\n{text}\n")
print(f"Appended to {shell_rc}: {text}")
print(f"Appended to {SHELL_RC}: {text}")
return True
def package(filename_base):
def package(filename_base, source_dir, build_dir, dist_dir, product_name):
"""
Package the application for macOS.
The app bundle must be signed, or an error will occur:
@ -76,35 +73,40 @@ def package(filename_base):
install_certificate(cert_base64, cert_password)
else:
print(
f"Warning: Skipped certificate installation, env var {cert_p12_env} not set",
f"Warning: Skipped certificate installation, env var {CERT_P12_ENV} not set",
file=sys.stderr,
)
build_bundle()
bundle_source_dir = os.path.join(
build_dir, os.path.join("bundle", product_name + ".app")
)
build_bundle(bundle_source_dir)
if codesign_id:
sign_bundle(codesign_id)
sign_bundle(bundle_source_dir, codesign_id)
else:
print(
f"Warning: Skipped code signing, env var {codesign_env} not set",
f"Warning: Skipped code signing, env var {CODESIGN_ENV} not set",
file=sys.stderr,
)
dmg_path = build_dmg(filename_base)
dmg_path = build_dmg(
bundle_source_dir, filename_base, source_dir, dist_dir, product_name
)
if notary_user:
notarize_package(dmg_path, notary_user, notary_password, notary_team_id)
else:
print(
f"Warning: Skipped notarization, env var {notary_user_env} not set",
f"Warning: Skipped notarization, env var {NOTARY_USER_ENV} not set",
file=sys.stderr,
)
def package_env_vars():
codesign_id = env.get_env(codesign_env, required=False)
cert_base64 = env.get_env(cert_p12_env, required=False)
notary_user = env.get_env(notary_user_env, required=False)
codesign_id = env.get_env(CODESIGN_ENV, required=False)
cert_base64 = env.get_env(CERT_P12_ENV, required=False)
notary_user = env.get_env(NOTARY_USER_ENV, required=False)
if notary_user:
notary_password = env.get_env("APPLE_NOTARY_PASSWORD")
@ -128,11 +130,12 @@ def package_env_vars():
)
def build_bundle():
def build_bundle(bundle_source_dir):
# it's important to build a new bundle every time, so that we catch bugs with fresh builds.
if os.path.exists(app_path):
print(f"Bundle already exists, deleting: {app_path}")
shutil.rmtree(app_path)
if os.path.exists(bundle_source_dir):
print(f"Bundle already exists, deleting: {bundle_source_dir}")
shutil.rmtree(bundle_source_dir)
print("Building bundle...")
@ -140,20 +143,20 @@ def build_bundle():
cmd_utils.run("cmake --build build --target install", shell=True, print_cmd=True)
def sign_bundle(codesign_id):
print(f"Signing bundle {app_path}...")
def sign_bundle(bundle_source_dir, codesign_id):
print(f"Signing bundle {bundle_source_dir}...")
assert_certificate_installed(codesign_id)
cmd_utils.run(
[
codesign_path,
CODESIGN_PATH,
"-f",
"--options",
"runtime",
"--deep",
"-s",
codesign_id,
app_path,
bundle_source_dir,
]
)
@ -172,9 +175,12 @@ def assert_certificate_installed(codesign_id):
raise RuntimeError("Code signing certificate not installed or has expired")
def build_dmg(filename_base):
settings_file_abs = os.path.abspath(settings_file)
app_path_abs = os.path.abspath(app_path)
def build_dmg(bundle_source_dir, filename_base, source_dir, dist_dir, product_name):
settings_path = (
SETTINGS_FILE if source_dir is None else os.path.join(source_dir, SETTINGS_FILE)
)
settings_path_abs = os.path.abspath(settings_path)
app_path_abs = os.path.abspath(bundle_source_dir)
# cwd for dmgbuild, since setting the dmg filename to a path (include the dist dir) seems to
# make the dmg disappear and never writes to the specified path. the dmgbuild module also
@ -191,7 +197,7 @@ def build_dmg(filename_base):
dmgbuild.build_dmg(
dmg_filename,
product_name,
settings_file=settings_file_abs,
settings_file=settings_path_abs,
defines={
"app": app_path_abs,
},
@ -216,18 +222,18 @@ def install_certificate(cert_base64, cert_password):
# WARNING: contains private key password, never print this command
cmd_utils.run(
[
sudo_path,
security_path,
SUDO_PATH,
SECURITY_PATH,
"import",
cert_path,
"-k",
keychain_path,
KEYCHAIN_PATH,
"-P",
cert_password,
"-T",
codesign_path,
CODESIGN_PATH,
"-T",
security_path,
SECURITY_PATH,
],
)
@ -241,7 +247,7 @@ def notarize_package(dmg_path, user, password, team_id):
def get_xcode_path():
result = cmd_utils.run(
[xcode_select_path, "-p"], get_output=True, shell=False, print_cmd=True
[XCODE_SELECT_PATH, "-p"], get_output=True, shell=False, print_cmd=True
)
return result.stdout.strip()
@ -255,7 +261,7 @@ class NotaryTool:
self.xcode_path = get_xcode_path()
def get_path(self):
return f"{self.xcode_path}{notarytool_path}"
return f"{self.xcode_path}{NOTARYTOOL_PATH}"
def store_credentials(self, user, password, team_id):
print("Storing credentials for notary tool...")

View File

@ -13,16 +13,22 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, sys
import lib.cmd_utils as cmd_utils
import lib.env as env
import os
build_dir = "build/meson"
meson_bin = env.get_python_executable("meson")
def meson_venv_bin():
if not env.in_venv():
raise RuntimeError("Not in a virtual environment")
return os.path.join(os.path.dirname(sys.executable), "meson")
def setup(no_system_list, static_list):
cmd = [meson_bin, "setup", build_dir]
cmd = [meson_venv_bin(), "setup", build_dir]
# TODO: These special Windows exceptions should probably be in Meson
# or somewhere other than this script, as it's a bit hacky.
@ -72,11 +78,11 @@ def static_subproject(subproject):
def compile():
cmd_utils.run([meson_bin, "compile", "-C", build_dir], print_cmd=True)
cmd_utils.run([meson_venv_bin(), "compile", "-C", build_dir], print_cmd=True)
def install():
cmd = [meson_bin, "install", "-C", build_dir]
cmd = [meson_venv_bin(), "install", "-C", build_dir]
has_sudo = cmd_utils.has_command("sudo")
if has_sudo:

View File

@ -13,9 +13,8 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import os, sys
import lib.cmd_utils as cmd_utils
import lib.env as env
import glob
@ -43,7 +42,7 @@ class Qt:
print(f"Skipping Qt, already installed at: {self.dir_pattern}")
return
args = [env.get_python_executable(), "-m", "aqt", "install-qt"]
args = [sys.executable, "-m", "aqt", "install-qt"]
args.extend(["--outputdir", self.base_dir])
args.extend(["--base", self.mirror_url])
args.extend([self.os_name, "desktop", str(self.version), self.compiler])

View File

@ -13,21 +13,22 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import ctypes, sys, os, shutil
import ctypes, sys, os, shutil, time, subprocess
import xml.etree.ElementTree as ET
import lib.cmd_utils as cmd_utils
import lib.env as env
import psutil # type: ignore
from lib.certificate import Certificate
import lib.colors as colors
import lib.file_utils as file_utils
LOCK_FILE = "tmp/elevated.lock"
MSBUILD_CMD = "msbuild"
SIGNTOOL_CMD = "signtool"
CERTUTIL_CMD = "certutil"
RUNNER_TEMP_ENV = "RUNNER_TEMP"
DIST_DIR = "dist"
BUILD_DIR = "build"
WIX_FILE = f"{BUILD_DIR}/installer/Deskflow.sln"
MSI_FILE = f"{BUILD_DIR}/installer/bin/Release/Deskflow.msi"
SERVICE_NOT_RUNNING_ERROR = 2
ERROR_ACCESS_VIOLATION = 0xC0000005
def run_elevated(script, args=None, use_sys_argv=True, wait_for_exit=False):
@ -100,17 +101,19 @@ def set_env_var(name, value):
cmd_utils.run(["setx", name, new_value], check=True, shell=True, print_cmd=True)
def package(filename_base):
def package(filename_base, build_dir, dist_dir):
cert_env_key = "WINDOWS_PFX_CERTIFICATE"
cert_base64 = env.get_env(cert_env_key, required=False)
packager = WindowsPackager(filename_base, build_dir, dist_dir)
if cert_base64:
cert_password = env.get_env("WINDOWS_PFX_PASSWORD")
sign_binaries(cert_base64, cert_password)
packager.sign_binaries(cert_base64, cert_password)
build_msi(filename_base)
packager.build_msi()
if cert_base64:
sign_msi(filename_base, cert_base64, cert_password)
packager.sign_msi(cert_base64, cert_password)
else:
print(f"Skipped code signing, env var not set: {cert_env_key}")
@ -124,43 +127,6 @@ def assert_vs_cmd(cmd):
)
def build_msi(filename_base):
print("Building MSI installer...")
configuration = "Release"
platform = "x64"
assert_vs_cmd(MSBUILD_CMD)
cmd_utils.run(
[
MSBUILD_CMD,
WIX_FILE,
f"/p:Configuration={configuration}",
f"/p:Platform={platform}",
],
shell=True,
print_cmd=True,
)
path = get_package_path(filename_base)
print(f"Copying MSI installer to {DIST_DIR}")
os.makedirs(DIST_DIR, exist_ok=True)
shutil.copy(MSI_FILE, path)
def get_package_path(filename_base):
return f"{DIST_DIR}/{filename_base}.msi"
def sign_binaries(cert_base64, cert_password):
exe_pattern = f"{BUILD_DIR}/bin/*.exe"
run_codesign(exe_pattern, cert_base64, cert_password)
def sign_msi(filename_base, cert_base64, cert_password):
path = get_package_path(filename_base)
run_codesign(path, cert_base64, cert_password)
def run_codesign(path, cert_base64, cert_password):
time_server = "http://timestamp.digicert.com"
hashing_algorithm = "SHA256"
@ -187,6 +153,49 @@ def run_codesign(path, cert_base64, cert_password):
)
class WindowsPackager:
def __init__(self, filename_base, build_dir, dist_dir):
self.filename_base = filename_base
self.build_dir = build_dir
self.dist_dir = dist_dir
self.wix_file = f"{build_dir}/installer/Installer.sln"
self.msi_file = f"{build_dir}/installer/bin/Release/Installer.msi"
def build_msi(self):
print("Building MSI installer...")
configuration = "Release"
platform = "x64"
assert_vs_cmd(MSBUILD_CMD)
cmd_utils.run(
[
MSBUILD_CMD,
self.wix_file,
f"/p:Configuration={configuration}",
f"/p:Platform={platform}",
],
shell=True,
print_cmd=True,
)
path = self.get_package_path()
print(f"Copying MSI installer to {self.dist_dir}")
os.makedirs(self.dist_dir, exist_ok=True)
shutil.copy(self.msi_file, path)
def get_package_path(self):
return f"{self.dist_dir}/{self.filename_base}.msi"
def sign_binaries(self, cert_base64, cert_password):
exe_pattern = f"{self.build_dir}/bin/*.exe"
run_codesign(exe_pattern, cert_base64, cert_password)
def sign_msi(self, cert_base64, cert_password):
path = self.get_package_path()
run_codesign(path, cert_base64, cert_password)
class WindowsChoco:
"""Chocolatey for Windows."""
@ -215,3 +224,136 @@ class WindowsChoco:
file=sys.stderr,
)
sys.exit(1)
class WindowsService:
def __init__(self, script, args):
self.script = script
self.verbose = args.verbose
self.bin_name = args.bin_name
self.source_dir = os.path.abspath(args.source_dir)
self.target_dir = os.path.abspath(args.target_dir)
self.service_id = args.service_id
self.ignore_processes = args.ignore_processes
def print_verbose(self, message):
if self.verbose:
print(message)
def ensure_admin(self):
if not is_admin():
run_elevated(self.script)
sys.exit()
def restart(self):
"""Stops the daemon service, copies files, and restarts the daemon service."""
self.ensure_admin()
self.stop()
self.copy_files()
self.start()
def reinstall(self):
"""Stops and uninstalls daemon service, copies files, and reinstalls the daemon service."""
self.ensure_admin()
self.stop()
source_bin_path = f"{os.path.join(self.source_dir, self.bin_name)}.exe"
self.copy_files()
print("Removing old daemon service")
try:
subprocess.run([source_bin_path, "/uninstall"], shell=True, check=True)
except subprocess.CalledProcessError as e:
self.check_access_violation(e.returncode, source_bin_path)
if e.returncode != 0:
print(
f"{colors.WARNING_TEXT} Uninstall failed, return code: {e.returncode}",
file=sys.stderr,
)
target_bin_path = os.path.join(self.target_dir, self.bin_name + ".exe")
try:
print("Installing daemon service")
subprocess.run([target_bin_path, "/install"], shell=True, check=True)
except subprocess.CalledProcessError as e:
self.check_access_violation(e.returncode, target_bin_path)
if e.returncode != 0:
print(
f"{colors.WARNING_TEXT} Install failed, return code: {e.returncode}"
)
def copy_files(self):
options = file_utils.CopyOptions(ignore_errors=True, verbose=False)
print(f"Copying files from {self.source_dir} to {self.target_dir}")
file_utils.copy(f"{self.source_dir}/*", self.target_dir, options)
def stop(self):
self.ensure_admin()
print("Stopping daemon service")
try:
subprocess.run(["net", "stop", self.service_id], shell=True, check=True)
except subprocess.CalledProcessError as e:
if e.returncode == SERVICE_NOT_RUNNING_ERROR:
self.print_verbose("Daemon service not running")
else:
raise e
# Wait for Windows to release the file handles after process termination.
self.wait_for_stop()
def start(self):
self.ensure_admin()
print("Starting daemon service")
subprocess.run(["net", "start", self.service_id], shell=True, check=True)
def wait_for_stop(self):
if self.is_any_process_running(self.target_dir):
print("Waiting for file handles to release...", end="", flush=True)
while self.is_any_process_running(self.target_dir):
if not self.verbose:
print(".", end="", flush=True)
time.sleep(1)
if not self.verbose:
print()
def check_access_violation(self, return_code, bin_path):
if return_code == ERROR_ACCESS_VIOLATION:
print(
f"{colors.WARNING_TEXT} Process crashed with memory access violation: {bin_path}",
file=sys.stderr,
)
def is_ignored_process(self, exe):
for ignore_process in self.ignore_processes:
if exe.endswith(ignore_process):
return True
return False
def is_any_process_running(self, dir):
"""Check if there is any running process that contains the given directory."""
self.print_verbose(f"Checking if any process is running in: {dir}")
for proc in psutil.process_iter(attrs=["name", "exe"]):
exe = proc.info["exe"]
if not exe:
self.print_verbose(f"Skipping process with no exe: {proc}")
continue
if self.is_ignored_process(exe):
self.print_verbose(f"Ignoring process: {exe}")
continue
try:
if dir.lower() in exe.lower():
self.print_verbose(f"Process found: {exe}")
return True
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass
return False

View File

@ -24,8 +24,14 @@ import platform
from lib.linux import PackageType
from dotenv import load_dotenv # type: ignore
env_file = ".env"
default_package_prefix = "deskflow"
ENV_FILE = ".env"
DEFAULT_PRODUCT_NAME = "Deskflow"
DEFAULT_FILENAME_BASE = "deskflow"
DEFAULT_PROJECT_BUILD_DIR = "build"
DEFAULT_DIST_DIR = "dist"
DEFAULT_TEST_CMD = "deskflow-server --version"
DEFAULT_PACKAGE_NAME = "deskflow"
VERSION_FILE = "VERSION"
def main():
@ -37,28 +43,69 @@ def main():
)
args = parser.parse_args()
load_dotenv(dotenv_path=env_file)
load_dotenv(dotenv_path=ENV_FILE)
version = env.get_app_version()
filename_base = get_filename_base(version)
package(
DEFAULT_FILENAME_BASE,
get_app_version(VERSION_FILE),
DEFAULT_PROJECT_BUILD_DIR,
DEFAULT_DIST_DIR,
DEFAULT_TEST_CMD,
DEFAULT_PRODUCT_NAME,
DEFAULT_PACKAGE_NAME,
leave_test_installed=args.leave_test_installed,
)
def get_app_version(filename):
"""
Returns the version either from the env var, or from the version file.
"""
version = env.get_env("DESKFLOW_VERSION", required=False)
if version:
return version
with open(filename, "r") as f:
return f.read().strip()
def package(
filename_prefix,
version,
project_build_dir,
dist_dir,
test_cmd,
product_name,
package_name,
source_dir=None,
leave_test_installed=False,
):
filename_base = get_filename_base(version, filename_prefix)
print(f"Package filename base: {filename_base}")
if env.is_windows():
windows_package(filename_base)
windows_package(filename_base, project_build_dir, dist_dir)
elif env.is_mac():
mac_package(filename_base)
mac_package(
filename_base, source_dir, project_build_dir, dist_dir, product_name
)
elif env.is_linux():
linux_package(filename_base, version, args.leave_test_installed)
linux_package(
filename_base,
filename_prefix,
dist_dir,
test_cmd,
package_name,
version,
leave_test_installed,
)
else:
raise RuntimeError(f"Unsupported platform: {env.get_os()}")
def get_filename_base(version, use_linux_distro=True):
def get_filename_base(version, prefix, use_linux_distro=True):
os = env.get_os()
machine = platform.machine().lower()
package_base = env.get_env(
"DESKFLOW_PACKAGE_PREFIX", default=default_package_prefix
)
os_part = os
if os == "linux" and use_linux_distro:
@ -85,31 +132,54 @@ def get_filename_base(version, use_linux_distro=True):
# Underscore is used to delimit different parts of the filename (e.g. version, OS, etc).
# Dashes are used to delimit spaces, e.g. "debian-trixie" for "Debian Trixie".
return f"{package_base}_{version}_{os_part}_{machine}"
return f"{prefix}_{version}_{os_part}_{machine}"
def windows_package(filename_base):
def windows_package(filename_base, project_build_dir, dist_dir):
import lib.windows as windows
windows.package(filename_base)
windows.package(filename_base, project_build_dir, dist_dir)
def mac_package(filename_base):
def mac_package(filename_base, source_dir, project_build_dir, dist_dir, product_name):
import lib.mac as mac
mac.package(filename_base)
mac.package(filename_base, source_dir, project_build_dir, dist_dir, product_name)
def linux_package(filename_base, version, leave_test_installed):
def linux_package(
filename_base,
filename_prefix,
dist_dir,
test_cmd,
package_name,
version,
leave_test_installed,
):
import lib.linux as linux
extra_packages = env.get_env_bool("LINUX_EXTRA_PACKAGES", False)
linux.package(filename_base, PackageType.DISTRO, leave_test_installed)
linux.package(
filename_base,
dist_dir,
test_cmd,
package_name,
PackageType.DISTRO,
leave_test_installed,
)
if extra_packages:
filename_base = get_filename_base(version, use_linux_distro=False)
linux.package(filename_base, PackageType.TGZ)
filename_base = get_filename_base(
version, filename_base, use_linux_distro=False
)
linux.package(
filename_prefix,
dist_dir,
test_cmd,
package_name,
PackageType.TGZ,
)
if __name__ == "__main__":

View File

@ -17,7 +17,7 @@
import lib.env as env
env.ensure_in_venv(__file__, auto_create=True)
env.ensure_in_venv(__file__, create_venv=True)
env.install_requirements()
import lib.colors as colors