# -*- coding: utf-8 -*-
import logging, time, json
from aliyunsdkcore import client
from aliyunsdkram.request.v20150501.CreateAccessKeyRequest import CreateAccessKeyRequest
from aliyunsdkram.request.v20150501.DeleteAccessKeyRequest import DeleteAccessKeyRequest
from aliyunsdkkms.request.v20160120.EncryptRequest import EncryptRequest
from aliyunsdkkms.request.v20160120.DecryptRequest import DecryptRequest
from aliyunsdkcore.auth.credentials import StsTokenCredential
# ak Encrypt content
AK_CiphertextBlob = "NmQyY2ZhODMtMTlhYS00MTNjLTlmZjAtZTQxYTFiYWVmMzZmM1B1NXhTZENCNXVWd1dhdTNMWVRvb3V6dU9QcVVlMXRBQUFBQUFBQUFBQ3gwZTkzeGhDdHVzMWhDUCtZeVVuMWlobzlCa3VxMlErOXFHWWdXXXHELLwL1NSZTFvUURYSW9lak5Hak1lMnF0R2I1TWUxMEJiYmkzVnBwZHlrWGYzc3kyK2tQbGlKb2lHQ3lrZUdieHN2eXZwSVYzN2Qyd1cydz09"
USER_NAME = "ls-test" # sub-account name
LOGGER = logging.getLogger()
def handler(event, context):creds = context.credentialssts_token_credential = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)# this demo ecs and function in same region, if not in same region, you need change region_id to your ecs instance's region_idclt = client.AcsClient(region_id=context.region, credential=sts_token_credential)request = DecryptRequest()request.set_CiphertextBlob(AK_CiphertextBlob)response = _send_request(clt, request)ak_info = json.loads(response.get("Plaintext","{}"))if not ak_info:return "KMS Decrypt ERROR"ak_id = ak_info["ak_id"]ak_secret = ak_info["ak_secret"]LOGGER.info("Decrypt sucessfully with key id: {}".format(response.get("KeyId","{}")))clt2 = client.AcsClient(ak_id, ak_secret, context.region)request = CreateAccessKeyRequest()request.set_UserName(USER_NAME) # 給子賬號ls-test創建AKresponse = _send_request(clt2, request)create_ak_id = response.get("AccessKey",{}).get("AccessKeyId")if not create_ak_id:returnLOGGER.info("create ak {} sucess!".format(create_ak_id))time.sleep(10)request = DeleteAccessKeyRequest()request.set_UserName(USER_NAME) request.set_UserAccessKeyId(create_ak_id)response = _send_request(clt2, request)LOGGER.info("delete ak {} sucess!".format(create_ak_id))return "OK"# send open api request
def _send_request(clt, request):request.set_accept_format('json')try:response_str = clt.do_action_with_exception(request)LOGGER.debug(response_str)response_detail = json.loads(response_str)return response_detailexcept Exception as e:LOGGER.error(e)
AK 存在環境變量版本
# -*- coding: utf-8 -*-
import os, logging, time, json
from aliyunsdkcore import client
from aliyunsdkram.request.v20150501.CreateAccessKeyRequest import CreateAccessKeyRequest
from aliyunsdkram.request.v20150501.DeleteAccessKeyRequest import DeleteAccessKeyRequest
USER_NAME = "ls-test" # sub-account name
LOGGER = logging.getLogger()
def handler(event, context):ak_id = os.environ['AK_ID']ak_secret = os.environ['AK_SECRET']clt = client.AcsClient(ak_id, ak_secret, context.region)request = CreateAccessKeyRequest()request.set_UserName(USER_NAME) # 給子賬號USER_NAME創建AKresponse = _send_request(clt, request)create_ak_id = response.get("AccessKey", "").get("AccessKeyId")if not create_ak_id:returnLOGGER.info("create ak {} sucess!".format(create_ak_id))time.sleep(5)request = DeleteAccessKeyRequest()request.set_UserName(USER_NAME) request.set_UserAccessKeyId(create_ak_id)response = _send_request(clt, request)LOGGER.info("delete ak {} sucess!".format(create_ak_id))return "OK"# send open api request
def _send_request(clt, request):request.set_accept_format('json')try:response_str = clt.do_action_with_exception(request)LOGGER.info(response_str)response_detail = json.loads(response_str)return response_detailexcept Exception as e:LOGGER.error(e)
# -*- coding: utf-8 -*-
import logging
import json, random, string, time
from aliyunsdkcore import client
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeSecurityGroupAttributeRequest import DescribeSecurityGroupAttributeRequest
from aliyunsdkcore.auth.credentials import StsTokenCredential
LOGGER = logging.getLogger()
clt = None
# 需要檢查的ecs列表, 修改成你的ecs id 列表
ECS_INST_IDS = ["i-uf6h07zdscdg9g55zkxx", "i-uf6bwkxfxh847a1e2xxx"]
def handler(event, context):creds = context.credentialsglobal cltsts_token_credential = StsTokenCredential(creds.access_key_id, creds.access_key_secret, creds.security_token)# this demo ecs and function in same region, if not in same region, you need change region_id to your ecs instance's region_idclt = client.AcsClient(region_id=context.region, credential=sts_token_credential)invalid_perssions = {}for ecs_id in ECS_INST_IDS:ret = check_and_modify_security_rule(ecs_id)if ret:invalid_perssions[ecs_id] = retreturn invalid_perssions
def check_and_modify_security_rule(instance_id):LOGGER.info("check_and_modify_security_rule, instance_id is %s ", instance_id)request = DescribeInstancesRequest()request.set_InstanceIds(json.dumps([instance_id]))response = _send_request(request)SecurityGroupIds = []if response is not None:instance_list = response.get('Instances', {}).get('Instance')for item in instance_list:SecurityGroupIds = item.get('SecurityGroupIds', {}).get("SecurityGroupId", [])breakif not SecurityGroupIds:LOGGER.error("ecs {} do not have SecurityGroupIds".format(instance_id))return invalid_perssions = []for sg_id in SecurityGroupIds:request = DescribeSecurityGroupAttributeRequest()request.set_SecurityGroupId(sg_id)response = _send_request(request)LOGGER.info("Find a securityGroup id {}".format(sg_id))permissions = response.get("Permissions", {}).get("Permission",[])if not permissions:continuefor permission in permissions:if permission["Direction"] == "ingress" and permission["SourceCidrIp"] == "0.0.0.0/0":LOGGER.error("ecs {0} , SecurityGroup id {1}, have a risk, need fix; permission = {2}".format(instance_id, sg_id, permission))invalid_perssions.append(permission)return invalid_perssions
# send open api request
def _send_request(request):request.set_accept_format('json')try:response_str = clt.do_action_with_exception(request)LOGGER.debug(response_str)response_detail = json.loads(response_str)return response_detailexcept Exception as e:LOGGER.error(e)