PostgreSQL Source Code  git master
basebackup_throttle.c File Reference
#include "postgres.h"
#include "miscadmin.h"
#include "replication/basebackup_sink.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "utils/timestamp.h"
Include dependency graph for basebackup_throttle.c:

Go to the source code of this file.

Data Structures

struct  bbsink_throttle
 

Macros

#define THROTTLING_FREQUENCY   8
 

Typedefs

typedef struct bbsink_throttle bbsink_throttle
 

Functions

static void bbsink_throttle_begin_backup (bbsink *sink)
 
static void bbsink_throttle_archive_contents (bbsink *sink, size_t len)
 
static void bbsink_throttle_manifest_contents (bbsink *sink, size_t len)
 
static void throttle (bbsink_throttle *sink, size_t increment)
 
bbsinkbbsink_throttle_new (bbsink *next, uint32 maxrate)
 

Variables

static const bbsink_ops bbsink_throttle_ops
 

Macro Definition Documentation

◆ THROTTLING_FREQUENCY

#define THROTTLING_FREQUENCY   8

Definition at line 61 of file basebackup_throttle.c.

Typedef Documentation

◆ bbsink_throttle

Function Documentation

◆ bbsink_throttle_archive_contents()

static void bbsink_throttle_archive_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 110 of file basebackup_throttle.c.

111 {
112  throttle((bbsink_throttle *) sink, len);
113 
115 }
void bbsink_forward_archive_contents(bbsink *sink, size_t len)
static void throttle(bbsink_throttle *sink, size_t increment)
const void size_t len

References bbsink_forward_archive_contents(), len, and throttle().

◆ bbsink_throttle_begin_backup()

static void bbsink_throttle_begin_backup ( bbsink sink)
static

Definition at line 96 of file basebackup_throttle.c.

97 {
98  bbsink_throttle *mysink = (bbsink_throttle *) sink;
99 
101 
102  /* The 'real data' starts now (header was ignored). */
104 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
void bbsink_forward_begin_backup(bbsink *sink)
TimestampTz throttled_last

References bbsink_forward_begin_backup(), GetCurrentTimestamp(), and bbsink_throttle::throttled_last.

◆ bbsink_throttle_manifest_contents()

static void bbsink_throttle_manifest_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 121 of file basebackup_throttle.c.

122 {
123  throttle((bbsink_throttle *) sink, len);
124 
126 }
void bbsink_forward_manifest_contents(bbsink *sink, size_t len)

References bbsink_forward_manifest_contents(), len, and throttle().

◆ bbsink_throttle_new()

bbsink* bbsink_throttle_new ( bbsink next,
uint32  maxrate 
)

Definition at line 68 of file basebackup_throttle.c.

69 {
70  bbsink_throttle *sink;
71 
72  Assert(next != NULL);
73  Assert(maxrate > 0);
74 
75  sink = palloc0(sizeof(bbsink_throttle));
76  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
77  sink->base.bbs_next = next;
78 
79  sink->throttling_sample =
80  (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
81 
82  /*
83  * The minimum amount of time for throttling_sample bytes to be
84  * transferred.
85  */
87 
88  return &sink->base;
89 }
#define THROTTLING_FREQUENCY
static const bbsink_ops bbsink_throttle_ops
static int32 next
Definition: blutils.c:219
#define USECS_PER_SEC
Definition: timestamp.h:133
Assert(fmt[strlen(fmt) - 1] !='\n')
void * palloc0(Size size)
Definition: mcxt.c:1099
static int32 maxrate
TimeOffset elapsed_min_unit
bbsink * bbs_next
const bbsink_ops * bbs_ops

References Assert(), bbsink_throttle::base, bbsink::bbs_next, bbsink::bbs_ops, bbsink_throttle_ops, bbsink_throttle::elapsed_min_unit, maxrate, next, palloc0(), THROTTLING_FREQUENCY, bbsink_throttle::throttling_sample, and USECS_PER_SEC.

Referenced by SendBaseBackup().

◆ throttle()

static void throttle ( bbsink_throttle sink,
size_t  increment 
)
static

Definition at line 134 of file basebackup_throttle.c.

135 {
136  TimeOffset elapsed_min;
137 
138  Assert(sink->throttling_counter >= 0);
139 
140  sink->throttling_counter += increment;
141  if (sink->throttling_counter < sink->throttling_sample)
142  return;
143 
144  /* How much time should have elapsed at minimum? */
145  elapsed_min = sink->elapsed_min_unit *
146  (sink->throttling_counter / sink->throttling_sample);
147 
148  /*
149  * Since the latch could be set repeatedly because of concurrently WAL
150  * activity, sleep in a loop to ensure enough time has passed.
151  */
152  for (;;)
153  {
154  TimeOffset elapsed,
155  sleep;
156  int wait_result;
157 
158  /* Time elapsed since the last measurement (and possible wake up). */
159  elapsed = GetCurrentTimestamp() - sink->throttled_last;
160 
161  /* sleep if the transfer is faster than it should be */
162  sleep = elapsed_min - elapsed;
163  if (sleep <= 0)
164  break;
165 
167 
168  /* We're eating a potentially set latch, so check for interrupts */
170 
171  /*
172  * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
173  * the maximum time to sleep. Thus the cast to long is safe.
174  */
175  wait_result = WaitLatch(MyLatch,
177  (long) (sleep / 1000),
179 
180  if (wait_result & WL_LATCH_SET)
182 
183  /* Done waiting? */
184  if (wait_result & WL_TIMEOUT)
185  break;
186  }
187 
188  /*
189  * As we work with integers, only whole multiple of throttling_sample was
190  * processed. The rest will be done during the next call of this function.
191  */
192  sink->throttling_counter %= sink->throttling_sample;
193 
194  /*
195  * Time interval for the remaining amount and possible next increments
196  * starts now.
197  */
199 }
int64 TimeOffset
Definition: timestamp.h:40
struct Latch * MyLatch
Definition: globals.c:58
void ResetLatch(Latch *latch)
Definition: latch.c:658
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:451
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
@ WAIT_EVENT_BASE_BACKUP_THROTTLE
Definition: wait_event.h:142

References Assert(), CHECK_FOR_INTERRUPTS, bbsink_throttle::elapsed_min_unit, GetCurrentTimestamp(), MyLatch, ResetLatch(), bbsink_throttle::throttled_last, bbsink_throttle::throttling_counter, bbsink_throttle::throttling_sample, WAIT_EVENT_BASE_BACKUP_THROTTLE, WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by bbsink_throttle_archive_contents(), and bbsink_throttle_manifest_contents().

Variable Documentation

◆ bbsink_throttle_ops

const bbsink_ops bbsink_throttle_ops
static
Initial value:
= {
.begin_archive = bbsink_forward_begin_archive,
.archive_contents = bbsink_throttle_archive_contents,
.begin_manifest = bbsink_forward_begin_manifest,
.manifest_contents = bbsink_throttle_manifest_contents,
.end_manifest = bbsink_forward_end_manifest,
}
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_cleanup(bbsink *sink)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_begin_archive(bbsink *sink, const char *archive_name)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_throttle_archive_contents(bbsink *sink, size_t len)
static void bbsink_throttle_begin_backup(bbsink *sink)
static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len)

Definition at line 46 of file basebackup_throttle.c.

Referenced by bbsink_throttle_new().