PostgreSQL Source Code  git master
basebackup_throttle.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basebackup_throttle.c
4  * Basebackup sink implementing throttling. Data is forwarded to the
5  * next base backup sink in the chain at a rate no greater than the
6  * configured maximum.
7  *
8  * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/backend/replication/basebackup_throttle.c
12  *
13  *-------------------------------------------------------------------------
14  */
15 #include "postgres.h"
16 
17 #include "miscadmin.h"
19 #include "pgstat.h"
20 #include "storage/latch.h"
21 #include "utils/timestamp.h"
22 
23 typedef struct bbsink_throttle
24 {
25  /* Common information for all types of sink. */
27 
28  /* The actual number of bytes, transfer of which may cause sleep. */
30 
31  /* Amount of data already transferred but not yet throttled. */
33 
34  /* The minimum time required to transfer throttling_sample bytes. */
36 
37  /* The last check of the transfer rate. */
40 
41 static void bbsink_throttle_begin_backup(bbsink *sink);
42 static void bbsink_throttle_archive_contents(bbsink *sink, size_t len);
43 static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len);
44 static void throttle(bbsink_throttle *sink, size_t increment);
45 
48  .begin_archive = bbsink_forward_begin_archive,
49  .archive_contents = bbsink_throttle_archive_contents,
50  .end_archive = bbsink_forward_end_archive,
51  .begin_manifest = bbsink_forward_begin_manifest,
52  .manifest_contents = bbsink_throttle_manifest_contents,
53  .end_manifest = bbsink_forward_end_manifest,
54  .end_backup = bbsink_forward_end_backup,
55  .cleanup = bbsink_forward_cleanup
56 };
57 
58 /*
59  * How frequently to throttle, as a fraction of the specified rate-second.
60  */
61 #define THROTTLING_FREQUENCY 8
62 
63 /*
64  * Create a new basebackup sink that performs throttling and forwards data
65  * to a successor sink.
66  */
67 bbsink *
69 {
70  bbsink_throttle *sink;
71 
72  Assert(next != NULL);
73  Assert(maxrate > 0);
74 
75  sink = palloc0(sizeof(bbsink_throttle));
76  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_throttle_ops;
77  sink->base.bbs_next = next;
78 
79  sink->throttling_sample =
80  (int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
81 
82  /*
83  * The minimum amount of time for throttling_sample bytes to be
84  * transferred.
85  */
87 
88  return &sink->base;
89 }
90 
91 /*
92  * There's no real work to do here, but we need to record the current time so
93  * that it can be used for future calculations.
94  */
95 static void
97 {
98  bbsink_throttle *mysink = (bbsink_throttle *) sink;
99 
101 
102  /* The 'real data' starts now (header was ignored). */
104 }
105 
106 /*
107  * First throttle, and then pass archive contents to next sink.
108  */
109 static void
111 {
112  throttle((bbsink_throttle *) sink, len);
113 
115 }
116 
117 /*
118  * First throttle, and then pass manifest contents to next sink.
119  */
120 static void
122 {
123  throttle((bbsink_throttle *) sink, len);
124 
126 }
127 
128 /*
129  * Increment the network transfer counter by the given number of bytes,
130  * and sleep if necessary to comply with the requested network transfer
131  * rate.
132  */
133 static void
134 throttle(bbsink_throttle *sink, size_t increment)
135 {
136  TimeOffset elapsed_min;
137 
138  Assert(sink->throttling_counter >= 0);
139 
140  sink->throttling_counter += increment;
141  if (sink->throttling_counter < sink->throttling_sample)
142  return;
143 
144  /* How much time should have elapsed at minimum? */
145  elapsed_min = sink->elapsed_min_unit *
146  (sink->throttling_counter / sink->throttling_sample);
147 
148  /*
149  * Since the latch could be set repeatedly because of concurrently WAL
150  * activity, sleep in a loop to ensure enough time has passed.
151  */
152  for (;;)
153  {
154  TimeOffset elapsed,
155  sleep;
156  int wait_result;
157 
158  /* Time elapsed since the last measurement (and possible wake up). */
159  elapsed = GetCurrentTimestamp() - sink->throttled_last;
160 
161  /* sleep if the transfer is faster than it should be */
162  sleep = elapsed_min - elapsed;
163  if (sleep <= 0)
164  break;
165 
167 
168  /* We're eating a potentially set latch, so check for interrupts */
170 
171  /*
172  * (TAR_SEND_SIZE / throttling_sample * elapsed_min_unit) should be
173  * the maximum time to sleep. Thus the cast to long is safe.
174  */
175  wait_result = WaitLatch(MyLatch,
177  (long) (sleep / 1000),
179 
180  if (wait_result & WL_LATCH_SET)
182 
183  /* Done waiting? */
184  if (wait_result & WL_TIMEOUT)
185  break;
186  }
187 
188  /*
189  * As we work with integers, only whole multiple of throttling_sample was
190  * processed. The rest will be done during the next call of this function.
191  */
192  sink->throttling_counter %= sink->throttling_sample;
193 
194  /*
195  * Time interval for the remaining amount and possible next increments
196  * starts now.
197  */
199 }
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1574
void bbsink_forward_begin_backup(bbsink *sink)
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_cleanup(bbsink *sink)
void bbsink_forward_manifest_contents(bbsink *sink, size_t len)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_archive_contents(bbsink *sink, size_t len)
void bbsink_forward_begin_archive(bbsink *sink, const char *archive_name)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_throttle_archive_contents(bbsink *sink, size_t len)
static void throttle(bbsink_throttle *sink, size_t increment)
bbsink * bbsink_throttle_new(bbsink *next, uint32 maxrate)
static void bbsink_throttle_begin_backup(bbsink *sink)
struct bbsink_throttle bbsink_throttle
#define THROTTLING_FREQUENCY
static void bbsink_throttle_manifest_contents(bbsink *sink, size_t len)
static const bbsink_ops bbsink_throttle_ops
static int32 next
Definition: blutils.c:219
unsigned int uint32
Definition: c.h:441
int64 TimestampTz
Definition: timestamp.h:39
#define USECS_PER_SEC
Definition: timestamp.h:133
int64 TimeOffset
Definition: timestamp.h:40
struct Latch * MyLatch
Definition: globals.c:58
void ResetLatch(Latch *latch)
Definition: latch.c:658
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
Definition: latch.c:451
#define WL_TIMEOUT
Definition: latch.h:128
#define WL_EXIT_ON_PM_DEATH
Definition: latch.h:130
#define WL_LATCH_SET
Definition: latch.h:125
Assert(fmt[strlen(fmt) - 1] !='\n')
void * palloc0(Size size)
Definition: mcxt.c:1099
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:121
static int32 maxrate
const void size_t len
void(* begin_backup)(bbsink *sink)
TimestampTz throttled_last
TimeOffset elapsed_min_unit
bbsink * bbs_next
const bbsink_ops * bbs_ops
@ WAIT_EVENT_BASE_BACKUP_THROTTLE
Definition: wait_event.h:142