自動化SSL憑證監控與到期通知系統
👨💻簡介
最近因為憑證越來越多,需要監控什麼時候到期,當到期時發送到期通知,因此撰寫一個簡單的小程式來完成。
這次使用Python和Telegram Bot來監控SSL證書的到期時間並發送通知。並使用GCP工具,如CloudFunction和CloudScheduler做部署平台。
🛠️使用工具
- Python 3.9
- Telegram Bot(Webhook)
- CloudFunction
- CloudScheduler
📝功能需求
- 取得憑證到期時間
- 到期後發送通知
- 透過 Telegram Bot 發送訊息
- 讀取 yaml domain list
- 設定環境變數
- CloudFunction 設定
- CloudScheduler 排程設定
🎯Setup
1. 取得憑證到期時間
def get_ssl_cert_expiry_date(domain):
"""
取得 SSL 證書的過期日期。
參數:
domain (str): 需要檢查SSL證書過期時間的域名。
返回:
datetime: SSL證書的過期日期,如果獲取失敗則返回None。
"""
# 建立SSL上下文,建立一個安全的“環境”來管理SSL設定和操作
ssl_context = ssl.create_default_context()
# 包裝socket對象,將基礎的socket通訊轉變為加密通訊
conn = ssl_context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=domain)
# 設定連接的超時時間為 3 秒,防止長時間等待
conn.settimeout(3.0)
try:
# 通過加密的連線嘗試連接到服務器的443端口(HTTPS)
conn.connect((domain, 443))
# 取得服務器的SSL證書訊息
ssl_info = conn.getpeercert()
# 解析證書中的過期時間
expire_date = datetime.strptime(ssl_info['notAfter'], '%b %d %H:%M:%S %Y %Z')
return expire_date
except Exception as e:
# 處理連接或取得證書訊息過程中的異常
print(f"無法獲取 {domain} 的 SSL 證書過期日期,錯誤:{e}")
return None
finally:
# 確保無論成功與否,都關閉與服務器的連接
conn.close()
這部分使用了 python 的 ssl 以及 socket library,他們幫助我能夠建立安全的加密環境,以及使用 socket 進行通訊,使我能夠取得憑證相關資訊。
補充一下 datetime.strptime
,因為從 ssl_info
dict 解析後的 notAfter
會回傳 'Mar 4 06:35:50 2024 GMT
,因此需要透過 datetime.strptime
,將字串解析成 datetime 對象。
字符串格式如下:
%b
:月份的縮寫名稱,如 Jan, Feb, Mar 等。%d
:月份中的天數,為 01 到 31。%H
:小時(24 小時制),從 00 到 23。%M
:分鐘,從 00 到 59。%S
:秒,從 00 到 59。%Y
:4 位數的年份,如 2024。%Z
:時區名稱,如 UTC 或 EST 等。
對應欄位:
Mar 4 06:35:50 2024 GMT
月 日 時 分 秒 年 時區
%b %d %H:%M:%S %Y %Z
至此,取得憑證過期時間完成,接下來需要建立一個 function 幫助我判斷當小於 30 天時,需要發送通知,我們往第二部分進行。
2. 到期後發送通知
邏輯判斷會需要讓返回的時間能夠與當前時間做比較,當小於等於 30 天時,則會發送通知。
def check_ssl_expiration(domain):
"""
檢查給定域名的SSL證書過期時間,並在證書即將過期時印出。
參數:
domain (str): 需要檢查SSL證書的域名。
"""
# 調用先前定義的函數get_ssl_cert_expiry_date來獲取SSL證書的過期日期。
expire_date = get_ssl_cert_expiry_date(domain)
# 如果成功取得到過期日期
if expire_date:
# 計算證書的剩餘有效天數
remaining_days = (expire_date - datetime.utcnow()).days
# 如果剩餘天數不超過30天
if remaining_days <= 30:
# 印出一條log訊息,說明證書將在指定天數內過期
print(f"{domain} 的 SSL 證書將在 {remaining_days} 天內過期。")
else:
# 如果證書的剩餘有效期超過30天,則打印證書的過期日
print(f"{domain} 的 SSL 證書過期日期是 {expire_date.strftime('%Y-%m-%d')}。")
補充說明一下 datetime
以及 strftime
的用法:
datetime.utcnow()
這個 library 在這裡主要取得 UTC 的當前日期和時間
>>> from datetime import datetime
>>> datetime.utcnow()
## 返回一個`datetime.datetime`對象,這個對象包含了當前UTC時區的年、月、日、小時、分、秒,以及微秒。
datetime.datetime(2024, 4, 1, 15, 34, 21, 211636)
代表執行 datetime.utcnow()
時,被執行的時間是 2024 年 4 月 1 日,下午 3 點 34 分 21 秒,以及 211636 微秒。
strftime()
這個方法主要用來將時間對象轉為字串,方便組成要傳送的訊息。
"%Y-%m-%d"
是 strftime
方法的格式化字串參數,其中 %Y
表示 4 位數的年份,%m
表示月份(01至12),%d
表示月份中的天數(01 至 31)。
整段 function 功能描述為:首先,使用 datetime.utcnow()
取得當前 UTC 時間,然後與 expire_date
進行相減,計算出證書的剩餘有效天數。
如果這個剩餘天數不超過 30 天,就印出一條 log 訊息,說明證書將在指定天數內過期;否則印出證書的過期日期。
3. 發送訊息到 telegram group
def send_notification(message, domain, telegram_bot_token, telegram_group_id):
"""
通過指定的Webhook URL發送通知。
參數:
message (dict): 要發送的消息內容,格式為字典。
domain (str): 域名,用於發送消息中表示哪個域名的SSL證書。
telegram_token (str): 要使用的telegram bot token。
telegram_group_id (str): 指定發送消息的群組。
"""
telegram_send_message_url = f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage"
# 向telegram api發送HTTP POST請求。
response = requests.post(telegram_send_message_url, data={
"chat_id": telegram_group_id,
"text": message
})
# 檢查響應的狀態碼。如果狀態碼為200,表示通知發送成功。
if response.status_code == 200:
print(f"已為 {domain} 發送通知")
else:
# 如果狀態碼不是200,表示發送失敗。
print(f"為 {domain} 發送失敗")
將 send_notification
添加到 check_ssl_expiration
,這裡應該有更好的作法。
def check_ssl_expiration(domain, env, platform, telegram_bot_token, telegram_group_id):
"""
檢查給定域名的SSL證書過期時間,並在證書即將過期時通過webhook發送通知。
參數:
domain (str): 需要檢查SSL證書的域名。
env (str): 環境標籤(例如:開發、測試、正式),用於消息中以區分不同環境。
platform (str): 平台標籤(例如:AWS、GCP、Azure),用於消息中以標明證書部署的平台。
telegram_token (str): 要使用的telegram bot token。
telegram_group_id (str): 指定發送消息的群組。
"""
expire_date = get_ssl_cert_expiry_date(domain)
if expire_date:
remaining_days = (expire_date - datetime.utcnow()).days
if remaining_days <= 30:
# 建立發送訊息,包含了證書到期的相關信息
message = "\n".join([
"來源: Gitlab-Runner",
"標題: 憑證到期",
f"域名: {domain}",
f"到期日: {expire_date.strftime('%Y-%m-%d')}",
f"平台: {platform}",
f"環境: {env}",
])
print(f"{domain} 的 SSL 證書將在 {remaining_days} 天內過期。")
send_notification(message, domain, telegram_bot_token, telegram_group_id)
else:
print(f"{domain} 的 SSL 證書過期日期是 {expire_date.strftime('%Y-%m-%d')}。")
這裡我使用的是簡單的 telegram bot,發送後會呈現如下資訊。
4. 讀取 yaml domain list
假設 domain.yaml 如下
domain_envs:
live:
- google.com
- en.wikipedia.org
def load_data_from_yaml(yaml_file_path, key):
"""
從YAML檔案加載指定鍵的配置。
參數:
yaml_file_path (str): YAML檔案的路徑。
key (str): 要從YAML檔案中讀取的鍵名。
返回:
dict or None: 返回從YAML檔案中讀取的配置字典。如果指定的鍵不存在,則返回空字典。
"""
try:
# 嘗試打開指定的YAML檔案。'r'表示以讀取模式打開,'encoding='utf-8''確保文件正確讀取UTF-8編碼的內容。
with open(yaml_file_path, 'r', encoding='utf-8') as file:
# 使用yaml.safe_load(file)安全地加載YAML檔案的內容。
# 此函數將YAML檔案的結構轉換為Python數據類型(通常是字典)。
data = yaml.safe_load(file)
# 嘗試從加載的數據中獲取特定鍵(key)的值。
# 如果鍵不存在,則默認返回一個空字典{}。
return data.get(key, {})
except FileNotFoundError as e:
# 如果嘗試打開的YAML檔案不存在,則捕獲FileNotFoundError異常。
logging.error(f"YAML檔案未找到: {e}")
# 錯誤日誌記錄後,返回一個空字典{},表示沒有加載到任何數據。
return {}
except Exception as e:
# 如果在加載或處理YAML檔案時發生了其他任何異常,則捕獲通用異常。
logging.error(f"讀取YAML檔案時發生錯誤: {e}")
# 同樣記錄錯誤日誌並返回一個空字典{}。
return {}
yaml_file_path = 'config.yaml'
domain_envs = load_data_from_yaml(yaml_file_path, 'domain_envs')
這裡主要使用 yaml
進行一些讀取操作,使我可以獲取到 domain_envs
。
印出的格式如下:
{'live': ['google.com', 'en.wikipedia.org']}
而要取得到 env 以及各個 domain,則使用 for 迴圈進行迭代,將每個 item 都取得;而 domain 因為有多個所以會再使用一個 for 迴圈進行迭代,並一一進行確認證書狀態。
for env, domains in domain_envs.items():
for domain in domains:
check_ssl_expiration(domain, env, platform, telegram_bot_token, telegram_group_id)
5. 設定環境變數
這部分是使用 os.environ.get
方法,取得環境變數的值
def get_env_variable(name, default_value="未設定"):
"""
從系統環境變數中取得一個值。
參數:
name (str): 環境變數的鍵名。
default_value (str): 如果找不到鍵,則返回的預設值。預設為"未設定"。
返回:
str: 環境變數的值,或者在找不到鍵時返回預設值。
"""
return os.environ.get(name, default_value)
接著在入口函數設定,取得相關環境變數
platform = get_env_variable("PLATFORM")
telegram_bot_token = get_env_variable("TELEGRAM_BOT_TOKEN")
telegram_group_id = get_env_variable("TELEGRAM_GROUP_ID")
6. cloudfunction設定
- 觸發條件選擇 HTTPS,並且須通過驗證
- 執行環境變數,需要將原本從系統取得的環境變數在這裡設定
- 入口函數需要將原先寫在 main function 的部分移到這裡
設定好後可以測試看看是否正常,可在紀錄查看執行結果
7. cloudscheduler 排程設定
要先設定 IAM,允許可呼叫 cloud function,先建立好 service account 後,加入兩個角色,Cloud Functions 管理員以及 Cloud Run 叫用者。
接著就能到 cloudscheduler 進行相關設定。
這樣就完成了監控憑證的小程式了。
🔗專案repo –> ssl-certificate-checker