cert_expiration plugin now parses cert manually with OpenSSL

so that the cert can still be parsed even it doesn't validate
This commit is contained in:
Justus Grunow 2025-05-08 16:14:20 -04:00
parent 3a0567ef0c
commit 3c0e669596
6 changed files with 31 additions and 23 deletions

View File

@ -8,6 +8,7 @@ hosts:
endpoint: https://webassets.confederationcollege.ca endpoint: https://webassets.confederationcollege.ca
- cert_expiration: - cert_expiration:
endpoint: webassets.confederationcollege.ca endpoint: webassets.confederationcollege.ca
expiration_warning_days: 30
secureassets: secureassets:
hostname: secureassets.confederationcollege.ca hostname: secureassets.confederationcollege.ca
actions: actions:
@ -18,6 +19,7 @@ hosts:
- ping - ping
- cert_expiration: - cert_expiration:
endpoint: cc-cognotest-01.confederationc.on.ca:9300 endpoint: cc-cognotest-01.confederationc.on.ca:9300
expiration_warning_days: 30
Solarwinds Service Desk: Solarwinds Service Desk:
hostname: itsupport.confederationcollege.ca hostname: itsupport.confederationcollege.ca
actions: actions:
@ -48,4 +50,9 @@ hosts:
server: cc-apps-01 server: cc-apps-01
actions: actions:
- ping - ping
Gitlab:
hostname: cc-gitlab.confederationcollege.ca
actions:
- cert_expiration:
endpoint: cc-gitlab.confederationcollege.ca
expiration_warning_days: 30

View File

@ -9,7 +9,7 @@ import importlib
plugins = {} plugins = {}
for finder, name, ispkg in pkgutil.iter_modules(path=['plugins/plugins']): for finder, name, ispkg in pkgutil.iter_modules(path=['plugins/plugins']):
plugins[name] = importlib.import_module(name) plugins[name] = importlib.import_module(name)
row_format = "{:<30}{:<10}{:<6}{:<20}" row_format = "{:<30} {:<10} {:<6}"
with open('hosts.yaml', 'r') as file: with open('hosts.yaml', 'r') as file:
hosts = yaml.safe_load(file) hosts = yaml.safe_load(file)
@ -30,8 +30,8 @@ for host, details in hosts['hosts'].items():
for argument, value in action[action_name].items(): for argument, value in action[action_name].items():
arguments[argument] = value arguments[argument] = value
#print(arguments) #print(arguments)
result = getattr(plugins[action_name], action_name)(arguments) result = getattr(plugins[action_name], action_name)(arguments)
#print(f"{host}\t{action_name}\t{result[0]}\t{result[1]}") #print(f"{host}\t{action_name}\t{result[0]}\t{result[1]}")
print(row_format.format(host, action_name, result[0], result[1])) print(row_format.format(host, action_name, result[0]))
print(f"{result[1]}")

View File

@ -1,9 +1,8 @@
from enum import Enum from enum import Enum
class Plugin: class Plugin:
class Status(Enum): class Status(Enum):
SUCCESS = "Succ SUCCESS = "Succ"
i = 12345 i = 12345
def f(self): def f(self):
return 'hello world' return 'hello world'

View File

@ -1,5 +1,6 @@
import socket
import ssl import ssl
import OpenSSL
from datetime import datetime,timedelta
def cert_expiration(args): def cert_expiration(args):
hostname = args['endpoint'].split(":")[0] hostname = args['endpoint'].split(":")[0]
@ -9,15 +10,17 @@ def cert_expiration(args):
except IndexError: except IndexError:
port = 443 port = 443
context = ssl.create_default_context() cert = ssl.get_server_certificate((hostname, port))
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
expiration_date = datetime.strptime(f"{x509.get_notAfter().decode("UTF-8")[0:-1]}UTC", "%Y%m%d%H%M%S%Z")
with context.wrap_socket(socket.socket(socket.AF_INET), status = "Success"
server_hostname = hostname) as conn: if (expiration_date - timedelta(days = args['expiration_warning_days'])) < datetime.now():
conn.connect((hostname, int(port))) status = "Warning"
else:
cert = conn.getpeercert() status = "Success"
return [True, f"Expiration: {cert['notAfter']}"]
return [status, f"Expiration: {expiration_date}"]

View File

@ -3,6 +3,6 @@ import requests
def httpcheck(arguments): def httpcheck(arguments):
try: try:
r = requests.head(f"{arguments['endpoint']}") r = requests.head(f"{arguments['endpoint']}")
return [True, r.status_code] return ["Success", f"Status code: {r.status_code}"]
except requests.ConnectionError: except requests.ConnectionError:
return [False, r.status_code] return ["Failure", f"Status code: {r.status_code}"]

View File

@ -3,13 +3,12 @@ import subprocess
from plugins import plugin from plugins import plugin
def ping(arguments): def ping(arguments):
print(plugin.Plugin.i)
response = subprocess.run(["ping","-c","1",arguments['hostname']], response = subprocess.run(["ping","-c","1",arguments['hostname']],
stdout = subprocess.DEVNULL, stderr = subprocess.PIPE) stdout = subprocess.DEVNULL, stderr = subprocess.PIPE)
if response.returncode == 0: if response.returncode == 0:
return [True, ""] return ["Success", ""]
elif response.stderr: elif response.stderr:
return [False, response.stderr.decode("UTF-8")] return ["Success", response.stderr.decode("UTF-8")]
else: else:
return [False, ""] return ["Failure", ""]