PostgreSQL Source Code git master
Loading...
Searching...
No Matches
basebackup_throttle.c File Reference
#include "postgres.h"
#include "backup/basebackup_sink.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/latch.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
Include dependency graph for basebackup_throttle.c:

Go to the source code of this file.

Data Structures

struct  bbsink_throttle
 

Macros

#define THROTTLING_FREQUENCY   8
 

Typedefs

typedef struct bbsink_throttle bbsink_throttle
 

Functions

static void bbsink_throttle_begin_backup (bbsink *sink)
 
static void bbsink_throttle_archive_contents (bbsink *sink, size_t len)
 
static void bbsink_throttle_manifest_contents (bbsink *sink, size_t len)
 
static void throttle (bbsink_throttle *sink, size_t increment)
 
bbsinkbbsink_throttle_new (bbsink *next, uint32 maxrate)
 

Variables

static const bbsink_ops bbsink_throttle_ops
 

Macro Definition Documentation

◆ THROTTLING_FREQUENCY

#define THROTTLING_FREQUENCY   8

Definition at line 62 of file basebackup_throttle.c.

Typedef Documentation

◆ bbsink_throttle

Function Documentation

◆ bbsink_throttle_archive_contents()

static void bbsink_throttle_archive_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 111 of file basebackup_throttle.c.

112{
114
116}
void bbsink_forward_archive_contents(bbsink *sink, size_t len)
static void throttle(bbsink_throttle *sink, size_t increment)
const void size_t len
static int fb(int x)

References bbsink_forward_archive_contents(), fb(), len, and throttle().

◆ bbsink_throttle_begin_backup()

static void bbsink_throttle_begin_backup ( bbsink sink)
static

Definition at line 97 of file basebackup_throttle.c.

98{
100
102
103 /* The 'real data' starts now (header was ignored). */
104 mysink->throttled_last = GetCurrentTimestamp();
105}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1643
void bbsink_forward_begin_backup(bbsink *sink)

References bbsink_forward_begin_backup(), fb(), and GetCurrentTimestamp().

◆ bbsink_throttle_manifest_contents()

static void bbsink_throttle_manifest_contents ( bbsink sink,
size_t  len 
)
static

Definition at line 122 of file basebackup_throttle.c.

123{
125
127}
void bbsink_forward_manifest_contents(bbsink *sink, size_t len)

References bbsink_forward_manifest_contents(), fb(), len, and throttle().

◆ bbsink_throttle_new()

bbsink * bbsink_throttle_new ( bbsink next,
uint32  maxrate 
)

Definition at line 69 of file basebackup_throttle.c.

70{
72
73 Assert(next != NULL);
74 Assert(maxrate > 0);
75
77 *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
78 sink->base.bbs_next = next;
79
80 sink->throttling_sample =
82
83 /*
84 * The minimum amount of time for throttling_sample bytes to be
85 * transferred.
86 */
87 sink->elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
88
89 return &sink->base;
90}
#define THROTTLING_FREQUENCY
static const bbsink_ops bbsink_throttle_ops
static int32 next
Definition blutils.c:225
#define Assert(condition)
Definition c.h:927
int64_t int64
Definition c.h:597
#define USECS_PER_SEC
Definition timestamp.h:134
#define palloc0_object(type)
Definition fe_memutils.h:75
static int32 maxrate

References Assert, bbsink_throttle_ops, fb(), maxrate, next, palloc0_object, THROTTLING_FREQUENCY, and USECS_PER_SEC.

Referenced by SendBaseBackup().

◆ throttle()

static void throttle ( bbsink_throttle sink,
size_t  increment 
)
static

Definition at line 135 of file basebackup_throttle.c.

136{
138
139 Assert(sink->throttling_counter >= 0);
140
141 sink->throttling_counter += increment;
142 if (sink->throttling_counter < sink->throttling_sample)
143 return;
144
145 /* How much time should have elapsed at minimum? */
146 elapsed_min = sink->elapsed_min_unit *
147 (sink->throttling_counter / sink->throttling_sample);
148
149 /*
150 * Since the latch could be set repeatedly because of concurrently WAL
151 * activity, sleep in a loop to ensure enough time has passed.
152 */
153 for (;;)
154 {
156 sleep;
157 int wait_result;
158
159 /* Time elapsed since the last measurement (and possible wake up). */
160 elapsed = GetCurrentTimestamp() - sink->throttled_last;
161
162 /* sleep if the transfer is faster than it should be */
164 if (sleep <= 0)
165 break;
166
168
169 /* We're eating a potentially set latch, so check for interrupts */
171
172 /*
173 * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
174 * the maximum time to sleep. Thus the cast to long is safe.
175 */
178 (long) (sleep / 1000),
180
183
184 /* Done waiting? */
186 break;
187 }
188
189 /*
190 * As we work with integers, only whole multiple of throttling_sample was
191 * processed. The rest will be done during the next call of this function.
192 */
193 sink->throttling_counter %= sink->throttling_sample;
194
195 /*
196 * Time interval for the remaining amount and possible next increments
197 * starts now.
198 */
199 sink->throttled_last = GetCurrentTimestamp();
200}
int64 TimeOffset
Definition timestamp.h:40
struct Latch * MyLatch
Definition globals.c:63
void ResetLatch(Latch *latch)
Definition latch.c:374
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition latch.c:172
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
#define WL_TIMEOUT
#define WL_EXIT_ON_PM_DEATH
#define WL_LATCH_SET

References Assert, CHECK_FOR_INTERRUPTS, fb(), GetCurrentTimestamp(), MyLatch, ResetLatch(), WaitLatch(), WL_EXIT_ON_PM_DEATH, WL_LATCH_SET, and WL_TIMEOUT.

Referenced by bbsink_throttle_archive_contents(), and bbsink_throttle_manifest_contents().

Variable Documentation

◆ bbsink_throttle_ops

const bbsink_ops bbsink_throttle_ops
static
Initial value:
= {
.begin_archive = bbsink_forward_begin_archive,
.archive_contents = bbsink_throttle_archive_contents,
.begin_manifest = bbsink_forward_begin_manifest,
.manifest_contents = bbsink_throttle_manifest_contents,
.end_manifest = bbsink_forward_end_manifest,
}
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_cleanup(bbsink *sink)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_begin_archive(bbsink *sink, const char *archive_name)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_throttle_archive_contents(bbsink *sink, size_t len)
static void bbsink_throttle_begin_backup(bbsink *sink)
static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len)

Definition at line 47 of file basebackup_throttle.c.

47 {
48 .begin_backup = bbsink_throttle_begin_backup,
49 .begin_archive = bbsink_forward_begin_archive,
50 .archive_contents = bbsink_throttle_archive_contents,
51 .end_archive = bbsink_forward_end_archive,
52 .begin_manifest = bbsink_forward_begin_manifest,
53 .manifest_contents = bbsink_throttle_manifest_contents,
54 .end_manifest = bbsink_forward_end_manifest,
55 .end_backup = bbsink_forward_end_backup,
56 .cleanup = bbsink_forward_cleanup
57};

Referenced by bbsink_throttle_new().