如何在 AWS 使用 SASL/SCRAM 建立 MSK
👨💻簡介
Amazon Managed Streaming for Apache Kafka(Amazon MSK)是 Amazon 推出的完全代管、具有高可用以及安全的 Apache Kafka 服務。在這篇文章中,會使用 AWS 建立 MSK,並使用 SASL/SCRAM 的驗證方式來完成整個設定。
🎯setup
建立叢集
進入到 MSK 頁面,然後創建叢集
- 叢集設定
創建時選自訂建立,叢集類型我這邊使用的是已佈建,規格可以選最小的,版本使用 AWS 建議的,區域數量為3,達到高可用,最後叢集組態先使用預設即可
- 聯網
聯網的部分可以使用 AWS 幫你建立好的預設 VPC,我在前一步選擇三個可用區,因此會需要有三條不同區域的子網,安全群組也選擇預設的即可
- 安全
安全這邊選擇 SASL/SCRAM 身分驗證,加密靜態資料選擇使用 AWS 受管金鑰,待會會提到
SASL/SCRAM(Simple Authentication andSecurity Layer/ Salted Challenge Response Mechanism),是一種使用帳號和密碼來完成驗證的方式,但它的前提是 Client 端與 Proxy 之間必須使用 TLS 加密。
- 監控和標籤
這裡監控只要選基本的即可
接著等到叢集建立完成後開始設定密鑰,建立叢集大概需要15分鐘
建立KMS Key
AmazonMSK 使用 Amazon Secret Manager 來儲存 MSK 使用的帳號與密碼。在設定 Secret Manager 之前,必須先在 Amazon KMS(Key Management Service) 創建一個使用者託管的 Key。
KMS是 Amazon 託管的密鑰管理服務
- 設定金鑰
建立過程都先選默認的,如果後續要調整可以再改。
- 新增標籤
別名叫kafka-kms
- 定義金鑰管理許可
選擇自己的 IAM user
- 定義金鑰使用許可
一樣選擇自己的 IAM user 即可
回到主頁就可以看到自己的金鑰建立完成了
建立 SecretsManager
Amazon Secrets Manager 是密碼管理服務,可以把敏感資料儲存在這,不需要放在程式裡,可以透過 API 調用取得,避免在程式外洩。 AmazonMSK 通過 Secrets Manager 來儲存帳號與密碼。
這裡要注意一點,使用預設 AWS KMS 金鑰建立的密碼無法與 Amazon MSK 叢集搭配使用。
- 選擇機密類型
在 Secrets Manager 中選擇其他類型的密碼,這邊要打上我們需要驗證的 username 以及 password,加密的金鑰要選擇前面建立的 KMS 密鑰。
- 設定機密
在建立密碼時,密碼名稱必須包含前綴為 AmazonMSK_
。
- 設定輪換
選擇預設即可
關聯密鑰到 MSK Cluster
回到主頁面的屬性,往下滑找到關聯機密,選擇剛剛建立的機密即可
設定權限
建立 topic 之前,我們需要先透過 Broker 去修改帳號的權限,接著開放公開存取。 在相同子網路上建立 ec2,透過私有連線下去修改權限,先去 Client 端主機下載相對應版本的 Kafka,可以透過 [此連結](Apache Kafka)查看。
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
下載完成後解壓縮。
tar -xzf kafka_2.13-3.8.0.tgz
如果機器上沒有安裝 java 也一併安裝
sudo yum -y install java-11
接著要先取得 BootstrapConnectString,將使用者權限設定完成。 在 MSK 主頁面點擊檢視用戶端資訊可以取得 BootstrapConnectString。
再來到 Client 端機器上的 kafka 目錄下,因為在執行權限設定時,需要進行身份驗證,所以需要建立一份驗證檔,取名為 client_sasl.properties
,裡面會有 Secrets Manager 設定的帳號密碼,將這檔案建立在 kafka 目錄下
client_sasl.properties
內容如下
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alan" \
password="alan-secret";
執行以下命令來修改權限,替換 BootStrapConnectString 為剛剛複製的。如果 Cluster 的節點數小於3個,replication-factor要設定小於3。
export TOPIC="*"
export USER="alan"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-1.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096,b-3.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9096"
## 允許使用者所有權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALL \
--topic $TOPIC \
--command-config client_sasl.properties
以下是單獨權限設定
## 授予指定的使用者建立指定的主題。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Create \
--topic $TOPIC \
--command-config client_sasl.properties
## 授予指定使用者對所有群組和指定主題的讀取權限。
## 該使用者可以從指定的主題讀取消息,並且適用於所有消費者群組。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Read --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties
## 授予指定的使用者對指定的主題的寫入權限。
## 該使用者可以向指定的主題發送消息。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Write \
--topic $TOPIC \
--command-config client_sasl.properties
## 授予指定的使用者對所有群組和指定的主題的描述權限。
## 該使用者可以查看主題和群組的元數據信息,例如消費者的偏移量和配置詳情。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation Describe --group="*" \
--topic $TOPIC \
--command-config client_sasl.properties
## 授予指定的使用者對指定的主題的刪除權限。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation DELETE \
--topic $TOPIC \
--command-config client_sasl.properties
## 允許使用者修改資源(如 topic 的配置)。這個權限對於更改 topic partition 數或設定非常重要。
./bin/kafka-acls.sh --bootstrap-server $BootStrapConnectString \
--add --allow-principal "User:$USER" \
--operation ALTER \
--topic $TOPIC \
--command-config client_sasl.properties
設定完權限後,即可開始透過外網建立 topic
開放外網存取
使用外網存取 msk,必須先開啟公開存取,因此須先設定叢集組態檔,將 allow.everyone.if.no.acl.found=false
加上去,讓叢集使用這份組態檔
點擊左側菜單欄的叢集組態 -> 建立叢集組態 -> 加上 allow.everyone.if.no.acl.found=false
接著回到叢集裡面的屬性,替換剛剛新增的設定檔
好了之後回到叢集裡面的屬性,編輯聯網設定,開啟公開存取
使用帳號與密碼連接到MSK Cluster
可以存取外網後,就可以開始測試外部 Client 端是否可以操作了。先到一台需要透過外網存取的機器,先取得外網 ip,將這 ip 加到MSK 的 SG 裡,開放可以連到 9196 port
安裝kafka以及java,並到 kafka 目錄下設定 client_sasl.properties
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="alan" \
password="alan-secret";
首先先建立 topic,一樣使用 Broker 的連線資訊,但是是使用外網,先到叢集那邊複製公有端點
到 kafka 目錄下執行指令建立 topic
export TOPIC="mytopic"
export KAFKA_OPTS="-Xmx256m -Xms256m"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"
./bin/kafka-topics.sh --create --bootstrap-server $BootStrapConnectString \
--replication-factor 3 \
--partitions 1 \
--topic $TOPIC \
--command-config client_sasl.properties
接著開啟兩個終端視窗,使用 kafka-console,讓第一個視窗成為producer
export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"
./bin/kafka-console-producer.sh --broker-list $BootStrapConnectString \
--topic $TOPIC \
--producer.config client_sasl.properties
接著讓第二個視窗成為consumer
export TOPIC="mytopic"
export BootStrapConnectString="b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196"
./bin/kafka-console-consumer.sh --bootstrap-server $BootStrapConnectString \
--topic $TOPIC \
--consumer.config client_sasl.properties
執行後可以看到 Producer 會將訊息傳送出去,並且 Consumer 可以成功接收到訊息
如果要用 python 來測試,須先安裝 boto3 以及 kafka-python package,並且設定好 aws-cli,因為需要取得 secret
pip install boto3
pip install kafka-python
建立一份 producer.py,成為producer
import boto3
import base64
import json
from kafka import KafkaProducer
from kafka import TopicPartition
def get_secret():
secret_name = "AmazonMSK_kfk01"
region_name = "ap-northeast-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
# For a list of exceptions thrown, see
# https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
raise e
secret = get_secret_value_response['SecretString']
return json.loads(secret)
topic = 'mytopic'
secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
producer = KafkaProducer(bootstrap_servers=BootstrapBroker_String,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=secret['username'],
sasl_plain_password=secret['password'],
)
try:
while True:
message = input("請輸入您要發送的消息 (輸入 'exit' 退出): ")
if message.lower() == 'exit':
print("結束輸入,關閉生產者...")
break
# 將用戶輸入的消息編碼為 bytes
producer.send(topic, message.encode('utf-8'))
producer.flush() # 確保消息已經被發送
print("消息發送成功")
except Exception as e:
print(f"發送消息失敗: {e}")
finally:
producer.close()
print("Kafka 生產者已關閉。")
建立一份consumer.py,成為 consumer
import boto3
import base64
import json
from kafka import KafkaConsumer
from kafka import TopicPartition
def get_secret():
secret_name = "AmazonMSK_kfk01"
region_name = "ap-northeast-1"
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
secret = base64.b64decode(get_secret_value_response['SecretBinary'])
return json.loads(secret)
TOPIC = "mytopic"
secret = get_secret()
BootstrapBroker_String = 'b-2-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-3-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196,b-1-public.awskafka.ii6ahb.c4.kafka.ap-northeast-1.amazonaws.com:9196'
consumer = KafkaConsumer(TOPIC, bootstrap_servers=BootstrapBroker_String,
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username=secret['username'],
sasl_plain_password=secret['password']
)
print("開始接收消息...")
try:
for message in consumer:
print(f"接收到消息: {message.value.decode('utf-8')}")
except Exception as e:
print(f"接收消息失敗: {e}")
finally:
consumer.close()
📚Reference
- 開始使用 Amazon MSK - Amazon Managed Streaming for Apache Kafka
- 使用 AWS Secrets Manager 登入認證 - Amazon Managed Streaming for Apache Kafka
- 在AWS MSK中启用用户名密码认证及ACL设置 - 简书 (jianshu.com)
- apache kafka - TOPIC_AUTHORIZATION_FAILED Topic authorization failed error from MSK - Stack Overflow
- aws-msk-托管kafka集群的简单使用(VPC内部访问:无验证和SASL认证)_aws msk如何使用-CSDN博客
- aws msk加密方式和问控制连接方式-CSDN博客
- 公用存取 - Amazon Managed Streaming for Apache Kafka
- 自訂 MSK 組態 - Amazon Managed Streaming for Apache Kafka