анализ исходного кода flask-login

задняя часть исходный код Безопасность Flask

В этой статье описывается, как flask-login реализует компонент аутентификации пользователя, не требующий базы данных.

Основное использование flask-login

Прежде чем представить, как работает flask-login, давайте кратко рассмотрим, как использовать flask-login.

Сначала создайтеLoginManagerэкземпляр и зарегистрирован вFlaskэкземпляр, а затем предоставитьuser_loaderФункция обратного вызова для загрузки объекта пользователя на основе идентификатора пользователя, хранящегося в сеансе.

login_manager = LoginManager()
login_manager.init_app(app)

@login_manager.user_loader
def load_user(user_id):
    return User.get(user_id)

flask-login также требует, чтобы вы внесли некоторые изменения в объект данных, добавив следующие свойства и методы:

@property
def is_active(self):
   return True

@property
def is_authenticated(self):
   return True

@property
def is_anonymous(self):
   return False

#: 这个方法返回一个能够识别唯一用户的 ID
def get_id(self):
   try:
       return text_type(self.id)
   except AttributeError:
       raise NotImplementedError('No `id` attribute - override `get_id`')

После того, как эти настройки выполнены, flask-login готов к использованию, некоторые типичные способы использования включают в себя вход и выход пользователей:login_user(user)иlogout_user()и использовать@login_requiredЗащитите некоторые функции просмотра и определите, есть ли у текущего пользователя права доступа (отличающиеся в зависимости от того, была ли аутентификация или нет):

@app.route("/settings")
@login_required
def settings():
    pass

и черезcurrent_userобъект для доступа к текущему пользователю.

анализ исходного кода flask-login

Мы анализируем его исходный код в том порядке, в котором вызывается flask-login во время использования.

Объект LoginManager

Первый взглядLoginManagerобъект, который используется для записи всей информации о конфигурации, которая__init__Информация о конфигурации инициализируется в методе. ОдинLoginManagerОбъекты регистрируются в экземпляре Flask с помощью метода init_app:

def init_app(self, app, add_context_processor=True):
   app.login_manager = self
   app.after_request(self._update_remember_cookie)

   self._login_disabled = app.config.get('LOGIN_DISABLED', False)

   if add_context_processor:
       app.context_processor(_user_context_processor)

Основная задача этого метода состоит в том, чтобы добавить функцию в хук after_request экземпляра Flask, который пользователь обновляет файл cookie Remember_me, и добавить обработчик пользовательского контекста в обработчик контекста Flask.

def _user_context_processor():
    return dict(current_user=_get_user())

Этот процессор контекста устанавливает глобально доступную переменнуюcurrent_user, чтобы мы могли получить доступ к этой переменной в функции просмотра или в файле шаблона.

декоратор user_loader

Затем мы переходим к этому методу, который является методом экземпляра LoginManager, и устанавливает user_callback в функцию, которую мы передали. В реальном использовании мы передаем эту функцию через декоратор, которыйload_user(user_id)функция.

def user_loader(self, callback):
   self.user_callback = callback
   return callback

Этот метод требует, чтобы ваша функция обратного вызова могла получать идентификатор в кодировке Unicode и возвращать объект пользователя или None, если пользователь не существует.

метод login_user

Пропустим модификацию класса User и сразу перейдем к этому методу.

def login_user(user, remember=False, force=False, fresh=True):
    if not force and not user.is_active:
        return False

    user_id = getattr(user, current_app.login_manager.id_attribute)()
    session['user_id'] = user_id
    session['_fresh'] = fresh
    session['_id'] = current_app.login_manager._session_identifier_generator()

    if remember:
        session['remember'] = 'set'

    _request_ctx_stack.top.user = user
    user_logged_in.send(current_app._get_current_object(), user=_get_user())
    return True

Если пользователь не активенnot.is_activeи не требует принудительного входаforce, он возвращает отказ. В противном случае, получить первыйuser_id, это черезgetattrдоступ к функциямuserизlogin_manager.id_attributeполученное имущество. вернуться к источникуgetattrдоступ естьuserизget_idметод, поэтому flask-login требует от насUserДобавьте этот метод в класс.

Затем добавьте следующие три сеанса к сеансу, предоставленному Flask:user_id _fresh _id_idчерезLoginManagerиз_session_identifier_generatorМетод получен, и этот метод привязан к этому методу по умолчанию:

def _create_identifier():
    user_agent = request.headers.get('User-Agent')
    if user_agent is not None:
        user_agent = user_agent.encode('utf-8')
    base = '{0}|{1}'.format(_get_remote_addr(), user_agent)
    if str is bytes:
        base = text_type(base, 'utf-8', errors='replace')  # pragma: no cover
    h = sha512()
    h.update(base.encode('utf8'))
    return h.hexdigest()

Не углубляйтесь слишком глубоко, знайте, что этот метод, наконец, генерирует соленый идентификатор на основе пользовательского агента и информации об IP, и его роль состоит в том, чтобы предотвратить подделку файлов cookie.

Затем добавьте добавление пользователя в зависимости от того, нужно ли вам помнитьrememberсессия. Наконец, в_request_ctx_stack.topДобавьте пользователя, отправьте сигнал входа пользователя и верните успех. В этом сигнале входа в систему, называемом_get_userметод,_get_userДетали метода заключаются в том, чтобы сначала обнаружить_request_ctx_stack.topЕсть ли информация о пользователе, если нет, то проходите_load_userМетод добавляет информацию о пользователе на вершину стека и возвращает объект пользователя, если он есть._load_userМетод важен, но вызывать его здесь, очевидно, не будет._request_ctx_stack.topдолжно бытьuserзначение, мы рассмотрим этот метод позже.

def _get_user():
    if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'):
        current_app.login_manager._load_user()

    return getattr(_request_ctx_stack.top, 'user', None)

декоратор login_required

Этот декоратор часто используется для защиты функций просмотра, к которым могут получить доступ только вошедшие в систему пользователи.LoginManager.unauthorizedЭто функция обратного вызова, которая также обеспечивает обработку исключений для некоторых HTTP-методов и тестовых случаев.

def login_required(func):
    @wraps(func)
    def decorated_view(*args, **kwargs):
        if request.method in EXEMPT_METHODS:
            return func(*args, **kwargs)
        elif current_app.login_manager._login_disabled:
            return func(*args, **kwargs)
        elif not current_user.is_authenticated:
            return current_app.login_manager.unauthorized()
        return func(*args, **kwargs)
    return decorated_view

объект current_user

В предыдущем анализе видно, что эта переменная часто появляется и очень полезна. Разработчики могут получить доступ к этой переменной для получения текущего пользователя. Если пользователь не вошел в систему, то получается анонимный пользователь. Его определение:

current_user = LocalProxy(lambda: _get_user())

_get_user()Метод был описан ранее, мы сразу переходим к_load_userметод. Очевидно, что если пользователь делает запрос после повторного входа в систему, нам необходимо получить информацию о пользователе из файла cookie или сеанса, инкапсулированного Flask поверх этого, чтобы последующая обработка могла быть выполнена правильно._load_userРоль метода такова, метод заключается в следующем:

def _load_user(self):
   user_accessed.send(current_app._get_current_object())

   config = current_app.config
   if config.get('SESSION_PROTECTION', self.session_protection):
       deleted = self._session_protection()
       if deleted:
           return self.reload_user()

   is_missing_user_id = 'user_id' not in session
   if is_missing_user_id:
       cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
       header_name = config.get('AUTH_HEADER_NAME', AUTH_HEADER_NAME)
       has_cookie = (cookie_name in request.cookies and
                     session.get('remember') != 'clear')
       if has_cookie:
           return self._load_from_cookie(request.cookies[cookie_name])
       elif self.request_callback:
           return self._load_from_request(request)
       elif header_name in request.headers:
           return self._load_from_header(request.headers[header_name])

   return self.reload_user()

def _session_protection(self):
   sess = session._get_current_object()
   ident = self._session_identifier_generator()

   app = current_app._get_current_object()
   mode = app.config.get('SESSION_PROTECTION', self.session_protection)

   if sess and ident != sess.get('_id', None):
       if mode == 'basic' or sess.permanent:
           sess['_fresh'] = False
           session_protected.send(app)
           return False
       elif mode == 'strong':
           for k in SESSION_KEYS:
               sess.pop(k, None)

           sess['remember'] = 'clear'
           session_protected.send(app)
           return True

   return False

Этот метод сначала обеспечивает безопасность сеанса. Если сеанс проходит проверку безопасности, он проходитreload_userпользователь перегрузки метода, в противном случае проверьте, нет ли сеанса в сеансеuser_idперезагрузить пользователя, если нет, перезагрузить пользователя тремя различными способами.

def reload_user(self, user=None):
    ctx = _request_ctx_stack.top

    if user is None:
        user_id = session.get('user_id')
        if user_id is None:
            ctx.user = self.anonymous_user()
        else:
            if self.user_callback is None:
                raise Exception(
                    "No user_loader has been installed for this "
                    "LoginManager. Add one with the "
                    "'LoginManager.user_loader' decorator.")
            user = self.user_callback(user_id)
            if user is None:
                ctx.user = self.anonymous_user()
            else:
                ctx.user = user
    else:
        ctx.user = user

В этом перегруженном методе, еслиuser_idне существует, загрузите анонимного пользователя в_request_ctx_stack.top, иначе согласноuser_idЗагрузите пользователя, если пользователь не существует, все равно загрузите анонимного пользователя.

после,current_userВы можете получить пользовательский объект или анонимный пользовательский объект.

current_user = LocalProxy(lambda: _get_user())

метод logout_user

Этот метод сначала получает текущего пользователя, а затем удаляет его.user_id _freshДождитесь сеанса, затем удалитеremember, и, наконец, перезагрузить текущего пользователя. Очевидно, что после перезагрузки это будет анонимный пользователь.

def logout_user():
    user = _get_user()

    if 'user_id' in session:
        session.pop('user_id')

    if '_fresh' in session:
        session.pop('_fresh')

    cookie_name = current_app.config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
    if cookie_name in request.cookies:
        session['remember'] = 'clear'

    user_logged_out.send(current_app._get_current_object(), user=user)

    current_app.login_manager.reload_user()
    return True

remember_me cookie

Помните, что мы упоминали ранее, что flask-login добавляет функцию к хуку after_request экземпляра Flask, который позволяет пользователю обновить файл cookie Remember_me, нам, очевидно, нужно обновить файл cookie Remember_me в конце запроса.rememberдля обработки.

def _update_remember_cookie(self, response):
   # Don't modify the session unless there's something to do.
   if 'remember' in session:
       operation = session.pop('remember', None)
       if operation == 'set' and 'user_id' in session:
           self._set_cookie(response)
       elif operation == 'clear':
           self._clear_cookie(response)
    return response

Эта функция зависит от того, следует ли установитьrememberвызывать разные функции

def _set_cookie(self, response):
    config = current_app.config
    cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
    duration = config.get('REMEMBER_COOKIE_DURATION', COOKIE_DURATION)
    domain = config.get('REMEMBER_COOKIE_DOMAIN')
    path = config.get('REMEMBER_COOKIE_PATH', '/')

    secure = config.get('REMEMBER_COOKIE_SECURE', COOKIE_SECURE)
    httponly = config.get('REMEMBER_COOKIE_HTTPONLY', COOKIE_HTTPONLY)

    data = encode_cookie(text_type(session['user_id']))

    try:
        expires = datetime.utcnow() + duration
    except TypeError:
        raise Exception('REMEMBER_COOKIE_DURATION must be a ' +
                        'datetime.timedelta, instead got: {0}'.format(
                            duration))

    response.set_cookie(cookie_name,
                        value=data,
                        expires=expires,
                        domain=domain,
                        path=path,
                        secure=secure,
                        httponly=httponly)

def _clear_cookie(self, response):
    config = current_app.config
    cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
    domain = config.get('REMEMBER_COOKIE_DOMAIN')
    path = config.get('REMEMBER_COOKIE_PATH', '/')
    response.delete_cookie(cookie_name, domain=domain, path=path)

Суммировать

  • flask-login использует сеанс, предоставленный Flask, для сохранения информации о пользователе черезuser_idдля записи идентификаторов пользователей,_idЧтобы злоумышленники не подделывали сеансы.
  • пройти через_request_ctx_stack.top.user, flask-login реализует потокобезопасность.
  • Функция запоминания реализована через файлы cookie.

Для других функций, таких как новый вход в систему, проверьте исходный код самостоятельно.

Имитация flask-login для написания модуля аутентификации на основе токенов

Хотя flask-login прост в использовании, поскольку он основан на сеансе, он ничего не может сделать для приложений RESTful API без сохранения состояния. Я сымитировал его интерфейс в недавнем проекте и реализовал простой, но удобный модуль аутентификации.

from functools import wraps
from flask import (_request_ctx_stack, has_request_context, request,
                   current_app)
from flask_restful import abort
from werkzeug.local import LocalProxy
from app.models.user import User

#: a proxy for the current user
#: it would be an anonymous user if no user is logged in
current_user = LocalProxy(lambda: _get_user())


class AnonymousUserMixin(object):
    @property
    def is_active(self):
        return False

    @property
    def is_authenticated(self):
        return False

    @property
    def is_anonymous(self):
        return True

    def __repr__(self):
        return '<AnonymousUser>'


class Manager(object):
    def __init__(self, app=None):
        if app:
            self.init_app(app)

    def init_app(self, app):
        app.login_manager = self
        app.context_processor(_user_context_processor)

        self._anonymous_user = AnonymousUserMixin
        self._login_disabled = app.config['LOGIN_DISABLED'] or False

    @staticmethod
    def _load_user():
        """Try to load user from request.json.token and set it to
        `_request_ctx_stack.top.user`. If None, set current user as an anonymous
        user.
        """
        ctx = _request_ctx_stack.top
        json = request.json
        user = AnonymousUserMixin()

        if json and json.get('token'):
            real_user = User.load_user_from_auth_token(json.get('token'))
            if real_user:
                user = real_user

        ctx.user = user


def _get_user():
    """Get current user from request context."""
    if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'):
        current_app.login_manager._load_user()

    return getattr(_request_ctx_stack.top, 'user', None)


def _user_context_processor():
    """A context processor to prepare current user."""
    return dict(current_user=_get_user())


def login_user(user):
    """Login a user and return a token."""
    _request_ctx_stack.top.user = user
    return user.generate_auth_token()


def logout_user(user):
    """For a restful API there shouldn't be a `logout` method because the
    server is stateless.
    """
    pass


def login_required(func):
    """Decorator to protect view functions that should only be accessed
    by authenticated users.
    """

    @wraps(func)
    def decorated_view(*args, **kwargs):
        if current_app.login_manager._login_disabled:
            return func(*args, **kwargs)
        elif not current_user.is_authenticated:
            abort(403, err='40300',
                  message='Please login before carrying out this action.')
        return func(*args, **kwargs)

    return decorated_view