В этой статье описывается, как flask-login реализует компонент аутентификации пользователя, не требующий базы данных.
Основное использование flask-login
Прежде чем представить, как работает flask-login, давайте кратко рассмотрим, как использовать flask-login.
Сначала создайтеLoginManager
экземпляр и зарегистрирован вFlask
экземпляр, а затем предоставитьuser_loader
Функция обратного вызова для загрузки объекта пользователя на основе идентификатора пользователя, хранящегося в сеансе.
login_manager = LoginManager()
login_manager.init_app(app)
@login_manager.user_loader
def load_user(user_id):
return User.get(user_id)
flask-login также требует, чтобы вы внесли некоторые изменения в объект данных, добавив следующие свойства и методы:
@property
def is_active(self):
return True
@property
def is_authenticated(self):
return True
@property
def is_anonymous(self):
return False
#: 这个方法返回一个能够识别唯一用户的 ID
def get_id(self):
try:
return text_type(self.id)
except AttributeError:
raise NotImplementedError('No `id` attribute - override `get_id`')
После того, как эти настройки выполнены, flask-login готов к использованию, некоторые типичные способы использования включают в себя вход и выход пользователей:login_user(user)
иlogout_user()
и использовать@login_required
Защитите некоторые функции просмотра и определите, есть ли у текущего пользователя права доступа (отличающиеся в зависимости от того, была ли аутентификация или нет):
@app.route("/settings")
@login_required
def settings():
pass
и черезcurrent_user
объект для доступа к текущему пользователю.
анализ исходного кода flask-login
Мы анализируем его исходный код в том порядке, в котором вызывается flask-login во время использования.
Объект LoginManager
Первый взглядLoginManager
объект, который используется для записи всей информации о конфигурации, которая__init__
Информация о конфигурации инициализируется в методе. ОдинLoginManager
Объекты регистрируются в экземпляре Flask с помощью метода init_app:
def init_app(self, app, add_context_processor=True):
app.login_manager = self
app.after_request(self._update_remember_cookie)
self._login_disabled = app.config.get('LOGIN_DISABLED', False)
if add_context_processor:
app.context_processor(_user_context_processor)
Основная задача этого метода состоит в том, чтобы добавить функцию в хук after_request экземпляра Flask, который пользователь обновляет файл cookie Remember_me, и добавить обработчик пользовательского контекста в обработчик контекста Flask.
def _user_context_processor():
return dict(current_user=_get_user())
Этот процессор контекста устанавливает глобально доступную переменнуюcurrent_user
, чтобы мы могли получить доступ к этой переменной в функции просмотра или в файле шаблона.
декоратор user_loader
Затем мы переходим к этому методу, который является методом экземпляра LoginManager, и устанавливает user_callback в функцию, которую мы передали. В реальном использовании мы передаем эту функцию через декоратор, которыйload_user(user_id)
функция.
def user_loader(self, callback):
self.user_callback = callback
return callback
Этот метод требует, чтобы ваша функция обратного вызова могла получать идентификатор в кодировке Unicode и возвращать объект пользователя или None, если пользователь не существует.
метод login_user
Пропустим модификацию класса User и сразу перейдем к этому методу.
def login_user(user, remember=False, force=False, fresh=True):
if not force and not user.is_active:
return False
user_id = getattr(user, current_app.login_manager.id_attribute)()
session['user_id'] = user_id
session['_fresh'] = fresh
session['_id'] = current_app.login_manager._session_identifier_generator()
if remember:
session['remember'] = 'set'
_request_ctx_stack.top.user = user
user_logged_in.send(current_app._get_current_object(), user=_get_user())
return True
Если пользователь не активенnot.is_active
и не требует принудительного входаforce
, он возвращает отказ. В противном случае, получить первыйuser_id
, это черезgetattr
доступ к функциямuser
изlogin_manager.id_attribute
полученное имущество. вернуться к источникуgetattr
доступ естьuser
изget_id
метод, поэтому flask-login требует от насUser
Добавьте этот метод в класс.
Затем добавьте следующие три сеанса к сеансу, предоставленному Flask:user_id
_fresh
_id
,в_id
черезLoginManager
из_session_identifier_generator
Метод получен, и этот метод привязан к этому методу по умолчанию:
def _create_identifier():
user_agent = request.headers.get('User-Agent')
if user_agent is not None:
user_agent = user_agent.encode('utf-8')
base = '{0}|{1}'.format(_get_remote_addr(), user_agent)
if str is bytes:
base = text_type(base, 'utf-8', errors='replace') # pragma: no cover
h = sha512()
h.update(base.encode('utf8'))
return h.hexdigest()
Не углубляйтесь слишком глубоко, знайте, что этот метод, наконец, генерирует соленый идентификатор на основе пользовательского агента и информации об IP, и его роль состоит в том, чтобы предотвратить подделку файлов cookie.
Затем добавьте добавление пользователя в зависимости от того, нужно ли вам помнитьremember
сессия. Наконец, в_request_ctx_stack.top
Добавьте пользователя, отправьте сигнал входа пользователя и верните успех. В этом сигнале входа в систему, называемом_get_user
метод,_get_user
Детали метода заключаются в том, чтобы сначала обнаружить_request_ctx_stack.top
Есть ли информация о пользователе, если нет, то проходите_load_user
Метод добавляет информацию о пользователе на вершину стека и возвращает объект пользователя, если он есть._load_user
Метод важен, но вызывать его здесь, очевидно, не будет._request_ctx_stack.top
должно бытьuser
значение, мы рассмотрим этот метод позже.
def _get_user():
if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'):
current_app.login_manager._load_user()
return getattr(_request_ctx_stack.top, 'user', None)
декоратор login_required
Этот декоратор часто используется для защиты функций просмотра, к которым могут получить доступ только вошедшие в систему пользователи.LoginManager.unauthorized
Это функция обратного вызова, которая также обеспечивает обработку исключений для некоторых HTTP-методов и тестовых случаев.
def login_required(func):
@wraps(func)
def decorated_view(*args, **kwargs):
if request.method in EXEMPT_METHODS:
return func(*args, **kwargs)
elif current_app.login_manager._login_disabled:
return func(*args, **kwargs)
elif not current_user.is_authenticated:
return current_app.login_manager.unauthorized()
return func(*args, **kwargs)
return decorated_view
объект current_user
В предыдущем анализе видно, что эта переменная часто появляется и очень полезна. Разработчики могут получить доступ к этой переменной для получения текущего пользователя. Если пользователь не вошел в систему, то получается анонимный пользователь. Его определение:
current_user = LocalProxy(lambda: _get_user())
_get_user()
Метод был описан ранее, мы сразу переходим к_load_user
метод. Очевидно, что если пользователь делает запрос после повторного входа в систему, нам необходимо получить информацию о пользователе из файла cookie или сеанса, инкапсулированного Flask поверх этого, чтобы последующая обработка могла быть выполнена правильно._load_user
Роль метода такова, метод заключается в следующем:
def _load_user(self):
user_accessed.send(current_app._get_current_object())
config = current_app.config
if config.get('SESSION_PROTECTION', self.session_protection):
deleted = self._session_protection()
if deleted:
return self.reload_user()
is_missing_user_id = 'user_id' not in session
if is_missing_user_id:
cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
header_name = config.get('AUTH_HEADER_NAME', AUTH_HEADER_NAME)
has_cookie = (cookie_name in request.cookies and
session.get('remember') != 'clear')
if has_cookie:
return self._load_from_cookie(request.cookies[cookie_name])
elif self.request_callback:
return self._load_from_request(request)
elif header_name in request.headers:
return self._load_from_header(request.headers[header_name])
return self.reload_user()
def _session_protection(self):
sess = session._get_current_object()
ident = self._session_identifier_generator()
app = current_app._get_current_object()
mode = app.config.get('SESSION_PROTECTION', self.session_protection)
if sess and ident != sess.get('_id', None):
if mode == 'basic' or sess.permanent:
sess['_fresh'] = False
session_protected.send(app)
return False
elif mode == 'strong':
for k in SESSION_KEYS:
sess.pop(k, None)
sess['remember'] = 'clear'
session_protected.send(app)
return True
return False
Этот метод сначала обеспечивает безопасность сеанса. Если сеанс проходит проверку безопасности, он проходитreload_user
пользователь перегрузки метода, в противном случае проверьте, нет ли сеанса в сеансеuser_id
перезагрузить пользователя, если нет, перезагрузить пользователя тремя различными способами.
def reload_user(self, user=None):
ctx = _request_ctx_stack.top
if user is None:
user_id = session.get('user_id')
if user_id is None:
ctx.user = self.anonymous_user()
else:
if self.user_callback is None:
raise Exception(
"No user_loader has been installed for this "
"LoginManager. Add one with the "
"'LoginManager.user_loader' decorator.")
user = self.user_callback(user_id)
if user is None:
ctx.user = self.anonymous_user()
else:
ctx.user = user
else:
ctx.user = user
В этом перегруженном методе, еслиuser_id
не существует, загрузите анонимного пользователя в_request_ctx_stack.top
, иначе согласноuser_id
Загрузите пользователя, если пользователь не существует, все равно загрузите анонимного пользователя.
после,current_user
Вы можете получить пользовательский объект или анонимный пользовательский объект.
current_user = LocalProxy(lambda: _get_user())
метод logout_user
Этот метод сначала получает текущего пользователя, а затем удаляет его.user_id
_fresh
Дождитесь сеанса, затем удалитеremember
, и, наконец, перезагрузить текущего пользователя. Очевидно, что после перезагрузки это будет анонимный пользователь.
def logout_user():
user = _get_user()
if 'user_id' in session:
session.pop('user_id')
if '_fresh' in session:
session.pop('_fresh')
cookie_name = current_app.config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
if cookie_name in request.cookies:
session['remember'] = 'clear'
user_logged_out.send(current_app._get_current_object(), user=user)
current_app.login_manager.reload_user()
return True
remember_me cookie
Помните, что мы упоминали ранее, что flask-login добавляет функцию к хуку after_request экземпляра Flask, который позволяет пользователю обновить файл cookie Remember_me, нам, очевидно, нужно обновить файл cookie Remember_me в конце запроса.remember
для обработки.
def _update_remember_cookie(self, response):
# Don't modify the session unless there's something to do.
if 'remember' in session:
operation = session.pop('remember', None)
if operation == 'set' and 'user_id' in session:
self._set_cookie(response)
elif operation == 'clear':
self._clear_cookie(response)
return response
Эта функция зависит от того, следует ли установитьremember
вызывать разные функции
def _set_cookie(self, response):
config = current_app.config
cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
duration = config.get('REMEMBER_COOKIE_DURATION', COOKIE_DURATION)
domain = config.get('REMEMBER_COOKIE_DOMAIN')
path = config.get('REMEMBER_COOKIE_PATH', '/')
secure = config.get('REMEMBER_COOKIE_SECURE', COOKIE_SECURE)
httponly = config.get('REMEMBER_COOKIE_HTTPONLY', COOKIE_HTTPONLY)
data = encode_cookie(text_type(session['user_id']))
try:
expires = datetime.utcnow() + duration
except TypeError:
raise Exception('REMEMBER_COOKIE_DURATION must be a ' +
'datetime.timedelta, instead got: {0}'.format(
duration))
response.set_cookie(cookie_name,
value=data,
expires=expires,
domain=domain,
path=path,
secure=secure,
httponly=httponly)
def _clear_cookie(self, response):
config = current_app.config
cookie_name = config.get('REMEMBER_COOKIE_NAME', COOKIE_NAME)
domain = config.get('REMEMBER_COOKIE_DOMAIN')
path = config.get('REMEMBER_COOKIE_PATH', '/')
response.delete_cookie(cookie_name, domain=domain, path=path)
Суммировать
- flask-login использует сеанс, предоставленный Flask, для сохранения информации о пользователе через
user_id
для записи идентификаторов пользователей,_id
Чтобы злоумышленники не подделывали сеансы. - пройти через
_request_ctx_stack.top.user
, flask-login реализует потокобезопасность. - Функция запоминания реализована через файлы cookie.
Для других функций, таких как новый вход в систему, проверьте исходный код самостоятельно.
Имитация flask-login для написания модуля аутентификации на основе токенов
Хотя flask-login прост в использовании, поскольку он основан на сеансе, он ничего не может сделать для приложений RESTful API без сохранения состояния. Я сымитировал его интерфейс в недавнем проекте и реализовал простой, но удобный модуль аутентификации.
from functools import wraps
from flask import (_request_ctx_stack, has_request_context, request,
current_app)
from flask_restful import abort
from werkzeug.local import LocalProxy
from app.models.user import User
#: a proxy for the current user
#: it would be an anonymous user if no user is logged in
current_user = LocalProxy(lambda: _get_user())
class AnonymousUserMixin(object):
@property
def is_active(self):
return False
@property
def is_authenticated(self):
return False
@property
def is_anonymous(self):
return True
def __repr__(self):
return '<AnonymousUser>'
class Manager(object):
def __init__(self, app=None):
if app:
self.init_app(app)
def init_app(self, app):
app.login_manager = self
app.context_processor(_user_context_processor)
self._anonymous_user = AnonymousUserMixin
self._login_disabled = app.config['LOGIN_DISABLED'] or False
@staticmethod
def _load_user():
"""Try to load user from request.json.token and set it to
`_request_ctx_stack.top.user`. If None, set current user as an anonymous
user.
"""
ctx = _request_ctx_stack.top
json = request.json
user = AnonymousUserMixin()
if json and json.get('token'):
real_user = User.load_user_from_auth_token(json.get('token'))
if real_user:
user = real_user
ctx.user = user
def _get_user():
"""Get current user from request context."""
if has_request_context() and not hasattr(_request_ctx_stack.top, 'user'):
current_app.login_manager._load_user()
return getattr(_request_ctx_stack.top, 'user', None)
def _user_context_processor():
"""A context processor to prepare current user."""
return dict(current_user=_get_user())
def login_user(user):
"""Login a user and return a token."""
_request_ctx_stack.top.user = user
return user.generate_auth_token()
def logout_user(user):
"""For a restful API there shouldn't be a `logout` method because the
server is stateless.
"""
pass
def login_required(func):
"""Decorator to protect view functions that should only be accessed
by authenticated users.
"""
@wraps(func)
def decorated_view(*args, **kwargs):
if current_app.login_manager._login_disabled:
return func(*args, **kwargs)
elif not current_user.is_authenticated:
abort(403, err='40300',
message='Please login before carrying out this action.')
return func(*args, **kwargs)
return decorated_view