Module plibs.third

Expand source code
import base64
import json
from urllib.parse import quote

import alisms
import httpx
import requests
from aip import AipOcr
from cacheout import Cache
from httpx import QueryParams


class BaiduAip:
    """
    百度智能云
    """
    __id_card_url = 'https://aip.baidubce.com/rest/2.0/ocr/v1/idcard'
    __auth_url = 'https://aip.baidubce.com/oauth/2.0/token'

    @staticmethod
    def get_result(key):
        return {
                   'normal': '识别正常',
                   'reversed_side': '未摆正身份证',
                   'non_idcard': '上传的图片中不包含身份证',
                   'blurred': '身份证模糊',
                   'over_exposure': '身份证关键字段反光或过曝',
                   'unknown': '未知状态',
               }.get(key) or '未知状态'

    def __init__(self, app_id, api_key, secret_key):
        self.app_id = app_id
        self.api_key = api_key
        self.secret_key = secret_key
        self.client = AipOcr(app_id, api_key, secret_key)

    def id_card(self, img_url, id_card_side='front'):
        """
        身份证图片识别
        """
        image = requests.get(img_url).content
        options = dict(detect_risk=True)
        ret = self.client.idcard(image, id_card_side, options)
        return ret

    @property
    async def _access_token(self):
        cache = Cache()
        key = 'access_token'
        access_token = cache.get(key)
        if not access_token:
            data = dict(
                grant_type='client_credentials',
                client_id=self.api_key,
                client_secret=self.secret_key
            )
            ret = await httpx.post(url=self.__auth_url, data=data)
            ret = ret.json()
            access_token = ret.get('access_token')
            cache.set('access_token', access_token, ret.get('expires_in'))
        return access_token

    async def async_id_card(self, img_url, id_card_side='front'):
        """
        身份证图片识别 异步方法
        """
        image = await httpx.get(img_url)
        content = image.content
        options = dict(detect_risk=True)
        data = {'image': base64.b64encode(content).decode(), 'id_card_side': id_card_side}
        data.update(options)
        access_token = await self._access_token
        ret = await httpx.post(url=self.__id_card_url, params=QueryParams(access_token=access_token), data=data)
        return ret


class AliSms:
    """
    阿里云短信发送
    """

    def __init__(self, access_key_id, access_key_secret, region_id='cn-hangzhou'):
        self.access_key_id = access_key_id
        self.access_key_secret = access_key_secret
        self.client = alisms.AliyunSMS(access_key_id=access_key_id,
                                       access_key_secret=access_key_secret,
                                       region_id=region_id)

    def send_verify_code(self, sign_name, template_code, phone, code):
        """
        发送短信验证码
        """
        params = {
            'code': code
        }
        result = self.send(sign_name, template_code, phone, params)
        return result

    def send(self, sign_name, template_code, phone, params):
        """
        发送短信
        """
        result = self.client.send_sms(phone, sign_name, template_code, params)
        return json.loads(result.text)

    async def async_send(self, sign_name, template_code, phone, params):
        """
        异步发送短信
        """
        template_params = json.dumps(params, separators=(',', ':'))  # Must not have whitespace
        self.client._form_bus_params(business='SendSms', PhoneNumbers=phone, SignName=sign_name,
                                     TemplateCode=template_code, TemplateParam=template_params)
        self.client._sms_params = self.client._sort_params(self.client._sms_params)
        signature = self.client.generate_signature()
        self.client._sms_params['Signature'] = signature
        self.client._sms_params.move_to_end('Signature', last=False)  # Move this param to the top
        final_url = self.client._config.get('host') + '?' + '&'.join(
            ['{}={}'.format(key, quote(val, safe='')) for key, val in self.client._sms_params.items()])
        ret = await httpx.get(final_url)
        return ret.json()

    async def async_send_verify_code(self, sign_name, template_code, phone, code):
        """
        发送短信验证码
        """
        params = {
            'code': code
        }
        result = await self.async_send(sign_name, template_code, phone, params)
        return result

Classes

class AliSms (access_key_id, access_key_secret, region_id='cn-hangzhou')

阿里云短信发送

Expand source code
class AliSms:
    """
    阿里云短信发送
    """

    def __init__(self, access_key_id, access_key_secret, region_id='cn-hangzhou'):
        self.access_key_id = access_key_id
        self.access_key_secret = access_key_secret
        self.client = alisms.AliyunSMS(access_key_id=access_key_id,
                                       access_key_secret=access_key_secret,
                                       region_id=region_id)

    def send_verify_code(self, sign_name, template_code, phone, code):
        """
        发送短信验证码
        """
        params = {
            'code': code
        }
        result = self.send(sign_name, template_code, phone, params)
        return result

    def send(self, sign_name, template_code, phone, params):
        """
        发送短信
        """
        result = self.client.send_sms(phone, sign_name, template_code, params)
        return json.loads(result.text)

    async def async_send(self, sign_name, template_code, phone, params):
        """
        异步发送短信
        """
        template_params = json.dumps(params, separators=(',', ':'))  # Must not have whitespace
        self.client._form_bus_params(business='SendSms', PhoneNumbers=phone, SignName=sign_name,
                                     TemplateCode=template_code, TemplateParam=template_params)
        self.client._sms_params = self.client._sort_params(self.client._sms_params)
        signature = self.client.generate_signature()
        self.client._sms_params['Signature'] = signature
        self.client._sms_params.move_to_end('Signature', last=False)  # Move this param to the top
        final_url = self.client._config.get('host') + '?' + '&'.join(
            ['{}={}'.format(key, quote(val, safe='')) for key, val in self.client._sms_params.items()])
        ret = await httpx.get(final_url)
        return ret.json()

    async def async_send_verify_code(self, sign_name, template_code, phone, code):
        """
        发送短信验证码
        """
        params = {
            'code': code
        }
        result = await self.async_send(sign_name, template_code, phone, params)
        return result

Methods

async def async_send(self, sign_name, template_code, phone, params)

异步发送短信

Expand source code
async def async_send(self, sign_name, template_code, phone, params):
    """
    异步发送短信
    """
    template_params = json.dumps(params, separators=(',', ':'))  # Must not have whitespace
    self.client._form_bus_params(business='SendSms', PhoneNumbers=phone, SignName=sign_name,
                                 TemplateCode=template_code, TemplateParam=template_params)
    self.client._sms_params = self.client._sort_params(self.client._sms_params)
    signature = self.client.generate_signature()
    self.client._sms_params['Signature'] = signature
    self.client._sms_params.move_to_end('Signature', last=False)  # Move this param to the top
    final_url = self.client._config.get('host') + '?' + '&'.join(
        ['{}={}'.format(key, quote(val, safe='')) for key, val in self.client._sms_params.items()])
    ret = await httpx.get(final_url)
    return ret.json()
async def async_send_verify_code(self, sign_name, template_code, phone, code)

发送短信验证码

Expand source code
async def async_send_verify_code(self, sign_name, template_code, phone, code):
    """
    发送短信验证码
    """
    params = {
        'code': code
    }
    result = await self.async_send(sign_name, template_code, phone, params)
    return result
def send(self, sign_name, template_code, phone, params)

发送短信

Expand source code
def send(self, sign_name, template_code, phone, params):
    """
    发送短信
    """
    result = self.client.send_sms(phone, sign_name, template_code, params)
    return json.loads(result.text)
def send_verify_code(self, sign_name, template_code, phone, code)

发送短信验证码

Expand source code
def send_verify_code(self, sign_name, template_code, phone, code):
    """
    发送短信验证码
    """
    params = {
        'code': code
    }
    result = self.send(sign_name, template_code, phone, params)
    return result
class BaiduAip (app_id, api_key, secret_key)

百度智能云

Expand source code
class BaiduAip:
    """
    百度智能云
    """
    __id_card_url = 'https://aip.baidubce.com/rest/2.0/ocr/v1/idcard'
    __auth_url = 'https://aip.baidubce.com/oauth/2.0/token'

    @staticmethod
    def get_result(key):
        return {
                   'normal': '识别正常',
                   'reversed_side': '未摆正身份证',
                   'non_idcard': '上传的图片中不包含身份证',
                   'blurred': '身份证模糊',
                   'over_exposure': '身份证关键字段反光或过曝',
                   'unknown': '未知状态',
               }.get(key) or '未知状态'

    def __init__(self, app_id, api_key, secret_key):
        self.app_id = app_id
        self.api_key = api_key
        self.secret_key = secret_key
        self.client = AipOcr(app_id, api_key, secret_key)

    def id_card(self, img_url, id_card_side='front'):
        """
        身份证图片识别
        """
        image = requests.get(img_url).content
        options = dict(detect_risk=True)
        ret = self.client.idcard(image, id_card_side, options)
        return ret

    @property
    async def _access_token(self):
        cache = Cache()
        key = 'access_token'
        access_token = cache.get(key)
        if not access_token:
            data = dict(
                grant_type='client_credentials',
                client_id=self.api_key,
                client_secret=self.secret_key
            )
            ret = await httpx.post(url=self.__auth_url, data=data)
            ret = ret.json()
            access_token = ret.get('access_token')
            cache.set('access_token', access_token, ret.get('expires_in'))
        return access_token

    async def async_id_card(self, img_url, id_card_side='front'):
        """
        身份证图片识别 异步方法
        """
        image = await httpx.get(img_url)
        content = image.content
        options = dict(detect_risk=True)
        data = {'image': base64.b64encode(content).decode(), 'id_card_side': id_card_side}
        data.update(options)
        access_token = await self._access_token
        ret = await httpx.post(url=self.__id_card_url, params=QueryParams(access_token=access_token), data=data)
        return ret

Static methods

def get_result(key)
Expand source code
@staticmethod
def get_result(key):
    return {
               'normal': '识别正常',
               'reversed_side': '未摆正身份证',
               'non_idcard': '上传的图片中不包含身份证',
               'blurred': '身份证模糊',
               'over_exposure': '身份证关键字段反光或过曝',
               'unknown': '未知状态',
           }.get(key) or '未知状态'

Methods

async def async_id_card(self, img_url, id_card_side='front')

身份证图片识别 异步方法

Expand source code
async def async_id_card(self, img_url, id_card_side='front'):
    """
    身份证图片识别 异步方法
    """
    image = await httpx.get(img_url)
    content = image.content
    options = dict(detect_risk=True)
    data = {'image': base64.b64encode(content).decode(), 'id_card_side': id_card_side}
    data.update(options)
    access_token = await self._access_token
    ret = await httpx.post(url=self.__id_card_url, params=QueryParams(access_token=access_token), data=data)
    return ret
def id_card(self, img_url, id_card_side='front')

身份证图片识别

Expand source code
def id_card(self, img_url, id_card_side='front'):
    """
    身份证图片识别
    """
    image = requests.get(img_url).content
    options = dict(detect_risk=True)
    ret = self.client.idcard(image, id_card_side, options)
    return ret