Newer
Older
from requests import HTTPError
_datetime_format_string = '%Y%m%d%H%M%S'
EXTRACTOR_TEXT = 1
EXTRACTOR_BINARY = 3
# Modes for TextExtractor
POSITIONLEN_MODE = 1
POSITION_MODE = 2
REGEX_MODE = 3
# searchmodes
SEARCHMODE_WEBARCHIV = 1
SEARCHMODE_INTERNETARCHIVE = 2
SEARCHMODE_WEBARCHIV_INTERNETARCHIVE = 3
class SessionTimeoutError(Exception):
pass
class WebarchivSession:
@property
def version(self):
"""
Current protocol version
"""
return '0.1.0'
@property
def api_path(self):
"""
Protocol, domain and path prefix for the Webarchive API,
with a single positional format string placeholder
for the REST operation and parameters.
"""
@property
def base_url(self):
"""
Protocol, domain and path prefix for the Webarchive API,
with a single positional format string placeholder
for the REST operation and parameters.
"""
@property
def _error_template(self):
"""
A format string for displaying HTTP Errors.
Must contain one placeholder 'status_code' for the HTTP status code.
Must contain one placeholder 'response_text' for the body of the response.
"""
return 'HTTP ERROR - status code {status_code}\n----\n{response_text}\n----\n\n'
def __init__(self, api_key, allow_tracking=False):
self.token = None
def connect(self):
"""
Connect to the Webarchive API, request and save a token.
"""
try:
self.token = self._authenticate()
except HTTPError as e:
self._display_http_error(e)
def _authenticate(self):
if self.allow_tracking:
from uuid import getnode as get_mac
mac = get_mac()
sha256 = hashlib.sha256()
sha256.update(str(mac).encode('utf-8'))
fingerprint = sha256.hexdigest()
else:
fingerprint = ''
}}'''.format(api_key=self.api_key, version=self.version, fingerprint=fingerprint),
headers={
'content-type': 'application/json',
'accept': 'application/ld+json'
}
)
if r.status_code == 201:
return r.json()['t']
else:
raise HTTPError(response=r)
def _add_api_key_and_token(self, params_dict: dict):
"""
Add the saved api key and token to a given dictionary.
:param params_dict: A dictionary that's probably used
as a 'params' keyword parameter for calling requests.get().
:return: The same dictionary extended by 'apikey' and 't' keys.
"""
params_dict['apikey'] = self.api_key
params_dict['t'] = self.token
return params_dict
def _display_http_error(self, e: HTTPError):
print(self._error_template.format(status_code=e.response.status_code,
response_text=e.response.text),
file=sys.stderr)
@staticmethod
def _handle_response_errors(r):
if r.status_code == 403:
print('Forbidden. Invalid Token or ApiKey transmitted', file=sys.stderr)
return r
elif r.status_code == 400:
print('Bad request', file=sys.stderr)
return r
elif r.status_code == 410:
print('The requested API Version (via X-API-VERSION Header) is not available', file=sys.stderr)
return r
def _get(self, op, auto_connect=True, **kwargs, ):
kwargs['params'] = self._add_api_key_and_token(kwargs.pop('params', {}))
r = requests.get(self.base_url.format(op), **kwargs)
if r.ok:
return r
elif r.status_code == 403 and auto_connect:
self.connect()
return self._get(op=op, auto_connect=False, **kwargs)
return self._handle_response_errors(r)
def _post(self, op, auto_connect=True, json: dict = None, **kwargs):
if not json:
json = {}
kwargs['json'] = self._add_api_key_and_token(json)
kwargs['headers'] = {
'content-type': 'application/json',
'accept': 'application/ld+json'
}
r = requests.post(self.base_url.format(op), **kwargs)
if r.ok:
elif r.status_code == 403 and auto_connect:
self.connect()
return self._post(op=op, auto_connect=False, **kwargs)
else:
return self._handle_response_errors(r)
"""
Start a fulltext search query in the Webarchive.
:param query_string: String to search for
:param from_: Optional earliest date bound for the search
in the format YYYYMM.
:param to_: Optional latest date bound for the search
in the format YYYYMM.
"""
params = {'q': query_string}
if from_:
params['from'] = from_
if to_:
params['to'] = to_
try:
except HTTPError as e:
self._display_http_error(e)
print('Query for "{}" not added'.format(query_string))
def fulltext_search_within_domain(self, query_string, domain, from_=None, to_=None):
"""
Start a fulltext seed search query in the Webarchive.
:param query_string: String to search for
:param domain: Search only within this domain name
:param from_: Optional earliest date bound for the search
in the format YYYYMM.
:param to_: Optional latest date bound for the search
in the format YYYYMM.
:return: HTTP Response object
"""
params = {'q': query_string, 'g': domain}
if from_:
params['from'] = from_
if to_:
params['to'] = to_
try:
response = self._get(op='/search/fulltext/seed', params=params)
except HTTPError as e:
self._display_http_error(e)
def fulltext_search_within_url(self, query_string, url, pagesize=10, from_=None, to_=None):
"""
Start a fulltext capture search query in the Webarchive.
:param query_string: String to search for
:param url: Search only captures starting at this exact web address
:param from_: Optional earliest date bound for the search
in the format YYYYMM.
:param to_: Optional latest date bound for the search
in the format YYYYMM.
:return: HTTP Response object
"""
params = {'q': query_string, 'g': url, 'pagesize': pagesize}
if from_:
params['from'] = from_
if to_:
params['to'] = to_
try:
response = self._get(op='/search/fulltext/capture', params=params)
except HTTPError as e:
self._display_http_error(e)
def wayback_search(self, query_string, from_=None, to_=None, mode_=SEARCHMODE_WEBARCHIV):
"""
Start a wayback search query in the Webarchive.
:param query_string: String to search for
:param from_: Optional earliest date bound for the search
in the format YYYYMM.
:param to_: Optional latest date bound for the search
in the format YYYYMM.
"""
params = {'q': query_string}
if from_:
params['from'] = from_
if to_:
params['to'] = to_
except HTTPError as e:
self._display_http_error(e)
print('Error:'.format(query_string))
if response.status_code == 400:
return response
while response.status_code != 200:
this is the polling request for the given type of request
j = resp.json()
context = j['context']
requestid = j['requestid']
type_ = resp.json()['type']
if type_ == 1:
r = self._get(op='/search/status/fulltext', params={'requestid': requestid})
r = self._get(op='/search/status/wayback', params={'requestid': requestid})
elif type_ == 5:
r = self._get(op='/fragment/checksum/status', params={'requestid': requestid})
else:
raise NotImplementedError(f'Unknown status query type {type_} - Please update client.')
onbpre
committed
def domain_name_search(self, query_string, page_=1, pagesize_=100):
"""
Start a domain name search in the Webarchive.
:param query_string: String to search for
:param page_: The page number parameter works with the page size parameter to control the offset of the records returned in the results. Default value is 1
:param pagesize_: The page size parameter works with the page number parameter to control the offset of the records returned in the results. It also controls how many results are returned with each request. Default value is 10
:return: result as json
"""
params = {'q': query_string}
if page_:
params['page'] = page_
if pagesize_:
params['pagesize'] = pagesize_
try:
response = self._get(op='/search/domainname', params=params)
onbpre
committed
except HTTPError as e:
self._display_http_error(e)
print('Error:'.format(query_string))
def histogram_search(self, query_string, interval_=3, from_=None, to_=None):
"""
Start a domain name search in the Webarchive.
:param query_string: String to search for
:return: result as json
"""
params = {'q': query_string}
if interval_:
params['interval'] = interval_
if from_:
params['from'] = from_
if to_:
params['to'] = to_
try:
response = self._get(op='/search/fulltext/histogram', params=params)
except HTTPError as e:
self._display_http_error(e)
print('Error:'.format(query_string))
def get_snapshot_url(self, seed, capture, onlysvg):
return self.api_path + 'snapshot?capture=' + capture + '&t=' + self.token + '&apikey=' + self.api_key + '&onlysvg=' + onlysvg + '&seed=' + seed
@staticmethod
try:
return response.json()['subtype'] == 2
except:
return False
@staticmethod
try:
return response.json()['subtype'] == 3
except:
return False
try:
response = self._post(op='/savepage', json={
"url": url
})
return response
except HTTPError as e:
self._display_http_error(e)
def fragment_checksum_html(self, seed, capture, selector, occurrence):
response = self._post(op='/fragment/checksum/html', json={
"seed": seed,
"capture": capture,
"selector": selector,
"occurrence": occurrence,
"extractortype": EXTRACTOR_HTML
})
except HTTPError as e:
self._display_http_error(e)
def fragment_checksum_binary(self, seed, capture):
try:
response = self._post(op='/fragment/checksum/binary', json={
"seed": seed,
"capture": capture,
"extractortype": EXTRACTOR_BINARY
})
response = self.status_query(response)
return self.wait_for_response(response)
except HTTPError as e:
self._display_http_error(e)
def fragment_checksum_text_positionlen(self, seed, capture, pos, len):
try:
response = self._post(op='/fragment/checksum/text', json={
"seed": seed,
"capture": capture,
"mode": POSITIONLEN_MODE,
"pos": pos,
"len": len,
"extractortype": EXTRACTOR_TEXT
})
response = self.status_query(response)
return self.wait_for_response(response)
except HTTPError as e:
self._display_http_error(e)
def fragment_checksum_text_position(self, seed, capture, pos):
try:
response = self._post(op='/fragment/checksum/text', json={
"seed": seed,
"capture": capture,
"mode": POSITION_MODE,
"pos": pos,
"extractortype": EXTRACTOR_TEXT
})
response = self.status_query(response)
return self.wait_for_response(response)
except HTTPError as e:
self._display_http_error(e)
def fragment_checksum_text_regex(self, seed, capture, regexpattern, occurrence):
try:
response = self._post(op='/fragment/checksum/text', json={
"seed": seed,
"capture": capture,
"mode": REGEX_MODE,
"regexpattern": regexpattern,
"occurrence": occurrence,
"extractortype": EXTRACTOR_TEXT
})
response = self.status_query(response)
return self.wait_for_response(response)
except HTTPError as e:
self._display_http_error(e)
def create_watchlist(self, urls):
try:
response = self._post(op='/watchlist', json={
"urls": urls
})
return response
except HTTPError as e:
self._display_http_error(e)