# encoding=utf-8
'''WARC format.
For the WARC file specification, see
http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf.
For the CDX specifications, see
https://archive.org/web/researcher/cdx_file_format.php and
https://github.com/internetarchive/CDX-Writer.
'''
import base64
import codecs
import hashlib
import re
import uuid
from typing import Optional
from wpull.protocol.http.request import Response
from wpull.namevalue import NameValueRecord
import wpull.util
[docs]class WARCRecord(object):
'''A record in a WARC file.
Attributes:
fields: An instance of :class:`.namevalue.NameValueRecord`.
block_file: A file object. May be None.
'''
VERSION = 'WARC/1.0'
WARC_TYPE = 'WARC-Type'
CONTENT_TYPE = 'Content-Type'
WARC_DATE = 'WARC-Date'
WARC_RECORD_ID = 'WARC-Record-ID'
WARCINFO = 'warcinfo'
WARC_FIELDS = 'application/warc-fields'
REQUEST = 'request'
RESPONSE = 'response'
REVISIT = 'revisit'
TYPE_REQUEST = 'application/http;msgtype=request'
TYPE_RESPONSE = 'application/http;msgtype=response'
SAME_PAYLOAD_DIGEST_URI = \
'http://netpreserve.org/warc/1.0/revisit/identical-payload-digest'
NAME_OVERRIDES = frozenset([
'WARC-Date',
'WARC-Type',
'WARC-Record-ID',
'WARC-Concurrent-To',
'WARC-Refers-To',
'Content-Length',
'Content-Type',
'WARC-Target-URI',
'WARC-Block-Digest',
'WARC-IP-Address',
'WARC-Filename',
'WARC-Warcinfo-ID',
'WARC-Payload-Digest',
'WARC-Truncated',
'WARC-Filename',
'WARC-Profile',
'WARC-Identified-Payload-Type',
'WARC-Segment-Origin-ID',
'WARC-Segment-Number',
'WARC-Segment-Total-Length',
])
'''Field name case normalization overrides because hanzo's warc-tools do
not adequately conform to specifications.'''
def __init__(self):
self.fields = NameValueRecord(normalize_overrides=self.NAME_OVERRIDES)
self.block_file = None
[docs] def set_common_fields(self, warc_type: str, content_type: str):
'''Set the required fields for the record.'''
self.fields[self.WARC_TYPE] = warc_type
self.fields[self.CONTENT_TYPE] = content_type
self.fields[self.WARC_DATE] = wpull.util.datetime_str()
self.fields[self.WARC_RECORD_ID] = '<{0}>'.format(uuid.uuid4().urn)
[docs] def set_content_length(self):
'''Find and set the content length.
.. seealso:: :meth:`compute_checksum`.
'''
if not self.block_file:
self.fields['Content-Length'] = '0'
return
with wpull.util.reset_file_offset(self.block_file):
wpull.util.seek_file_end(self.block_file)
self.fields['Content-Length'] = str(self.block_file.tell())
[docs] def compute_checksum(self, payload_offset: Optional[int]=None):
'''Compute and add the checksum data to the record fields.
This function also sets the content length.
'''
if not self.block_file:
self.fields['Content-Length'] = '0'
return
block_hasher = hashlib.sha1()
payload_hasher = hashlib.sha1()
with wpull.util.reset_file_offset(self.block_file):
if payload_offset is not None:
data = self.block_file.read(payload_offset)
block_hasher.update(data)
while True:
data = self.block_file.read(4096)
if data == b'':
break
block_hasher.update(data)
payload_hasher.update(data)
content_length = self.block_file.tell()
content_hash = block_hasher.digest()
self.fields['WARC-Block-Digest'] = 'sha1:{0}'.format(
base64.b32encode(content_hash).decode()
)
if payload_offset is not None:
payload_hash = payload_hasher.digest()
self.fields['WARC-Payload-Digest'] = 'sha1:{0}'.format(
base64.b32encode(payload_hash).decode()
)
self.fields['Content-Length'] = str(content_length)
def __iter__(self):
'''Iterate the record as bytes.'''
yield self.VERSION.encode()
yield b'\r\n'
yield bytes(self.fields)
yield b'\r\n'
with wpull.util.reset_file_offset(self.block_file):
while True:
data = self.block_file.read(4096)
if data == b'':
break
yield data
yield b'\r\n\r\n'
def __bytes__(self):
'''Return the record as bytes.'''
return b''.join(iter(self))
[docs]def read_cdx(file, encoding='utf8'):
'''Iterate CDX file.
Args:
file (str): A file object.
encoding (str): The encoding of the file.
Returns:
iterator: Each item is a dict that maps from field key to value.
'''
with codecs.getreader(encoding)(file) as stream:
header_line = stream.readline()
separator = header_line[0]
field_keys = header_line.strip().split(separator)
if field_keys.pop(0) != 'CDX':
raise ValueError('CDX header not found.')
for line in stream:
yield dict(zip(field_keys, line.strip().split(separator)))