Edit file File name : agent.py Content :from agent_util import execute_command, which from datetime import datetime, timedelta from inspector import Inspector, get_fqdn, get_server_name from ipc_client import DEMClient, IPCClient from agent_exceptions import NoAgentSectionHeaderException, NoManifestFileException from forticlient_helper import ForticlientHelper, calculate_customer_key from process_manager import ProcessManager from os.path import basename, exists, isdir, isfile, join from pickle_database import PickleDatabase from plugin_manager import PluginManager from pprint import pprint from progress_printer import ProgressPrinter from result_queue import ResultQueue from schedule import Schedule from sys import exit # In case of Python3 try: import StringIO except: import io as StringIO import aggregator import calendar import container_discovery import csv import difflib import display import fcntl import p_importlib import logging import logging.handlers import optparse import os import random import re import subprocess import sys import tempfile import time import traceback import types from blacklister import PluginBlacklister try: import six except: # Legacy support for Python 2.4 class Six: PY2 = True six = Six() # In case of python 3 try: import ConfigParser as configparser except: import configparser try: import json except ImportError: try: import simplejson as json # it's possible that we may not need json for the action that we're taking. # for example, for the rpm post install script, on a python version that # doesn't have json, we'll get this far in the code. but the post # install doesn't use json, so we're fine except ImportError: json = None # Import a SHA function, either from hashlib for newer Python's or sha for older try: import hashlib sha_func = hashlib.sha1 except: import sha sha_func = sha.new # Backport subprocess.check_output for Python versions < 2.7 # Adapted from http://stackoverflow.com/questions/4814970/subprocess-check-output-doesnt-seem-to-exist-python-2-6-5 if "check_output" not in dir(subprocess): # duck punch it in! def f(*popenargs, **kwargs): if "stdout" in kwargs: raise ValueError("stdout argument not allowed, it will be overridden.") process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) output, unused_err = process.communicate() retcode = process.poll() return output subprocess.check_output = f try: # Python 2.x import urlparse except: import urllib.parse as urlparse import urllib.request as urlrequest import ssl try: # Python 2.x import httplib except ImportError: import http.client as httplib DEFAULT_MEDIATOR_URL = "https://global.fortimonitor.com" AGENT_INSTALL_BLOCK = "/usr/local/FortiMonitor/agent/data/fm_agent_install_block" class ExceptionMediator(Exception): pass def defer_installation(timeout): """ Return wheter the installation should be delayed or not. An installation is delayed if there is a timestamp and our current time is less than that value. """ if timeout and timeout > time.time(): return True return False def get_next_wait_period(timestamp, waited): """ Calculate the next wait period until the installation can be attempted again, is the last timestamp plus the last seconds we waited times 2, until a maximum of 12 hours have been reached. """ if not waited: waited = 30 if not timestamp: timestamp = time.time() next_wait_secs = min(waited * 2, 43200) return timestamp + next_wait_secs, next_wait_secs def load_failed_tries_file(): """ Load the file that contains the time to wait until trying the next install, and the amount of seconds we have waited so far. """ if not os.path.exists(AGENT_INSTALL_BLOCK): return None, None timestamp, seconds = 0.0, 0 with open(AGENT_INSTALL_BLOCK) as opened: data = opened.read() timestamp, seconds = data.split(";") timestamp = float(timestamp) seconds = int(seconds) return timestamp, seconds def save_failed_tries_file(timestamp, seconds): """ Save a file containing the next time the install is allowed to proceed and the seconds we are waiting for that timestamp. """ with open(AGENT_INSTALL_BLOCK, "w+") as opened: opened.write(f"{timestamp};{seconds}") def get_regional_agg_url(customer_key): """ Pull out the URL for the customer from the global mediator api to use instead of the default. Args: customer_key: identifier for the customer to pull a single regional CP url. Returns: regional_agg_url: URL for the aggregator that the customer should use. """ if os.path.exists("/etc/fm_mediator_url"): with open("/etc/fm_mediator_url", "rb") as opened: mediator_url = opened.read().decode() mediator_url = mediator_url.strip("\n") else: mediator_url = DEFAULT_MEDIATOR_URL uri = "/aggregator_url/{}".format(customer_key) if mediator_url.startswith("https://"): base_url = mediator_url.split("https://")[-1] elif mediator_url.startswith("http://"): base_url = mediator_url.split("http://")[-1] else: base_url = mediator_url aggregator_url = None connection = httplib.HTTPSConnection( host=base_url, timeout=10, context=ssl._create_unverified_context() ) connection.request("GET", uri) resp = connection.getresponse() if resp.status == 200: aggregator_url = resp.read().decode("utf-8") else: raise ExceptionMediator( f"Failed to grab agg url using customer key {customer_key}. {resp.status}" ) return aggregator_url class Agent(object): CUSTOM = "custom" DEFAULT_LOG_LEVEL = "INFO" MAX_IMPORT_FILES = 20 def safe_to_start_agent(self, timeout=2, sleep_time=10, counter=3): "Check to see if it's safe to start up the agent" # Safe if there are no other instances running if not os.path.exists(self.pid_file): self.log.debug("No existing PID file found, proceeding to run") return True # There's an existing PID file, so let's see if it's still active try: pid, timestamp = open(self.pid_file).read().strip().split(":") pid = int(pid) timestamp = int(timestamp) except: # If we couldn't read it, assume that the other instance just exited - should be safe to continue self.log.critical( "Error reading existing PID file: %s" % traceback.format_exc() ) return True # See if the process is still running try: os.getpgid(pid) except OSError: # It's exited, safe to proceed return True try: import pwd username = pwd.getpwuid(os.stat("/proc/%d" % pid).st_uid)[0] psout = execute_command("ps -o cmd= %d" % pid)[1].lower() if ( username != self.user or "python" not in psout or ("%s_agent" % self.brand) not in psout ): self.remove_pid_file() return True except: pass # Process is running, see how old it is if timeout and (time.time() - timestamp) / 60.0 > timeout: self.log.critical("Found stale agent process %s - killing" % pid) # Other process is too old, kill it off and start a new one os.kill(pid, 9) return True # Checking if the process is to uninstall, in which case, kill the running process. parser = optparse.OptionParser() options, args = self.parse_arguments(parser) if options.uninstall and self.user != "root": self.log.critical( "Uninstalling. Killing all process from the username %s " % self.user ) try: # We could get an exception if an uninstall is happening, and the agent user is removed. manager = ProcessManager() pids = manager.filter_non_pid_process(os.listdir("/proc")) pids = manager.get_process_from_user(pids, self.user) self.log.critical("Found pids %s " % " ".join(pids)) for pid in pids: os.kill(int(pid), 9) except: pass return True # Other process should still be running, we bail for now if counter != 0: self.current_agent_delay += 10 counter -= 1 self.log.critical( "Found existing agent process %s, sleeping for %s and checking %s more times if safe to start." % (pid, sleep_time, counter) ) time.sleep(sleep_time) return self.safe_to_start_agent(timeout, counter=counter) else: self.log.critical( "Found existing agent process %s, exiting to wait for it to finish" % pid ) return False def write_pid_file(self): "Create a new PID file to track our instance" pid = os.getpid() now = int(time.time()) f = open(self.pid_file, "w") f.write("%s:%s" % (pid, now)) def remove_pid_file(self): "Remove an old PID file to clean up on the way out" # Need to check to see if it exists to avoid a problem on package uninstall if os.path.exists(self.pid_file): os.remove(self.pid_file) # removes the agent from the system def uninstall(self, aggregator_client, remove_instance=False): indent = 1 pp = ProgressPrinter("Notifying %s of uninstall" % self.brand, indent=indent) success = aggregator_client.notify_of_uninstall(remove_instance) if success: pp.finish() else: pp.finish("ERROR CONNECTING") # Remove logging and DB directories. We'll leave CUSTOM_PLUGIN_DIR in tact in case they're # uninstalling and reinstalling. pp = ProgressPrinter("Removing %r directory" % self.log_dir, indent=indent) os.system("rm -rf %s %s %s" % (self.db_dir, self.log_dir, self.config_dir)) pp.finish() indent = 1 ProgressPrinter("\n\nUninstalling %s\n" % self.pkg_dir, section=True) pp.finish() print(("\nUninstall of %s complete\n" % self.pkg_dir)) def get_manifest(self): """ Get the manifest configuration if it exists. Also, throw a deprecation warning if the the manifest does not conform to the new-style format (It must have an [agent] section heading). """ manifest = configparser.ConfigParser() try: manifest_file = manifest.read(self.manifest_file) if not manifest_file: raise NoManifestFileException("No manifest file found") if not manifest.has_section("agent"): raise NoAgentSectionHeaderException( "Using a manifest file without the section heading " '"[agent]" is deprecated; please add this heading to ' "the file. Example:" """ [agent] customer_key = customerkey server_group = 123 """ ) except (configparser.MissingSectionHeaderError, NoAgentSectionHeaderException): self.log.warn(str(traceback.format_exc())) if sys.version_info[0] == 3: amended_manifest_file = StringIO.StringIO( "[agent]\n" + open(self.manifest_file, "r").read() ) else: amended_manifest_file = StringIO.StringIO( "[agent]\n" + open(self.manifest_file, "r").read().decode("utf-8") ) manifest.readfp(amended_manifest_file) except NoManifestFileException: self.log.info(str(traceback.format_exc())) return manifest def write_config(self, manifest): """ Create/update the config file with the settings from the manifest. Return the config. """ new_config = configparser.ConfigParser() # Get the existing config file (if it exists) for creating a diff. See # below. old_config_lines = None if os.path.exists(self.config_file): self.log.info("Existing config file found") old_config_file = open(self.config_file, "rb") old_config_lines = old_config_file.readlines() old_config_file.close() # Copy old config settings into the new config old_config = configparser.ConfigParser() old_config.read(self.config_file) new_config = self.copy_config_settings(old_config, new_config) # Copy the manifest settings into the new config new_config = self.copy_config_settings(manifest, new_config) # Ensure the required settings are set. if not new_config.has_section("agent"): new_config.add_section("agent") if not new_config.has_option("agent", "aggregator_url"): new_config.set("agent", "aggregator_url", self.agg_url) new_config.set("agent", "version", self.version) if "plugin_blacklist" in new_config.options("agent"): original_plugins = new_config.get("agent", "plugin_blacklist") else: original_plugins = [] updated_plugins = self._blacklister.update_list(original_plugins) if updated_plugins: new_config.set("agent", "plugin_blacklist", updated_plugins) proxies = urlrequest.getproxies() if not new_config.has_section("agent_proxy") and proxies: agg_url = new_config.get("agent", "aggregator_url") try: agg_url_option = urlparse.urlparse(agg_url) if agg_url_option.scheme: agg_hostname = agg_url_option.hostname else: agg_hostname = agg_url_option.path if not urlrequest.proxy_bypass(agg_hostname): new_config.add_section("agent_proxy") for key in ["https", "http"]: p_url = proxies.get(key, None) if p_url is not None: new_config.set("agent_proxy", key, p_url.strip("/")) except: err = sys.exc_info()[1] error = str(err) self.log.error("Install proxy error: {}".format(error)) new_config_file = open(self.config_file, "w") new_config.write(new_config_file) new_config_file.close() os.system("chmod 640 %s" % self.config_file) if old_config_lines is not None: # Create a diff of the old config vs new config. differ = difflib.Differ() diff_lines = differ.compare( old_config_lines, open(self.config_file, "r").readlines() ) diff_lines = list(diff_lines) changes = [ line for line in diff_lines if line.startswith("+ ") or line.startswith("- ") ] if len(changes): self.log.info("Config file overwritten") self.log.debug("Config diff:\n%s", "".join(diff_lines)) else: self.log.info("No change to config file") else: self.log.info("Created new config file: %s", self.config_file) return new_config def copy_config_settings(self, original, destination): """ Copy settings from the original to the destination, overwriting destination's settings if they already exist. """ for section in original.sections(): if not destination.has_section(section): destination.add_section(section) for option, value in original.items(section): destination.set(section, option, value) return destination def install(self, agg_url, version, server_key, customer_key, force=False): self.log.info("Begining installation") block_found, waited_for = load_failed_tries_file() if defer_installation(block_found): until = datetime.fromtimestamp(block_found or time.time()) self.log.error( f"Agent installation block found at {AGENT_INSTALL_BLOCK}. Preventing install until {until}." ) sys.exit(-3) if self.is_installed and force is False: print("Agent already installed") self.log.info("Agent already installed") return # Make dirs for logging, DB, config, and plugins. dirs = (self.log_dir, self.db_dir, self.config_dir, self.custom_plugin_dir) os.system("mkdir -p %s %s %s %s" % dirs) self.log.info("Created directories: %s %s %s %s" % dirs) # Create a new config from the manifest (if it exists). manifest = self.get_manifest() config = self.write_config(manifest) proxy_config = {} if config.has_section("agent_proxy"): proxy_config = config["agent_proxy"] aggregator_client = aggregator.Client( agg_url, version, server_key, customer_key, proxy_config=proxy_config ) agent_settings = dict( (option, value.strip("'\"")) for option, value in config.items("agent") ) if config.has_section("agent_proxy"): aggregator_client.proxy_config = config["agent_proxy"] pp = ProgressPrinter("\nHandshaking with %s servers" % self.brand, indent=1) # Check for a custom aggregator URL, and set it in the client if present handshake_type = agent_settings.get("handshake_type", "standard").lower() if handshake_type != "forticlient": try: agg_url = config.get("agent", "aggregator_url") print( ( "Using manifest file aggregator for initial handshake: %s" % agg_url ) ) self.log.info( "Using manifest file aggregator for initial handshake: %s" % agg_url ) aggregator_client.agg_url = agg_url except: pass elif handshake_type == "forticlient": # If we are a FortiSase install, pull the regional aggregator url using the calculated customer key, # and overwrite the config of the agent with it. try: handshake_data = self.get_fortisase_attributes() agent_settings["forticlient_metadata"] = handshake_data # Calculate the expected customer key from ems_serial and environment ems_serial = handshake_data["ems_serial"] environment = handshake_data["forticlient_environment"] expected_customer_key = calculate_customer_key(ems_serial, environment) agent_settings["customer_key"] = expected_customer_key agg_url = get_regional_agg_url(expected_customer_key) logging.info( f"Overwriting agg url with {agg_url} for customer key {expected_customer_key}" ) aggregator_client.agg_url = agg_url agent_settings["aggregator_url"] = agg_url # Save the agg url on the config. config.set("agent", "aggregator_url", agg_url) config.write(open(self.config_file, "w")) if os.path.exists(AGENT_INSTALL_BLOCK): # Remove the installation caching block file. os.remove(AGENT_INSTALL_BLOCK) except Exception as err: block_until, seconds = get_next_wait_period(block_found, waited_for) save_failed_tries_file(block_until, seconds) self.log.exception( f"Mediator error grabbing agg url {err}. Blocking agent installation for {seconds}." ) sys.exit(-3) else: raise ValueError( f"Unexpected handshake type {handshake_type}. Aborting handshake" ) if config.has_section("attributes"): server_attributes = dict( (option, value) for option, value in config.items("attributes") ) else: server_attributes = {} try: success, server_key, found_server, error, log_level = ( aggregator_client.handshake( self.get_all_ips(), agent_settings, server_attributes ) ) except: print( "\n\nThere was an error in the initial handshake with the aggregator, please" ) print( "check your aggregator URL, and ensure you have connectivity to retrieve:\n" ) for url in agg_url.split(","): print((" %s\n" % os.path.join(url, "v2/hello"))) self.log.error("Error in initial handshake: %s" % traceback.format_exc()) sys.exit() if not server_key or not found_server: print("Handshake failed: %s" % error) self.log.error("Handshake failed: %s" % error) sys.exit() self.log.debug( "%s, %s, %s, %s, %s" % (success, server_key, found_server, error, log_level) ) if log_level: self.db["log_level"] = log_level # Install remote countermeasures plugins, if specfied if ( "enable_countermeasures" in config.options("agent") and config.get("agent", "enable_countermeasures").lower() == "true" and "countermeasures_remote_plugins" in config.options("agent") ): for url in config.get("agent", "countermeasures_remote_plugins").split(","): cmd = "%s %s/countermeasure.py install_plugins --url %s" % ( sys.executable, self.bin_dir, url.strip(), ) os.system(cmd) if success: pp.finish() else: self.log.critical("Installation failed:\n%s", error) pp.finish("ERROR CONNECTING: %s" % error) if success and server_key: config.set("agent", "server_key", server_key) config.write(open(self.config_file, "w")) if found_server: print( ( """Installation of %s complete. Your server will now sync automatically with the %s ControlPanel. """ % (self.pkg_dir, self.brand) ) ) self.log.info("Agent will automatically sync with aggregator") else: padding = int(80 / 2 - (len(server_key) / 2)) server_key = (" " * padding) + server_key print( ( """ Installation of %s complete. Please copy and paste the following server key into the %s ControlPanel for your server: %s""" % (self.pkg_dir, self.brand, server_key) ) ) self.log.warn( "The server key must be manually entered into the " "Control Panel before agent will begin syncing" ) else: print( ( """ Installation of %s had an error (%s). The %s is installed but it cannot sync correctly. Please contact %s and send them the log file at %s """ % (self.pkg_dir, error, self.pkg_dir, self.brand, self.log_file) ) ) self.log.critical("Aggregator sync failed:\n%s", error) self.migrate_config() # This is used for collecting all the IPs associated with the machine, to be # passed to the aggregator through aggregator.Client.handshake(). The # aggregator will then check all of these IPs in sequence to try to find a # matching server. def get_all_ips(self): ips = [] ifconfig_path = which("ifconfig") ifconfig_cmd = ifconfig_path # special logig for solaris b/c if the way ifconfig syntax is changed if "sunos" in sys.platform or "aix" in sys.platform: ifconfig_cmd = ifconfig_path + " -a" if "hp-ux" in sys.platform: netstat = which("netstat") if netstat: code, output = execute_command("%s -in" % netstat) if code == 0: for l in output.split("\n"): if l.lower().startswith("name") or not l or l == "": continue line = l.split() ips.append(line[3]) elif ifconfig_path and "hp-ux" not in sys.platform: code, output = execute_command(ifconfig_cmd) if code == 0: if sys.platform in ("freebsd", "darwin"): ips = re.findall(r"inet6? (.+?)\s", output) ips = [ip.strip().split("%")[0] for ip in ips] else: ips = [ x.strip("addr:") for x in re.findall(r"inet6? (.+?)\s", output) ] else: ip_addr_path = which("ip") code, output = execute_command("%s addr show" % ip_addr_path) ips = [x for x in re.findall(r"inet6? (.+?)\s", output)] ips = [x for x in ips if x] # Remove any stray whitespace and CIDR notation from IPv6 addresses # AIX reports an inet6 address like '::1%1/64' - account for that. ips = [ip.strip().split("/")[0].split("%")[0] for ip in ips] if "1" in ips: ips[ips.index("1")] = "::1" # If that didn't work, get the IP address by making an outbound # connection with the aggregator. if not ips: self.log.warning( "Unable to retrieve IP address(es) locally, contacting aggregator" ) aggregator_client = aggregator.Client( self.agg_url, self.version, proxy_config=self.proxy_config ) try: ips = [aggregator_client.get_local_ip()] except Exception as e: self.log.error("IP address lookup failure: {}".format(e)) ips = [] if not ips: self.log.error("Unable to determine IP address(es)") else: self.log.debug("IP addresses: %s", ips) return ips def get_old_style_config_properties(self, manfile): # Return with no error if the manifest file doesn't exist if not os.path.exists(manfile): return {} try: mf = open(manfile).read().strip().split("\n") return dict([list(map(str.strip, line.split("="))) for line in mf]) except: print("Error reading manifest file") return {} def _open_file(self, fname, mode="r+"): ofile = open(fname, mode) # Acquire lock locked = True for i in range(10): try: fcntl.flock(ofile, fcntl.LOCK_EX | fcntl.LOCK_NB) locked = False break except: time.sleep(1.0) if locked: self.log.exception("Could not acquire lock on %s" % fname) ofile.close() return None return ofile def get_metric_values(self): if exists(self.report_file): # Read current metrics csvfile = self._open_file(self.report_file) if not csvfile: return {} unique_values = {} try: csv_reader = csv.reader(csvfile) for textkey, value, timestamp in csv_reader: timestamp = datetime.strptime(timestamp, "%Y-%m-%d %H:%M:%S") value = float(value) unique_values[ "%s:%s" % (textkey, timestamp.strftime("%Y%m%d%H%M")) ] = [textkey, value, time.mktime(timestamp.timetuple())] except: self.log.error("Unable to parse custom metric file") unique_values = {} unique_values = list(unique_values.values()) unique_values.sort(key=lambda v: v[2]) custom_values = {} for textkey, value, timestamp in unique_values: if textkey not in custom_values: custom_values[textkey] = [[value, timestamp]] else: custom_values[textkey].append([value, timestamp]) # Remove all synced metrics csvfile.seek(0) csvfile.truncate() # Release lock fcntl.flock(csvfile, fcntl.LOCK_UN) csvfile.close() return custom_values else: return {} def get_registered_metrics(self): if exists(self.register_file): # Read current metrics csvfile = self._open_file(self.register_file) if not csvfile: return {} csvreader = csv.reader(csvfile) try: metrics = dict([(row[0], row[1]) for row in csvreader]) except Exception: self.log.exception("Error reading custom metric register file") metrics = {} # Remove all synced metrics csvfile.seek(0) csvfile.truncate() # Release lock fcntl.flock(csvfile, fcntl.LOCK_UN) csvfile.close() return metrics else: return {} def get_existing_metrics(self): existing_tkeys = [] for sr_id, schedule in list(self.db["schedules"].items()): tkey = "%s.%s" % (schedule.plugin_textkey, schedule.resource_textkey) if tkey not in existing_tkeys: existing_tkeys.append(tkey) return existing_tkeys def ignore_metric(self, plugin_textkey, resource_textkey): if plugin_textkey == "com.pnp-hcl.dominostats": if resource_textkey.startswith("Mem.PID."): return True return False def process_imports(self, config): req_top_keys = ["plugin_textkey", "plugin_category_name"] req_metric_keys = ["textkey", "value", "unit", "timestamp"] req_incident_keys = ["textkey", "description", "action", "timestamp"] existing_metrics = self.get_existing_metrics() self.log.info("Processing incoming import files") new_metrics = {} new_values = {} custom_incidents = [] import_dirs = [self.custom_import_dir] additional_dirs = ( config.has_option("agent", "metric_incoming_directory") and config.get("agent", "metric_incoming_directory") or None ) if additional_dirs: import_dirs.extend(additional_dirs.split(",")) max_files = self.MAX_IMPORT_FILES max_override = ( config.has_option("agent", "max_incoming_files_override") and config.get("agent", "max_incoming_files_override") or None ) if max_override: max_files = int(max_override) files = [] for directory in import_dirs: if not isdir(directory): continue if len(files) >= max_files: break self.log.info("Looking in %s", directory) for f in os.listdir(directory): if len(files) >= max_files: break if isfile(join(directory, f)): files.append(join(directory, f)) for full_path in files: fname = basename(full_path) # Check if we can delete this file when we're done if not os.access(full_path, os.W_OK): self.log.error("Can not delete %s so will not process.", full_path) continue f = open(full_path, "r+") try: self.log.info("Processing %s", full_path) j = json.loads(f.read()) f.close() for req in req_top_keys: if req not in list(j.keys()): logging.error( "Can not process file %s! Missing required key: %s", fname, req, ) # TODO: Log full file here? continue metrics = j.get("metrics", []) for m in metrics: for req in req_metric_keys: if req not in list(m.keys()): logging.error( "Can not process metric! Missing required key: %s", req ) pprint(m) continue if self.ignore_metric(j["plugin_textkey"], m["textkey"]): continue try: try: unix_timestamp = int(m["timestamp"]) except Exception: timestamp = datetime.strptime( m["timestamp"], "%Y-%m-%d %H:%M:%S" ) unix_timestamp = calendar.timegm(timestamp.timetuple()) except Exception: self.log.error( "Could not process timestamp %s for metric %s", m["timestamp"], m["textkey"], ) continue new_value = (m["value"], unix_timestamp) tkey = "%s.%s" % (j["plugin_textkey"], m["textkey"]) if tkey not in existing_metrics: if tkey in new_metrics: new_metrics[tkey].setdefault("first_values", []).append( new_value ) else: label = m.get("label", None) if label is None: label = m["textkey"] new_metrics[tkey] = { "plugin_textkey": j["plugin_textkey"], "plugin_name": j["plugin_category_name"], "resource_textkey": m["textkey"], "label": label, "unit": m.get("unit", None), "first_values": [new_value], } else: new_values.setdefault(tkey, []).append(new_value) incidents = j.get("incidents", []) for incident in incidents: for req in req_incident_keys: if req not in list(incident.keys()): logging.error( "Can not process incident! Missing required key: %s", req, ) pprint(incident) continue try: try: unix_timestamp = int(incident["timestamp"]) except Exception: timestamp = datetime.strptime( incident["timestamp"], "%Y-%m-%d %H:%M:%S" ) unix_timestamp = calendar.timegm(timestamp.timetuple()) except Exception: self.log.error( "Could not process timestamp %s for incident %s", incident["timestamp"], incident["textkey"], ) continue obj = { "plugin_textkey": j["plugin_textkey"], "resource_textkey": incident["textkey"], "timestamp": unix_timestamp, "description": incident["description"], "action": incident["action"], } if "match_key" in incident: obj["match_key"] = incident["match_key"] if "metadata" in incident: obj["metadata"] = incident["metadata"] custom_incidents.append(obj) # All done with this file, delete it os.remove(full_path) except Exception: if f.closed: f = open(full_path, f.mode) self.log.error("Error processing %s:", fname) # TODO: Can this be debug instead? f.seek(0) self.log.info(f.read()) self.log.error(traceback.format_exc()) self.log.error("Deleting file") f.close() os.remove(full_path) continue return new_metrics, new_values, custom_incidents def get_update_config(self): config = {"fqdn": get_fqdn()} if os.path.exists(self.update_config_file): manfile = self._open_file(self.update_config_file) if not manfile: return config # Read current properties properties = self.get_old_style_config_properties(self.update_config_file) # Release lock and remove manfile.seek(0) manfile.truncate() fcntl.flock(manfile, fcntl.LOCK_UN) manfile.close() try: os.remove(self.update_config_file) except: pass return properties else: if self.is_fortisase_install: server_name = get_server_name() if server_name: config["server_name"] = server_name return config def __init__( self, brand, agg_url, version, user, bin_dir, lib_dir, pkg_dir, timeout, base_config_dir, base_custom_plugin_dir, base_data_dir, base_log_dir, acceptable_sync_delay, ): self.brand = brand self.agg_url = agg_url self.version = version self.user = user self.lib_dir = lib_dir self.bin_dir = bin_dir self.pkg_dir = pkg_dir self.tmp_dir = tempfile.gettempdir() self.metadata_rebuild_freq = ( 3600 # How often do we want to rebuild metadata (seconds) ) self.is_root = os.getuid() == 0 or os.geteuid() == 0 self.acceptable_sync_delay = acceptable_sync_delay # XXX I think these dir settings might need to be moved back into the # configs. # These dirs and files are managed by the script, not the package. # Need to be created by the script by --install, and removed by --uninstall. self.db_dir = os.path.join(base_data_dir, self.pkg_dir) self.db_file = join(self.db_dir, "%s.db" % self.pkg_dir) self.log_dir = os.path.join(base_log_dir, self.pkg_dir) self.log_file = join(self.log_dir, "%s.log" % self.pkg_dir) if len(sys.argv) > 0 and sys.argv[1] == "--power-status": self.log_file = os.path.join(self.log_dir, "power_status.log") self.config_dir = os.path.join(base_config_dir, self.pkg_dir) self.config_file = join(self.config_dir, "%s_agent.cfg" % self.brand) self.custom_plugin_dir = os.path.join(base_custom_plugin_dir, self.pkg_dir) self.countermeasures_custom_plugin_dir = os.path.join( self.custom_plugin_dir, "countermeasures" ) self.custom_import_dir = os.path.join(self.custom_plugin_dir, "incoming") self.manifest_file = os.path.join( base_config_dir, "%s-agent-manifest" % self.brand ) data_dir = os.path.join(base_data_dir, self.pkg_dir) self.pid_file = os.path.join(data_dir, "agent.pid") self.update_config_file = os.path.join( base_data_dir, self.pkg_dir, "update-config" ) # Plugin textkey for custom metrics specified by the user as well as register and report files if "freebsd" in sys.platform.lower(): self.register_file = os.path.join(lib_dir, "register") self.report_file = os.path.join(lib_dir, "report") elif "darwin" == sys.platform.lower(): self.register_file = os.path.join(self.custom_plugin_dir, "register") self.report_file = os.path.join(self.custom_plugin_dir, "report") else: self.register_file = os.path.join(base_data_dir, self.pkg_dir, "register") self.report_file = os.path.join(base_data_dir, self.pkg_dir, "report") # See if we've been installed - the BIN_DIR directory neeeds to exist, and then we need to # make sure there is a server_key in the config file self.is_installed = True self.has_dem = False self.dem_port = "demservice" self.update_service_port = "updateservice" self.ipcPath = "/tmp/com.fortinet.fortimonitor" self.auto_update = False self.scheduled_update = None self.is_fortisase_install = False self.proxy_config = None try: if not exists(self.bin_dir): raise Exception("No bin directory") if not os.path.exists(self.config_file): raise Exception("No config file {}".format(self.config_file)) config_file = configparser.ConfigParser() config_file.read(self.config_file) if config_file.has_section("agent_proxy"): self.proxy_config = config_file["agent_proxy"] if sys.platform in ("darwin", "linux"): if config_file.has_option("dem", "enabled"): self.has_dem = config_file.get("dem", "enabled").lower() == "true" if config_file.has_option("dem", "server_port"): self.dem_port = config_file.get("dem", "server_port") if config_file.has_option("agent", "updateservice.port"): self.update_service_port = config_file.get( "agent", "updateservice.port" ) if config_file.has_option("agent", "ipc_path"): self.ipcPath = config_file.get("agent", "ipc_path") if "darwin" == sys.platform: if config_file.has_option("agent", "auto_update"): self.auto_update = ( config_file.get("agent", "auto_update").lower() == "true" ) if config_file.has_option("agent", "scheduled_update"): self.scheduled_update = config_file.get( "agent", "scheduled_update" ) if config_file.has_option("agent", "handshake_type"): if ( "forticlient" == config_file.get("agent", "handshake_type").lower() ): self.is_fortisase_install = True server_key = config_file.get("agent", "server_key") if not server_key: raise Exception("Missing server key") except Exception as e: logging.exception(f"Initialization failure: {e}") sys.stderr.write("Initialize exception: {}".format(e)) self.is_installed = False # Custom OS block # Here we'll update sys.platform for all plugins to be able to use if "VMkernel" in os.uname(): sys.platform = "vmware" # Actual run of the agent delay. self.current_agent_delay = 0 self.set_up_logging() self.log = logging.getLogger(self.__class__.__name__) try: self.timeout = float(config_file.get("agent", "startup_timeout")) except Exception: self.timeout = timeout self.db = None self._blacklister = PluginBlacklister() def migrate_config(self): """ Update agent configs to use "[agent]" instead of "[AgentConfig]" as the main heading and "aggregator_url" instead of "agg_url" (in order to match the option in the manifest file). """ if self.db["config_migrated"]: self.log.info("Config is in the correct format") return config = configparser.ConfigParser() if config.read(self.config_file): config_has_changed = False if not config.has_section("agent"): config.add_section("agent") config_has_changed = True self.log.info("Added [agent] section to config") if config.has_section("AgentConfig"): for option, value in config.items("AgentConfig"): if option == "agg_url": option = "aggregator_url" config.set("agent", option, value) config.remove_section("AgentConfig") config_has_changed = True self.log.info( "Copied deprecated [AgentConfig] section to [agent] and removed it from config" ) if config_has_changed: config_file = open(self.config_file, "w") config.write(config_file) config_file.close() self.db["config_migrated"] = True def __del__(self): self.remove_pid_file() def set_up_logging(self): root_logger = logging.getLogger() if not os.path.isdir(self.log_dir): os.system("mkdir -p {}".format(self.log_dir)) try: log_file = open(self.log_file, "a") except IOError: print( ( 'Cannot open log file %s: "%s"' % (self.log_file, str(traceback.format_exc())) ) ) print("Logging to stderr instead") handler = logging.StreamHandler() else: log_file.close() handler = logging.handlers.RotatingFileHandler( self.log_file, "a", maxBytes=5 * 1024**2, backupCount=5 ) handler.setFormatter( logging.Formatter( "%(process)d) %(asctime)s - %(name)s - %(levelname)s - %(message)s" ) ) root_logger.addHandler(handler) # We initialize the level to NOTSET here to make sure that all # logging inside PickleDatabase is captured because the root # logger's default log level is WARNING. See # https://docs.python.org/2/library/logging.html#logging.Logger.setLevel # for details). root_logger.setLevel(logging.NOTSET) db = self.open_db() try: log_level = getattr(logging, db["log_level"].upper()) except: log_level = getattr(logging, self.DEFAULT_LOG_LEVEL) root_logger.setLevel(log_level) def parse_arguments(self, parser): """ Return the options and arguments parsed from the parser. """ if self.is_installed: parser.add_option("--server-key", dest="server_key", action="store") parser.add_option( "--rebuild-metadata", action="store_true", dest="rebuild_metadata", default=False, ) parser.add_option("--status", action="store_true", dest="status", default=False) parser.add_option("--stats", action="store_true", dest="stats", default=False) parser.add_option( "--from-cron", action="store_true", dest="from_cron", default=False ) parser.add_option("--aggregator", action="store", dest="aggregator") parser.add_option( "--install", action="store_true", default=False, dest="install" ) parser.add_option( "--uninstall", action="store_true", default=False, dest="uninstall" ) parser.add_option( "--remove-instance", action="store_true", default=False, dest="remove_instance", ) parser.add_option( "--customer-key", default=None, action="store", dest="customer_key" ) parser.add_option( "--unpause", default=None, action="store_true", dest="unpause" ) # Docker parser.add_option( "--list-containers", default=None, action="store_true", dest="list_containers", ) parser.add_option( "--rebuild-container-metadata", default=None, action="store_true", dest="rebuild_container_metadata", ) parser.add_option( "--power-status", dest="system_power_status", action="store", default=None ) options, args = parser.parse_args() return options, args def main(self): activityStart = datetime.now() server_key = None config = configparser.RawConfigParser() config.read(self.config_file) try: safe_counter = int(config.get("agent", "safe_counter")) except: safe_counter = 3 if self.is_installed and config != []: try: server_key = config.get("agent", "server_key") except: server_key = None try: self.agg_url = config.get("agent", "aggregator_url") or self.agg_url except: self.agg_url = None # installed? just print out the server key usage = """%%prog [options] %s, server key: %s, aggregator endpoint: %s """ % (self.pkg_dir, server_key, self.agg_url) # not installed? print out the install usage else: usage = """sudo python %%prog --install [--customer-key=YOUR_CUSTOMER_KEY] %s""" % (self.pkg_dir,) parser = optparse.OptionParser(usage=usage) options, args = self.parse_arguments(parser) if options.system_power_status: try: self.log.info( "Power status -> {} UID {} EUID {}".format( options.system_power_status, os.getuid(), os.geteuid() ) ) aggregator_client = aggregator.Client( self.agg_url, self.version, server_key, proxy_config=self.proxy_config, ) data = {"reason": options.system_power_status} aggregator_client.call("agent_power_change", data) except: pass exit(0) if not self.safe_to_start_agent(self.timeout, counter=safe_counter): # Need to overwrite delete to avoid removing a pid self.__del__ = lambda: self.log.warning("Preventing pid file removal") self.log.warning( "Exiting without running - other agent process already running" ) sys.exit(1) self.write_pid_file() db = False self.db = self.open_db() if self.is_installed: db = self.db # XXX This may be removed at a later date, when all agents' configs have # been migrated. self.migrate_config() self.log.info("Activity started") if options.status: plugins = PluginManager( db, self.config_file, join(self.lib_dir, "plugins"), self.custom_plugin_dir, ) display.status(self, server_key, db["schedules"], plugins) if options.stats: display.stats(db["schedules"], db["num_syncs"], db["last_sync"]) if options.uninstall: aggregator_client = aggregator.Client( self.agg_url, self.version, server_key, proxy_config=self.proxy_config ) self.uninstall(aggregator_client, options.remove_instance) exit() if not self.is_installed or options.install: if options.aggregator: self.agg_url = options.aggregator customer_key = options.customer_key or None if "darwin" == sys.platform.lower(): dirs_to_create = [ (self.db_dir, None), (self.config_dir, None), (self.custom_plugin_dir, 0o777), (self.countermeasures_custom_plugin_dir, 0o777), (self.custom_import_dir, 0o777), ] for dir, perms in dirs_to_create: os.system("mkdir -p {}".format(dir)) if perms: os.chmod(dir, perms) for rfile in [self.register_file, self.report_file]: with open(rfile, "a+") as rf: pass if not os.path.isfile(rfile): self.log.warning("Installer did not create {}".format(rfile)) self.install(self.agg_url, self.version, server_key, customer_key) return # Require at least one of these options valid_options = [ "from_cron", "aggregator", "rebuild_metadata", "server_key", "unpause", "list_containers", "rebuild_container_metadata", ] option_given = False for valid_option in valid_options: if getattr(options, valid_option, None): option_given = True break if not option_given: msg = "%s Agent v%s, server key: %s, aggregator endpoint: %s" % ( self.brand, self.version, server_key, self.agg_url, ) print(msg) self.log.info(msg) return # Support unpausing from the command line if options.unpause: print("Unpausing agent, will run as usual on next run") db["pause"] = None db.save() return # Docker cli commands if options.list_containers: if "docker_containers" not in db or db["docker_containers"] == {}: print("No monitored containers") return containers = db["docker_containers"] print("Monitored Containers:\n") print("CONTAINER ID\tIMAGE\t\tCOMMAND\t\t\tSTATUS") for short_id, metadata in containers.items(): cont_image = metadata.get("Image", "?") cont_command = metadata.get("Command", "?") cont_status = metadata.get("Status", "?") print( '%s\t%s\t"%s"\t%s' % (short_id, cont_image, cont_command[:20], cont_status) ) return if options.rebuild_container_metadata: db["rebuild_container_metadata"] = True print("Metadata queued for rebuild") self.log.info("Container metadata rebuild queued") return requested_auto_update = False try: just_set_option_and_quit = False if options.server_key: just_set_option_and_quit = True key = options.server_key print(("Setting server key to %s" % key)) config.set("agent", "server_key", key) if options.aggregator: just_set_option_and_quit = True agg = options.aggregator print(("Setting aggregator endpoint to %s" % agg)) config.set("agent", "aggregator_url", agg) if just_set_option_and_quit: config.write(open(self.config_file, "wb")) exit(0) # Linux agent should not run if executed as root if self.is_root and not options.rebuild_metadata: self.log.error("Linux agent should not run if executed as root") print("Linux agent should not run if executed as root") return server_key = config.get("agent", "server_key") aggregator_client = aggregator.Client( self.agg_url, self.version, server_key, proxy_config=self.proxy_config ) # should never be here if not server_key: print("No server key found, please re-install the agent.") exit(1) if self.has_dem: try: needs_schedules = False dem_client = DEMClient(self.dem_port, self.ipcPath) schedules_received = dem_client.send_receive("schedules-init") if schedules_received is None or "no" == schedules_received: needs_schedules = True if needs_schedules or self._agent_version_updated(db): self._init_dem_schedules(aggregator_client) except: pass plugins = PluginManager( db, self.config_file, join(self.lib_dir, "plugins"), self.custom_plugin_dir, ) dem_client = DEMClient(self.dem_port) wifi_info = dem_client.get_dem_wifi_info() if wifi_info: plugins.add_dem_wifi_results(wifi_info) # Check on Countermeasures remote plugins update if ( "enable_countermeasures" in config.options("agent") and config.get("agent", "enable_countermeasures").lower() == "true" and "countermeasures_remote_plugins" in config.options("agent") and "countermeasures_refresh_plugins" in config.options("agent") ): # See if we need to refresh refresh_cycle = ( int(config.get("agent", "countermeasures_refresh_plugins")) * 3600 ) if ( "countermeasures_last_refresh" not in db or time.time() - db["countermeasures_last_refresh"] > refresh_cycle ): for url in config.get( "agent", "countermeasures_remote_plugins" ).split(","): self.log.info( "Refreshing CounterMeasures plugins from %s" % url ) cmd = "%s %s/countermeasure.py install_plugins --url %s &" % ( sys.executable, self.bin_dir, url.strip(), ) os.system(cmd) db["countermeasures_last_refresh"] = time.time() elif "countermeasures_last_refresh" in db: self.log.info( "Waiting to refresh CM plugins in %d minutes" % ( ( db["countermeasures_last_refresh"] + refresh_cycle - time.time() ) / 60 ) ) # run all_plugins_start_time = datetime.now() results_to_send = [] custom_metrics = self.get_metric_values() new_import_metrics, new_import_values, custom_incidents = ( self.process_imports(config) ) # Create an anomalies container if it isn't already there if "anomalies" not in db or db["anomalies"] == None: db["anomalies"] = {} for sr_id, schedule in list(db["schedules"].items()): schedule_tkey = "%s.%s" % ( schedule.plugin_textkey, schedule.resource_textkey, ) # FIXME I gave the next check time a five-second leeway. There must be a better way! # Ignore schedule freuqency for custom metrics from JSON files and report.py calls leeway_time = 5 if ( schedule.plugin_textkey != self.CUSTOM and schedule.resource_textkey not in custom_metrics and schedule_tkey not in new_import_values and schedule.next_check_time > (all_plugins_start_time + timedelta(seconds=leeway_time)) ): self.log.info("%r too early to check", schedule) continue frequency = timedelta(seconds=schedule.frequency) current_agent_delay = timedelta(seconds=self.current_agent_delay) # Gave more leeway time to compensate the time sleeping if any. There must be a better way! schedule.next_check_time = ( all_plugins_start_time + frequency - current_agent_delay ) if schedule_tkey in new_import_values: scale = plugins.config.get(schedule_tkey, {}).get("scale", 1.0) for value, timestamp in new_import_values[schedule_tkey]: if value is not None: value *= scale if value is None: continue results_to_send.append((sr_id, timestamp, value)) anomalies = {} elif schedule.plugin_textkey == self.CUSTOM: if schedule.resource_textkey not in custom_metrics: continue scale = plugins.config.get(schedule.plugin_textkey, {}).get( "scale", 1.0 ) for value, timestamp in custom_metrics[schedule.resource_textkey]: if value is not None: value *= scale if value is None: continue results_to_send.append((sr_id, timestamp, value)) anomalies = {} elif schedule.plugin_textkey not in plugins.plugins: # Likely a custom metric that didn't report in this period # TODO: Better way to do this? self.log.info("No custom value or plugin for %s", schedule_tkey) continue else: plugin_start_time = datetime.now() t0 = time.time() value, anomalies = schedule.check( plugins, db["anomalies"].get(schedule.id, {}) ) t1 = time.time() self.log.debug( "%r returned %s in %.2f seconds" % (schedule, value, t1 - t0) ) if value is None: continue results_to_send.append( ( sr_id, time.mktime(plugin_start_time.timetuple()), value, ) ) self.log.info( "Running all plugins took %s", datetime.now() - all_plugins_start_time ) # Add data to our queue db["result_queue"].update(results_to_send) # sync # If we're paused, we don't want to sync and will just exit here. if db["pause"]: if time.time() < db["pause"]: db.save() time_left = (db["pause"] - time.time()) / 60.0 self.log.info( "Pause command received. Processing stopped. Process will resume in %.2f minutes." % time_left ) return else: # We've paused as long as instructed, now set pause to None and resume with the sync db["pause"] = None db.save() self.log.info( "Pause duration exceeded, unpausing the agent for the next run" ) return start_time = time.time() # do we need to resend and re-cache metadata? metadata = None fortisase_attributes = {} countermeasures_metadata = [] facts = None # let's just ensure that once a day they push, just in case something # gets out of sync lucky_day = random.randint(0, 1440) == 0 # See if we need to rebuild the metadata - performed every hour, or if specified by the --rebuild-metadata # command line option, or if the agent config file has changed since the last time we saw it rebuild_metadata = False if "last_metadata_time" not in db: db["last_metadata_time"] = 0 if time.time() - db["last_metadata_time"] > self.metadata_rebuild_freq: rebuild_metadata = True if options.rebuild_metadata: rebuild_metadata = True if "last_config_file_time" not in db: db["last_config_file_time"] = time.time() last_config_file_time = os.path.getmtime(self.config_file) if last_config_file_time > db["last_config_file_time"]: rebuild_metadata = True db["last_config_file_time"] = last_config_file_time if rebuild_metadata: self.log.info("Rebuilding plugin metadata") db["last_metadata_time"] = time.time() if "custom_plugin_url" in config.options("agent"): plugins.install_remote_plugins( config.get("agent", "custom_plugin_url") ) stale = plugins.is_metadata_stale() if ( stale or options.rebuild_metadata or lucky_day or not db["has_connected_with_aggregator"] ): metadata = plugins.metadata if stale: self.log.info("metadata changed") elif options.rebuild_metadata: self.log.info("rebuilding metadata") elif lucky_day: self.log.info("randomly forcing metadata rebuild") elif not db["has_connected_with_aggregator"]: self.log.info("we've never pushed up metadata before") # If we're rebuilding metadata, also get the server facts facts = Inspector(self).get_all_facts(wifi_info) fortisase_attributes = self.get_fortisase_attributes() # If Countermeasures is enabled, rebuild Countermeasure plugin metadata # Dynamically load all available plugins, both in the default install directory # and the customer's custom directory countermeasures_metadata = [] if ( "enable_countermeasures" in config.options("agent") and config.get("agent", "enable_countermeasures").lower() == "true" ): for directory in ( os.path.join(self.lib_dir, "countermeasures", "plugins"), self.countermeasures_custom_plugin_dir, ): if not os.path.exists(directory): continue sys.path.append(directory) for mod_name in os.listdir(directory): if mod_name.endswith(".py") and not mod_name.startswith( "__" ): try: mod = p_importlib.import_module(mod_name[:-3]) except: self.log.exception( "Unable to import module %s" % mod_name ) continue # Compute the hash of the plugin, being if sha_func: if six.PY2: hash = sha_func( open( os.path.join(directory, mod_name) ).read() ).hexdigest() else: hash = sha_func( open(os.path.join(directory, mod_name)) .read() .encode("utf-8") ).hexdigest() else: hash = "" for name, obj in list(mod.__dict__.items()): if ( (sys.version_info[0] == 3 and type(obj) == type) or ( sys.version_info[0] == 2 and type(obj) == types.ClassType ) ) and name.endswith("Countermeasure"): try: plugin = obj() countermeasures_metadata.append( { "textkey": plugin.textkey, "name": plugin.name, "author": plugin.author, "hash": hash, "description": plugin.description, } ) except: pass if mod_name.endswith(".json"): try: json_counter = open( os.path.join(directory, mod_name) ) except: self.log.error( "Unable to open %s" % os.path.join(directory, mod_name) ) self.log.error(traceback.format_exc()) continue file_content = json_counter.read() if sha_func: hash = sha_func( file_content.encode("utf-8") ).hexdigest() else: hash = "" json_counter.close() try: counter_data = json.loads(file_content) except Exception: self.log.error( "%s file is not a valid json file to be read" % mod_name ) self.log.error(traceback.format_exc()) continue required_fields = [ "name", "textkey", "command", "author", ] existing_keys = counter_data.keys() success = True for key in required_fields: if key not in existing_keys or not counter_data.get( key ): self.log.error( "%s is missing from the countermeasure declaration in %s" % (key, mod_name) ) success = False break if not success: continue textkey = counter_data.get("textkey") countermeasures_metadata.append( { "textkey": textkey, "name": counter_data.get("name"), "author": counter_data.get("author"), "hash": hash, "description": counter_data.get("description"), } ) # Check if we can access Docker if "docker_supported" not in db or not db["docker_supported"]: can_access_docker = container_discovery.check_access() if can_access_docker == "success": db["docker_supported"] = True self.log.info("Docker supported") elif can_access_docker == "no-permission": self.log.info("Missing permission to access Docker socket") if "docker_supported" in db and db["docker_supported"]: db["rebuild_container_metadata"] = True # actually sync response = {} command_results = {} # Send results of a log request back to the agent if "log_request" in db and db["log_request"]: command_results["log_request"] = str(db["log_request"]) if "diagnostics" in db and db["diagnostics"]: command_results["diagnostics"] = str(db["diagnostics"]) if "socket_stats" in db and db["socket_stats"]: command_results["socket_stats"] = str(db["socket_stats"]) if "mtr" in db and db["mtr"]: command_results["mtr"] = str(db["mtr"]) auto_topo_scans = [] if "auto_topo_scans" in db: auto_topo_scans = db["auto_topo_scans"] try: anomalies_to_report = [] self.log.info( "Syncing with aggregator: %d results, %d anomalies", len(results_to_send), len(anomalies_to_report), ) if metadata: metadata_summary = dict( (plugin_key, len(list(plugin_metadata.keys()))) for plugin_key, (_, plugin_metadata) in list(metadata.items()) ) self.log.debug("Metadata summary: %r", metadata_summary) force_send_schedules = False if db["num_syncs"] == 0 or db["schedules"] == {}: force_send_schedules = True if rebuild_metadata or db["sync_schedules"]: force_send_schedules = True # We have a lot of results coming into the aggregator all at once from # various agents (every minute usually). We put an artificial random delay here before syncing # to stagger the results that come in. delay = random.randint(1, self.acceptable_sync_delay or 1) time.sleep(delay) # Pull results out of our queue to send # If single result time is set, we only want to send the latest result, and not anything else # in the queue. if db["single_result"]: if time.time() < db["single_result"]: result_data = db["result_queue"].pop_results( len(db["schedules"]) ) else: db["single_result"] = None result_data = db["result_queue"].pop_results() else: result_data = db["result_queue"].pop_results() # See if we have any queued discovered containers to send up discovered_containers = [] deleted_containers = [] MAX_CONTAINERS_SYNC = 20 if "discovered_containers" in db: container_queue = db["discovered_containers"] for i in range(min(len(container_queue), MAX_CONTAINERS_SYNC)): discovered_containers.append(container_queue.pop(0)) if "deleted_containers" in db: deleted_container_queue = db["deleted_containers"] for i in range(len(deleted_container_queue)): deleted_containers.append(deleted_container_queue.pop(0)) dem_results = self._getDemResults(db) try: # Set traceback limit 0 to include only the error message w/o the traceback sys.tracebacklimit = 0 new_import_metrics = list(new_import_metrics.values()) if server_key: response = aggregator_client.sync( result_data, anomalies_to_report, metadata, countermeasures_metadata, facts, discovered_containers, deleted_containers, self.get_registered_metrics(), new_import_metrics, custom_incidents, self.get_update_config(), self.get_all_ips(), auto_topo_scans, force_send_schedules, command_results, dem_enabled=self.has_dem, dem_service_results=dem_results, fortisase_attributes=fortisase_attributes, ) db["log_request"] = None db["diagnostics"] = None db["socket_stats"] = None db["mtr"] = None db["auto_topo_scans"] = [] db["sync_schedules"] = None dem_updates = { "icmp_server_resources": response.get( "icmp_server_resources", {} ), "monitor_schedules": response.get("monitor_schedules", {}), "traceroutes": response.get("traceroutes", []), "traceroute_checks": response.get("traceroute_checks", {}), } self._updateDEMServiceSchedules(dem_updates) else: self.log.info("No server_key found, skipping sync") except aggregator.ReinstallResponse as err: self.log.warning( f"Received a request tor reinstall the agent due to a existing conflict {err}" ) # Reinstall response is for FortiSase agents so calculate the customer key dynamically handshake_data = self.get_fortisase_attributes() if handshake_data: ems_serial = handshake_data["ems_serial"] environment = handshake_data["forticlient_environment"] customer_key = calculate_customer_key(ems_serial, environment) self.install( self.agg_url, self.version, server_key, customer_key, force=True, ) else: logging.warning( f"Reinstall request received, forticlient data not available. Please verify. {handshake_data}" ) return except aggregator.UnauthorizedResponse as err: self.log.warning( f"Received an unauthorized response from the agg. Pausing execution of agent by {err} seconds" ) self.db["pause"] = time.time() + int(str(err)) return except: # Failed to hit aggregator, so we'll put those results back into the queue db["result_queue"].update(result_data) for demKey in dem_results.keys(): q = db[demKey] q.update(dem_results[demKey]) self.log.exception("Could not sync with aggregator") self.log.debug("Saving results locally: %r", result_data) db.save() # Note: sys.exit() only raises a SystemExit exception. return if response.get("found_server", False): db["has_connected_with_aggregator"] = True db["num_syncs"] += 1 db["last_sync"] = datetime.now().strftime("%m/%d/%Y %H:%M") except: self.log.exception("Error syncing with aggregator") else: if rebuild_metadata: db["last_metadata"] = plugins.hashed_metadata() self.log.info( "syncing took %.2f seconds", time.time() - start_time - delay ) # Execute any Countermeasures in the response, spawned as separate background processes which can # continue to execute after the agent exits if ( "enable_countermeasures" in config.options("agent") and config.get("agent", "enable_countermeasures").lower() == "true" ): for countermeasure in response.get("countermeasures", []): hash = countermeasure.get("hash") textkeys = countermeasure.get("textkeys", []) cm_metadata = countermeasure.get("metadata", {}) metadata_file = "" # Write the JSON metadataout to a temp file try: fname = "countermeasure-metadata-%s.json" % hash metadata_file = os.path.join(self.tmp_dir, fname) f = open(metadata_file, "w") f.write(json.dumps(cm_metadata)) f.close() except Exception: self.log.error( "Failed parsing countermeasure metadata for %s: %s" % (hash, textkeys) ) self.log.error(traceback.format_exc()) self.log.info( "Queueing countermeasures for %s: %s" % (hash, textkeys) ) if textkeys: cmd = ( "%s %s/countermeasure.py execute --hash %s --textkeys %s" % (sys.executable, self.bin_dir, hash, " ".join(textkeys)) ) if metadata_file: cmd += " --metadata-file %s" % metadata_file os.spawnvp(os.P_NOWAIT, sys.executable, cmd.split()) # now process what we got back from the sync self.update_schedules(response.get("schedules", [])) # process our agent commands if response.get("commands", []): self.log.info( "got %d agent commands", len(list(response["commands"].keys())) ) if "pause" in response["commands"]: seconds = response["commands"]["pause"] # Seconds db["pause"] = time.time() + seconds if "single_result" in response["commands"]: seconds = response["commands"]["single_result"] db["single_result"] = time.time() + seconds if "log_request" in response["commands"]: lines = response["commands"][ "log_request" ] # Number of lines to tail from log log_output = subprocess.check_output( "tail -%d %s" % (lines, self.log_file), shell=True ) db["log_request"] = log_output # We'll send back log output if "queue_batch_size" in response["commands"]: queue_batch_size = response["commands"]["queue_batch_size"] db["result_queue"].queue_batch_size = queue_batch_size if "queue_max_results" in response["commands"]: queue_max_results = response["commands"]["queue_max_results"] db["result_queue"].queue_max_results = queue_max_results if "socket_stats" in response["commands"]: try: args = response["commands"].get("socket_stats") timeout = args.get("timeout") if timeout is None: timeout = 10 timeout = int(timeout) ss_cmd = "ss -t -u -r 2>&1" if which("timeout"): ss_cmd = "timeout %d %s" % (timeout, ss_cmd) socket_stats = subprocess.check_output(ss_cmd, shell=True) db["socket_stats"] = socket_stats except: db["socket_stats"] = traceback.format_exc() if "mtr" in response["commands"]: try: args = response["commands"].get("mtr") host = args.get("host") timeout = args.get("timeout") if timeout is None: timeout = 10 timeout = int(timeout) if host is None: parsed_url = urlparse.urlparse(self.agg_url) if parsed_url.hostname is None: parsed_url = urlparse.urlparse("http://" + self.agg_url) host = parsed_url.hostname mtr_cmd = "mtr --csv -c 1 %s 2>&1" mtr_cmd %= host if which("timeout"): mtr_cmd = "timeout %d %s" % (timeout, mtr_cmd) mtr_output = subprocess.check_output(mtr_cmd, shell=True) db["mtr"] = mtr_output except: db["mtr"] = traceback.format_exc() # Change severity of log level log_level_key = response["commands"].get("log_level") if log_level_key is not None: log_level_key = log_level_key.upper() try: log_level = getattr(logging, log_level_key) db["log_level"] = log_level_key self.log.setLevel(log_level) level = logging.INFO message = 'Set log level to "%s"' except AttributeError: level = logging.WARNING message = 'Invalid log level command: "%s"' self.log.log(level, message % log_level_key) if "diagnostics" in response["commands"]: db["diagnostics"] = self.build_diagnostics( db, self.version, self.brand ) if "metadata_resync" in response["commands"]: db["last_metadata_time"] = 0 db["last_metadata"] = None if ( "refresh_countermeasures" in response["commands"] and "enable_countermeasures" in config.options("agent") and config.get("agent", "enable_countermeasures").lower() == "true" and "countermeasures_remote_plugins" in config.options("agent") and "countermeasures_refresh_plugins" in config.options("agent") ): for url in config.get( "agent", "countermeasures_remote_plugins" ).split(","): self.log.info( "Refreshing CounterMeasures plugins from %s" % url ) cmd = "%s %s/countermeasure.py install_plugins --url %s &" % ( sys.executable, self.bin_dir, url.strip(), ) os.system(cmd) db["countermeasures_last_refresh"] = time.time() if "rebuild_container_metadata" in response["commands"]: db["rebuild_container_metadata"] = True if "update_agent" in response["commands"]: requested_auto_update = True if "sync_schedules" in response["commands"]: db["sync_schedules"] = True if "get_logs" in response["commands"]: try: self.upload_logs(server_key) except: pass if self.is_root: self.log.info( "Linux agent running as root, skipping container discovery" ) print("Linux agent running as root, skipping container discovery") else: if "docker_supported" in db and db["docker_supported"]: if "docker_containers" not in db: db["docker_containers"] = {} rebuild_container_metadata = False if ( "rebuild_container_metadata" in db and db["rebuild_container_metadata"] ): rebuild_container_metadata = True db["rebuild_container_metadata"] = False existing_containers = db["docker_containers"] existing_container_ids = list(existing_containers.keys()) try: found_containers = ( container_discovery.discover_docker_containers( config, plugins, existing_containers, rebuild=rebuild_container_metadata, ) ) except Exception: t, e = sys.exc_info()[:2] self.log.error(e) self.log.error( "Docker has been enabled but the fm-agent user needs to be added to the docker group.\n" "You can do so with `sudo usermod -a -G docker fm-agent`" ) found_containers = None if found_containers: found_container_ids = [c["Id"][:12] for c in found_containers] new_containers = [] for container in found_containers: container_id = container["Id"][:12] # Always update the db copy, in case something changed existing_containers[container_id] = container if ( rebuild_container_metadata or container_id not in existing_container_ids ): new_containers.append(container) if "updated" in container and container["updated"]: del container["updated"] new_containers.append(container) deleted_containers = [] for container_id, container in existing_containers.items(): if container_id not in found_container_ids: deleted_containers.append(container_id) # Actually delete for container_id in deleted_containers: del existing_containers[container_id] if "discovered_containers" not in db: db["discovered_containers"] = [] if "deleted_containers" not in db: db["deleted_containers"] = [] db["discovered_containers"].extend(new_containers) db["deleted_containers"].extend(deleted_containers) self.log.info( "Discovered %d new/updated containers", len(new_containers) ) self.log.info( "Found %d newly deleted containers", len(deleted_containers) ) self.run_auto_topo_scans(config) except: self.log.exception("Error in main loop") self.checkForUpdate( db=db, server_key=server_key, agg_client=aggregator_client, force=requested_auto_update, ) # ideally this should be in a finally block, but older python verisons # don't support try/except/finally, and we need all three db.save() self.log.info( "Activity finished in {}s".format( (datetime.now() - activityStart).total_seconds() ) ) def get_fortisase_attributes(self): try: helper = ForticlientHelper() return helper.get_handshake_data() except Exception as err: logging.warning(err) return {} def upload_logs(self, server_key): import shutil import tempfile with tempfile.TemporaryDirectory() as tmpdirname: shutil.copytree(self.log_dir, tmpdirname, dirs_exist_ok=True) now = datetime.now() zip_file_prefix = "agent-logs-{}".format(now.strftime("%Y%m%d%H%M%S")) zip_output = os.path.join(tmpdirname, "zip") os.mkdir(zip_output) zip_name = shutil.make_archive( os.path.join("/tmp", zip_file_prefix), "zip", tmpdirname ) try: endpoint = "{}/v2/agent_logs".format(self.agg_url) cc = 'curl -F file=@{} -H "Accept: application/json" -H "Authorization: {}" {}'.format( zip_name, server_key, endpoint ) os.system(cc) self.log.info("Uploaded log file {}".format(zip_name)) except Exception as e: self.log.exception(e) finally: if os.path.isfile(zip_name): os.remove(zip_name) def checkForUpdate(self, db, server_key=None, agg_client=None, force=False): if force: self.log.info("Admin update request") self._onCheckUpdates(agg_client=agg_client) return db_key = "next_update_check" if not self.auto_update: if db_key in db: db.pop(db_key) return next_update_check = None if not server_key: self.log.warn("checkForUpdate: no server key") return if not agg_client: agg_client = aggregator.Client( self.agg_url, self.version, server_key, proxy_config=self.proxy_config ) db_key = "next_update_check" try: update_period = timedelta(days=1) if db_key not in db: if self.scheduled_update is None: from random import randrange randomSec = randrange(int(update_period.total_seconds())) db[db_key] = datetime.now() + timedelta(seconds=randomSec) else: try: h, m = self.scheduled_update.split(":") rn = datetime.now() ct = datetime( year=rn.year, month=rn.month, day=rn.day, hour=int(h), minute=int(m), ) if ct < rn: ct = ct + update_period db[db_key] = ct except Exception as e: self.log.error( "Could not calculate next check {}: {}".format( self.scheduled_update, str(e) ) ) return self.log.info("Next update check at {}".format(db[db_key])) return next_update_check = db[db_key] if not next_update_check or datetime.now() > next_update_check: self._onCheckUpdates(agg_client) if next_update_check is None: next_update_check = datetime.now() db[db_key] = next_update_check + update_period self.log.info("Next update check at {}".format(db[db_key])) except Exception as e: self.log.error("checkForUpdates problem: {}".format(e)) def _onCheckUpdates(self, agg_client): self.log.info("Performing updates check...") # # Note the agent_update_info endpoint expects a framework version, an artifact # of the Windows agent. The aggregator won't use it for darwin, so just send the # our version as the framework. # try: endpoint = "agent_update_info/darwin/{}".format(self.version) updates = agg_client.call(endpoint, method="GET") if len(updates) > 0: client = IPCClient(self.update_service_port, self.ipcPath) client.send_receive("updates", payload=json.dumps(updates)) except Exception as e: self.log.exception("Update check failure: {}".format(e)) def get_reportable_anomalies(self): # Get the anomalies that are cleared and have previously been reported. self.log.info("Gathering reportable anomalies") # Get the anomalies that exceed duration and have not previously been # reported. Also, mark them as reported. cleared_anomalies = [] lengthy_anomalies = [] for schedule_id, anomalies in list(self.db["anomalies"].items()): schedule = self.db["schedules"].get(schedule_id) if not schedule: # Resource schedule has been deleted from the central aggregator, but # we still have an anomaly - clear that out and proceed del self.db["anomalies"][schedule_id] continue for threshold_id, anomaly in list(anomalies.items()): self.log.debug("Threshold %s", threshold_id) if not anomaly.reported_as_cleared and anomaly.has_cleared( schedule.number_of_checks ): cleared_anomalies.append( ( schedule_id, threshold_id, time.mktime(anomaly.time_last_detected.timetuple()), False, # False indicates that this anomaly has cleared ) ) anomaly.reported_as_cleared = True self.log.debug("Cleared anomaly: %s", anomaly) if ( not anomaly.reported_as_exceeded_duration and anomaly.exceeds_duration() ): lengthy_anomalies.append( ( schedule_id, threshold_id, time.mktime(anomaly.time_last_detected.timetuple()), True, # True indicates that this anomaly has exceeded duration ) ) anomaly.reported_as_exceeded_duration = True self.log.debug("Lengthy anomaly: %s", anomaly) self.log.info("Found %d anomalies that have cleared", len(cleared_anomalies)) self.log.debug("Cleared anomalies: %r", cleared_anomalies) self.log.info( "Found %d anomalies that exceed the threshold duration", len(lengthy_anomalies), ) self.log.debug("Lengthy anomalies: %r", lengthy_anomalies) self.db.save() return cleared_anomalies + lengthy_anomalies def remove_reported_cleared_anomalies(self): self.log.info("Checking for reported cleared anomalies") for schedule_id, anomalies in list(self.db["anomalies"].items()): for threshold_id, anomaly in list(anomalies.items()): if anomaly.reported_as_cleared: anomaly = anomalies.pop(threshold_id) self.log.info("Removed reported cleared anomaly") self.log.debug("Anomaly: %s", anomaly) if not anomalies: self.db["anomalies"].pop(schedule_id) self.log.debug("Remaining anomalies: %s", self.db["anomalies"]) self.db.save() def update_schedules(self, new_schedules): if new_schedules == [] or new_schedules == None: self.log.info("No schedule changes received from aggregator") return existing_schedules = self.db["schedules"] self.db["schedules"] = {} for new_schedule_data in new_schedules: new_schedule_id = new_schedule_data["id"] self.log.info("Received schedule %s from aggregator", new_schedule_id) schedule = existing_schedules.get(new_schedule_id, None) try: if schedule: schedule.update(new_schedule_data) del existing_schedules[schedule.id] action = "Edited" elif not schedule: schedule = Schedule(new_schedule_data) action = "Created" self.db["schedules"][schedule.id] = schedule self.log.info("%s schedule %s locally", action, schedule.id) self.log.debug("Schedule data: %r", new_schedule_data) except Exception: err = sys.exc_info()[1] error = str(err) self.log.error( "Invalid schedule {} data: {}".format(new_schedule_id, error) ) # Our schedule setting doesn't call the correct setitem method, # so we'll save here explicitly self.db.save() self.log.info("Created/updated %d schedules", len(new_schedules)) # Everything that's left is deleted. self.log.info("Deleted %d schedules", len(existing_schedules)) def build_diagnostics(self, db, version, brand): """Function to build a string of diagnostics data to send back to the aggregator.""" string = "AGENT DIAGNOSTICS\n" string += "Agent version: %s\n" % self.version string += "Agent server hostname: %s" % subprocess.check_output( "hostname", shell=True ) if "darwin" == sys.platform: string += "Agent OS: %s" % subprocess.check_output( "sw_vers | grep ProductVersion", shell=True ) else: string += "Agent OS: %s" % subprocess.check_output( "cat /etc/*-release | grep PRETTY_NAME", shell=True ) string += "uname output: %s" % subprocess.check_output("uname -a", shell=True) if "darwin" != sys.platform: string += "Package information: %s\n" % subprocess.check_output( "apt-cache show %s-agent || true" % self.brand, shell=True ) string += "ip output:\n%s" % subprocess.check_output("ip addr show", shell=True) # Build pickle data string += "Local agent pickle file data:\n%s\n" % json.dumps( db.data, indent=2, default=self.defaultprint ) return string def defaultprint(self, obj): if isinstance(obj, Schedule): return obj.__repr__() else: return None def open_db(self): if not os.path.isdir(self.db_dir): os.system("mkdir -p {}".format(self.db_dir)) try: db = PickleDatabase(self.db_file) except: return None # If something went wrong reading the pickle file, our data dict will # be empty and we'll need to rebuild it. To be safe, always go through # and add the keys that need to be there, in case something happened # to them. defaults = { "anomalies": {}, "config_migrated": False, "diagnostics": None, # 'has_connected_with_aggregator' is to get around the problem of # the aggregator issuing a "pause" command to an agent when the # server key sent by the agent isn't found on the controlpanel. When # an agent is first installed, this is the case, but we don't want # to pause the agent. So we add this extra flag so that an agent # will only pause if it has communicated with the aggregator before. "has_connected_with_aggregator": False, "last_metadata": None, "last_sync": None, "log_level": self.DEFAULT_LOG_LEVEL, "num_syncs": 0, "pause": None, "result_queue": ResultQueue(), "schedules": {}, "single_result": None, "sync_schedules": None, "check_results": ResultQueue(queue_max_results=1000, queue_batch_size=50), "server_resource_levels": ResultQueue( queue_max_results=1000, queue_batch_size=50 ), "traceroutes": ResultQueue(queue_max_results=100, queue_batch_size=5), "traceroute_checks": ResultQueue(queue_max_results=50, queue_batch_size=5), } for key, default in list(defaults.items()): if key not in db: db[key] = default return db def should_run_auto_topo_scans(self, config): try: return config.get("topo", "auto_scan") == "1" except: return False def get_num_topo_scans(self, config): try: return int(config.get("topo", "scans_per_sync")) except: return 0 def get_topo_scan_sleep(self, config): try: return int(config.get("topo", "scan_sleep")) except: return 1 def run_topo_scan(self): ss_cmd = "ss -t -u -r 2>&1" result = "" t = time.time() self.log.info("Starting topo scan") try: result = str(subprocess.check_output(ss_cmd, shell=True)) except: result = traceback.format_exc() elapsed = time.time() - t self.log.info("Topo scan complete. Elapsed time: %.2f seconds" % elapsed) return result def run_auto_topo_scans(self, config): if not self.should_run_auto_topo_scans(config): return n = self.get_num_topo_scans(config) scan_sleep = self.get_topo_scan_sleep(config) if "auto_topo_scans" not in self.db: self.db["auto_topo_scans"] = [] for i in range(n): t = time.time() scan = self.run_topo_scan() self.db["auto_topo_scans"].append((t, scan)) time.sleep(scan_sleep) def _getDemResults(self, db): rv = {} if not self.has_dem: return rv client = DEMClient(self.dem_port, self.ipcPath) response = client.send_receive("collect") if response is None: return rv latestResults = json.loads(response) for key in latestResults.keys(): try: q = db[key] if q.isEmpty(): rv[key] = latestResults[key] else: q.update(latestResults[key]) rv[key] = q.pop_results() except Exception as e: self.log.error("_getDemResults: {}".format(e)) continue return rv def _agent_version_updated(self, db): has_update = False if "last_ran_version" in db and db["last_ran_version"]: if db["last_ran_version"] != self.version: db["last_ran_version"] = self.version has_update = True else: db["last_ran_version"] = self.version has_update = True return has_update def _init_dem_schedules(self, client): try: response = client.call("schedules", method="GET") schedules = { "icmp_server_resources": response.get("icmp_server_resources", []), "monitor_schedules": response.get("monitor_schedules", []), "traceroute_checks": response.get("traceroute_checks", []), } dem_client = DEMClient(self.dem_port, self.ipcPath) dem_client.send_receive("initSchedules", payload=json.dumps(schedules)) except Exception as aggEx: logging.error("/schedules error: {}".format(str(aggEx))) return def _updateDEMServiceSchedules(self, newSchedules): if not self.has_dem: return client = DEMClient(self.dem_port, self.ipcPath) _ = client.send_receive("update-schedules", payload=json.dumps(newSchedules)) Save