Preview: fortisase_connection.py
Size: 4.68 KB
/home/godevadmin/public_html/upload_images/home/000~ROOT~000/lib/fm-agent/plugins/fortisase_connection.py
import os
import logging
from ipaddress import IPv4Address, IPv6Address
from configparser import ConfigParser
import sys
import json
import agent_util
class FortisaseVPNConnection:
def __init__(self):
self.connection_state = None
self.connection_name = None
self.connected_address = None
self.valid_tunnels = [
"Secure Internet Access",
"Secure Internet Access - IPsec",
"FortiSASE Cloud Security",
"SASE Secure Internet Access",
]
def parse_connection(self, data):
"""
Receive a json data that contains the information of the VPN Connection, and its ip address
"""
connections = json.loads(data)
for connection in connections:
tunnel_name = connection.get("tunnel_name")
connected = bool(connection.get("connected"))
ip = connection.get("ip_address")
if tunnel_name in self.valid_tunnels and connected:
# Tunnel is valid
self.connection_state = "Connected"
self.connection_name = tunnel_name
if ":" in ip:
self.connected_address = IPv6Address(ip)
else:
self.connected_address = IPv4Address(ip)
return True
return False
def is_sia_tunnel_up(self) -> bool:
"""
Return True only if the tunnel name and connection state is in acceptable
parameters.
"""
return (
self.connection_name in self.valid_tunnels
and self.connection_state == "Connected"
)
class FortiSaseConnection(agent_util.Plugin):
textkey = "fortisase"
label = "FortiSase"
log = logging.getLogger("fortisase")
# Default config file for Agent Config for FortiSase installations.
# We use it to determine the installation type. If file is not present, is not valid
config_file = "/usr/local/FortiMonitor/agent/config/fm-agent/fm_agent.cfg"
@classmethod
def get_metadata(cls, config):
status = agent_util.SUPPORTED
msg = None
# Plugin only set to work on OSX for now.
if sys.platform.lower() != "darwin":
return {}
# Agent needs to be a FortiSase installation to work as well.
if not os.path.exists(cls.config_file):
cls.log.info(
"Agent config file not found. Unable to determine handshake type"
)
return {}
config_reader = ConfigParser()
config_reader.read(cls.config_file)
is_fortisase_install = (
config_reader.get("agent", "handshake_type").lower() == "forticlient"
)
if not is_fortisase_install:
status = agent_util.UNSUPPORTED
msg = "Agent installation is not FortiSase"
if status == agent_util.UNSUPPORTED:
cls.log.info(f"Fortisase connection plugin disabled. {msg}")
return {}
metadata = {
"osx.connected_sia": {
"label": "Connected SIA",
"options": None,
"status": status,
"error_msg": msg,
"unit": "bool",
},
"osx.turbo_ip": {
"label": "Turbo IP",
"options": None,
"status": status,
"error_msg": msg,
},
}
return metadata
def check(self, textkey, data, config):
try:
# This file configuration is only available on FortiClient 7.4.4 and above.
vpn_data = "/Library/Application Support/Fortinet/FortiClient/data/vpn_status_info.json"
with open(vpn_data, "r") as opened:
data = opened.read()
client = FortisaseVPNConnection()
connected = client.parse_connection(data)
metric = None
if textkey == "osx.connected_sia":
# Grab the fortitray connection value from the log file.
connected = client.is_sia_tunnel_up()
if connected:
return 1
else:
return 0
elif textkey == "osx.turbo_ip":
connected = client.is_sia_tunnel_up()
if not connected:
# If the connection is not detected we don't need to check the turbo ip.
return
ip_address = client.connected_address
if ip_address:
metric = int(ip_address)
if metric:
return float(metric)
except Exception as msg:
self.log.warning(f"Unable to process the FortiSase files. Error: {msg}")
return
Directory Contents
Dirs: 1 × Files: 55