Skip to content

应用协议

功能介绍#

应用协议,其它类型的插件可以通过继承协议基类的方式,获得基类的方法,方便插件载入

实现思路#

第一步,创建一个新的类,继承AppProtocolExtension这个基类

第二步,要重载基类的load方法

第三步,实现基类中规定的抽象方法

抽象方法#

基类定义#

arkid.core.extension.app_protocol.AppProtocolExtension (Extension) #

Source code in arkid/core/extension/app_protocol.py
class AppProtocolExtension(Extension):

    TYPE = "app_protocol"


    composite_schema_map = {}
    created_composite_schema_list = []
    composite_key = 'app_type'
    composite_model = App

    @property
    def type(self):
        return AppProtocolExtension.TYPE


    def load(self):
        super().load()
        self.listen_event(core_event.CREATE_APP_CONFIG, self.create_app)
        self.listen_event(core_event.UPDATE_APP_CONFIG, self.update_app)
        self.listen_event(core_event.DELETE_APP, self.delete_app)

    def register_app_protocol_schema(self, schema, app_type):
        """
        注册应用的schema
        Params:
            schema: schema
            app_type: 应用类型
        """
        self.register_config_schema(schema, self.package + '_' + app_type)
        self.register_composite_config_schema(schema, app_type, exclude=['secret'])

    @abstractmethod
    def create_app(self, event, **kwargs):
        """
        抽象方法,创建应用
        Params:
            event: 事件参数
            kwargs: 其它方法参数
        Return:
            bool: 是否成功执行
        """
        pass

    @abstractmethod
    def update_app(self, event, **kwargs):
        """
        抽象方法,修改应用
        Params:
            event: 事件参数
            kwargs: 其它方法参数
        Return:
            bool: 是否成功执行
        """
        pass

    @abstractmethod
    def delete_app(self, event, **kwargs):
        """
        抽象方法,删除应用
        Params:
            event: 事件参数
            kwargs: 其它方法参数
        Return:
            bool: 是否成功执行
        """
        pass

    def register_enter_view(self, view:View, path:str, url_name:str, type:list, tenant_urls: bool=True):
        '''
        注册统一的入口函数,方便检测
        Params:
            view: str 目标View的as_view(),例如:AuthorizationView.as_view()
            path: str 需要跳转的路径,例如:r"app/(?P<app_id>[\w-]+)/oauth/authorize/$
            url_name: str 注册的路径名称, 例如:authorize
            type: list 一个当前插件的类型list, 例如:['OIDC', 'OAuth2']
            tenant_urls: bool 是否注册为租户url
        Return:
            response: 函数执行结果
        '''
        # 入口函数
        class EnterView(View):

            def get(self, request, **kwargs):
                from arkid.core.perm.permission_data import PermissionData
                permissiondata = PermissionData()
                result, alert = permissiondata.check_app_entry_permission(request, type, kwargs)
                if result:
                    return view(request)
                else:
                    url = self.get_login_url(request, alert)
                    return HttpResponseRedirect(url)

            def get_login_url(self, request, alert):
                from arkid.config import get_app_config
                full_path = request.get_full_path()
                next_uri = urllib.parse.quote(full_path)
                host = get_app_config().get_frontend_host()
                tenant = request.tenant
                if not tenant:
                    return f'{host}{LOGIN_URL}?tenant_id=&next={next_uri}'

                if tenant.is_platform_tenant and tenant.id.hex not in request.get_full_path() and \
                    str(tenant.id) not in request.get_full_path():
                    return f'{host}{LOGIN_URL}?tenant_id=&next={next_uri}'

                token = request.GET.get('token', '')
                if not token:
                    tenant_expand = Tenant.expand_objects.get(id=tenant.id)
                    if tenant_expand.get('login_url'):
                        return f"{tenant_expand['login_url']}?tenant_id={tenant.id}&next={next_uri}"
                    backend_host = get_app_config().get_host()
                    backend_login_url = '/api/v1/login'
                    return f"{backend_host}{backend_login_url}?tenant_id={tenant.id}&next={next_uri}"
                    # if tenant.slug:
                    #     host =get_app_config().get_slug_frontend_host(tenant.slug)
                    #     return f'{host}{LOGIN_URL}?&next={next_uri}'
                    # else:
                    #     return f'{host}{LOGIN_URL}?tenant_id={tenant.id}&next={next_uri}'

                if tenant.slug:
                    host =get_app_config().get_slug_frontend_host(tenant.slug)
                    return f'{host}{LOGIN_URL}?alert={alert}&next={next_uri}'
                else:
                    return f'{host}{LOGIN_URL}?tenant_id={tenant.id}&alert={alert}&next={next_uri}'

            def post(self, request, **kwargs):
                from arkid.core.perm.permission_data import PermissionData
                permissiondata = PermissionData()
                result, alert = permissiondata.check_app_entry_permission(request, type, kwargs)
                if result:
                    return view(request)
                else:
                    url = self.get_login_url(request, alert)
                    return HttpResponseRedirect(url)

        # 获取进入的路由
        entry_url = [re_path(path, EnterView.as_view(), name=url_name)]
        # 注册入口路由
        self.register_routers(entry_url, tenant_urls)

composite_model (BaseModel, ExpandModel) django-model #

App(id, is_del, is_active, updated, created, tenant, name, url, logo, description, type, secret, config, package, entry_permission, arkstore_category_id, arkstore_app_id)

Source code in arkid/core/extension/app_protocol.py
class App(BaseModel, ExpandModel):
    class Meta(object):
        verbose_name = _("APP", "应用")
        verbose_name_plural = _("APP", "应用")

    tenant = models.ForeignKey('Tenant', blank=False, on_delete=models.PROTECT)
    name = models.CharField(max_length=128, verbose_name=_('name', '名称'))
    url = models.CharField(max_length=1024,null=True,blank=True, verbose_name=_('url', '地址'))
    logo = models.CharField(
        max_length=1024, blank=True, null=True, default='', verbose_name=_('logo', '图标')
    )
    description = models.TextField(
        blank=True, null=True, verbose_name=_('description', '描述')
    )
    type = models.CharField(max_length=128, default='', verbose_name=_('type', '类型'))
    secret = models.CharField(
        max_length=255,
        blank=True,
        null=True,
        default='',
        verbose_name=_('secret', '密钥'),
    )
    config = models.OneToOneField(
        TenantExtensionConfig,
        blank=True,
        null=True,
        default=None,
        on_delete=models.PROTECT,
    )
    package = models.CharField(
        max_length=128,
        blank=True,
        null=True,
        default='',
        verbose_name=_('package', '包名'),
    )
    entry_permission = models.ForeignKey(
        'SystemPermission',
        blank=True,
        null=True,
        default=None,
        on_delete=models.PROTECT
    )
    arkstore_category_id = models.IntegerField(default=None, null=True, verbose_name=_('ArkStore分类ID'))
    arkstore_app_id = models.CharField(
        max_length=1024, blank=True, null=True, default=None, verbose_name=_('Arkstore app id', '方舟商店应用标识')
    )

    def __str__(self) -> str:
        return f'Tenant: {self.tenant.name}, App: {self.name}'

arkstore_app_id: CharField blank django-field nullable #

Arkstore app id

arkstore_category_id: IntegerField django-field nullable #

ArkStore分类ID

config: OneToOneField blank django-field nullable #

config

created: DateTimeField blank django-field nullable #

创建时间

description: TextField blank django-field nullable #

description

entry_permission: ForeignKey blank django-field nullable #

entry permission

id: UUIDField django-field #

ID

is_active: BooleanField django-field #

是否可用

is_del: BooleanField django-field #

是否删除

logo

name: CharField django-field #

name

package: CharField blank django-field nullable #

package

secret: CharField blank django-field nullable #

secret

tenant: ForeignKey django-field #

tenant

type: CharField django-field #

type

updated: DateTimeField blank django-field nullable #

更新时间

url: CharField blank django-field nullable #

url

create_app(self, event, **kwargs) #

抽象方法,创建应用

Parameters:

Name Type Description Default
event

事件参数

required
kwargs

其它方法参数

{}

Returns:

Type Description
bool

是否成功执行

Source code in arkid/core/extension/app_protocol.py
@abstractmethod
def create_app(self, event, **kwargs):
    """
    抽象方法,创建应用
    Params:
        event: 事件参数
        kwargs: 其它方法参数
    Return:
        bool: 是否成功执行
    """
    pass

delete_app(self, event, **kwargs) #

抽象方法,删除应用

Parameters:

Name Type Description Default
event

事件参数

required
kwargs

其它方法参数

{}

Returns:

Type Description
bool

是否成功执行

Source code in arkid/core/extension/app_protocol.py
@abstractmethod
def delete_app(self, event, **kwargs):
    """
    抽象方法,删除应用
    Params:
        event: 事件参数
        kwargs: 其它方法参数
    Return:
        bool: 是否成功执行
    """
    pass

load(self) #

抽象方法,插件加载的入口方法

Source code in arkid/core/extension/app_protocol.py
def load(self):
    super().load()
    self.listen_event(core_event.CREATE_APP_CONFIG, self.create_app)
    self.listen_event(core_event.UPDATE_APP_CONFIG, self.update_app)
    self.listen_event(core_event.DELETE_APP, self.delete_app)

register_app_protocol_schema(self, schema, app_type) #

注册应用的schema

Parameters:

Name Type Description Default
schema

schema

required
app_type

应用类型

required
Source code in arkid/core/extension/app_protocol.py
def register_app_protocol_schema(self, schema, app_type):
    """
    注册应用的schema
    Params:
        schema: schema
        app_type: 应用类型
    """
    self.register_config_schema(schema, self.package + '_' + app_type)
    self.register_composite_config_schema(schema, app_type, exclude=['secret'])

register_enter_view(self, view, path, url_name, type, tenant_urls=True) #

注册统一的入口函数,方便检测

Parameters:

Name Type Description Default
view View

str 目标View的as_view(),例如:AuthorizationView.as_view()

required
path str

str 需要跳转的路径,例如:r"app/(?P[\w-]+)/oauth/authorize/$

required
url_name str

str 注册的路径名称, 例如:authorize

required
type list

list 一个当前插件的类型list, 例如:['OIDC', 'OAuth2']

required
tenant_urls bool

bool 是否注册为租户url

True

Returns:

Type Description
response

函数执行结果

Source code in arkid/core/extension/app_protocol.py
def register_enter_view(self, view:View, path:str, url_name:str, type:list, tenant_urls: bool=True):
    '''
    注册统一的入口函数,方便检测
    Params:
        view: str 目标View的as_view(),例如:AuthorizationView.as_view()
        path: str 需要跳转的路径,例如:r"app/(?P<app_id>[\w-]+)/oauth/authorize/$
        url_name: str 注册的路径名称, 例如:authorize
        type: list 一个当前插件的类型list, 例如:['OIDC', 'OAuth2']
        tenant_urls: bool 是否注册为租户url
    Return:
        response: 函数执行结果
    '''
    # 入口函数
    class EnterView(View):

        def get(self, request, **kwargs):
            from arkid.core.perm.permission_data import PermissionData
            permissiondata = PermissionData()
            result, alert = permissiondata.check_app_entry_permission(request, type, kwargs)
            if result:
                return view(request)
            else:
                url = self.get_login_url(request, alert)
                return HttpResponseRedirect(url)

        def get_login_url(self, request, alert):
            from arkid.config import get_app_config
            full_path = request.get_full_path()
            next_uri = urllib.parse.quote(full_path)
            host = get_app_config().get_frontend_host()
            tenant = request.tenant
            if not tenant:
                return f'{host}{LOGIN_URL}?tenant_id=&next={next_uri}'

            if tenant.is_platform_tenant and tenant.id.hex not in request.get_full_path() and \
                str(tenant.id) not in request.get_full_path():
                return f'{host}{LOGIN_URL}?tenant_id=&next={next_uri}'

            token = request.GET.get('token', '')
            if not token:
                tenant_expand = Tenant.expand_objects.get(id=tenant.id)
                if tenant_expand.get('login_url'):
                    return f"{tenant_expand['login_url']}?tenant_id={tenant.id}&next={next_uri}"
                backend_host = get_app_config().get_host()
                backend_login_url = '/api/v1/login'
                return f"{backend_host}{backend_login_url}?tenant_id={tenant.id}&next={next_uri}"
                # if tenant.slug:
                #     host =get_app_config().get_slug_frontend_host(tenant.slug)
                #     return f'{host}{LOGIN_URL}?&next={next_uri}'
                # else:
                #     return f'{host}{LOGIN_URL}?tenant_id={tenant.id}&next={next_uri}'

            if tenant.slug:
                host =get_app_config().get_slug_frontend_host(tenant.slug)
                return f'{host}{LOGIN_URL}?alert={alert}&next={next_uri}'
            else:
                return f'{host}{LOGIN_URL}?tenant_id={tenant.id}&alert={alert}&next={next_uri}'

        def post(self, request, **kwargs):
            from arkid.core.perm.permission_data import PermissionData
            permissiondata = PermissionData()
            result, alert = permissiondata.check_app_entry_permission(request, type, kwargs)
            if result:
                return view(request)
            else:
                url = self.get_login_url(request, alert)
                return HttpResponseRedirect(url)

    # 获取进入的路由
    entry_url = [re_path(path, EnterView.as_view(), name=url_name)]
    # 注册入口路由
    self.register_routers(entry_url, tenant_urls)

update_app(self, event, **kwargs) #

抽象方法,修改应用

Parameters:

Name Type Description Default
event

事件参数

required
kwargs

其它方法参数

{}

Returns:

Type Description
bool

是否成功执行

Source code in arkid/core/extension/app_protocol.py
@abstractmethod
def update_app(self, event, **kwargs):
    """
    抽象方法,修改应用
    Params:
        event: 事件参数
        kwargs: 其它方法参数
    Return:
        bool: 是否成功执行
    """
    pass

示例#

extension_root.com_longgui_app_protocol_oidc.OAuth2ServerExtension (AppProtocolExtension) #

Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
class OAuth2ServerExtension(AppProtocolExtension):

    def load(self):
        # 加载url地址
        self.load_urls()
        # 加载相应的view
        self.load_auth_view()
        # 加载相应的配置文件
        if not settings.IS_CENTRAL_ARKID:
            self.register_app_protocol_schema(OIDCConfigSchema, 'OIDC')
            self.register_app_protocol_schema(Oauth2ConfigSchema, 'OAuth2')
        super().load()

    def load_urls(self):
        self.register_routers(urls, True)

    def load_auth_view(self):
        # 加载认证view
        auth_view = AuthorizationView.as_view()
        auth_path = r"app/(?P<app_id>[\w-]+)/oauth/authorize/$"
        url_name = "authorize"
        type = ['OIDC', 'OAuth2']
        self.register_enter_view(auth_view, auth_path, url_name, type)

    def create_app(self, event, **kwargs):
        config = event.data["config"]
        return self.update_app_data(event, config, True)

    def update_app(self, event, **kwargs):
        config = event.data["config"]
        return self.update_app_data(event, config, False)

    def delete_app(self, event, **kwargs):
        Application.objects.filter(uuid=event.data.id).delete()
        return True

    def update_app_data(self, event, config, is_create):
        '''
        修改应用程序
        '''
        app = event.data["app"]
        tenant = event.tenant

        client_type = config["client_type"]
        redirect_uris = config["redirect_uris"]
        grant_type = config["grant_type"]
        skip_authorization = config["skip_authorization"]
        app_type = event.data.get("app_type")
        algorithm = config.get("algorithm",None)

        obj,iscreated = Application.objects.get_or_create(uuid=app.id)                                                                                
        obj.name = app.name
        obj.client_type = client_type
        obj.redirect_uris = redirect_uris
        obj.skip_authorization = skip_authorization
        obj.authorization_grant_type = grant_type
        if algorithm and app_type == 'OIDC':
            obj.algorithm = algorithm
        obj.save()
        # 更新地址信息
        self.update_url_data(tenant.id, config, obj)
        return True

    def update_url_data(self, tenant_id, config, obj):
        '''
        更新配置中的url信息
        '''
        host = get_app_config().get_frontend_host()
        namespace = f'api:{self.pname}_tenant'
        config["userinfo"] = host+reverse(namespace+":oauth-user-info", args=[tenant_id])
        config["authorize"] = host+reverse(namespace+":authorize", args=[tenant_id, obj.uuid])
        config["token"] = host+reverse(namespace+":token", args=[tenant_id])
        config["logout"] = host+reverse(namespace+":oauth-user-logout", args=[tenant_id])
        config["issuer_url"] = "{}/api/v1/tenant/{}/app/{}".format(host,tenant_id,obj.uuid)
        config["client_id"] = obj.client_id
        config["client_secret"] = obj.client_secret
        config["skip_authorization"] = obj.skip_authorization

create_app(self, event, **kwargs) #

抽象方法,创建应用

Parameters:

Name Type Description Default
event

事件参数

required
kwargs

其它方法参数

{}

Returns:

Type Description
bool

是否成功执行

Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
def create_app(self, event, **kwargs):
    config = event.data["config"]
    return self.update_app_data(event, config, True)

delete_app(self, event, **kwargs) #

抽象方法,删除应用

Parameters:

Name Type Description Default
event

事件参数

required
kwargs

其它方法参数

{}

Returns:

Type Description
bool

是否成功执行

Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
def delete_app(self, event, **kwargs):
    Application.objects.filter(uuid=event.data.id).delete()
    return True

load(self) #

抽象方法,插件加载的入口方法

Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
def load(self):
    # 加载url地址
    self.load_urls()
    # 加载相应的view
    self.load_auth_view()
    # 加载相应的配置文件
    if not settings.IS_CENTRAL_ARKID:
        self.register_app_protocol_schema(OIDCConfigSchema, 'OIDC')
        self.register_app_protocol_schema(Oauth2ConfigSchema, 'OAuth2')
    super().load()

update_app(self, event, **kwargs) #

抽象方法,修改应用

Parameters:

Name Type Description Default
event

事件参数

required
kwargs

其它方法参数

{}

Returns:

Type Description
bool

是否成功执行

Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
def update_app(self, event, **kwargs):
    config = event.data["config"]
    return self.update_app_data(event, config, False)

update_app_data(self, event, config, is_create) #

修改应用程序

Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
def update_app_data(self, event, config, is_create):
    '''
    修改应用程序
    '''
    app = event.data["app"]
    tenant = event.tenant

    client_type = config["client_type"]
    redirect_uris = config["redirect_uris"]
    grant_type = config["grant_type"]
    skip_authorization = config["skip_authorization"]
    app_type = event.data.get("app_type")
    algorithm = config.get("algorithm",None)

    obj,iscreated = Application.objects.get_or_create(uuid=app.id)                                                                                
    obj.name = app.name
    obj.client_type = client_type
    obj.redirect_uris = redirect_uris
    obj.skip_authorization = skip_authorization
    obj.authorization_grant_type = grant_type
    if algorithm and app_type == 'OIDC':
        obj.algorithm = algorithm
    obj.save()
    # 更新地址信息
    self.update_url_data(tenant.id, config, obj)
    return True

update_url_data(self, tenant_id, config, obj) #

更新配置中的url信息

Source code in extension_root/com_longgui_app_protocol_oidc/__init__.py
def update_url_data(self, tenant_id, config, obj):
    '''
    更新配置中的url信息
    '''
    host = get_app_config().get_frontend_host()
    namespace = f'api:{self.pname}_tenant'
    config["userinfo"] = host+reverse(namespace+":oauth-user-info", args=[tenant_id])
    config["authorize"] = host+reverse(namespace+":authorize", args=[tenant_id, obj.uuid])
    config["token"] = host+reverse(namespace+":token", args=[tenant_id])
    config["logout"] = host+reverse(namespace+":oauth-user-logout", args=[tenant_id])
    config["issuer_url"] = "{}/api/v1/tenant/{}/app/{}".format(host,tenant_id,obj.uuid)
    config["client_id"] = obj.client_id
    config["client_secret"] = obj.client_secret
    config["skip_authorization"] = obj.skip_authorization

评论