Skip to content

授权规则

功能介绍#

授权规则,作为系统已有认证方式的补充,它可以支持更细粒度的权限赋予。 可以支持针对用户的不同属性进行权限的赋予,例如用户名、昵称、性别、手机号等。 也可以针对分组的不同属性进行权限的赋予。 还可以根据开发者需要,通过扩展数据存储的结构,实现特定场景下权限的赋予。

实现思路#

arkid系统已经默认实现了一个针对用户属性的权限赋予,方便开发者进行参考

graph LR
  A[开始] --> B{设计schema} --> C{注册前端页面}--> D{注册schema}--> E{实现授权函数} --> F[结束];

下面对于实现思路做一下简单的介绍:

  1. 需要开发者想清楚自己开发插件的需要筛选的属性,是用户属性,还是分组属性,或者其它属性。以及需要筛选那些应用,以及那些权限。 要设计出schema,用于存储数据结构。如果schema中有用到一些应用列表,权限列表,用户列表等,需要把用到的前端页面,使用父类的register_front_pages单独进行注册。

  2. 不同的授权规则之间是通过不同的schema进行划分,所以当开发者设计完schema后,需要通过register_impowerrule_schema,进行注册。

  3. 注册完成后,创建授权规则的页面绿色Type字段就会多一种授权规则,如下图所示:

    jqhUld.md.jpg

    如果我们选择不同的授权规则,红色部分就会展示出不同的内容,展示内容由schema结构决定。

    授权规则的创建编辑和删除,都是由arkid提前处理好的,开发者只需要关注赋权方法就可以。

    下面对赋权方法的使用做下介绍:

    需要开发者实现get_auth_result(event, **kwargs)方法

    1. 参数: 这个方法的kwargsevent 两个参数,我们重点关注event参数,这个参数中包含了datatenant两个属性,其中event.tenant可以获取到当前的租户;event.data可以获取到传递过来的数据。我们获取data里面的值
      1. data.user可以获取到当前用户;
      2. data.app可以获取到当前应用,如果这个应用是一个None,就表示应用为arkid,反之则为其它应用;
      3. data.arr 可以获取到用户权限数组(0或1组成,0是没权限,1有权限,数组下标和数据库权限表里的sort_id是相同的);
      4. data.config 可以获取到当前授权规则
    2. 使用: 开发者需要根据授权规则event.config,结合data.user,根据自身的需求,做筛选后,返回有权限的sort_id数组

抽象方法#

基类定义#

arkid.core.extension.impower_rule.ImpowerRuleBaseExtension (Extension) #

Source code in arkid/core/extension/impower_rule.py
class ImpowerRuleBaseExtension(Extension):

    TYPE = 'impower_rule'

    composite_schema_map = {}
    created_composite_schema_list = []
    composite_key = 'type'
    composite_model = TenantExtensionConfig

    @property
    def type(self):
        return ImpowerRuleBaseExtension.TYPE

    def load(self):
        super().load()
        self.listen_event(core_event.GET_AUTH_RESULT, self.filter_auth_result)

    def get_extensions(self):
        '''
        获取当前类型所有的插件
        '''
        return Extension_Obj.active_objects.filter(
            package=self.package,
            type=self.TYPE
        ).all()

    def get_all_config(self, tenant_id):
        '''
        获取所有的配置
        '''
        return TenantExtensionConfig.active_objects.filter(
            tenant_id=tenant_id,
            extension__in=self.get_extensions()
        ).all()

    def register_impower_rule_schema(self, schema, impowerrule_type):
        """
        注册授权规则的schema
        Params:
            schema: schema
            impowerrule_type: 授权规则类型
        """
        self.register_config_schema(schema, self.package + '_' + impowerrule_type)
        self.register_composite_config_schema(schema, impowerrule_type, exclude=['extension'])

    def filter_auth_result(self, event, **kwargs):
        '''
        筛选抽象结果
        '''
        tenant = event.tenant
        configs = self.get_all_config(tenant.id)
        data = event.data
        arr = data.get('arr', [])
        copy_arr = [x for x in arr]
        result_sort_ids = []
        # 每一个授权规则配置单独验证
        for config in configs:
            data['config'] = config
            sort_ids = self.get_auth_result(event, **kwargs)
            if sort_ids:
                result_sort_ids.extend(sort_ids)
        # 对于授权结果进行合并
        for index, value in enumerate(copy_arr):
            if int(value) == 0 and index in result_sort_ids:
                copy_arr[index] = 1
        return copy_arr

    @abstractmethod
    def get_auth_result(self, event, **kwargs):
        """
        抽象方法,获取权限的鉴定结果
        Params:
            event: 事件参数
                data: 数据
                    user: 用户
                    app: 应用(如果app是None,就表示这个应用是arkid)
                    arr: 权限结果数组(这个是已经赋值过分组权限的数据,有权限是1,没权限是0)
                    config: 应用的授权规则
                tenant: 租户
            kwargs: 其它方法参数
        Return:
            arr: sort_id数组
        """
        pass

composite_model (BaseModel) django-model #

TenantExtensionConfig(id, is_del, is_active, updated, created, tenant, extension, config, name, type)

Source code in arkid/core/extension/impower_rule.py
class TenantExtensionConfig(BaseModel):

    class Meta(object):
        verbose_name = _("插件运行时配置")
        verbose_name_plural = _("插件运行时配置")

    tenant = models.ForeignKey('core.Tenant', blank=False, on_delete=models.PROTECT, verbose_name=_('租户'))
    extension = models.ForeignKey('Extension', blank=False, on_delete=models.PROTECT, verbose_name=_('插件'))
    config = models.JSONField(blank=True, default=dict, verbose_name=_('Runtime Config','运行时配置'))
    name = models.CharField(max_length=128, default='', verbose_name=_('名称'))
    type = models.CharField(max_length=128, default='', verbose_name=_('类型'))

config: JSONField blank django-field #

Runtime Config

created: DateTimeField blank django-field nullable #

创建时间

extension: ForeignKey django-field #

插件

id: UUIDField django-field #

ID

is_active: BooleanField django-field #

是否可用

is_del: BooleanField django-field #

是否删除

name: CharField django-field #

名称

tenant: ForeignKey django-field #

租户

type: CharField django-field #

类型

updated: DateTimeField blank django-field nullable #

更新时间

filter_auth_result(self, event, **kwargs) #

筛选抽象结果

Source code in arkid/core/extension/impower_rule.py
def filter_auth_result(self, event, **kwargs):
    '''
    筛选抽象结果
    '''
    tenant = event.tenant
    configs = self.get_all_config(tenant.id)
    data = event.data
    arr = data.get('arr', [])
    copy_arr = [x for x in arr]
    result_sort_ids = []
    # 每一个授权规则配置单独验证
    for config in configs:
        data['config'] = config
        sort_ids = self.get_auth_result(event, **kwargs)
        if sort_ids:
            result_sort_ids.extend(sort_ids)
    # 对于授权结果进行合并
    for index, value in enumerate(copy_arr):
        if int(value) == 0 and index in result_sort_ids:
            copy_arr[index] = 1
    return copy_arr

get_all_config(self, tenant_id) #

获取所有的配置

Source code in arkid/core/extension/impower_rule.py
def get_all_config(self, tenant_id):
    '''
    获取所有的配置
    '''
    return TenantExtensionConfig.active_objects.filter(
        tenant_id=tenant_id,
        extension__in=self.get_extensions()
    ).all()

get_auth_result(self, event, **kwargs) #

抽象方法,获取权限的鉴定结果

Parameters:

Name Type Description Default
event

事件参数 data: 数据 user: 用户 app: 应用(如果app是None,就表示这个应用是arkid) arr: 权限结果数组(这个是已经赋值过分组权限的数据,有权限是1,没权限是0) config: 应用的授权规则 tenant: 租户

required
kwargs

其它方法参数

{}

Returns:

Type Description
arr

sort_id数组

Source code in arkid/core/extension/impower_rule.py
@abstractmethod
def get_auth_result(self, event, **kwargs):
    """
    抽象方法,获取权限的鉴定结果
    Params:
        event: 事件参数
            data: 数据
                user: 用户
                app: 应用(如果app是None,就表示这个应用是arkid)
                arr: 权限结果数组(这个是已经赋值过分组权限的数据,有权限是1,没权限是0)
                config: 应用的授权规则
            tenant: 租户
        kwargs: 其它方法参数
    Return:
        arr: sort_id数组
    """
    pass

get_extensions(self) #

获取当前类型所有的插件

Source code in arkid/core/extension/impower_rule.py
def get_extensions(self):
    '''
    获取当前类型所有的插件
    '''
    return Extension_Obj.active_objects.filter(
        package=self.package,
        type=self.TYPE
    ).all()

load(self) #

抽象方法,插件加载的入口方法

Source code in arkid/core/extension/impower_rule.py
def load(self):
    super().load()
    self.listen_event(core_event.GET_AUTH_RESULT, self.filter_auth_result)

register_impower_rule_schema(self, schema, impowerrule_type) #

注册授权规则的schema

Parameters:

Name Type Description Default
schema

schema

required
impowerrule_type

授权规则类型

required
Source code in arkid/core/extension/impower_rule.py
def register_impower_rule_schema(self, schema, impowerrule_type):
    """
    注册授权规则的schema
    Params:
        schema: schema
        impowerrule_type: 授权规则类型
    """
    self.register_config_schema(schema, self.package + '_' + impowerrule_type)
    self.register_composite_config_schema(schema, impowerrule_type, exclude=['extension'])

示例#

extension_root.com_longgui_impower_rule.ImpowerRuleExtension (ImpowerRuleBaseExtension) #

Source code in extension_root/com_longgui_impower_rule/__init__.py
class ImpowerRuleExtension(ImpowerRuleBaseExtension):

    def load(self):
        super().load()
        # 注册前端页面
        self.load_front_page()
        # 注册schema
        self.register_impower_rule_schema(ImpowerRuleCreateIn, 'DefaultImpowerRule')

    def load_front_page(self):
        '''
        注册前端页面
        '''
        self.register_front_pages(user_field_page)
        self.register_front_pages(app_page)
        self.register_front_pages(app_permission_page)

    def get_auth_result(self, event, **kwargs):
        '''
        获取权限鉴定结果
        '''
        data = event.data
        tenant = event.tenant

        user = data.get('user')
        config = data.get('config')
        # 处理授权逻辑
        permission_info = {}
        # 取得了所有配置
        config_info = config.config
        config_matchfield = config_info.get('matchfield')
        config_matchsymbol = config_info.get('matchsymbol')
        config_matchvalue = config_info.get('matchvalue')
        config_app = config_info.get('app')
        config_app_id = config_app.get('id')
        config_permissions = config_info.get('permissions')
        # 用户扩展字段用于筛选
        user = User.expand_objects.filter(id=user.id).first()
        # 选择的字段值
        select_value = user.get(config_matchfield.get('id'))
        # 取得返回值
        sort_ids = []
        if config_matchsymbol == '等于' and config_matchvalue == select_value:
            for config_permission in config_permissions:
                sort_ids.append(config_permission.get('sort_id'))
        return sort_ids

get_auth_result(self, event, **kwargs) #

获取权限鉴定结果

Source code in extension_root/com_longgui_impower_rule/__init__.py
def get_auth_result(self, event, **kwargs):
    '''
    获取权限鉴定结果
    '''
    data = event.data
    tenant = event.tenant

    user = data.get('user')
    config = data.get('config')
    # 处理授权逻辑
    permission_info = {}
    # 取得了所有配置
    config_info = config.config
    config_matchfield = config_info.get('matchfield')
    config_matchsymbol = config_info.get('matchsymbol')
    config_matchvalue = config_info.get('matchvalue')
    config_app = config_info.get('app')
    config_app_id = config_app.get('id')
    config_permissions = config_info.get('permissions')
    # 用户扩展字段用于筛选
    user = User.expand_objects.filter(id=user.id).first()
    # 选择的字段值
    select_value = user.get(config_matchfield.get('id'))
    # 取得返回值
    sort_ids = []
    if config_matchsymbol == '等于' and config_matchvalue == select_value:
        for config_permission in config_permissions:
            sort_ids.append(config_permission.get('sort_id'))
    return sort_ids

load(self) #

抽象方法,插件加载的入口方法

Source code in extension_root/com_longgui_impower_rule/__init__.py
def load(self):
    super().load()
    # 注册前端页面
    self.load_front_page()
    # 注册schema
    self.register_impower_rule_schema(ImpowerRuleCreateIn, 'DefaultImpowerRule')

load_front_page(self) #

注册前端页面

Source code in extension_root/com_longgui_impower_rule/__init__.py
def load_front_page(self):
    '''
    注册前端页面
    '''
    self.register_front_pages(user_field_page)
    self.register_front_pages(app_page)
    self.register_front_pages(app_permission_page)

评论