import logging import requests from typing import Dict, Union class Instance: def __init__(self, instance_dict : Dict): """If obfuscate, reject_media or reject_reports are not specified default to False""" self.severity: str = "suspend" self.obfuscate: bool = False self.reject_media: bool = False self.reject_reports: bool = False self.id: Union[int, None] = None self.domain: str = "" self.private_comment: str = "" self.public_comment: str = "" self.parse_block(instance_dict) def __str__(self): return f"{self.domain}: {self.severity}" def __eq__(self, other): return self.domain == other.domain and self.severity == other.severity and self.reject_media == other.reject_media and self.reject_reports == other.reject_reports and self.obfuscate == other.obfuscate def status_str(self): return f"{self.severity}\nReject reports: {self.reject_reports}\nReject media: {self.reject_media}\nObfuscate: {self.obfuscate}" def as_dict(self, private=False): keys = ["domain", "severity", "public_comment", "obfuscate", "reject_media", "reject_reports"] if private: keys.append("private_comment") exportable = {} for key in keys: exportable[key] = getattr(self, key) return exportable def parse_block(self, instance_dict): # this specifies possible properties and default values if not found on the remote source. If a default is None # the value is required and the parse will fail properties_and_defaults = [("domain", None), ("severity", "suspend"), ("public_comment", ""), ("private_comment", ""), ("obfuscate", False), ("reject_media", False), ("reject_reports", False)] for key, default in properties_and_defaults: try: setattr(self, key, instance_dict[key]) except KeyError: if default is not None: setattr(self, key, default) else: raise KeyError(f"The key {key} was not in the instance_dict response.") def apply(self, server, token, block_id=None): """Applies instance block on the remote server""" headers = { f'Authorization': f'Bearer {token}', } # As long as we generate this enside of apply we cannot properly test for the correct format data = {"domain": self.domain, "severity": self.severity, "reject_media": str(self.reject_media).lower(), "reject_reports": str(self.reject_reports).lower(), "private_comment": str(self.private_comment).lower(), "public_comment": self.public_comment, "obfuscate": str(self.obfuscate).lower()} """If no id is given add a new block, else update the existing block""" if block_id is None: response = requests.post(f'https://{server}/api/v1/admin/domain_blocks', data=data, headers=headers) else: response = requests.put(f'https://{server}/api/v1/admin/domain_blocks/{block_id}', data=data, headers=headers) if response.status_code != 200: raise ConnectionError(f"Could not apply block for {self.domain} ({response.status_code}: {response.reason})") def delete(self, server: str, token: str): """Deletes the instance from the blocklist on the remote server""" headers = { f'Authorization': f'Bearer {token}', } response = requests.delete(f'https://{server}/api/v1/admin/domain_blocks/{self.id}', headers=headers) if response.status_code != 200: raise ConnectionError(f"Could not apply block ({response.status_code}: {response.reason})") @staticmethod def list_diffs(local_blocklist, remote_blocklist): """Compares the local and remote blocklist and returns a list of differences""" diffs = [] for local_instance in local_blocklist: instance_found = False for idx, remote_instance in enumerate(remote_blocklist): if local_instance.domain == remote_instance.domain: instance_found = True if local_instance == remote_instance: pass else: """If the local block is different from the remote block, add it to the diff""" diffs.append({"local": local_instance, "remote": remote_instance}) """Remove the remote instance from the list so we later have a list of remote instances we don't have locally""" del remote_blocklist[idx] """If the local instance is not in the remote blocklist, add it to the diff""" if not instance_found: diffs.append({"local": local_instance, "remote": None}) for remote_instance in remote_blocklist: diffs.append({"local": None, "remote": remote_instance}) return diffs @staticmethod def apply_blocks_from_diff(diffs, server, token, no_delete: bool): """Uses a diff (list of difference in local an remote instance) to apply instance blocks""" for diff in diffs: if diff["local"] is None: if not no_delete: """Delete the block on the remote server""" diff['remote'].delete(server, token) logging.info(f"Deleted {diff['remote'].domain} from blocklist") elif diff["remote"] is None: """Add the block on the remote server""" diff["local"].apply(server, token) logging.info(f"Added {diff['local'].domain} to blocklist") else: """Update the block on the remote server""" diff["local"].apply(server, token, block_id=diff["remote"].id) logging.info(f"Updated {diff['local'].domain} in blocklist") @staticmethod def show_diffs(local_blocklist, remote_blocklist): """Shows a table in the CLI comparing the local and remote blocklist""" from rich.table import Table from rich.console import Console table = Table(title="Differences", expand=True, show_lines=True) table.add_column("Domain", style="cyan") table.add_column("Local status", style="green") table.add_column("Current remote status", style="magenta") diffs = Instance.list_diffs(local_blocklist, remote_blocklist) for diff in diffs: if diff["local"] is None: table.add_row(diff["remote"].domain, None, diff["remote"].status_str()) elif diff["remote"] is None: table.add_row(diff["local"].domain, diff["local"].status_str(), None) else: table.add_row(diff["local"].domain, diff["local"].status_str(), diff["remote"].status_str()) console = Console() console.print(table)