"""
The :py:class:`~pgdumplib.dump.Dump` class exposes methods to
:py:meth:`load <pgdumplib.dump.Dump.load>` an existing dump,
to :py:meth:`add an entry <pgdumplib.dump.Dump.add_entry>` to a dump,
to :py:meth:`add table data <pgdumplib.dump.Dump.add_data>` to a dump,
to :py:meth:`add blob data <pgdumplib.dump.Dump.add_blob>` to a dump,
and to :py:meth:`save <pgdumplib.dump.Dump.save>` a new dump.
There are :doc:`converters` that are available to format the data that is
returned by :py:meth:`~pgdumplib.dump.Dump.read_data`. The converter
is passed in during construction of a new :py:class:`~pgdumplib.dump.Dump`,
and is also available as an argument to :py:func:`pgdumplib.load`.
The default converter, :py:class:`~pgdumplib.converters.DataConverter` will
return all fields as strings, only replacing ``NULL`` with
:py:const:`None`. The :py:class:`~pgdumplib.converters.SmartDataConverter`
will attempt to convert all columns to native Python data types.
When loading or creating a dump, the table and blob data are stored in
gzip compressed data files in a temporary directory that is automatically
cleaned up when the :py:class:`~pgdumplib.dump.Dump` instance is released.
"""
from __future__ import annotations
import contextlib
import dataclasses
import datetime
import gzip
import io
import logging
import os
import pathlib
import re
import struct
import tempfile
import typing
import zlib
import toposort
from pgdumplib import constants, converters, exceptions, version
LOGGER = logging.getLogger(__name__)
ENCODING_PATTERN = re.compile(r"^.*=\s+'(.*)'")
VERSION_INFO = '{} (pgdumplib {})'
[docs]class Dump:
"""Create a new instance of the :py:class:`~pgdumplib.dump.Dump` class
Once created, the instance of :py:class:`~pgdumplib.dump.Dump` can
be used to read existing dumps or to create new ones.
:param str dbname: The database name for the dump (Default: ``pgdumplib``)
:param str encoding: The data encoding (Default: ``UTF8``)
:param converter: The data converter class to use
(Default: :py:class:`pgdumplib.converters.DataConverter`)
"""
def __init__(
self, dbname: str = 'pgdumplib', encoding: str = 'UTF8',
converter: typing.Optional[
typing.Type[converters.DataConverter],
typing.Type[converters.NoOpConverter],
typing.Type[converters.SmartDataConverter]] = None,
appear_as: str = '12.0'):
self.compression = False
self.dbname = dbname
self.dump_version = VERSION_INFO.format(appear_as, version)
self.encoding = encoding
self.entries = [
Entry(
dump_id=1, tag=constants.ENCODING, desc=constants.ENCODING,
defn="SET client_encoding = '{}';\n".format(self.encoding)),
Entry(
dump_id=2, tag='STDSTRINGS', desc='STDSTRINGS',
defn="SET standard_conforming_strings = 'on';\n"),
Entry(
dump_id=3, tag='SEARCHPATH', desc='SEARCHPATH',
defn='SELECT pg_catalog.set_config('
"'search_path', '', false);\n")
]
self.server_version = self.dump_version
self.timestamp = datetime.datetime.now()
converter = converter or converters.DataConverter
self._converter: converters.DataConverter = converter()
self._format: str = 'Custom'
self._handle: typing.Optional[typing.BinaryIO] = None
self._intsize: int = 4
self._offsize: int = 8
self._temp_dir = tempfile.TemporaryDirectory()
k_version = self._get_k_version(
tuple(int(v) for v in appear_as.split('.')))
self._vmaj: int = k_version[0]
self._vmin: int = k_version[1]
self._vrev: int = k_version[2]
self._writers: typing.Dict[int, TableData] = {}
def __repr__(self) -> str:
return '<Dump format={!r} timestamp={!r} entry_count={!r}>'.format(
self._format, self.timestamp.isoformat(), len(self.entries))
[docs] def add_entry(
self,
desc: str,
namespace: typing.Optional[str] = None,
tag: typing.Optional[str] = None,
owner: typing.Optional[str] = None,
defn: typing.Optional[str] = None,
drop_stmt: typing.Optional[str] = None,
copy_stmt: typing.Optional[str] = None,
dependencies: typing.Optional[typing.List[int]] = None,
tablespace: typing.Optional[str] = None,
tableam: typing.Optional[str] = None,
dump_id: typing.Optional[int] = None) -> Entry:
"""Add an entry to the dump
The ``namespace`` and ``tag`` are required.
A :py:exc:`ValueError` will be raised if `desc` is not value that
is known in :py:module:`pgdumplib.constants`.
The section is
When adding data, use :py:meth:`~Dump.table_data_writer` instead of
invoking :py:meth:`~Dump.add_entry` directly.
If ``dependencies`` are specified, they will be validated and if a
``dump_id`` is specified and no entry is found with that ``dump_id``,
a :py:exc:`ValueError` will be raised.
Other omitted values will be set to the default values will be set to
the defaults specified in the :py:class:`pgdumplib.dump.Entry`
class.
The ``dump_id`` will be auto-calculated based upon the existing entries
if it is not specified.
.. note:: The creation of ad-hoc blobs is not supported.
:param str desc: The entry description
:param str namespace: The namespace of the entry
:param str tag: The name/table/relation/etc of the entry
:param str owner: The owner of the object in Postgres
:param str defn: The DDL definition for the entry
:param drop_stmt: A drop statement used to drop the entry before
:param copy_stmt: A copy statement used when there is a corresponding
data section.
:param list dependencies: A list of dump_ids of objects that the entry
is dependent upon.
:param str tablespace: The tablespace to use
:param str tableam: The table access method
:param int dump_id: The dump id, will be auto-calculated if left empty
:raises: :py:exc:`ValueError`
:rtype: pgdumplib.dump.Entry
"""
if desc not in constants.SECTION_MAPPING:
raise ValueError('Invalid desc: {}'.format(desc))
if dump_id is not None and dump_id < 1:
raise ValueError('dump_id must be greater than 1')
dump_ids = [e.dump_id for e in self.entries]
if dump_id and dump_id in dump_ids:
raise ValueError('dump_id {!r} is already assigned', dump_id)
for dependency in dependencies or []:
if dependency not in dump_ids:
raise ValueError(
'Dependency dump_id {!r} not found'.format(dependency))
self.entries.append(Entry(
dump_id or self._next_dump_id(), False, '', '', tag or '', desc,
defn or '', drop_stmt or '', copy_stmt or '', namespace or '',
tablespace or '', tableam or '', owner or '', False,
dependencies or []))
return self.entries[-1]
[docs] def blobs(self) -> typing.Generator[typing.Tuple[int, bytes], None, None]:
"""Iterator that returns each blob in the dump
:rtype: tuple(int, bytes)
"""
def read_oid(fd: typing.BinaryIO) -> typing.Optional[int]:
"""Small helper function to deduplicate code"""
try:
return struct.unpack('I', fd.read(4))[0]
except struct.error:
return None
for entry in self._data_entries:
if entry.desc == constants.BLOBS:
with self._tempfile(entry.dump_id, 'rb') as handle:
oid: typing.Optional[int] = read_oid(handle)
while oid:
length: int = struct.unpack('I', handle.read(4))[0]
yield oid, handle.read(length)
oid = read_oid(handle)
[docs] def get_entry(self, dump_id: int) -> typing.Optional[Entry]:
"""Return the entry for the given `dump_id`
:param int dump_id: The dump ID of the entry to return.
:rtype: pgdumplib.dump.Entry or None
"""
for entry in self.entries:
if entry.dump_id == dump_id:
return entry
return None
[docs] def load(self, path: os.PathLike) -> Dump:
"""Load the Dumpfile, including extracting all data into a temporary
directory
:param os.PathLike path: The path of the dump to load
:raises: :py:exc:`RuntimeError`
:raises: :py:exc:`ValueError`
"""
if not pathlib.Path(path).exists():
raise ValueError('Path {!r} does not exist'.format(path))
LOGGER.debug('Loading dump file from %s', path)
self.entries = [] # Wipe out pre-existing entries
self._handle = open(path, 'rb')
self._read_header()
if not constants.MIN_VER <= self.version <= constants.MAX_VER:
raise ValueError(
'Unsupported backup version: {}.{}.{}'.format(
*self.version))
self.compression = self._read_int() != 0
self.timestamp = self._read_timestamp()
self.dbname = self._read_bytes().decode(self.encoding)
self.server_version = self._read_bytes().decode(self.encoding)
self.dump_version = self._read_bytes().decode(self.encoding)
self._read_entries()
self._set_encoding()
# Cache table data and blobs
for entry in self._data_entries:
if entry.data_state == constants.K_OFFSET_NO_DATA:
continue
elif entry.data_state != constants.K_OFFSET_POS_SET:
raise RuntimeError('Unsupported data format')
self._handle.seek(entry.offset, io.SEEK_SET)
block_type, dump_id = self._read_block_header()
if not dump_id or dump_id != entry.dump_id:
raise RuntimeError('Dump IDs do not match ({} != {}'.format(
dump_id, entry.dump_id))
if block_type == constants.BLK_DATA:
self._cache_table_data(dump_id)
elif block_type == constants.BLK_BLOBS:
self._cache_blobs(dump_id)
else:
raise RuntimeError('Unknown block type: {}'.format(block_type))
return self
[docs] def lookup_entry(self, desc: str, namespace: str, tag: str) \
-> typing.Optional[Entry]:
"""Return the entry for the given namespace and tag
:param str desc: The desc / object type of the entry
:param str namespace: The namespace of the entry
:param str tag: The tag/relation/table name
:param str section: The dump section the entry is for
:raises: :py:exc:`ValueError`
:rtype: pgdumplib.dump.Entry or None
"""
if desc not in constants.SECTION_MAPPING:
raise ValueError('Invalid desc: {}'.format(desc))
for entry in [e for e in self.entries if e.desc == desc]:
if entry.namespace == namespace and entry.tag == tag:
return entry
return None
[docs] def save(self, path: os.PathLike) -> typing.NoReturn:
"""Save the Dump file to the specified path
:param os.PathLike path: The path to save the dump to
"""
if getattr(self, '_handle', None) and not self._handle.closed:
self._handle.close()
self.compression = False
self._handle = open(path, 'wb')
self._save()
self._handle.close()
[docs] def table_data(self, namespace: str, table: str) \
-> typing.Generator[
typing.Union[str, typing.Tuple[typing.Any, ...]], None, None]:
"""Iterator that returns data for the given namespace and table
:param str namespace: The namespace/schema for the table
:param str table: The table name
:raises: :py:exc:`pgdumplib.exceptions.EntityNotFoundError`
"""
for entry in self._data_entries:
if entry.namespace == namespace and entry.tag == table:
for row in self._read_table_data(entry.dump_id):
yield self._converter.convert(row)
return
raise exceptions.EntityNotFoundError(namespace=namespace, table=table)
[docs] @contextlib.contextmanager
def table_data_writer(self, entry: Entry, columns: typing.Sequence) \
-> typing.Generator[TableData, None, None]:
"""A context manager that is used to return a
:py:class:`~pgdumplib.dump.TableData` instance, which can be used
to add table data to the dump.
When invoked for a given entry containing the table definition,
:param Entry entry: The entry for the table to add data for
:param columns: The ordered list of table columns
:type columns: list or tuple
:rtype: TableData
"""
if entry.dump_id not in self._writers.keys():
dump_id = self._next_dump_id()
self.entries.append(Entry(
dump_id=dump_id, had_dumper=True, tag=entry.tag,
desc=constants.TABLE_DATA,
copy_stmt='COPY {}.{} ({}) FROM stdin;'.format(
entry.namespace, entry.tag, ', '.join(columns)),
namespace=entry.namespace, owner=entry.owner,
dependencies=[entry.dump_id],
data_state=constants.K_OFFSET_POS_NOT_SET))
self._writers[entry.dump_id] = TableData(
dump_id, self._temp_dir.name, self.encoding)
yield self._writers[entry.dump_id]
return None
@property
def version(self) -> typing.Tuple[int, int, int]:
"""Return the version as a tuple to make version comparisons easier.
:rtype: tuple
"""
return self._vmaj, self._vmin, self._vrev
def _cache_blobs(self, dump_id: int) -> typing.NoReturn:
"""Create a temp cache file for blob data
:param int dump_id: The dump ID for the filename
"""
count = 0
with self._tempfile(dump_id, 'wb') as handle:
for oid, blob in self._read_blobs():
handle.write(struct.pack('I', oid))
handle.write(struct.pack('I', len(blob)))
handle.write(blob)
count += 1
def _cache_table_data(self, dump_id: int) -> typing.NoReturn:
"""Create a temp cache file for the table data
:param int dump_id: The dump ID for the filename
"""
with self._tempfile(dump_id, 'wb') as handle:
handle.write(self._read_data())
@property
def _data_entries(self) -> typing.List[Entry]:
"""Return the list of entries that are in the data section
:rtype: list
"""
return [e for e in self.entries if e.section == constants.SECTION_DATA]
@staticmethod
def _get_k_version(appear_as: typing.Tuple[int, int]) \
-> typing.Tuple[int, int, int]:
for (min_ver, max_ver), value in constants.K_VERSION_MAP.items():
if min_ver <= appear_as <= max_ver:
return value
raise RuntimeError(
'Unsupported PostgreSQL version: {}'.format(appear_as))
def _next_dump_id(self) -> int:
"""Get the next ``dump_id`` that is available for adding an entry
:rtype: int
"""
return max(e.dump_id for e in self.entries) + 1
def _read_blobs(self) -> typing.Generator[
typing.Tuple[int, bytes], None, None]:
"""Read blobs, returning a tuple of the blob ID and the blob data
:rtype: (int, bytes)
:raises: :exc:`RuntimeError`
"""
oid = self._read_int()
while oid is not None and oid > 0:
data = self._read_data()
yield oid, data
oid = self._read_int()
if oid == 0:
oid = self._read_int()
def _read_block_header(self) -> typing.Tuple[bytes, typing.Optional[int]]:
"""Read the block header in
:rtype: bytes, int
"""
return self._handle.read(1), self._read_int()
def _read_byte(self) -> typing.Optional[int]:
"""Read in an individual byte
:rtype: int
"""
try:
return struct.unpack('B', self._handle.read(1))[0]
except struct.error:
return None
def _read_bytes(self) -> bytes:
"""Read in a byte stream
:rtype: bytes
"""
length = self._read_int()
if length and length > 0:
value = self._handle.read(length)
return value
return b''
def _read_data(self) -> bytes:
"""Read a data block, returning the bytes.
:rtype: bytes
"""
if self.compression:
return self._read_data_compressed()
return self._read_data_uncompressed()
def _read_data_compressed(self) -> bytes:
"""Read a compressed data block
:rtype: bytes
"""
buffer = io.BytesIO()
chunk = b''
decompress = zlib.decompressobj()
while True:
chunk_size = self._read_int()
if not chunk_size: # pragma: nocover
break
chunk += self._handle.read(chunk_size)
buffer.write(decompress.decompress(chunk))
chunk = decompress.unconsumed_tail
if chunk_size < constants.ZLIB_IN_SIZE:
break
return buffer.getvalue()
def _read_data_uncompressed(self) -> bytes:
"""Read an uncompressed data block
:rtype: bytes
"""
buffer = io.BytesIO()
while True:
block_length = self._read_int()
if not block_length or block_length <= 0:
break
buffer.write(self._handle.read(block_length))
return buffer.getvalue()
def _read_dependencies(self) -> list:
"""Read in the dependencies for an entry.
:rtype: list
"""
values = set({})
while True:
value = self._read_bytes()
if not value:
break
values.add(int(value))
return sorted(values)
def _read_entries(self) -> typing.NoReturn:
"""Read in all of the entries"""
for _i in range(0, self._read_int() or 0):
self._read_entry()
def _read_entry(self) -> typing.NoReturn:
"""Read in an individual entry and append it to the entries stack"""
dump_id = self._read_int()
had_dumper = bool(self._read_int())
table_oid = self._read_bytes().decode(self.encoding)
oid = self._read_bytes().decode(self.encoding)
tag = self._read_bytes().decode(self.encoding)
desc = self._read_bytes().decode(self.encoding)
self._read_int() # Section is mapped, no need to assign
defn = self._read_bytes().decode(self.encoding)
drop_stmt = self._read_bytes().decode(self.encoding)
copy_stmt = self._read_bytes().decode(self.encoding)
namespace = self._read_bytes().decode(self.encoding)
tablespace = self._read_bytes().decode(self.encoding)
if self.version >= (1, 14, 0):
tableam = self._read_bytes().decode(self.encoding)
else:
tableam = ''
owner = self._read_bytes().decode(self.encoding)
with_oids = self._read_bytes() == b'true'
dependencies = self._read_dependencies()
data_state, offset = self._read_offset()
self.entries.append(Entry(
dump_id=dump_id, had_dumper=had_dumper, table_oid=table_oid,
oid=oid, tag=tag, desc=desc, defn=defn, drop_stmt=drop_stmt,
copy_stmt=copy_stmt, namespace=namespace, tablespace=tablespace,
tableam=tableam, owner=owner, with_oids=with_oids,
dependencies=dependencies, data_state=data_state or 0,
offset=offset or 0))
def _read_header(self) -> typing.NoReturn:
"""Read in the dump header
:raises: ValueError
"""
if self._handle.read(5) != constants.MAGIC:
raise ValueError('Invalid archive header')
self._vmaj = struct.unpack('B', self._handle.read(1))[0]
self._vmin = struct.unpack('B', self._handle.read(1))[0]
self._vrev = struct.unpack('B', self._handle.read(1))[0]
self._intsize = struct.unpack('B', self._handle.read(1))[0]
self._offsize = struct.unpack('B', self._handle.read(1))[0]
self._format = constants.FORMATS[struct.unpack(
'B', self._handle.read(1))[0]]
LOGGER.debug('Archive version %i.%i.%i',
self._vmaj, self._vmin, self._vrev)
def _read_int(self) -> typing.Optional[int]:
"""Read in a signed integer
:rtype: int or None
"""
sign = self._read_byte()
if sign is None:
return None
bs, bv, value = 0, 0, 0
for _offset in range(0, self._intsize):
bv = (self._read_byte() or 0) & 0xFF
if bv != 0:
value += (bv << bs)
bs += 8
return -value if sign else value
def _read_offset(self) -> typing.Tuple[int, int]:
"""Read in the value for the length of the data stored in the file
:rtype: int, int
"""
data_state = self._read_byte() or 0
value = 0
for offset in range(0, self._offsize):
bv = self._read_byte() or 0
value |= bv << (offset * 8)
return data_state, value
def _read_table_data(self, dump_id: int) \
-> typing.Generator[str, None, None]:
"""Iterate through the data returning on row at a time
:rtype: str
"""
try:
with self._tempfile(dump_id, 'rb') as handle:
for line in handle:
out = (line or b'').decode(self.encoding).strip()
if out.startswith('\\.') or not out:
break
yield out
except exceptions.NoDataError:
pass
def _read_timestamp(self) -> datetime.datetime:
"""Read in the timestamp from handle.
:rtype: datetime.datetime
"""
second, minute, hour, day, month, year = (
self._read_int(), self._read_int(), self._read_int(),
self._read_int(), (self._read_int() or 0) + 1,
(self._read_int() or 0) + 1900)
self._read_int() # DST flag
return datetime.datetime(year, month, day, hour, minute, second, 0)
def _save(self) -> typing.NoReturn:
"""Save the dump file to disk"""
self._write_toc()
self._write_entries()
if self._write_data():
self._write_toc() # Overwrite ToC and entries
self._write_entries()
def _set_encoding(self) -> typing.NoReturn:
"""If the encoding is found in the dump entries, set the encoding
to `self.encoding`.
"""
for entry in self.entries:
if entry.desc == constants.ENCODING:
match = ENCODING_PATTERN.match(entry.defn)
if match:
self.encoding = match.group(1)
return
@contextlib.contextmanager
def _tempfile(self, dump_id: int, mode: str) \
-> typing.Generator[typing.IO[bytes], None, None]:
"""Open the temp file for the specified dump_id in the specified mode
:param int dump_id: The dump_id for the temp file
:param str mode: The mode (rb, wb)
"""
path = pathlib.Path(self._temp_dir.name) / '{}.gz'.format(dump_id)
if not path.exists() and mode.startswith('r'):
raise exceptions.NoDataError()
with gzip.open(path, mode) as handle:
try:
yield handle
finally:
return
def _write_blobs(self, dump_id: int) -> int:
"""Write the blobs for the entry.
:param int dump_id: The entry dump ID for the blobs
:rtype: int
"""
with self._tempfile(dump_id, 'rb') as handle:
self._handle.write(constants.BLK_BLOBS)
self._write_int(dump_id)
while True:
try:
oid = struct.unpack('I', handle.read(4))[0]
except struct.error:
break
length = struct.unpack('I', handle.read(4))[0]
self._write_int(oid)
self._write_int(length)
self._handle.write(handle.read(length))
self._write_int(0)
self._write_int(0)
return length
def _write_byte(self, value: int) -> typing.NoReturn:
"""Write a byte to the handle
:param int value: The byte value
"""
self._handle.write(struct.pack('B', value))
def _write_data(self) -> set:
"""Write the data blocks, returning a set of IDs that were written"""
saved = set({})
for offset, entry in enumerate(self.entries):
if entry.section != constants.SECTION_DATA:
continue
self.entries[offset].offset = self._handle.tell()
size = 0
if entry.desc == constants.TABLE_DATA:
size = self._write_table_data(entry.dump_id)
saved.add(entry.dump_id)
elif entry.desc == constants.BLOBS:
size = self._write_blobs(entry.dump_id)
saved.add(entry.dump_id)
if size:
self.entries[offset].data_state = constants.K_OFFSET_POS_SET
return saved
def _write_entries(self):
self._write_int(len(self.entries))
saved = set({})
# Always add these entries first
for entry in self.entries[0:3]:
self._write_entry(entry)
saved.add(entry.dump_id)
saved = self._write_section(
constants.SECTION_PRE_DATA, [
constants.GROUP,
constants.ROLE,
constants.USER,
constants.SCHEMA,
constants.EXTENSION,
constants.AGGREGATE,
constants.OPERATOR,
constants.OPERATOR_CLASS,
constants.CAST,
constants.COLLATION,
constants.CONVERSION,
constants.PROCEDURAL_LANGUAGE,
constants.FOREIGN_DATA_WRAPPER,
constants.FOREIGN_SERVER,
constants.SERVER,
constants.DOMAIN,
constants.TYPE,
constants.SHELL_TYPE], saved)
saved = self._write_section(constants.SECTION_DATA, [], saved)
saved = self._write_section(
constants.SECTION_POST_DATA, [
constants.CHECK_CONSTRAINT,
constants.CONSTRAINT,
constants.INDEX], saved)
saved = self._write_section(constants.SECTION_NONE, [], saved)
LOGGER.debug('Wrote %i of %i entries', len(saved), len(self.entries))
def _write_entry(self, entry: Entry) -> typing.NoReturn:
"""Write the entry
:param pgdumplib.dump.Entry entry: The entry to write
"""
LOGGER.debug('Writing %r', entry)
self._write_int(entry.dump_id)
self._write_int(int(entry.had_dumper))
self._write_str(entry.table_oid or '0')
self._write_str(entry.oid or '0')
self._write_str(entry.tag)
self._write_str(entry.desc)
self._write_int(constants.SECTIONS.index(entry.section) + 1)
self._write_str(entry.defn)
self._write_str(entry.drop_stmt)
self._write_str(entry.copy_stmt)
self._write_str(entry.namespace)
self._write_str(entry.tablespace)
if self.version >= (1, 14, 0):
LOGGER.debug('Adding tableam')
self._write_str(entry.tableam)
self._write_str(entry.owner)
self._write_str('true' if entry.with_oids else 'false')
for dependency in entry.dependencies or []:
self._write_str(str(dependency))
self._write_int(-1)
self._write_offset(entry.offset, entry.data_state)
def _write_header(self) -> typing.NoReturn:
"""Write the file header"""
LOGGER.debug('Writing archive version %i.%i.%i',
self._vmaj, self._vmin, self._vrev)
self._handle.write(constants.MAGIC)
self._write_byte(self._vmaj)
self._write_byte(self._vmin)
self._write_byte(self._vrev)
self._write_byte(self._intsize)
self._write_byte(self._offsize)
self._write_byte(constants.FORMATS.index(self._format))
def _write_int(self, value: int) -> typing.NoReturn:
"""Write an integer value
:param int value:
"""
self._write_byte(1 if value < 0 else 0)
if value < 0:
value = -value
for _offset in range(0, self._intsize):
self._write_byte(value & 0xFF)
value >>= 8
def _write_offset(self, value: int, data_state: int) -> typing.NoReturn:
"""Write the offset value.
:param int value: The value to write
:param int data_state: The data state flag
"""
self._write_byte(data_state)
for offset in range(0, self._offsize):
self._write_byte(value & 0xFF)
value >>= 8
def _write_section(self, section: str, obj_types: list, saved: set) -> set:
for obj_type in obj_types:
for entry in [e for e in self.entries if e.desc == obj_type]:
self._write_entry(entry)
saved.add(entry.dump_id)
for dump_id in toposort.toposort_flatten(
{e.dump_id: set(e.dependencies) for e in self.entries
if e.section == section}, True):
if dump_id not in saved:
self._write_entry(self.get_entry(dump_id))
saved.add(dump_id)
return saved
def _write_str(self, value: str) -> typing.NoReturn:
"""Write a string
:param str value: The string to write
"""
out = value.encode(self.encoding) if value else b''
self._write_int(len(out))
if out:
LOGGER.debug('Writing %r', out)
self._handle.write(out)
def _write_table_data(self, dump_id: int) -> int:
"""Write the blobs for the entry, returning the # of bytes written
:param int dump_id: The entry dump ID for the blobs
:rtype: int
"""
self._handle.write(constants.BLK_DATA)
self._write_int(dump_id)
writer = [w for w in self._writers.values() if w.dump_id == dump_id]
if writer: # Data was added ad-hoc
writer[0].finish()
self._write_int(writer[0].size)
self._handle.write(writer[0].read())
self._write_int(0) # End of data indicator
return writer[0].size
# Data was cached on load
with self._tempfile(dump_id, 'rb') as handle:
handle.seek(0, io.SEEK_END) # Seek to end to figure out size
size = handle.tell()
self._write_int(size)
if size:
handle.seek(0) # Rewind to read data
self._handle.write(handle.read())
self._write_int(0) # End of data indicator
return size
def _write_timestamp(self, value: datetime.datetime) -> typing.NoReturn:
"""Write a datetime.datetime value
:param datetime.datetime value: The value to write
"""
self._write_int(value.second)
self._write_int(value.minute)
self._write_int(value.hour)
self._write_int(value.day)
self._write_int(value.month - 1)
self._write_int(value.year - 1900)
self._write_int(1 if value.dst() else 0)
def _write_toc(self) -> typing.NoReturn:
"""Write the ToC for the file"""
self._handle.seek(0)
self._write_header()
self._write_int(int(self.compression))
self._write_timestamp(self.timestamp)
self._write_str(self.dbname)
self._write_str(self.server_version)
self._write_str(self.dump_version)
[docs]@dataclasses.dataclass(eq=True)
class Entry:
"""The entry model represents a single entry in the dataclass
Custom formatted dump files are primarily comprised of entries, which
contain all of the metadata and DDL required to construct the database.
For table data and blobs, there are entries that contain offset locations
in the dump file that instruct the reader as to where the data lives
in the file.
:var int dump_id: The dump id, will be auto-calculated if left empty
:var bool had_dumper: Indicates
:var str oid: The OID of the object the entry represents
:var str tag: The name/table/relation/etc of the entry
:var str desc: The entry description
:var str defn: The DDL definition for the entry
:var str drop_stmt: A drop statement used to drop the entry before
:var str copy_stmt: A copy statement used when there is a corresponding
data section.
:var str namespace: The namespace of the entry
:var str tablespace: The tablespace to use
:var str tableam: The table access method
:var str owner: The owner of the object in Postgres
:var bool with_oids: Indicates ...
:var list dependencies: A list of dump_ids of objects that the entry
is dependent upon.
:var int data_state: Indicates if the entry has data and how it is stored
:var int offset: If the entry has data, the offset to the data in the file
:var str section: The section of the dump file the entry belongs to
"""
dump_id: int
had_dumper: bool = False
table_oid: str = '0'
oid: str = '0'
tag: typing.Optional[str] = None
desc: typing.Optional[str] = None
defn: typing.Optional[str] = None
drop_stmt: typing.Optional[str] = None
copy_stmt: typing.Optional[str] = None
namespace: typing.Optional[str] = None
tablespace: typing.Optional[str] = None
tableam: typing.Optional[str] = None
owner: typing.Optional[str] = None
with_oids: bool = False
dependencies: typing.List[int] = dataclasses.field(default_factory=list)
data_state: int = constants.K_OFFSET_NO_DATA
offset: int = 0
@property
def section(self) -> str:
"""Return the section the entry belongs to"""
return constants.SECTION_MAPPING[self.desc]
[docs]class TableData:
"""Used to encapsulate table data using temporary file and allowing
for an API that allows for the appending of data one row at a time.
Do not create this class directly, instead invoke
:py:meth:`~pgdumplib.dump.Dump.table_data_writer`.
"""
def __init__(self, dump_id: int, tempdir: str, encoding: str):
self.dump_id = dump_id
self._encoding = encoding
self._path = pathlib.Path(tempdir) / '{}.gz'.format(dump_id)
self._handle = gzip.open(self._path, 'wb')
[docs] def append(self, *args) -> typing.NoReturn:
"""Append a row to the table data, passing columns in as args
Column order must match the order specified when
:py:meth:`~pgdumplib.dump.Dump.table_data_writer` was invoked.
All columns will be coerced to a string with special attention
paid to ``None``, converting it to the null marker (``\\N``) and
:py:class:`datetime.datetime` objects, which will have the proper
pg_dump timestamp format applied to them.
"""
row = '\t'.join([self._convert(c) for c in args])
self._handle.write('{}\n'.format(row).encode(self._encoding))
[docs] def finish(self) -> typing.NoReturn:
"""Invoked prior to saving a dump to close the temporary data
handle and switch the class into read-only mode.
For use by :py:class:`pgdumplib.dump.Dump` only.
"""
if not self._handle.closed:
self._handle.close()
self._handle = gzip.open(self._path, 'rb')
[docs] def read(self) -> bytes:
"""Read the data from disk for writing to the dump
For use by :py:class:`pgdumplib.dump.Dump` only.
:rtype: bytes
"""
self._handle.seek(0)
return self._handle.read()
@property
def size(self) -> int:
"""Return the current size of the data on disk
:rtype: int
"""
self._handle.seek(0, io.SEEK_END) # Seek to end to figure out size
size = self._handle.tell()
self._handle.seek(0)
return size
@staticmethod
def _convert(column: typing.Any) -> str:
"""Convert the column to a string
:param any column: The column to convert
"""
if isinstance(column, datetime.datetime):
return column.strftime(constants.PGDUMP_STRFTIME_FMT)
elif column is None:
return '\\N'
return str(column)