Newer
Older
from requests import HTTPError
_datetime_format_string = '%Y%m%d%H%M%S'
EXTRACTOR_TEXT = 1
EXTRACTOR_CSS = 2
EXTRACTOR_BINARY = 3
class SessionTimeoutError(Exception):
pass
class WebarchivSession:
@property
def version(self):
"""
Current protocol version
"""
return '0.1.0'
@property
def api_path(self):
"""
Protocol, domain and path prefix for the Webarchive API,
with a single positional format string placeholder
for the REST operation and parameters.
"""
return 'https://webarchiv.onb.ac.at/api/'
@property
def base_url(self):
"""
Protocol, domain and path prefix for the Webarchive API,
with a single positional format string placeholder
for the REST operation and parameters.
"""
@property
def _error_template(self):
"""
A format string for displaying HTTP Errors.
Must contain one placeholder 'status_code' for the HTTP status code.
Must contain one placeholder 'response_text' for the body of the response.
"""
return 'HTTP ERROR - status code {status_code}\n----\n{response_text}\n----\n\n'
def __init__(self, api_key, allow_tracking=False):
self.token = None
def connect(self):
"""
Connect to the Webarchive API, request and save a token.
"""
try:
self.token = self._authenticate()
except HTTPError as e:
self._display_http_error(e)
def _authenticate(self):
if self.allow_tracking:
from uuid import getnode as get_mac
mac = get_mac()
sha256 = hashlib.sha256()
sha256.update(str(mac).encode('utf-8'))
fingerprint = sha256.hexdigest()
else:
fingerprint = ''
r = requests.post(self.base_url.format('authentication'),
data='''{{
"apikey": "{api_key}",
}}'''.format(api_key=self.api_key, version=self.version, fingerprint=fingerprint),
headers={
'content-type': 'application/json',
'accept': 'application/ld+json'
}
)
if r.status_code == 201:
return r.json()['t']
else:
raise HTTPError(response=r)
def _add_api_key_and_token(self, params_dict: dict):
"""
Add the saved api key and token to a given dictionary.
:param params_dict: A dictionary that's probably used
as a 'params' keyword parameter for calling requests.get().
:return: The same dictionary extended by 'apikey' and 't' keys.
"""
params_dict['apikey'] = self.api_key
params_dict['t'] = self.token
return params_dict
def _display_http_error(self, e: HTTPError):
print(self._error_template.format(status_code=e.response.status_code,
response_text=e.response.text),
file=sys.stderr)
def _handle_response_errors(self, r):
if r.status_code == 403:
print('Forbidden. Invalid Token or ApiKey transmitted', file=sys.stderr)
return r
elif r.status_code == 400:
print('Bad request', file=sys.stderr)
return r
elif r.status_code == 410:
print('The requested API Version (via X-API-VERSION Header) is not available', file=sys.stderr)
return r
def _get(self, op, auto_connect=True, **kwargs, ):
kwargs['params'] = self._add_api_key_and_token(kwargs.pop('params', {}))
r = requests.get(self.base_url.format(op), **kwargs)
if r.ok:
return r
elif r.status_code == 403 and auto_connect:
self.connect()
return self._get(op=op, auto_connect=False, **kwargs)
return self._handle_response_errors(r)
def _post(self, op, auto_connect=True, **kwargs):
kwargs['json'] = self._add_api_key_and_token(kwargs.pop('json', {}))
r = requests.post(self.base_url.format(op), **kwargs)
if r.ok:
elif r.status_code == 403 and auto_connect:
self.connect()
return self._post(op=op, auto_connect=False, **kwargs)
else:
return self._handle_response_errors(r)
def _post(self, op, jsondata, auto_connect=True):
r = requests.post(self.base_url.format(op), jsondata,
headers={
'content-type': 'application/json',
'accept': 'application/ld+json'
})
if r.ok:
return r
else:
if r.status_code == 403:
if auto_connect:
self.connect()
return self._post(op=op, jsondata=jsondata, auto_connect=False)
else:
print('Forbidden. Invalid Token or ApiKey transmitted', file=sys.stderr)
return r
elif r.status_code == 400:
print('Bad request', file=sys.stderr)
return r
elif r.status_code == 410:
print('The requested API Version (via X-API-VERSION Header) is not available', file=sys.stderr)
return r
"""
Start a fulltext search query in the Webarchive.
:param query_string: String to search for
:param from_: Optional earliest date bound for the search
in the format YYYYMM.
:param to_: Optional latest date bound for the search
in the format YYYYMM.
"""
params = {'q': query_string}
if from_:
params['from'] = from_
if to_:
params['to'] = to_
try:
response = self._get(op='/search/fulltext', params=params)
return self.waitForResponse(response)
except HTTPError as e:
self._display_http_error(e)
print('Query for "{}" not added'.format(query_string))
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def fulltext_search_within_domain(self, query_string, domain, from_=None, to_=None):
"""
Start a fulltext seed search query in the Webarchive.
:param query_string: String to search for
:param domain: Search only within this domain name
:param from_: Optional earliest date bound for the search
in the format YYYYMM.
:param to_: Optional latest date bound for the search
in the format YYYYMM.
:return: HTTP Response object
"""
params = {'q': query_string, 'g': domain}
if from_:
params['from'] = from_
if to_:
params['to'] = to_
try:
response = self._get(op='/search/fulltext/seed', params=params)
return self.waitForResponse(response)
except HTTPError as e:
self._display_http_error(e)
def fulltext_search_within_url(self, query_string, url, pagesize=10, from_=None, to_=None):
"""
Start a fulltext capture search query in the Webarchive.
:param query_string: String to search for
:param url: Search only captures starting at this exact web address
:param from_: Optional earliest date bound for the search
in the format YYYYMM.
:param to_: Optional latest date bound for the search
in the format YYYYMM.
:return: HTTP Response object
"""
params = {'q': query_string, 'g': url, 'pagesize': pagesize}
if from_:
params['from'] = from_
if to_:
params['to'] = to_
try:
response = self._get(op='/search/fulltext/capture', params=params)
return self.waitForResponse(response)
except HTTPError as e:
self._display_http_error(e)
"""
Start a wayback search query in the Webarchive.
:param query_string: String to search for
:param from_: Optional earliest date bound for the search
in the format YYYYMM.
:param to_: Optional latest date bound for the search
in the format YYYYMM.
"""
params = {'q': query_string}
if from_:
params['from'] = from_
if to_:
params['to'] = to_
try:
response = self._get(op='/search/wayback', params=params)
return self.waitForResponse(response)
except HTTPError as e:
self._display_http_error(e)
print('Error:'.format(query_string))
if response.status_code == 400:
return response
while response.status_code != 200:
this is the polling request for the given type of request
j = resp.json()
context = j['context']
requestid = j['requestid']
type_ = resp.json()['type']
if type_ == 1:
r = self._get(op='/search/status/fulltext', params={'requestid': requestid})
r = self._get(op='/search/status/wayback', params={'requestid': requestid})
elif type_ == 5:
r = self._get(op='/fragment/checksum/status', params={'requestid': requestid})
else:
raise NotImplementedError(f'Unknown status query type {type_} - Please update client.')
onbpre
committed
def domain_name_search(self, query_string, page_=1, pagesize_=100):
"""
Start a domain name search in the Webarchive.
:param query_string: String to search for
:param page_: The page number parameter works with the page size parameter to control the offset of the records returned in the results. Default value is 1
:param pagesize_: The page size parameter works with the page number parameter to control the offset of the records returned in the results. It also controls how many results are returned with each request. Default value is 10
:return: result as json
"""
params = {'q': query_string}
if page_:
params['page'] = page_
if pagesize_:
params['pagesize'] = pagesize_
try:
response = self._get(op='/search/domainname', params=params)
return self.waitForResponse(response)
except HTTPError as e:
self._display_http_error(e)
print('Error:'.format(query_string))
def histogram_search(self, query_string, interval_=3, from_=None, to_=None):
"""
Start a domain name search in the Webarchive.
:param query_string: String to search for
:return: result as json
"""
params = {'q': query_string}
if interval_:
params['interval'] = interval_
if from_:
params['from'] = from_
if to_:
params['to'] = to_
try:
response = self._get(op='/search/fulltext/histogram', params=params)
return self.waitForResponse(response)
except HTTPError as e:
self._display_http_error(e)
print('Error:'.format(query_string))
return self.api_path + 'snapshot?capture=' + capture + '&t=' + self.token + '&apikey=' + self.api_key + '&onlysvg=' + onlysvg + '&seed=' + seed
@staticmethod
def resultContainsSeeds(response):
try:
return response.json()['subtype'] == 2
except:
return False
@staticmethod
def resultContainsCaptures(response):
try:
return response.json()['subtype'] == 3
except:
return False
def savePage(self, url):
self.connect()
r = requests.post(self.base_url.format('savepage'),
data='''{{
"apikey": "{api_key}",
"t": "{token}",
"url": "{url}"
}}'''.format(api_key=self.api_key, token=self.token, url=url),
headers={
'content-type': 'application/json',
'accept': 'application/ld+json'
}
)
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def fragmentChecksumHtml(self, seed, capture, selector, occurrence):
try:
response = self._post(op='/fragment/checksum/html', jsondata='''{{
"apikey": "{api_key}",
"t": "{token}",
"seed": "{seed}",
"capture": "{capture}",
"selector": "{selector}",
"occurrence": "{occurrence}",
"extractortype": 2
}}'''.format(api_key=self.api_key, token=self.token, seed=seed, capture=capture,
selector=selector, occurrence=occurrence))
response = self.status_query(response)
return self.waitForResponse(response)
except HTTPError as e:
self._display_http_error(e)
if __name__ == '__main__':
# noinspection SpellCheckingInspection
w = WebarchivSession('2pm8i0hnmpcTK4Oj4CUeBoZd7vywrm4c')
# response = w.wayback_search("http://www.onb.ac.at")
# response = w.wayback_search("http://frauenhetz.jetzt")
url = "http://sport.orf.at/l/stories/2003717/"
response = w.wayback_search("http://sport.orf.at/l/stories/2003717/", "20110101000000", "20120401000000")
# response = w.wayback_search("x")
if response.status_code != 200:
print("Error ", response.status_code)
exit(1)
print(response.json()['total'])
print(url)
lastchecksum = ''
for capture in response.json()['hits']:
capturedate = capture['c']
resp = w.fragmentChecksumHtml(url, capturedate, ".odd td", 3);
checksum = resp.json()['checksum']
returncode = resp.json()['returncode']
if returncode == 2:
continue
if checksum != lastchecksum:
print(resp.json())
print("http://wayback/web/" + capturedate + "/" + url)
print(capturedate + " " + checksum)
lastchecksum = checksum
print("end")