barman.copy_controller module#

Copy controller module

A copy controller will handle the copy between a series of files and directory, and their final destination.

class barman.copy_controller.RsyncCopyController(path=None, ssh_command=None, ssh_options=None, network_compression=False, reuse_backup=None, safe_horizon=None, exclude=None, retry_times=0, retry_sleep=0, workers=1, workers_start_batch_period=1, workers_start_batch_size=10)View on GitHub#

Bases: object

Copy a list of files and directory to their final destination.

CONFIG_CLASS = 'config'#
LIST_ONLY_RE = re.compile('\n        ^ # start of the line\n\n        # capture the mode (es. "-rw-------")\n        (?P<mode>[-\\w]+)\n        \\s+\n\n        # size is an integer\n        (?P<size>\\d+)\n        \\s+\n\n    , re.VERBOSE)#
PGCONTROL_CLASS = 'pg_control'#
PGDATA_CLASS = 'PGDATA'#
TABLESPACE_CLASS = 'tablespace'#
VANISHED_RE = re.compile('\n        ^ # start of the line\n        (\n        # files which vanished before rsync start\n        rsync:\\ link_stat\\ ".+"\\ failed:\\ No\\ such\\ file\\ or\\ directory\\ \\(2\\)\n        |\n  , re.IGNORECASE|re.VERBOSE)#
__init__(path=None, ssh_command=None, ssh_options=None, network_compression=False, reuse_backup=None, safe_horizon=None, exclude=None, retry_times=0, retry_sleep=0, workers=1, workers_start_batch_period=1, workers_start_batch_size=10)View on GitHub#
Parameters:
  • path (str|None) – the PATH where rsync executable will be searched

  • ssh_command (str|None) – the ssh executable to be used to access remote paths

  • ssh_options (list[str]|None) – list of ssh options to be used to access remote paths

  • network_compression (boolean) – whether to use the network compression

  • reuse_backup (str|None) – if “link” or “copy” enables the incremental copy feature

  • safe_horizon (datetime.datetime|None) – if set, assumes that every files older than it are save to copy without checksum verification.

  • exclude (list[str]|None) – list of patterns to be excluded from the copy

  • retry_times (int) – The number of times to retry a failed operation

  • retry_sleep (int) – Sleep time between two retry

  • workers (int) – The number of parallel copy workers

  • workers_start_batch_period (int) – The time period in seconds over which a single batch of workers will be started

  • workers_start_batch_size (int) – The maximum number of parallel workers to start in a single batch

_analyze_directory(item)View on GitHub#

Analyzes the status of source and destination directories identifying the files that are safe from the point of view of a PostgreSQL backup.

The safe_horizon value is the timestamp of the beginning of the older backup involved in copy (as source or destination). Any files updated after that timestamp, must be checked as they could have been modified during the backup - and we do not reply WAL files to update them.

The destination directory must exist.

If the “safe_horizon” parameter is None, we cannot make any assumptions about what can be considered “safe”, so we must check everything with checksums enabled.

If “ref” parameter is provided and is not None, it is looked up instead of the “dst” dir. This is useful when we are copying files using ‘–link-dest’ and ‘–copy-dest’ rsync options. In this case, both the “dst” and “ref” dir must exist and the “dst” dir must be empty.

If source or destination path begin with a ‘:’ character, it is a remote path. Only local paths are supported in “ref” argument.

Parameters:

item (_RsyncCopyItem) – information about a copy operation

_apply_rate_limit(generation_history)View on GitHub#

Apply the rate limit defined by self.workers_start_batch_size and self.workers_start_batch_period.

Historic start times in generation_history are checked to determine whether more than self.workers_start_batch_size jobs have been started within the length of time defined by self.workers_start_batch_period. If the maximum has been reached then this function will wait until the oldest start time within the last workers_start_batch_period seconds is no longer within the time period.

Once it has finished waiting, or simply determined it does not need to wait, it adds the current time to generation_history and returns it.

Parameters:

generation_history (list[int]) – A list of the generation times of previous jobs.

Return list[int]:

An updated list of generation times including the current time (after completing any necessary waiting) and not including any times which were not within self.workers_start_batch_period when the function was called.

_copy(rsync, src, dst, file_list, checksum=False)View on GitHub#

The method execute the call to rsync, using as source a a list of files, and adding the checksum option if required by the caller.

Parameters:
  • rsync (Rsync) – the Rsync object used to retrieve the list of files inside the directories for copy purposes

  • src (str) – source directory

  • dst (str) – destination directory

  • file_list (str) – path to the file containing the sources for rsync

  • checksum (bool) – if checksum argument for rsync is required

_create_dir_and_purge(item)View on GitHub#

Create destination directories and delete any unknown file

Parameters:

item (_RsyncCopyItem) – information about a copy operation

_execute_job(job)View on GitHub#

Execute a _RsyncJob in a worker process

_fill_buckets(file_list)View on GitHub#

Generate buckets for parallel copy

Parameters:

file_list (list[_FileItem]) – list of file to transfer

Return type:

iter[list[_FileItem]]

_job_generator(include_classes=None, exclude_classes=None)View on GitHub#

Generate the jobs to be executed by the workers

Parameters:
  • include_classes (list[str]|None) – If not none, copy only the items which have one of the specified classes.

  • exclude_classes (list[str]|None) – If not none, skip all items which have one of the specified classes.

Return type:

iter[_RsyncJob]

_list_files(item, path)View on GitHub#

This method recursively retrieves a list of files contained in a directory, either local or remote (if starts with ‘:’)

Parameters:
  • item (_RsyncCopyItem) – information about a copy operation

  • path (str) – the path we want to inspect

Raises:
_progress_init()View on GitHub#

Init counters used by progress logging

_progress_message(msg)View on GitHub#

Log a message containing the progress

Parameters:

msg (str) – the message

Return srt:

message to log

_retry_handler(item, command, args, kwargs, attempt, exc)View on GitHub#
Parameters:
  • item (_RsyncCopyItem) – The item that is being processed

  • command (RsyncPgData) – Command object being executed

  • args (list) – command args

  • kwargs (dict) – command kwargs

  • attempt (int) – attempt number (starting from 0)

  • exc (CommandFailedException) – the exception which caused the failure

_reuse_args(reuse_directory)View on GitHub#

If reuse_backup is ‘copy’ or ‘link’, build the rsync option to enable the reuse, otherwise returns an empty list

Parameters:

reuse_directory (str) – the local path with data to be reused

Return type:

list[str]

_rsync_factory(item)View on GitHub#

Build the RsyncPgData object required for copying the provided item

Parameters:

item (_RsyncCopyItem) – information about a copy operation

Return type:

RsyncPgData

_rsync_ignore_vanished_files(rsync, *args, **kwargs)View on GitHub#

Wrap an Rsync.get_output() call and ignore missing args

TODO: when rsync 3.1 will be widespread, replace this

with –ignore-missing-args argument

Parameters:

rsync (Rsync) – the Rsync object used to execute the copy

_rsync_set_pre_31_mode()View on GitHub#

Stop using –ignore-missing-args and restore rsync < 3.1 compatibility

add_directory(label, src, dst, exclude=None, exclude_and_protect=None, include=None, bwlimit=None, reuse=None, item_class=None)View on GitHub#

Add a directory that we want to copy.

If “src” or “dst” content begin with a ‘:’ character, it is a remote path. Only local paths are supported in “reuse” argument.

If “reuse” parameter is provided and is not None, it is used to implement the incremental copy. This only works if “is_directory” is True

Parameters:
  • label (str) – symbolic name to be used for error messages and logging.

  • src (str) – source directory.

  • dst (str) – destination directory.

  • exclude (list[str]) – list of patterns to be excluded from the copy. The destination will be deleted if present.

  • exclude_and_protect (list[str]) – list of patterns to be excluded from the copy. The destination will be preserved if present.

  • include (list[str]) – list of patterns to be included in the copy even if excluded.

  • bwlimit – bandwidth limit to be enforced. (KiB)

  • reuse (str|None) – the reference path for incremental mode.

  • item_class (str) – If specified carries a meta information about what the object to be copied is.

add_file(label, src, dst, item_class=None, optional=False, bwlimit=None)View on GitHub#

Add a file that we want to copy

Parameters:
  • label (str) – symbolic name to be used for error messages and logging.

  • src (str) – source directory.

  • dst (str) – destination directory.

  • item_class (str) – If specified carries a meta information about what the object to be copied is.

  • optional (bool) – Whether a failure copying this object should be treated as a fatal failure.

  • bwlimit – bandwidth limit to be enforced. (KiB)

copy()View on GitHub#

Execute the actual copy

copy_end_time#

Copy end time

copy_start_time#

Copy start time

current_step#

Current step number

item_list#

List of items to be copied

jobs_done#

Already finished jobs list

rsync_cache#

A cache of RsyncPgData objects

statistics()View on GitHub#

Return statistics about the copy object.

Return type:

dict

temp_dir#

Temp dir used to store the status during the copy

total_steps#

Total number of steps

class barman.copy_controller._FileItem(mode, size, date, path)View on GitHub#

Bases: _FileItem

This named tuple is used to store the content each line of the output of a “rsync –list-only” call

class barman.copy_controller._RsyncCopyItem(label, src, dst, exclude=None, exclude_and_protect=None, include=None, is_directory=False, bwlimit=None, reuse=None, item_class=None, optional=False)View on GitHub#

Bases: object

Internal data object that contains the information about one of the items that have to be copied during a RsyncCopyController run.

__init__(label, src, dst, exclude=None, exclude_and_protect=None, include=None, is_directory=False, bwlimit=None, reuse=None, item_class=None, optional=False)View on GitHub#

The “label” parameter is meant to be used for error messages and logging.

If “src” or “dst” content begin with a ‘:’ character, it is a remote path. Only local paths are supported in “reuse” argument.

If “reuse” parameter is provided and is not None, it is used to implement the incremental copy. This only works if “is_directory” is True

Parameters:
  • label (str) – a symbolic name for this item

  • src (str) – source directory.

  • dst (str) – destination directory.

  • exclude (list[str]) – list of patterns to be excluded from the copy. The destination will be deleted if present.

  • exclude_and_protect (list[str]) – list of patterns to be excluded from the copy. The destination will be preserved if present.

  • include (list[str]) – list of patterns to be included in the copy even if excluded.

  • is_directory (bool) – Whether the item points to a directory.

  • bwlimit – bandwidth limit to be enforced. (KiB)

  • reuse (str|None) – the reference path for incremental mode.

  • item_class (str|None) – If specified carries a meta information about what the object to be copied is.

  • optional (bool) – Whether a failure copying this object should be treated as a fatal failure. This only works if “is_directory” is False

class barman.copy_controller._RsyncJob(item_idx, description, id=None, file_list=None, checksum=None)View on GitHub#

Bases: object

A job to be executed by a worker Process

__init__(item_idx, description, id=None, file_list=None, checksum=None)View on GitHub#
Parameters:
  • item_idx (int) – The index of copy item containing this job

  • description (str) – The description of the job, used for logging

  • id (int) – Job ID (as in bucket)

  • file_list (list[RsyncCopyController._FileItem]) – Path to the file containing the file list

  • checksum (bool) – Whether to force the checksum verification

barman.copy_controller._init_worker(func)View on GitHub#

Store the callable used to execute jobs passed to _run_worker function

Parameters:

func (callable) – the callable to invoke for every job

barman.copy_controller._run_worker(job)View on GitHub#

Execute a job using the callable set using _init_worker function

Parameters:

job (_RsyncJob) – the job to be executed

barman.copy_controller._worker_callable = None#

Global variable containing a callable used to execute the jobs. Initialized by _init_worker and used by _run_worker function. This variable must be None outside a multiprocessing worker Process.