Skip to content

Certification rules: Limited number of certification failure#

Features#

After the number of failures of the user exceeds the limit of authentication,Expand the user certification voucher form,Insert secondary authentication factors,And when the user initiates the certification request again, the subordinate authentication factors are verified

precondition#

The number of certification failure limit rules plug -in require,The main authentication factor is to have login/register/Authentic factors for reset passwords and other main functions,The primary authentication factors are mainly authentication factors to supplement the authentication process through authentication rules,Taking the user name and password authentication factors and graphic verification code authentication factors as an example。

Configuration guide#

Enter through the menu bar on the left【Tenant management】->【Plug -in management】,Find the number of authentication times in the plug -in lease page to limit the rules of the plug -in card,Click to rent
vEbUde.png

Enter through the menu bar on the left【Certification management】-> 【Certification rules】,Click to create button,Type selection"retry_times",Choose the default password authentication factor of the main authentication factors,Select the default graphic verification code authentication factors for sub -authentication factors,The configuration is completed
vEb7LT.md.png

After the configuration is complete,When the user enters the login interface and repeats three times,The page will refresh and enable the graphics verification code
vEqeSI.png

Implementation#

  • Certification rules: Limited number of certification failure:
sequenceDiagram
    participant D as user
    participant C as Platform core
    participant A as Certification failure number limit rules plugin

    C->>A: Loading plug -in
    A->>C: Register and monitor the incident Create_LOGIN_PAGE_RULES,AUTH_FAIL,BEFORE_AUTH
    D->>C: Visit Registration/Log in/Reset password page
    C->>A: Send Create_LOGIN_PAGE_Rules event
    A->>C: Response event,Determine whether to meet the rules,If the rules are satisfied, it will trigger Authrule_FIX_LOGIN_Page event
    C->>D: Rendering/Log in/Reset password page
    D->>C: Enter certification voucher,Initiate certification requests
    C->>A: Triggering before_Auth event
    A->>C: Response event,Determine whether to meet the rules,If the rules are satisfied, it will trigger Authrule_CHECK_AUTH_Data event,Check and return the result
    C->>A: test result,If the certification is not completed,Trigger amh_Fail event
    A->>C: Response event,Record the number of failures and determine whether to refresh the page
    C->>D: By rendering or refresh the page according to the return result

Abstract method implementation#

Code#

extension_root.com_longgui_auth_rule_retry_times.AuthRuleRetryTimesExtension (AuthRuleExtension) #

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
class AuthRuleRetryTimesExtension(AuthRuleExtension):

    def load(self):
        super().load()
        self.create_extension_config_schema()
        self.listen_event(AUTH_FAIL,self.auth_fail)
        self.listen_event(BEFORE_AUTH,self.before_auth)
        self.listen_event(AUTH_SUCCESS,self.auth_success)

        # 配置初始数据
        tenant = Tenant.platform_tenant()
        if not self.get_tenant_configs(tenant):
            main_auth_factor = TenantExtensionConfig.active_objects.filter(
                tenant=tenant,
                extension=Extension.active_objects.filter(
                    package="com.longgui.auth.factor.password"
                ).first(),
                type="password"
            ).first()

            second_auth_factor = TenantExtensionConfig.active_objects.filter(
                tenant=tenant,
                extension=Extension.active_objects.filter(
                    package="com.longgui.auth.factor.authcode"
                ).first(),
                type="authcode"
            ).first()

            if main_auth_factor and second_auth_factor:
                # 如主认证因素和此认证因素都存在的情况下 创建认证规则

                config = {
                    "main_auth_factor": {
                        "id": main_auth_factor.id.hex, 
                        "name": main_auth_factor.name, 
                        "package": main_auth_factor.extension.package
                    }, 
                    "second_auth_factor": {
                        "id": second_auth_factor.id.hex, 
                        "name": second_auth_factor.name, 
                        "package": second_auth_factor.extension.package
                    }, 
                    "try_times": 3
                }
                self.create_tenant_config(tenant, config, "认证规则:登录失败三次启用图形验证码", "retry_times")

    def before_auth(self,event,**kwargs):
        """ 响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果

        Args:
            event: 事件

        Returns:
            tuple(bool,dict): 次级认证因素校验结果
        """
        for config in self.get_tenant_configs(event.tenant):
            if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
                host = get_remote_addr(event.request)
                if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)):
                    # 判定需要验证
                    responses = dispatch_event(
                        Event(
                            core_event.AUTHRULE_CHECK_AUTH_DATA,
                            tenant=event.tenant,
                            request=event.request,
                            packages=[
                                config.config["second_auth_factor"]["package"]
                            ]
                        )
                    )

                    for useless,(response,useless) in responses:
                        if not response:
                            continue
                        result,data = response
                        if not result:
                            return response
        return True,None

    def auth_success(self,event,**kwargs):
        # 检查是否存在满足条件的配置
        for config in self.get_tenant_configs(event.tenant):
            if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"].id.hex:
                host = get_remote_addr(event.request)
                key = self.gen_key(host,config.id.hex)
                try_times  = 1
                cache.set(event.tenant,key,try_times,expired=config.config.get("expired",30)*60)
                self.clear_refresh_status(event.tenant,host,config.id.hex)

    def auth_fail(self, event, **kwargs):
        """响应事件:认证失败,记录对应IP认证失败次数

        Args:
            event : 事件
        """
        data = event.data["data"]
        # 检查是否存在满足条件的配置
        for config in self.get_tenant_configs(event.tenant):
            if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
                host = get_remote_addr(event.request)
                key = self.gen_key(host,config.id.hex)
                try_times  = int(cache.get(event.tenant,key) or 1)
                cache.set(event.tenant,key,try_times+1,expired=config.config.get("expired",30)*60)
                if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)) and not self.check_refresh_status(event.tenant,host,config.id.hex):
                    data.update(self.error(ErrorCode.AUTH_FAIL_TIMES_OVER_LIMITED))
                    self.set_refresh_status(event.tenant,host,config.id.hex)
                    data["refresh"] = True


    def check_rule(self, event, config):
        login_pages = event.data

        if self.check_retry_times(event.tenant,get_remote_addr(event.request),config.id.hex,config.config.get("try_times",0)): 
            dispatch_event(
                Event(
                    core_event.AUTHRULE_FIX_LOGIN_PAGE,
                    tenant=event.tenant,
                    request=event.request,
                    packages=[
                        config.config["second_auth_factor"]["package"]
                    ],
                    data={
                        "login_pages": login_pages,
                        "main_auth_factor_id": config.config["main_auth_factor"]["id"],
                        "config_id":config.config["second_auth_factor"]["id"]
                    }
                )
            )

    def check_retry_times(self,tenant,host,config_id,limited=3):
        """校验认证失败次数是否超出限制

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID
            limited (int, optional): 认证失败次数限制. Defaults to 3.

        Returns:
            bool: 校验结果
        """
        key=self.gen_key(host,config_id)
        retry_times = int(cache.get(tenant,key) or limited)
        return retry_times > limited

    def set_refresh_status(self,tenant,host,config_id):
        """设置登陆页面刷新tag

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID
        """
        cache.set(tenant,self.gen_refresh_key(host,config_id),1)

    def clear_refresh_status(self,tenant,host,config_id):
        cache.set(tenant,self.gen_refresh_key(host,config_id),0)

    def check_refresh_status(self,tenant,host,config_id):
        """校验是否需要刷新页面

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID

        Returns:
            bool: 是否需要刷新页面
        """
        return bool(cache.get(tenant,self.gen_refresh_key(host,config_id)))

    def gen_refresh_key(self,host:str,config_id:str):
        """页面刷新标识KEY

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID

        Returns:
            str: 页面刷新标识KEY
        """
        return f"{self.package}_cache_auth_refresh_{host}_{config_id}"

    def gen_key(self,host:str,config_id:str):
        """生成记录失败次数的KEY

        Args:
            host (str): 客户一端IP地址
            config_id (str): 插件运行时ID

        Returns:
            str: 记录失败次数的KEY
        """
        return f"{self.package}_cache_auth_retry_times_{host}_{config_id}"

    def create_extension_config_schema(self):
        """创建插件运行时schema
        """
        main_auth_factor_page = pages.TablePage(select=True,name=_("选择主认证因素"))

        self.register_front_pages(main_auth_factor_page)

        main_auth_factor_page.create_actions(
            init_action=actions.DirectAction(
                path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
                method=actions.FrontActionMethod.GET

            )
        )

        second_auth_factor_page = pages.TablePage(select=True,name=_("选择次认证因素"))

        self.register_front_pages(second_auth_factor_page)

        second_auth_factor_page.create_actions(
            init_action=actions.DirectAction(
                path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
                method=actions.FrontActionMethod.GET
            )
        )

        AuthRuleRetryTimesConfigSchema = create_extension_schema(
            'AuthRuleRetryTimesConfigSchema',
            __file__,
            [
                (
                    'try_times', 
                    int, 
                    Field(
                        title=_('try_times', '限制重试次数'),
                        default=3
                    )
                ),
                (
                    'main_auth_factor',
                    MainAuthRuleSchema, 
                    Field(
                        title=_('main_auth_factor', '主认证因素'),
                        page=main_auth_factor_page.tag
                    )
                ),
                (
                    'second_auth_factor',
                    SecondAuthFactorConfigSchema,
                    Field(
                        title=_('second_auth_factor', '次认证因素'),
                        page=second_auth_factor_page.tag
                    )
                ),
                (
                    'expired', 
                    Optional[int],
                    Field(
                        title=_('expired', '有效期/分钟'),
                        default=30,
                    )
                ),
            ],
            base_schema=BaseAuthRuleSchema
        )
        self.register_auth_rule_schema(
            AuthRuleRetryTimesConfigSchema,
            "retry_times"
        )

auth_fail(self, event, **kwargs) #

响应事件:认证失败,记录对应IP认证失败次数

Parameters:

Name Type Description Default
event

事件

required
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def auth_fail(self, event, **kwargs):
    """响应事件:认证失败,记录对应IP认证失败次数

    Args:
        event : 事件
    """
    data = event.data["data"]
    # 检查是否存在满足条件的配置
    for config in self.get_tenant_configs(event.tenant):
        if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
            host = get_remote_addr(event.request)
            key = self.gen_key(host,config.id.hex)
            try_times  = int(cache.get(event.tenant,key) or 1)
            cache.set(event.tenant,key,try_times+1,expired=config.config.get("expired",30)*60)
            if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)) and not self.check_refresh_status(event.tenant,host,config.id.hex):
                data.update(self.error(ErrorCode.AUTH_FAIL_TIMES_OVER_LIMITED))
                self.set_refresh_status(event.tenant,host,config.id.hex)
                data["refresh"] = True

before_auth(self, event, **kwargs) #

响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果

Parameters:

Name Type Description Default
event

事件

required

Returns:

Type Description
tuple(bool,dict)

次级认证因素校验结果

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def before_auth(self,event,**kwargs):
    """ 响应事件:认证之前, 判断是否满足次级认证因素校验条件,如满足则触发事件并检查次级认证因素校验结果

    Args:
        event: 事件

    Returns:
        tuple(bool,dict): 次级认证因素校验结果
    """
    for config in self.get_tenant_configs(event.tenant):
        if uuid.UUID(config.config["main_auth_factor"]["id"]).hex == event.data["auth_factor_config_id"]:
            host = get_remote_addr(event.request)
            if self.check_retry_times(event.tenant,host,config.id.hex,config.config.get("try_times",0)):
                # 判定需要验证
                responses = dispatch_event(
                    Event(
                        core_event.AUTHRULE_CHECK_AUTH_DATA,
                        tenant=event.tenant,
                        request=event.request,
                        packages=[
                            config.config["second_auth_factor"]["package"]
                        ]
                    )
                )

                for useless,(response,useless) in responses:
                    if not response:
                        continue
                    result,data = response
                    if not result:
                        return response
    return True,None

check_refresh_status(self, tenant, host, config_id) #

校验是否需要刷新页面

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required

Returns:

Type Description
bool

是否需要刷新页面

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def check_refresh_status(self,tenant,host,config_id):
    """校验是否需要刷新页面

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID

    Returns:
        bool: 是否需要刷新页面
    """
    return bool(cache.get(tenant,self.gen_refresh_key(host,config_id)))

check_retry_times(self, tenant, host, config_id, limited=3) #

校验认证失败次数是否超出限制

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required
limited int

认证失败次数限制. Defaults to 3.

3

Returns:

Type Description
bool

校验结果

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def check_retry_times(self,tenant,host,config_id,limited=3):
    """校验认证失败次数是否超出限制

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID
        limited (int, optional): 认证失败次数限制. Defaults to 3.

    Returns:
        bool: 校验结果
    """
    key=self.gen_key(host,config_id)
    retry_times = int(cache.get(tenant,key) or limited)
    return retry_times > limited

check_rule(self, event, config) #

抽象方法:校验规则

Parameters:

Name Type Description Default
event Event

CREATE_LOGIN_PAGE_RULES事件

required
config TenantExtensionConfig

运行时配置

required
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def check_rule(self, event, config):
    login_pages = event.data

    if self.check_retry_times(event.tenant,get_remote_addr(event.request),config.id.hex,config.config.get("try_times",0)): 
        dispatch_event(
            Event(
                core_event.AUTHRULE_FIX_LOGIN_PAGE,
                tenant=event.tenant,
                request=event.request,
                packages=[
                    config.config["second_auth_factor"]["package"]
                ],
                data={
                    "login_pages": login_pages,
                    "main_auth_factor_id": config.config["main_auth_factor"]["id"],
                    "config_id":config.config["second_auth_factor"]["id"]
                }
            )
        )

create_extension_config_schema(self) #

创建插件运行时schema

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def create_extension_config_schema(self):
    """创建插件运行时schema
    """
    main_auth_factor_page = pages.TablePage(select=True,name=_("选择主认证因素"))

    self.register_front_pages(main_auth_factor_page)

    main_auth_factor_page.create_actions(
        init_action=actions.DirectAction(
            path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
            method=actions.FrontActionMethod.GET

        )
    )

    second_auth_factor_page = pages.TablePage(select=True,name=_("选择次认证因素"))

    self.register_front_pages(second_auth_factor_page)

    second_auth_factor_page.create_actions(
        init_action=actions.DirectAction(
            path='/api/v1/tenants/{tenant_id}/config_select/?extension__type=auth_factor',
            method=actions.FrontActionMethod.GET
        )
    )

    AuthRuleRetryTimesConfigSchema = create_extension_schema(
        'AuthRuleRetryTimesConfigSchema',
        __file__,
        [
            (
                'try_times', 
                int, 
                Field(
                    title=_('try_times', '限制重试次数'),
                    default=3
                )
            ),
            (
                'main_auth_factor',
                MainAuthRuleSchema, 
                Field(
                    title=_('main_auth_factor', '主认证因素'),
                    page=main_auth_factor_page.tag
                )
            ),
            (
                'second_auth_factor',
                SecondAuthFactorConfigSchema,
                Field(
                    title=_('second_auth_factor', '次认证因素'),
                    page=second_auth_factor_page.tag
                )
            ),
            (
                'expired', 
                Optional[int],
                Field(
                    title=_('expired', '有效期/分钟'),
                    default=30,
                )
            ),
        ],
        base_schema=BaseAuthRuleSchema
    )
    self.register_auth_rule_schema(
        AuthRuleRetryTimesConfigSchema,
        "retry_times"
    )

gen_key(self, host, config_id) #

生成记录失败次数的KEY

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required

Returns:

Type Description
str

记录失败次数的KEY

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def gen_key(self,host:str,config_id:str):
    """生成记录失败次数的KEY

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID

    Returns:
        str: 记录失败次数的KEY
    """
    return f"{self.package}_cache_auth_retry_times_{host}_{config_id}"

gen_refresh_key(self, host, config_id) #

页面刷新标识KEY

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required

Returns:

Type Description
str

页面刷新标识KEY

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def gen_refresh_key(self,host:str,config_id:str):
    """页面刷新标识KEY

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID

    Returns:
        str: 页面刷新标识KEY
    """
    return f"{self.package}_cache_auth_refresh_{host}_{config_id}"

load(self) #

抽象方法,插件加载的入口方法

Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def load(self):
    super().load()
    self.create_extension_config_schema()
    self.listen_event(AUTH_FAIL,self.auth_fail)
    self.listen_event(BEFORE_AUTH,self.before_auth)
    self.listen_event(AUTH_SUCCESS,self.auth_success)

    # 配置初始数据
    tenant = Tenant.platform_tenant()
    if not self.get_tenant_configs(tenant):
        main_auth_factor = TenantExtensionConfig.active_objects.filter(
            tenant=tenant,
            extension=Extension.active_objects.filter(
                package="com.longgui.auth.factor.password"
            ).first(),
            type="password"
        ).first()

        second_auth_factor = TenantExtensionConfig.active_objects.filter(
            tenant=tenant,
            extension=Extension.active_objects.filter(
                package="com.longgui.auth.factor.authcode"
            ).first(),
            type="authcode"
        ).first()

        if main_auth_factor and second_auth_factor:
            # 如主认证因素和此认证因素都存在的情况下 创建认证规则

            config = {
                "main_auth_factor": {
                    "id": main_auth_factor.id.hex, 
                    "name": main_auth_factor.name, 
                    "package": main_auth_factor.extension.package
                }, 
                "second_auth_factor": {
                    "id": second_auth_factor.id.hex, 
                    "name": second_auth_factor.name, 
                    "package": second_auth_factor.extension.package
                }, 
                "try_times": 3
            }
            self.create_tenant_config(tenant, config, "认证规则:登录失败三次启用图形验证码", "retry_times")

set_refresh_status(self, tenant, host, config_id) #

设置登陆页面刷新tag

Parameters:

Name Type Description Default
host str

客户一端IP地址

required
config_id str

插件运行时ID

required
Source code in extension_root/com_longgui_auth_rule_retry_times/__init__.py
def set_refresh_status(self,tenant,host,config_id):
    """设置登陆页面刷新tag

    Args:
        host (str): 客户一端IP地址
        config_id (str): 插件运行时ID
    """
    cache.set(tenant,self.gen_refresh_key(host,config_id),1)

评论