import copy
import functools
import json
import logging
from json import JSONDecodeError
from logging import Logger
from typing import Any, Optional
import backoff
import requests
from requests.auth import AuthBase
from annoworkapi.generated_api import AbstractAnnoworkApi
logger = logging.getLogger(__name__)
DEFAULT_ENDPOINT_URL = "https://annowork.com"
def _raise_for_status(response: requests.Response) -> None:
"""
HTTP Status CodeがErrorの場合、``requests.exceptions.HTTPError`` を発生させる。
そのとき ``response.text`` もHTTPErrorに加えて、HTTPError発生時にエラーの原因が分かるようにする。
Args:
response: Response
Raises:
requests.exceptions.HTTPError:
"""
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
http_error_msg = f"{e.args[0]} , {response.text}"
e.args = (http_error_msg,)
raise e
def _log_error_response(arg_logger: logging.Logger, response: requests.Response) -> None:
"""
HTTP Statusが400以上ならば、loggerにresponse/request情報を出力する
Args:
arg_logger: logger
response: Response
"""
def mask_key(d, key: str): # noqa: ANN001
if key in d:
d[key] = "***"
if 400 <= response.status_code < 600:
headers = copy.deepcopy(response.request.headers)
# logにAuthorizationを出力しないようにマスクする
mask_key(headers, "Authorization")
# request_bodyのpassword関係をマスクして、logに出力する
request_body = response.request.body
request_body_for_logger: Optional[Any]
if request_body is not None and request_body != "":
try:
dict_request_body = json.loads(request_body)
except JSONDecodeError:
request_body_for_logger = request_body
else:
request_body_for_logger = _create_request_body_for_logger(dict_request_body)
else:
request_body_for_logger = request_body
arg_logger.error(
"HTTP error occurred :: %s",
{
"response": {
"status_code": response.status_code,
"text": response.text,
},
"request": {
"http_method": response.request.method,
"url": response.request.url,
"body": request_body_for_logger,
"headers": headers,
},
},
)
def ignore_http_error(func=None, /, *, status_code_list: list[int], logger: Optional[Logger] = None): # pylint: disable=redefined-outer-name, # noqa: ANN001
"""
HTTPErrorが発生したとき、特定のstatus codeを無視して、処理する。
無視した場合、Noneを返す。
Args:
status_code_list: 無視するステータスコードのリスト
logger:
"""
new_logger = logging.getLogger(__name__) if logger is None else logger
def decorator(function): # noqa: ANN001
@functools.wraps(function)
def wrapped(*args, **kwargs):
try:
return function(*args, **kwargs)
except requests.exceptions.HTTPError as e:
if e.response.status_code in status_code_list:
return None
else:
_log_error_response(new_logger, e.response)
raise e
return wrapped
if func is None:
# We're called with parens.
return decorator
# We're called as @dataclass without parens.
return decorator(func)
def allow_404_error(func=None, /, *, logger: Optional[Logger] = None): # pylint: disable=redefined-outer-name, # noqa: ANN001
"""
Not Found Error(404)を無視(許容)して、処理するデコレータ。Not Found Errorが発生したときはNoneを返す。
リソースの存在確認などに利用する。
"""
def wrap(func): # noqa: ANN001
return ignore_http_error(func, status_code_list=[requests.codes.not_found], logger=logger)
if func is None:
return wrap
return wrap(func)
def my_backoff(function): # noqa: ANN001
"""
リトライが必要な場合はリトライする
"""
@functools.wraps(function)
def wrapped(*args, **kwargs):
def fatal_code(e: Exception):
"""
リトライするかどうか
status codeが5xxのときまたはToo many Requests(429)のときはリトライする。
ただし500はリトライしない
https://requests.kennethreitz.org/en/master/user/quickstart/#errors-and-exceptions
Args:
e: exception
Returns:
True: give up(リトライしない), False: リトライする
"""
if isinstance(e, requests.exceptions.HTTPError):
if e.response is None:
return True
# status_codeの範囲は4XX ~ 5XX
status_code = e.response.status_code
if status_code == requests.codes.internal_server_error:
return True
elif status_code == requests.codes.too_many_requests:
return False
elif 400 <= status_code < 500:
return True
elif 500 <= status_code < 600:
return False
return False
return backoff.on_exception(
backoff.expo,
(requests.exceptions.RequestException, ConnectionError),
jitter=backoff.full_jitter,
max_time=300,
giveup=fatal_code,
logger=logger,
# giveup時のレベルがデフォルトのERRORだと、`wrapper.get_job_or_none` などを実行したときに不要なログが出力されるため、ログレベルをDEBUG以下に下げておく # noqa: E501
giveup_log_level=logging.NOTSET,
)(function)(*args, **kwargs)
return wrapped
def _create_request_body_for_logger(data: Any) -> Any: # noqa: ANN401
"""
ログに出力するためのrequest_bodyを生成する。
* パスワードやトークンなどの機密情報をマスクする
* bytes型の場合は `(bytes)`と記載する。
Args:
data: request_body
Returns:
ログ出力用のrequest_body
"""
def mask_key(d, key: str): # noqa: ANN001
if key in d:
d[key] = "***"
if not isinstance(data, dict):
return data
elif isinstance(data, bytes):
# bytes型のときは値を出力しても意味がないので、bytesであることが分かるようにする
return "(bytes)"
MASKED_KEYS = {
"password",
"confirmation_code",
"new_password",
}
diff = MASKED_KEYS - set(data.keys())
if len(diff) == len(MASKED_KEYS):
# マスク対象のキーがない
return data
copied_data = copy.deepcopy(data)
for key in MASKED_KEYS:
mask_key(copied_data, key)
return copied_data
[ドキュメント]
class AnnoworkApi(AbstractAnnoworkApi):
"""
Web APIに対応したメソッドが存在するクラス。
Args:
login_user_id: AnnoworkにログインするときのユーザID
login_password: Annoworkにログインするときのパスワード
endpoint_url: WebAPI URLのbase部分
"""
def __init__(self, login_user_id: str, login_password: str, *, endpoint_url: str = DEFAULT_ENDPOINT_URL) -> None:
if not login_user_id or not login_password:
raise ValueError("login_user_id or login_password is empty.")
self.login_user_id = login_user_id
self.login_password = login_password
self.base_url = f"{endpoint_url}/api/v1"
self.session = requests.Session()
self.token_dict: Optional[dict[str, Any]] = None
class __MyToken(AuthBase):
"""
requestsモジュールのauthに渡す情報。
http://docs.python-requests.org/en/master/user/advanced/#custom-authentication
"""
def __init__(self, id_token: str) -> None:
self.id_token = id_token
def __call__(self, req): # noqa: ANN204,ANN001
req.headers["Authorization"] = self.id_token
return req
#########################################
# Private Method
#########################################
def _create_kwargs(
self,
params: Optional[dict[str, Any]] = None,
headers: Optional[dict[str, Any]] = None,
request_body: Optional[Any] = None, # noqa: ANN401
) -> dict[str, Any]:
"""
requestsモジュールのget,...メソッドに渡すkwargsを生成する。
Args:
params: クエリパラメタに設定する情報
headers: リクエストヘッダに設定する情報
Returns:
kwargs情報
"""
# query_param
new_params = {}
if params is not None:
for key, value in params.items():
if isinstance(value, (list, dict)):
new_params[key] = json.dumps(value)
else:
new_params[key] = value
kwargs: dict[str, Any] = {
"params": new_params,
"headers": headers,
}
if self.token_dict is not None:
kwargs.update({"auth": self.__MyToken(self.token_dict["id_token"])})
if request_body is not None:
if isinstance(request_body, (dict, list)):
kwargs.update({"json": request_body})
elif isinstance(request_body, str):
kwargs.update({"data": request_body.encode("utf-8")})
else:
kwargs.update({"data": request_body})
return kwargs
@staticmethod
def _response_to_content(response: requests.Response) -> Any: # noqa: ANN401
"""
Responseのcontentを、Content-Typeに対応した型に変換する。
Args:
response:
Returns:
JSONの場合はdict, textの場合はstringのcontent
"""
content_type = response.headers["Content-Type"]
# `Content-Type: application/json;charset=utf-8`などcharsetが含まれている場合にも対応できるようにする。
tokens = content_type.split(";")
media_type = tokens[0].strip()
if media_type == "application/json":
content = response.json() if len(response.content) != 0 else {}
elif media_type.find("text/") >= 0:
content = response.text
else:
content = response.content
return content
@my_backoff
def _execute_http_request(
self,
http_method: str,
url: str,
*,
params: Optional[dict[str, Any]] = None,
data: Optional[Any] = None, # noqa: ANN401
json: Optional[Any] = None, # pylint: disable=redefined-outer-name, # noqa: ANN401
headers: Optional[dict[str, Any]] = None,
**kwargs,
) -> requests.Response:
"""Session情報を使って、HTTP Requestを投げる。
引数は ``requests.Session.request`` にそのまま渡す。
Args:
raise_for_status: Trueの場合HTTP Status Codeが4XX,5XXのときはHTTPErrorをスローします
Returns:
requests.Response: [description]
Raises:
requests.exceptions.HTTPError: http status codeが4XXX,5XXXのとき
"""
response = self.session.request(method=http_method, url=url, params=params, data=data, headers=headers, json=json, **kwargs)
# response.requestよりメソッド引数のrequest情報の方が分かりやすいので、メソッド引数のrequest情報を出力する。
logger.debug(
"Sent a request :: %s",
{
"requests": {
"http_method": http_method,
"url": url,
"query_params": params,
"request_body_json": _create_request_body_for_logger(json) if json is not None else None,
"request_body_data": _create_request_body_for_logger(data) if data is not None else None,
"header_params": headers,
},
"response": {
"status_code": response.status_code,
"content_length": len(response.content),
},
},
)
_log_error_response(logger, response)
_raise_for_status(response)
return response
@my_backoff
def _request_wrapper(
self,
http_method: str,
url_path: str,
*,
query_params: Optional[dict[str, Any]] = None,
header_params: Optional[dict[str, Any]] = None,
request_body: Optional[Any] = None, # noqa: ANN401
log_response_with_error: bool = True,
) -> Any: # noqa: ANN401
"""
HTTP Requestを投げて、Responseを返す。
Args:
http_method:
url_path:
query_params:
header_params:
request_body:
log_response_with_error: HTTP Errorが発生したときにレスポンスの中身をログに出力するか否か
Returns:
responseの中身。content_typeにより型が変わる。
application/jsonならdict型, text/*ならばstr型, それ以外ならばbite型。
"""
url = f"{self.base_url}{url_path}"
kwargs = self._create_kwargs(query_params, header_params, request_body)
response = getattr(self.session, http_method.lower())(url, **kwargs)
logger.debug(
"Sent a request :: %s",
{
"request": {
"http_method": http_method.lower(),
"url": url,
"query_params": query_params,
"header_params": header_params,
"request_body": _create_request_body_for_logger(request_body) if request_body is not None else None,
},
"response": {
"status_code": response.status_code,
"content_length": len(response.content),
},
},
)
# Unauthorized Errorならば、ログイン後に再度実行する
if response.status_code == requests.codes.unauthorized:
self.login()
return self._request_wrapper(
http_method,
url_path,
query_params=query_params,
header_params=header_params,
request_body=request_body,
log_response_with_error=log_response_with_error,
)
response.encoding = "utf-8"
content = self._response_to_content(response)
if log_response_with_error:
_log_error_response(logger, response)
_raise_for_status(response)
return content
#########################################
# Public Method : Login
#########################################
[ドキュメント]
@my_backoff
def login(self) -> dict[str, Any]:
"""
ログイン
Returns:
Token情報
"""
request_body = {"user_id": self.login_user_id, "password": self.login_password}
url = f"{self.base_url}/login"
response = self._execute_http_request(http_method="post", url=url, json=request_body)
json_obj = response.json()
self.token_dict = json_obj
return json_obj