# -*- coding: utf-8 -*-
"""
Python-SecureHTTP
~~~~~~~~~~~~~~~~~
关于通信过程加密算法的说明:
1. AES加解密::
模式:CBC
密钥长度:128,192,256bit
密钥key:16,24,32bytes(建议使用ASCII编码密钥),初始偏移向量iv固定为key前16个字节
补码方式:PKCS7Padding(在AES中理论上同PKCS5Padding)
加密结果编码方式:十六进制或base64编码
2. RSA加解密::
算法:RSA
填充:RSA_PKCS5_PADDING
密钥格式:符合PKCS#1规范,密钥对采用PEM形式,公钥要求pkcs1或pkcs8格式,私钥要求pkcs1格式
3. 签名::
可选对请求参数或数据添加公共参数后排序再使用摘要算法签名(MD5、SHA1等)
关于参数中字符串说明,若无指定,则:
- py2: str, unicode
- py3: str, bytes
:copyright: (c) 2019 by staugur.
:license: BSD 3-Clause, see LICENSE for more details.
"""
__version__ = "0.5.0"
__author__ = "staugur <staugur@saintic.com>"
__all__ = ["RSAEncrypt", "RSADecrypt", "AESEncrypt", "AESDecrypt",
"EncryptedCommunicationClient", "EncryptedCommunicationServer", "generate_rsa_keys"]
import re
import sys
import json
import time
import copy
import base64
import hashlib
from binascii import b2a_hex, a2b_hex
from Cryptodome.PublicKey import RSA
from Cryptodome.Cipher import AES, PKCS1_v1_5
from Cryptodome.Random import get_random_bytes
from Cryptodome.Util.Padding import pad, unpad
PY2 = sys.version_info[0] == 2
if PY2:
from urllib import quote
else:
from urllib.request import quote
basestring = (str, bytes)
class SecureHTTPException(Exception):
"""异常基类"""
pass
[文档]class SignError(SecureHTTPException):
"""签名异常:加签异常、验签不匹配等"""
pass
[文档]class AESError(SecureHTTPException):
"""AES异常:加密、解密时参数错误"""
pass
[文档]class RSAError(SecureHTTPException):
"""RSA异常:密钥错误、加密解密错误"""
pass
def required_string(string, dst_type=None):
"""Automatically adapts and returns the required string.
:param: string: str,unicode,bytes: Any string for py2, py3+
:param: dst_type: basestring: Required string type, but other string type. Default: unicode for py2, bytes for py3+
:returns: source string(with dst_type)
.. versionadded:: 0.5.0
"""
if isinstance(string, basestring):
if PY2:
if not dst_type or dst_type == "unicode":
# py2 default string type: unicode
if isinstance(string, unicode):
return string
else:
return string.decode('utf-8')
elif dst_type == "str" or dst_type == "bytes":
if isinstance(string, str):
return string
else:
return string.encode('utf-8')
else:
raise ValueError("The param dst_type error, require unicode or str in py2")
else:
if not dst_type or dst_type == "bytes":
# py3 default string type: bytes
if isinstance(string, bytes):
return string
else:
return string.encode("utf-8")
elif dst_type == "str":
if isinstance(string, str):
return string
else:
return string.decode("utf-8")
else:
raise ValueError("The param dst_type error, require str or bytes in py3")
else:
raise TypeError("The param string type error, require %s" % basestring)
[文档]def generate_rsa_keys(incall=False, length=2048, passphrase=None):
"""生成RSA所需的公钥和私钥,公钥格式pkcs8,私钥格式pkcs1。
:param incall: bool: 是否内部调用,默认False表示提供给脚本调用直接打印密钥,True不打印密钥改为return返回
:param length: int: 指定密钥长度,默认1024,需要更强加密可设置为2048
:param passphrase: str: 私钥保护的密码短语
:returns: tuple(public_key, private_key)
"""
if incall is False:
# 命令行调用
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--version", help="Print the SecureHTTP Version", default=False, action='store_true')
parser.add_argument("-l", "--length", default=2048, type=int, choices=[1024, 2048, 3072, 4096], help="Key length, default is 2048.")
parser.add_argument("-p", "--passphrase", default=None, type=str, help="The pass phrase used for protecting the private key.")
parser.add_argument("-w", "--write", help="Write a key pair file in PEM format", default=False, action='store_true')
args = parser.parse_args()
version = args.version
length = args.length
passphrase = args.passphrase
write = args.write
if version:
print("v"+__version__)
exit(0)
print("\033[1;33mGenerating RSA private key, %s bit long modulus.\n\033[0m" % length)
startTime = time.time()
# 开始生成
key = RSA.generate(length)
public_key = key.publickey().exportKey(format="PEM", pkcs=8)
private_key = key.exportKey(format="PEM", passphrase=passphrase, pkcs=1)
# 生成完毕
if incall is False:
print("\033[1;32mSuccessfully generated, with %0.2f seconds.\nPlease save the key pair and don't reveal the private key!\n\033[0m" % float(time.time() - startTime))
if write is True:
public_key_file = "SecureHTTP_pkcs8_public.pem"
private_key_file = "SecureHTTP_pkcs1_private.pem"
with open(public_key_file, "w") as fp:
fp.write(public_key.decode('utf-8'))
with open(private_key_file, "w") as fp:
fp.write(private_key.decode('utf-8'))
print("\033[1;31mRSA PublicKey File: %s, and PrivateKey File: %s\033[0m" % (public_key_file, private_key_file))
else:
print("\033[1;31mRSA PublicKey for PKCS#8:\033[0m\n%s" % public_key.decode('utf-8'))
print("\n\033[1;31mRSA PrivateKey for PKCS#1:\033[0m\n%s" % private_key.decode('utf-8'))
else:
return (public_key, private_key)
[文档]def RSAEncrypt(pubkey, plaintext, output="base64"):
"""RSA公钥加密
:param pubkey: str,bytes: pkcs1或pkcs8格式公钥
:param plaintext: str,bytes: 准备加密的文本消息
:param output: str: Output format: base64 (default), hex (hexadecimal)
:returns: str,unicode: base64编码的字符串
"""
if (not PY2 and isinstance(plaintext, str)) or (PY2 and isinstance(plaintext, unicode)):
plaintext = plaintext.encode("utf-8")
pubkey = RSA.importKey(pubkey)
ciphertext = PKCS1_v1_5.new(pubkey).encrypt(plaintext)
crypted_str = b2a_hex(ciphertext) if output == "hex" else base64.b64encode(ciphertext)
return crypted_str
[文档]def RSADecrypt(privkey, ciphertext, passphrase=None, sentinel="ERROR", input="base64"):
"""RSA私钥解密
:param privkey: str,bytes: pkcs1格式私钥
:param ciphertext: str,bytes: 已加密的消息
:param passphrase: str,bytes: 私钥保护的密码短语
:param sentinel: any type: 检测到错误时返回的标记,默认返回ERROR字符串
:param input: str: Input format: base64 (default) or hex (hexadecimal), refer to the output parameter of :func:`RSAEncrypt`
:returns: str,unicode: 消息原文
"""
privkey = RSA.importKey(privkey, passphrase=passphrase)
ciphertext = a2b_hex(ciphertext) if input == "hex" else base64.b64decode(ciphertext)
plaintext = PKCS1_v1_5.new(privkey).decrypt(ciphertext, sentinel)
return plaintext
[文档]def AESEncrypt(key, plaintext, output="base64", output_type=None):
"""AES Encryption Function.
:param key: basestring: 16 24 32 bytes with ASCII.
:param plaintext: basestring: Plaintext message to be encrypted
:param output: str: Output format: base64 (default), hex (hexadecimal)
:param output_type: basestring: The type of encrypted string to be output, refer to the dst_type parameter of :func:`required_string`
:raises: AESError,ValueError
:returns: Encrypted ciphertext
"""
if key and plaintext:
key = required_string(key, "bytes")
if len(key) not in AES.key_size:
raise AESError("The key type error, resulting in length illegality")
#: Pad fill requires bytes type
padding = pad(required_string(plaintext, "bytes"), AES.block_size)
#: Encrypted in CBC mode, iv is fixed to the first 16 characters of the key
aes = AES.new(key, AES.MODE_CBC, key[:16])
ciphertext = aes.encrypt(padding)
crypted_str = b2a_hex(ciphertext) if output == "hex" else base64.b64encode(ciphertext)
return required_string(crypted_str, output_type)
else:
raise AESError("The key or plaintext is not valid")
[文档]def AESDecrypt(key, ciphertext, input="base64", output_type=None):
"""AES Decryption Function.
:param key: basestring: Refer to the key parameter of :func:`AESEncrypt`
:param ciphertext: basestring: Ciphertext message to be decrypted
:param input: str: Input format: base64 (default) or hex (hexadecimal), refer to the output parameter of :func:`AESEncrypt`
:param output_type: basestring: The type of decrypted string to be output, refer to the dst_type parameter of :func:`required_string`
:raises: AESError,binascii.Error,ValueError,TypeError
:returns: Decrypted plaintext
"""
if key and ciphertext:
key = required_string(key, "bytes")
if len(key) not in AES.key_size:
raise AESError("The key type error, resulting in length illegality")
#: Encrypted in CBC mode, iv is fixed to the first 16 characters of the key
aes = AES.new(key, AES.MODE_CBC, key[:16])
ciphertext = a2b_hex(ciphertext) if input == "hex" else base64.b64decode(ciphertext)
#: Remove fill
plaintext = unpad(aes.decrypt(ciphertext), AES.block_size)
return required_string(plaintext, output_type)
else:
raise AESError("The key or plaintext is not valid")
class EncryptedCommunicationMix(object):
"""加密传输通信基类。
此类封装加密通信过程中所需函数,包括RSA、AES、MD5等,加密传输整个流程是::
客户端上传数据加密 ==> 服务端获取数据解密 ==> 服务端返回数据加密 ==> 客户端获取数据解密
NO.1 客户端上传数据加密流程::
1. 客户端随机产生一个16位的字符串,用以之后AES加密的秘钥,AESKey。
2. 使用RSA对AESKey进行公钥加密,RSAKey。
3. 参数加签,规则是:对所有请求或提交的字典参数按key做升序排列并用"参数名=参数值&"形式连接。
4. 将明文的要上传的数据包(字典/Map)转为Json字符串,使用AESKey加密,得到JsonAESEncryptedData。
5. 封装为{key : RSAKey, value : JsonAESEncryptedData}的字典上传服务器,服务器只需要通过key和value,然后解析,获取数据即可。
NO.2 服务端获取数据解密流程::
1. 获取到RSAKey后用服务器私钥解密,获取到AESKey
2. 获取到JsonAESEncriptedData,使用AESKey解密,得到明文的客户端上传上来的数据。
3. 验签
4. 返回明文数据
NO.3 服务端返回数据加密流程::
1. 将要返回给客户端的数据(字典/Map)进行加签并将签名附属到数据中
2. 上一步得到的数据转成Json字符串,用AESKey加密处理,记为AESEncryptedResponseData
3. 封装数据{data : AESEncryptedResponseData}的形式返回给客户端
NO.4 客户端获取数据解密流程::
1. 客户端获取到数据后通过key为data得到服务器返回的已经加密的数据AESEncryptedResponseData
2. 对AESEncryptedResponseData使用AESKey进行解密,得到明文服务器返回的数据。
"""
#: Set the length of the byte to generate the AESKey
#:
#: .. versionadded:: 0.5.0
BS = AES.block_size
def get_current_timestamp(self):
""" UTC时间 """
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def md5(self, message):
"""MD5签名
:params message: str,unicode,bytes:
:returns: str: Signed message
"""
if not PY2 and isinstance(message, str):
message = message.encode("utf-8")
return hashlib.md5(message).hexdigest()
def sha1(self, message):
"""SHA1签名
:params message: str,unicode,bytes:
:returns: str: Signed message
"""
if not PY2 and isinstance(message, str):
message = message.encode("utf-8")
return hashlib.sha1(message).hexdigest()
def sha256(self, message):
"""SHA256签名
:params message: str,unicode,bytes:
:returns: str: Signed message
"""
if not PY2 and isinstance(message, str):
message = message.encode("utf-8")
return hashlib.sha256(message).hexdigest()
def abstract_algorithm_mapping(self, algorithm="md5"):
"""摘要算法映射表
:param algorithm: str: 算法名称,默认md5,可选md5、sha1、sha256
:returns: method of calculating summary
"""
mapping = dict(md5=self.md5, sha1=self.sha1, sha256=self.sha256)
return mapping.get(algorithm.lower(), self.md5)
def genAesKey(self):
"""生成AES密钥,32字节
:returns: str
"""
return get_random_bytes(self.BS)
def conversionComma(self, comma_str):
"""将字符串comma_str使用正则以逗号分隔
:param comma_str: str: 要分隔的字符串,以英文逗号分隔
:return: list
"""
if comma_str and isinstance(comma_str, basestring):
comma_pat = re.compile(r"\s*,\s*")
comma_str = required_string(comma_str, "str")
return [i for i in comma_pat.split(comma_str) if i]
else:
return []
def sign(self, parameters, meta={}):
""" 参数签名,目前版本请勿嵌套无序数据类型(如嵌套dict、嵌套list中嵌套dict),否则可能造成签名失败!
:param parameters: dict: 请求参数或提交的数据
:param meta: dict: 公共元数据,参与排序加签
:raises: TypeError
:returns: sign message(str) or None
"""
if isinstance(parameters, dict) and isinstance(meta, dict):
signIndex = meta.get("SignatureIndex", None)
SignMethod = meta.get("SignatureMethod", "md5")
# 重新定义要加签的dict
if signIndex is False:
return
elif signIndex and isinstance(signIndex, basestring):
signIndex = self.conversionComma(signIndex)
data = dict()
for k in signIndex:
data[k] = parameters[k]
else:
data = copy.deepcopy(parameters)
# 追加公共参数
for k, v in meta.items():
data[k] = v
# NO.1 参数排序
_my_sorted = sorted(data.items(), key=lambda data: data[0])
# NO.2 排序后拼接字符串
canonicalizedQueryString = ''
for (k, v) in _my_sorted:
canonicalizedQueryString += '%s=%s&' % (self._percent_encode(k), self._percent_encode(v))
# NO.3 加密返回签名: Signature
return self.abstract_algorithm_mapping(SignMethod)(canonicalizedQueryString)
else:
raise TypeError("Invalid sign parameters or meta")
def _percent_encode(self, encodeStr):
try:
encodeStr = json.dumps(encodeStr, sort_keys=True)
except:
raise
if isinstance(encodeStr, bytes):
encodeStr = encodeStr.decode(sys.stdin.encoding or 'utf-8')
res = quote(encodeStr.encode('utf-8'), '')
res = res.replace('+', '%20')
res = res.replace('*', '%2A')
res = res.replace('%7E', '~')
return res
[文档]class EncryptedCommunicationClient(EncryptedCommunicationMix):
"""客户端:主要是公钥加密"""
def __init__(self, PublicKey):
"""初始化客户端请求类
:param PublicKey: str: RSA的pkcs1或pkcs8格式公钥
"""
self.AESKey = self.genAesKey()
self.PublicKey = PublicKey
[文档] def clientEncrypt(self, post, **signargs):
"""客户端发起加密请求通信 for NO.1
:param post: dict: 请求的数据
:param signIndex: str: 参与排序加签的键名,False表示不签名,None时表示加签post中所有数据,非空时请用逗号分隔键名(字符串)
:param signMethod: str: 签名算法,可选md5、sha1、sha256
:returns: dict: {key=RSAKey, value=加密数据}
"""
if not (post and isinstance(post, dict)):
raise TypeError("Invalid post data")
# 深拷贝post
postData = copy.deepcopy(post)
# 使用RSA公钥加密AES密钥获取RSA密文作为密钥
RSAKey = RSAEncrypt(self.PublicKey, self.AESKey)
# 定义元数据
metaData = dict(Timestamp=self.get_current_timestamp(), SignatureVersion="v1", SignatureMethod=signargs.get("signMethod", "md5"), SignatureIndex=signargs.get("signIndex", None))
# 对请求数据签名
metaData.update(Signature=self.sign(postData, metaData))
# 对请求数据填充元信息
postData.update(__meta__=metaData)
# 使用AES加密请求数据
JsonAESEncryptedData = AESEncrypt(self.AESKey, json.dumps(postData, separators=(',', ':')), output_type="str")
return dict(key=RSAKey, value=JsonAESEncryptedData)
[文档] def clientDecrypt(self, encryptedRespData):
"""客户端获取服务端返回的加密数据并解密 for NO.4
:param encryptedRespData: dict: 服务端返回的加密数据,其格式应该是 {data: AES加密数据}
:raises: TypeError,SignError
:returns: 解密验签成功后,返回服务端的消息原文
"""
if encryptedRespData and isinstance(encryptedRespData, dict) and \
"data" in encryptedRespData:
JsonAESEncryptedData = encryptedRespData["data"]
respData = json.loads(AESDecrypt(self.AESKey, JsonAESEncryptedData, output_type="str"))
metaData = respData.pop("__meta__")
Signature = metaData.pop("Signature")
SignData = self.sign(respData, metaData)
if Signature == SignData:
return respData
else:
raise SignError("Signature verification failed")
else:
raise TypeError("Invalid encrypted resp data")
[文档]class EncryptedCommunicationServer(EncryptedCommunicationMix):
"""服务端:主要是私钥解密"""
def __init__(self, PrivateKey):
"""初始化服务端响应类
:param PrivateKey: str: pkcs1格式私钥
"""
self.PrivateKey = PrivateKey
# AESKey是服务端解密时解码的AESKey,即客户端加密时自主生成的AES密钥
self.AESKey = None
[文档] def serverDecrypt(self, encryptedPostData):
"""服务端获取请求数据并解密 for NO.2
:param encryptedPostData: dict: 请求的加密数据
:raises: TypeError,SignError
:returns: 解密后的请求数据原文
"""
if encryptedPostData and isinstance(encryptedPostData, dict) and \
"key" in encryptedPostData and \
"value" in encryptedPostData:
RSAKey = encryptedPostData["key"]
self.AESKey = RSADecrypt(self.PrivateKey, RSAKey)
JsonAESEncryptedData = encryptedPostData["value"]
postData = json.loads(AESDecrypt(self.AESKey, JsonAESEncryptedData, output_type="str"))
metaData = postData.pop("__meta__")
Signature = metaData.pop("Signature")
SignData = self.sign(postData, metaData)
if Signature == SignData:
return postData
else:
raise SignError("Signature verification failed")
else:
raise TypeError("Invalid encrypted post data")
[文档] def serverEncrypt(self, resp, **signargs):
"""服务端返回加密数据 for NO.3
:param resp: dict: 服务端返回的数据,目前仅支持dict
:param signIndex: tuple,list: 参与排序加签的键名,False表示不签名,None时表示加签resp中所有数据,非空时请用逗号分隔键名(字符串)
:param signMethod: str: 签名算法,可选md5、sha1、sha256
:raises: TypeError,ValueError
:returns: dict: 返回dict,格式是 {data: AES加密数据}
"""
if self.AESKey:
if resp and isinstance(resp, dict):
respData = copy.deepcopy(resp)
metaData = dict(Timestamp=self.get_current_timestamp(), SignatureVersion="v1", SignatureMethod=signargs.get("signMethod", "md5"), SignatureIndex=signargs.get("signIndex", None))
metaData.update(Signature=self.sign(respData, metaData))
respData.update(__meta__=metaData)
JsonAESEncryptedData = AESEncrypt(self.AESKey, json.dumps(respData, separators=(',', ':')), output_type="str")
return dict(data=JsonAESEncryptedData)
else:
raise TypeError("Invalid resp data")
else:
raise ValueError("Invalid AESKey")