Telegram 憑證監控機器人實作 EP1 - 讀取 MongoDB Domain Info
👨💻 簡介
上次做的憑證監控已經可以正常運作了,但這次希望能夠不從 yaml 讀取 domain info,而是從 MongoDB 進行讀取,方便未來的擴充性。
這次的重點是要透過 Python 連接 MongoDB,並且透過 Python 讀取 MongoDB,最後透過 Python 寫入 MongoDB。
🛠️ 使用工具
- Python 3.9.6
- MongoDB
- Mongoshell
- Docker
- Docker-Compose
📝 功能需求
- 建立 MongoDB docker-compose
- 透過 Python 連接 MongoDB
- 透過 Python 讀取 yaml 並寫入 MongoDB
- 透過 Python 傳入 env 以及 domain 寫入 MongoDB
- 透過 Python 讀取 MongoDB
- 透過 Python 修改 MongoDB
- 透過 Python 刪除 MongoDB
🎯Setup
- 建立 MongoDB docker-compose 要簡單使用 MongoDB 可以用 docker-compose 快速拉起:
version: "3.1"
services:
mongodb:
image: mongo:latest
container_name: mongodb_container
environment:
MONGO_INITDB_ROOT_USERNAME: rootuser
MONGO_INITDB_ROOT_PASSWORD: rootpass
ports:
- "27017:27017"
volumes:
- mongodb_data_container:/data/db
volumes:
mongodb_data_container:
docker-compose up -d
- 透過 Python 連接 MongoDB 在 Python Library 中,可以使用 PyMongo操作 MongoDB。 要使用這個 Library 要先透過安裝:
python3 -m pip install pymongo
接著可以簡單操作一筆資料看是否正常:
# 導入 pymongo 模塊中的 MongoClient 類,用於與 MongoDB 資料庫進行連接
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
# 定義 MongoDB 連接 URI,包含用戶名、密碼、服務器地址、端口和資料庫名
mongodb_uri = "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"
try:
# 嘗試連接 MongoDB
client = MongoClient(mongodb_uri)
# 嘗試獲取服務器訊息,以確認連接
info = client.server_info() # 會在連接失敗時拋出 ConnectionFailure 異常
print("MongoDB 連接成功。Server Info:", info)
except ConnectionFailure:
print("MongoDB 連接失敗。請檢查您的連接設置和Server狀態。")
輸出如下:
MongoDB 連接成功。Server Info: {'version': '7.0.8', 'gitVersion':
'c5d33e55ba38d98e2f48765ec4e55338d67a4a64', 'modules': [], 'allocator': 'tcmalloc',
'javascriptEngine': 'mozjs', 'sysInfo': 'deprecated', 'versionArray': [7, 0, 8, 0], 'openssl':
{'running': 'OpenSSL 3.0.2 15 Mar 2022', 'compiled': 'OpenSSL 3.0.2 15 Mar 2022'},
'buildEnvironment': {'distmod': 'ubuntu2204', 'distarch': 'aarch64', 'cc': '/opt/mongodbtoolchain/v4/
bin/gcc: gcc (GCC) 11.3.0', 'ccflags': '-Werror -include mongo/platform/basic.h -ffp-contract=off
...
稍微包裝一下:
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
mongodb_uri = "mongodb://rootuser:rootpass@localhost:27017/mydatabase?authSource=admin"
def init_mongo_client(mongodb_uri):
try:
# 嘗試連接 MongoDB
client = MongoClient(mongodb_uri)
# 嘗試獲取服務器訊息,以確認連接
info = client.server_info() # 會在連接失敗時拋出 ConnectionFailure 異常
mongodb_version = info['version']
print("MongoDB 連接成功。Mongo 版本為", mongodb_version)
return client
except ConnectionFailure:
print("MongoDB 連接失敗。請檢查您的連接設置和Server狀態。")
client = init_mongo_client(mongodb_uri)
這樣就代表我們成功連線到 MongoDB 了。
- 透過 Python 讀取 yaml 並寫入 MongoDB
接下來我想要將讀取的 domains.yaml 寫入 M ongo ,會用到之前的
load_data_from_yaml
:
client = init_mongo_client(mongodb_uri)
db = client.get_default_database()
# 定義要操作的集合名稱
collection_name = "domains"
collection = db[collection_name]
yaml_file_path = "domains.yaml"
domain_data = load_data_from_yaml(yaml_file_path, "domain_envs")
# 每次執行迴圈都會取得一個鍵值對,env 是 key,格式為 string,
# 而 value 則會是 domains,格式是 list
for env, domains in domain_data.items():
print(f"Writing domains for env {env}: {domains}")
# 這裡組成一個 document,方便儲存到 Mongo 裡
document = {
"env": env,
"domains": domains
}
# 更新條件,匹配那些 env 字段等於當前 env 值的 document
query = {"env": env}
# 將匹配的 document 的內容設為 `document` 字典中的內容
# 使用 upsert=True,如果不存在則插入,存在則更新
collection.update_one(query, {"$set": document}, upsert=True)
print("資料已成功寫入 MongoDB。")
在 Mongo 中,如果指定的集合 (collection) 不存在,當你進行第一次寫入操作(如插入文檔)時,MongoDB 會自動創建這個集合。
集合的創建是懶惰的(lazy),意味著直到你對集合進行了第一次寫入操作(例如使用 `insert_one`、`insert_many`、`update_one` 等方法),集合才會被實際創建。
這種設計使得在 MongoDB 中處理集合非常靈活,你不需要事先創建集合就可以開始開發和測試你的應用程序。MongoDB 會根據需要自動管理集合的創建。
儲存要注意的地方是格式,這裡因為使用的是 Mongo,所以我們這邊將讀取的格式組成一個 document。
在 Mongo 中,每個 document 都是以 BSON(Binary JSON) 格式儲存,這是一種類似於 JSON 的格式。
每個文檔都是由鍵值對(key-value pairs)組成的資料結構,其中每個鍵(key)是一個字符串,而值(value)可以是不同類型的資料類型,包括但不限於字符串、數字、布爾值、列表(在 BSON 中稱為)、甚至是嵌套的文檔。
可參考官方文檔[官方文檔](https://www.mongodb.com/docs/manual/reference/bson-types/#bson-types)。
update_one
基本語法如下:
collection.update_one(filter, update, upsert=False)
filter
:一個字典,用於指定查詢條件,以匹配需要更新的文檔。update
:一個字典,用於指定如何更新匹配的文檔。這通常涉及到 MongoDB 的更新操作符,如$set
、$unset
等。upsert
:一個可選的 boolean 值,默認為False
。如果設置為True
,當沒有文檔匹配filter
查詢條件時,update
操作將會作為一個新的 doc 被插入到集合中。
寫入成功後可以透過 MongoShell 或是 Mongo Compass,這邊使用 MongoShell,在 docker-compose.yaml 添加 MongoShell 的 service:
services:
mongoshell:
image: mongo:latest
container_name: mongodb_mongoshell
depends_on:
- mongodb
entrypoint:
[
"mongosh",
"--host",
"mongodb",
"--username",
"rootuser",
"--password",
"rootpass",
"--authenticationDatabase",
"admin",
]
stdin_open: true
tty: true
接著透過以下指令進入 mongoshell:
docker-compose run mongoshell
要查看 collection 需執行以下 query:
use mydatabase
db.domains.find({})
這段代碼用意如下:
- 選擇 mydatabase db
- 查詢 domains collection 中所有的 document
輸出類似以下訊息:
[
{
_id: ObjectId('66150a22e1a8ac17b898a2f0'),
env: 'live',
domains: [ 'google.com', 'en.wikipedia.org' ]
}
]
讓我們稍微優化一下程式碼:
def write_domain_data_to_mongodb(mongo_client, collection_name, domain_data):
db = mongo_client.get_default_database()
collection = db[collection_name]
for env, domains in domain_data.items():
document = {
"env": env,
"domains": domains
}
# 更新條件,這裡假設 env 是唯一的
query = {"env": env}
# 使用 upsert=True,如果不存在則插入,存在則更新
collection.update_one(query, {"$set": document}, upsert=True)
print("數據已成功寫入 MongoDB。")
client = init_mongo_client(mongodb_uri)
collection_name = "domains"
yaml_file_path = "domains.yaml"
domain_data = load_data_from_yaml(yaml_file_path, "domain_envs")
write_domain_data_to_mongodb(client, collection_name, domain_data)
這樣就完成了從 yaml 進行寫入,接著需要撰寫另一套,透過傳入 env 以及 domain 資訊來進行寫入。
- 透過 Python 傳入 env 以及 domain 寫入 MongoDB 會需要這個功能是因為之後要透過 TG Bot 傳入 env 以及 domain 進行寫入的操作:
def add_domain_to_mongodb(collection, env, domain):
# 嘗試添加或更新該 env 的域名
result = collection.update_one(
{"env": env}, {"$addToSet": {"domains": domain}}, upsert=True
)
if result.matched_count > 0 or result.upserted_id is not None:
print("域名已成功添加或更新。")
return True
else:
print("域名添加或更新失敗。")
return False
client = init_mongo_client(mongodb_uri)
db = client.get_default_database()
collection_name = "domains"
collection = db[collection_name]
add_env = "dev"
add_domain = "test.com"
add_domain_to_mongodb(collection, add_env, add_domain)
使用 addToSet
是為了確保新增時,如果已存在不會重複新增,確保每個 domain 的唯一性。
新增的部分告一段落,接著來進行讀取的部分。
- 透過 Python 讀取 MongoDB
讀取可以透過
collection.find({})
進行查詢:
db = client.get_default_database()
collection = db[collection_name]
try:
domain_envs = {}
data = collection.find({})
for item in data:
env = item.get("env")
domains = item.get("domains", [])
if env and domains:
domain_envs[env] = domains
print(domain_envs)
except Exception as e:
print(f"從 MongoDB 讀取數據失敗: {e}")
finally:
client.close()
接著優化一下代碼:
def load_domain_envs_from_mongodb(mongo_client, collection_name):
db = mongo_client.get_default_database()
collection = db[collection_name]
try:
domain_envs = {}
data = collection.find({})
for item in data:
env = item.get("env")
domains = item.get("domains", [])
if env and domains:
domain_envs[env] = domains
return domain_envs
except Exception as e:
print(f"從 MongoDB 讀取數據失敗: {e}")
return {}
finally:
client.close()
這樣就算是完成了基本的讀取,接下來要做取得單一 domain 資訊:
def get_domain_from_mongodb(collection, env, domain):
# 構造查詢條件
query = {"env": env, "domains": domain}
# 執行查詢操作
result = collection.find_one(query)
print("get result",result)
if result:
# 找到了相應的文檔,返回域名訊息
print(f"在環境 '{env}' 下找到域名 '{domain}' 的訊息。")
return result
else:
# 沒有找到相應的文檔
print(f"在環境 '{env}' 下未找到域名 '{domain}' 的訊息。")
return None
client = init_mongo_client(mongodb_uri)
db = client.get_default_database()
collection_name = "domains"
collection = db[collection_name]
get_env = "live"
get_domain = "google.com"
get_domain_from_mongodb(collection, get_env, get_domain)
讀取的部分告一段落,接著來進行修改的部分。
- 透過 Python 修改 MongoDB 假設我們目前的 MongoDB 資料如下:
[
{
_id: ObjectId('66150a22e1a8ac17b898a2f0'),
env: 'live',
domains: [ 'google.com', 'en.wikipedia.org' ]
}
]
我打算將 google.com
改成 github.com
,只需要將原本用來新增的 update_one
裡的 query
多一個 domains
欄位:
def update_domain_in_mongodb(collection, env_value, old_domain, new_domain):
# 建立查詢條件和更新動作
query = {"env": env_value, "domains": old_domain}
update_action = {"$set": {"domains.$": new_domain}}
# 執行更新操作
update_result = collection.update_many(query, update_action)
if update_result.matched_count > 0:
print(
f"成功更新文檔。匹配數量: {update_result.matched_count}, 修改數量: {update_result.modified_count}."
)
else:
print("未找到匹配的文檔或域名,更新未執行。")
db = client.get_default_database()
collection = db[collection_name]
env_value = "live"
origin_domain = "google.com"
new_domain = "github.com"
update_domain_in_mongodb(collection, env_value, origin_domain, new_domain)
這裡有用到 $
佔位符,主要是將查詢語句的第一個值來做 update,
會先查找 env
等於 env_value
並且 domains
等於 old_domain
, 接著將 domains
list 中第一個匹配的 old_domain
元素更新為 new_domain
。
可參考官網的操作符文檔。
- 透過 Python 刪除 MongoDB
最後一步要做的是刪除,能夠在指定的
env
刪除domain
:
def delete_domain_in_mongodb(collection, env_value, domain_to_delete):
query = {"env": env_value}
delete_action = {"$pull": {"domains": domain_to_delete}}
collection.update_one(query, delete_action)
db = client.get_default_database()
collection = db[collection_name]
env_value = "live"
delete_domain = "github.com"
delete_domain_in_mongodb(collection, env_value, delete_domain)
刪除一樣用 update
方法,然後使用 $pull
操作符,刪除指定的項目,官方文檔在這。
MongoDB 操作的部分就先告一段落,下篇會教如何打造屬於自己的 Telegram Bot,讓我的機器人能夠接收指令。
如果想看完整程式碼可以參考這裡 🔗 專案 repo –> ep1-mongo-setup