import sys import time import requests import hashlib import json from requests import HTTPError _datetime_format_string = '%Y%m%d%H%M%S' EXTRACTOR_TEXT = 1 EXTRACTOR_HTML = 2 EXTRACTOR_BINARY = 3 # Modes for TextExtractor POSITIONLEN_MODE = 1 POSITION_MODE = 2 REGEX_MODE = 3 # searchmodes SEARCHMODE_WEBARCHIV = 1 SEARCHMODE_INTERNETARCHIVE = 2 SEARCHMODE_WEBARCHIV_INTERNETARCHIVE = 3 class SessionTimeoutError(Exception): pass class WebarchivSession: @property def version(self): """ Current protocol version """ return '0.1.0' @property def api_path(self): """ Protocol, domain and path prefix for the Webarchive API, with a single positional format string placeholder for the REST operation and parameters. """ return 'https://webarchiv.onb.ac.at/api' @property def base_url(self): """ Protocol, domain and path prefix for the Webarchive API, with a single positional format string placeholder for the REST operation and parameters. """ return self.api_path + '{}' @property def _error_template(self): """ A format string for displaying HTTP Errors. Must contain one placeholder 'status_code' for the HTTP status code. Must contain one placeholder 'response_text' for the body of the response. """ return 'HTTP ERROR - status code {status_code}\n----\n{response_text}\n----\n\n' def __init__(self, api_key, allow_tracking=False): self.api_key = api_key self.allow_tracking = allow_tracking self.token = None def connect(self): """ Connect to the Webarchive API, request and save a token. """ try: self.token = self._authenticate() except HTTPError as e: self._display_http_error(e) def _authenticate(self): if self.allow_tracking: from uuid import getnode as get_mac mac = get_mac() sha256 = hashlib.sha256() sha256.update(str(mac).encode('utf-8')) fingerprint = sha256.hexdigest() else: fingerprint = '' r = requests.post(self.base_url.format('/authentication'), data='''{{ "apikey": "{api_key}", "fingerprint": "{fingerprint}", "version": "{version}" }}'''.format(api_key=self.api_key, version=self.version, fingerprint=fingerprint), headers={ 'content-type': 'application/json', 'accept': 'application/ld+json' } ) if r.status_code == 201: return r.json()['t'] else: raise HTTPError(response=r) def _add_api_key_and_token(self, params_dict: dict): """ Add the saved api key and token to a given dictionary. :param params_dict: A dictionary that's probably used as a 'params' keyword parameter for calling requests.get(). :return: The same dictionary extended by 'apikey' and 't' keys. """ params_dict['apikey'] = self.api_key params_dict['t'] = self.token return params_dict def _display_http_error(self, e: HTTPError): print(self._error_template.format(status_code=e.response.status_code, response_text=e.response.text), file=sys.stderr) @staticmethod def _handle_response_errors(r): if r.status_code == 403: print('Forbidden. Invalid Token or ApiKey transmitted', file=sys.stderr) return r elif r.status_code == 400: print('Bad request', file=sys.stderr) return r elif r.status_code == 410: print('The requested API Version (via X-API-VERSION Header) is not available', file=sys.stderr) return r def _get(self, op, auto_connect=True, **kwargs, ): kwargs['params'] = self._add_api_key_and_token(kwargs.pop('params', {})) r = requests.get(self.base_url.format(op), **kwargs) if r.ok: return r elif r.status_code == 403 and auto_connect: self.connect() return self._get(op=op, auto_connect=False, **kwargs) else: return self._handle_response_errors(r) def _post(self, op, auto_connect=True, json: dict = None, **kwargs): if not json: json = {} kwargs['json'] = self._add_api_key_and_token(json) kwargs['headers'] = { 'content-type': 'application/json', 'accept': 'application/ld+json' } r = requests.post(self.base_url.format(op), **kwargs) if r.ok: return r elif r.status_code == 403 and auto_connect: self.connect() return self._post(op=op, auto_connect=False, **kwargs) else: return self._handle_response_errors(r) def fulltext_search(self, query_string, from_=None, to_=None): """ Start a fulltext search query in the Webarchive. :param query_string: String to search for :param from_: Optional earliest date bound for the search in the format YYYYMM. :param to_: Optional latest date bound for the search in the format YYYYMM. :return: HTTP Response object """ params = {'q': query_string} if from_: params['from'] = from_ if to_: params['to'] = to_ try: response = self._get(op='/search/fulltext', params=params) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) print('Query for "{}" not added'.format(query_string)) def fulltext_search_within_domain(self, query_string, domain, from_=None, to_=None): """ Start a fulltext seed search query in the Webarchive. :param query_string: String to search for :param domain: Search only within this domain name :param from_: Optional earliest date bound for the search in the format YYYYMM. :param to_: Optional latest date bound for the search in the format YYYYMM. :return: HTTP Response object """ params = {'q': query_string, 'g': domain} if from_: params['from'] = from_ if to_: params['to'] = to_ try: response = self._get(op='/search/fulltext/seed', params=params) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) def fulltext_search_within_url(self, query_string, url, pagesize=10, from_=None, to_=None): """ Start a fulltext capture search query in the Webarchive. :param query_string: String to search for :param url: Search only captures starting at this exact web address :param from_: Optional earliest date bound for the search in the format YYYYMM. :param to_: Optional latest date bound for the search in the format YYYYMM. :return: HTTP Response object """ params = {'q': query_string, 'g': url, 'pagesize': pagesize} if from_: params['from'] = from_ if to_: params['to'] = to_ try: response = self._get(op='/search/fulltext/capture', params=params) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) def wayback_search(self, query_string, from_=None, to_=None, mode_=SEARCHMODE_WEBARCHIV): """ Start a wayback search query in the Webarchive. :param query_string: String to search for :param from_: Optional earliest date bound for the search in the format YYYYMM. :param to_: Optional latest date bound for the search in the format YYYYMM. :return: HTTP Response object """ params = {'q': query_string} if from_: params['from'] = from_ if to_: params['to'] = to_ params['mode'] = mode_ try: response = self._get(op='/search/wayback', params=params) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) print('Error:'.format(query_string)) def wait_for_response(self, response): """ Polls until the server responds with a result """ if response.status_code == 400: return response while response.status_code != 200: time.sleep(0.5) response = self.status_query(response) return response def status_query(self, resp): """ this is the polling request for the given type of request """ j = resp.json() context = j['context'] requestid = j['requestid'] type_ = resp.json()['type'] if type_ == 1: r = self._get(op='/search/status/fulltext', params={'requestid': requestid}) elif type_ == 2: r = self._get(op='/search/status/wayback', params={'requestid': requestid}) elif type_ == 5: r = self._get(op='/fragment/checksum/status', params={'requestid': requestid}) else: raise NotImplementedError(f'Unknown status query type {type_} - Please update client.') return r def domain_name_search(self, query_string, page_=1, pagesize_=100): """ Start a domain name search in the Webarchive. :param query_string: String to search for :param page_: The page number parameter works with the page size parameter to control the offset of the records returned in the results. Default value is 1 :param pagesize_: The page size parameter works with the page number parameter to control the offset of the records returned in the results. It also controls how many results are returned with each request. Default value is 10 :return: result as json """ params = {'q': query_string} if page_: params['page'] = page_ if pagesize_: params['pagesize'] = pagesize_ try: response = self._get(op='/search/domainname', params=params) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) print('Error:'.format(query_string)) def histogram_search(self, query_string, interval_=3, from_=None, to_=None): """ Start a domain name search in the Webarchive. :param query_string: String to search for :return: result as json """ params = {'q': query_string} if interval_: params['interval'] = interval_ if from_: params['from'] = from_ if to_: params['to'] = to_ try: response = self._get(op='/search/fulltext/histogram', params=params) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) print('Error:'.format(query_string)) def get_snapshot_url(self, seed, capture, onlysvg): return self.api_path + 'snapshot?capture=' + capture + '&t=' + self.token + '&apikey=' + self.api_key + '&onlysvg=' + onlysvg + '&seed=' + seed @staticmethod def result_contains_seeds(response): try: return response.json()['subtype'] == 2 except: return False @staticmethod def result_contains_captures(response): try: return response.json()['subtype'] == 3 except: return False def save_page(self, url): try: response = self._post(op='/savepage', json={ "url": url }) return response except HTTPError as e: self._display_http_error(e) def fragment_checksum_html(self, seed, capture, selector, occurrence): try: response = self._post(op='/fragment/checksum/html', json={ "seed": seed, "capture": capture, "selector": selector, "occurrence": occurrence, "extractortype": EXTRACTOR_HTML }) response = self.status_query(response) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) def fragment_checksum_binary(self, seed, capture): try: response = self._post(op='/fragment/checksum/binary', json={ "seed": seed, "capture": capture, "extractortype": EXTRACTOR_BINARY }) response = self.status_query(response) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) def fragment_checksum_text_positionlen(self, seed, capture, pos, len): try: response = self._post(op='/fragment/checksum/text', json={ "seed": seed, "capture": capture, "mode": POSITIONLEN_MODE, "pos": pos, "len": len, "extractortype": EXTRACTOR_TEXT }) response = self.status_query(response) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) def fragment_checksum_text_position(self, seed, capture, pos): try: response = self._post(op='/fragment/checksum/text', json={ "seed": seed, "capture": capture, "mode": POSITION_MODE, "pos": pos, "extractortype": EXTRACTOR_TEXT }) response = self.status_query(response) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) def fragment_checksum_text_regex(self, seed, capture, regexpattern, occurrence): try: response = self._post(op='/fragment/checksum/text', json={ "seed": seed, "capture": capture, "mode": REGEX_MODE, "regexpattern": regexpattern, "occurrence": occurrence, "extractortype": EXTRACTOR_TEXT }) response = self.status_query(response) return self.wait_for_response(response) except HTTPError as e: self._display_http_error(e) def create_watchlist(self, urls): try: response = self._post(op='/watchlist', json={ "urls": urls }) return response except HTTPError as e: self._display_http_error(e)