PostgreSQL Source Code  git master
basic_archive.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basic_archive.c
4  *
5  * This file demonstrates a basic archive library implementation that is
6  * roughly equivalent to the following shell command:
7  *
8  * test ! -f /path/to/dest && cp /path/to/src /path/to/dest
9  *
10  * One notable difference between this module and the shell command above
11  * is that this module first copies the file to a temporary destination,
12  * syncs it to disk, and then durably moves it to the final destination.
13  *
14  * Another notable difference is that if /path/to/dest already exists
15  * but has contents identical to /path/to/src, archiving will succeed,
16  * whereas the command shown above would fail. This prevents problems if
17  * a file is successfully archived and then the system crashes before
18  * a durable record of the success has been made.
19  *
20  * Copyright (c) 2022-2024, PostgreSQL Global Development Group
21  *
22  * IDENTIFICATION
23  * contrib/basic_archive/basic_archive.c
24  *
25  *-------------------------------------------------------------------------
26  */
27 #include "postgres.h"
28 
29 #include <sys/stat.h>
30 #include <sys/time.h>
31 #include <unistd.h>
32 
33 #include "archive/archive_module.h"
34 #include "common/int.h"
35 #include "miscadmin.h"
36 #include "storage/copydir.h"
37 #include "storage/fd.h"
38 #include "utils/guc.h"
39 #include "utils/memutils.h"
40 
42 
43 typedef struct BasicArchiveData
44 {
47 
48 static char *archive_directory = NULL;
49 
52 static bool basic_archive_file(ArchiveModuleState *state, const char *file, const char *path);
53 static void basic_archive_file_internal(const char *file, const char *path);
54 static bool check_archive_directory(char **newval, void **extra, GucSource source);
55 static bool compare_files(const char *file1, const char *file2);
57 
60  .check_configured_cb = basic_archive_configured,
61  .archive_file_cb = basic_archive_file,
62  .shutdown_cb = basic_archive_shutdown
63 };
64 
65 /*
66  * _PG_init
67  *
68  * Defines the module's GUC.
69  */
70 void
71 _PG_init(void)
72 {
73  DefineCustomStringVariable("basic_archive.archive_directory",
74  gettext_noop("Archive file destination directory."),
75  NULL,
77  "",
78  PGC_SIGHUP,
79  0,
80  check_archive_directory, NULL, NULL);
81 
82  MarkGUCPrefixReserved("basic_archive");
83 }
84 
85 /*
86  * _PG_archive_module_init
87  *
88  * Returns the module's archiving callbacks.
89  */
92 {
94 }
95 
96 /*
97  * basic_archive_startup
98  *
99  * Creates the module's memory context.
100  */
101 void
103 {
105 
107  sizeof(BasicArchiveData));
109  "basic_archive",
111  state->private_data = (void *) data;
112 }
113 
114 /*
115  * check_archive_directory
116  *
117  * Checks that the provided archive directory exists.
118  */
119 static bool
121 {
122  struct stat st;
123 
124  /*
125  * The default value is an empty string, so we have to accept that value.
126  * Our check_configured callback also checks for this and prevents
127  * archiving from proceeding if it is still empty.
128  */
129  if (*newval == NULL || *newval[0] == '\0')
130  return true;
131 
132  /*
133  * Make sure the file paths won't be too long. The docs indicate that the
134  * file names to be archived can be up to 64 characters long.
135  */
136  if (strlen(*newval) + 64 + 2 >= MAXPGPATH)
137  {
138  GUC_check_errdetail("Archive directory too long.");
139  return false;
140  }
141 
142  /*
143  * Do a basic sanity check that the specified archive directory exists. It
144  * could be removed at some point in the future, so we still need to be
145  * prepared for it not to exist in the actual archiving logic.
146  */
147  if (stat(*newval, &st) != 0 || !S_ISDIR(st.st_mode))
148  {
149  GUC_check_errdetail("Specified archive directory does not exist.");
150  return false;
151  }
152 
153  return true;
154 }
155 
156 /*
157  * basic_archive_configured
158  *
159  * Checks that archive_directory is not blank.
160  */
161 static bool
163 {
164  if (archive_directory != NULL && archive_directory[0] != '\0')
165  return true;
166 
167  arch_module_check_errdetail("%s is not set.",
168  "basic_archive.archive_directory");
169  return false;
170 }
171 
172 /*
173  * basic_archive_file
174  *
175  * Archives one file.
176  */
177 static bool
178 basic_archive_file(ArchiveModuleState *state, const char *file, const char *path)
179 {
180  sigjmp_buf local_sigjmp_buf;
181  MemoryContext oldcontext;
182  BasicArchiveData *data = (BasicArchiveData *) state->private_data;
183  MemoryContext basic_archive_context = data->context;
184 
185  /*
186  * We run basic_archive_file_internal() in our own memory context so that
187  * we can easily reset it during error recovery (thus avoiding memory
188  * leaks).
189  */
190  oldcontext = MemoryContextSwitchTo(basic_archive_context);
191 
192  /*
193  * Since the archiver operates at the bottom of the exception stack,
194  * ERRORs turn into FATALs and cause the archiver process to restart.
195  * However, using ereport(ERROR, ...) when there are problems is easy to
196  * code and maintain. Therefore, we create our own exception handler to
197  * catch ERRORs and return false instead of restarting the archiver
198  * whenever there is a failure.
199  */
200  if (sigsetjmp(local_sigjmp_buf, 1) != 0)
201  {
202  /* Since not using PG_TRY, must reset error stack by hand */
203  error_context_stack = NULL;
204 
205  /* Prevent interrupts while cleaning up */
206  HOLD_INTERRUPTS();
207 
208  /* Report the error and clear ErrorContext for next time */
209  EmitErrorReport();
210  FlushErrorState();
211 
212  /* Close any files left open by copy_file() or compare_files() */
214 
215  /* Reset our memory context and switch back to the original one */
216  MemoryContextSwitchTo(oldcontext);
217  MemoryContextReset(basic_archive_context);
218 
219  /* Remove our exception handler */
220  PG_exception_stack = NULL;
221 
222  /* Now we can allow interrupts again */
224 
225  /* Report failure so that the archiver retries this file */
226  return false;
227  }
228 
229  /* Enable our exception handler */
230  PG_exception_stack = &local_sigjmp_buf;
231 
232  /* Archive the file! */
233  basic_archive_file_internal(file, path);
234 
235  /* Remove our exception handler */
236  PG_exception_stack = NULL;
237 
238  /* Reset our memory context and switch back to the original one */
239  MemoryContextSwitchTo(oldcontext);
240  MemoryContextReset(basic_archive_context);
241 
242  return true;
243 }
244 
245 static void
246 basic_archive_file_internal(const char *file, const char *path)
247 {
248  char destination[MAXPGPATH];
249  char temp[MAXPGPATH + 256];
250  struct stat st;
251  struct timeval tv;
252  uint64 epoch; /* milliseconds */
253 
254  ereport(DEBUG3,
255  (errmsg("archiving \"%s\" via basic_archive", file)));
256 
257  snprintf(destination, MAXPGPATH, "%s/%s", archive_directory, file);
258 
259  /*
260  * First, check if the file has already been archived. If it already
261  * exists and has the same contents as the file we're trying to archive,
262  * we can return success (after ensuring the file is persisted to disk).
263  * This scenario is possible if the server crashed after archiving the
264  * file but before renaming its .ready file to .done.
265  *
266  * If the archive file already exists but has different contents,
267  * something might be wrong, so we just fail.
268  */
269  if (stat(destination, &st) == 0)
270  {
271  if (compare_files(path, destination))
272  {
273  ereport(DEBUG3,
274  (errmsg("archive file \"%s\" already exists with identical contents",
275  destination)));
276 
277  fsync_fname(destination, false);
279 
280  return;
281  }
282 
283  ereport(ERROR,
284  (errmsg("archive file \"%s\" already exists", destination)));
285  }
286  else if (errno != ENOENT)
287  ereport(ERROR,
289  errmsg("could not stat file \"%s\": %m", destination)));
290 
291  /*
292  * Pick a sufficiently unique name for the temporary file so that a
293  * collision is unlikely. This helps avoid problems in case a temporary
294  * file was left around after a crash or another server happens to be
295  * archiving to the same directory.
296  */
297  gettimeofday(&tv, NULL);
298  if (pg_mul_u64_overflow((uint64) 1000, (uint64) tv.tv_sec, &epoch) ||
299  pg_add_u64_overflow(epoch, (uint64) (tv.tv_usec / 1000), &epoch))
300  elog(ERROR, "could not generate temporary file name for archiving");
301 
302  snprintf(temp, sizeof(temp), "%s/%s.%s.%d." UINT64_FORMAT,
303  archive_directory, "archtemp", file, MyProcPid, epoch);
304 
305  /*
306  * Copy the file to its temporary destination. Note that this will fail
307  * if temp already exists.
308  */
309  copy_file(path, temp);
310 
311  /*
312  * Sync the temporary file to disk and move it to its final destination.
313  * Note that this will overwrite any existing file, but this is only
314  * possible if someone else created the file since the stat() above.
315  */
316  (void) durable_rename(temp, destination, ERROR);
317 
318  ereport(DEBUG1,
319  (errmsg("archived \"%s\" via basic_archive", file)));
320 }
321 
322 /*
323  * compare_files
324  *
325  * Returns whether the contents of the files are the same.
326  */
327 static bool
328 compare_files(const char *file1, const char *file2)
329 {
330 #define CMP_BUF_SIZE (4096)
331  char buf1[CMP_BUF_SIZE];
332  char buf2[CMP_BUF_SIZE];
333  int fd1;
334  int fd2;
335  bool ret = true;
336 
337  fd1 = OpenTransientFile(file1, O_RDONLY | PG_BINARY);
338  if (fd1 < 0)
339  ereport(ERROR,
341  errmsg("could not open file \"%s\": %m", file1)));
342 
343  fd2 = OpenTransientFile(file2, O_RDONLY | PG_BINARY);
344  if (fd2 < 0)
345  ereport(ERROR,
347  errmsg("could not open file \"%s\": %m", file2)));
348 
349  for (;;)
350  {
351  int nbytes = 0;
352  int buf1_len = 0;
353  int buf2_len = 0;
354 
355  while (buf1_len < CMP_BUF_SIZE)
356  {
357  nbytes = read(fd1, buf1 + buf1_len, CMP_BUF_SIZE - buf1_len);
358  if (nbytes < 0)
359  ereport(ERROR,
361  errmsg("could not read file \"%s\": %m", file1)));
362  else if (nbytes == 0)
363  break;
364 
365  buf1_len += nbytes;
366  }
367 
368  while (buf2_len < CMP_BUF_SIZE)
369  {
370  nbytes = read(fd2, buf2 + buf2_len, CMP_BUF_SIZE - buf2_len);
371  if (nbytes < 0)
372  ereport(ERROR,
374  errmsg("could not read file \"%s\": %m", file2)));
375  else if (nbytes == 0)
376  break;
377 
378  buf2_len += nbytes;
379  }
380 
381  if (buf1_len != buf2_len || memcmp(buf1, buf2, buf1_len) != 0)
382  {
383  ret = false;
384  break;
385  }
386  else if (buf1_len == 0)
387  break;
388  }
389 
390  if (CloseTransientFile(fd1) != 0)
391  ereport(ERROR,
393  errmsg("could not close file \"%s\": %m", file1)));
394 
395  if (CloseTransientFile(fd2) != 0)
396  ereport(ERROR,
398  errmsg("could not close file \"%s\": %m", file2)));
399 
400  return ret;
401 }
402 
403 /*
404  * basic_archive_shutdown
405  *
406  * Frees our allocated state.
407  */
408 static void
410 {
411  BasicArchiveData *data = (BasicArchiveData *) state->private_data;
412  MemoryContext basic_archive_context;
413 
414  /*
415  * If we didn't get to storing the pointer to our allocated state, we
416  * don't have anything to clean up.
417  */
418  if (data == NULL)
419  return;
420 
421  basic_archive_context = data->context;
422  Assert(CurrentMemoryContext != basic_archive_context);
423 
424  if (MemoryContextIsValid(basic_archive_context))
425  MemoryContextDelete(basic_archive_context);
426  data->context = NULL;
427 
428  /*
429  * Finally, free the state.
430  */
431  pfree(data);
432  state->private_data = NULL;
433 }
#define arch_module_check_errdetail
struct BasicArchiveData BasicArchiveData
static void basic_archive_startup(ArchiveModuleState *state)
static char * archive_directory
Definition: basic_archive.c:48
void _PG_init(void)
Definition: basic_archive.c:71
PG_MODULE_MAGIC
Definition: basic_archive.c:41
static bool basic_archive_file(ArchiveModuleState *state, const char *file, const char *path)
#define CMP_BUF_SIZE
const ArchiveModuleCallbacks * _PG_archive_module_init(void)
Definition: basic_archive.c:91
static void basic_archive_file_internal(const char *file, const char *path)
static const ArchiveModuleCallbacks basic_archive_callbacks
Definition: basic_archive.c:58
static bool compare_files(const char *file1, const char *file2)
static bool basic_archive_configured(ArchiveModuleState *state)
static bool check_archive_directory(char **newval, void **extra, GucSource source)
static void basic_archive_shutdown(ArchiveModuleState *state)
#define gettext_noop(x)
Definition: c.h:1183
#define InvalidSubTransactionId
Definition: c.h:645
#define PG_BINARY
Definition: c.h:1260
#define UINT64_FORMAT
Definition: c.h:536
void copy_file(const char *fromfile, const char *tofile)
Definition: copydir.c:117
void EmitErrorReport(void)
Definition: elog.c:1672
int errcode_for_file_access(void)
Definition: elog.c:882
ErrorContextCallback * error_context_stack
Definition: elog.c:94
void FlushErrorState(void)
Definition: elog.c:1828
int errmsg(const char *fmt,...)
Definition: elog.c:1072
sigjmp_buf * PG_exception_stack
Definition: elog.c:96
#define DEBUG3
Definition: elog.h:28
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:782
int CloseTransientFile(int fd)
Definition: fd.c:2809
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid)
Definition: fd.c:3132
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2633
int MyProcPid
Definition: globals.c:45
void DefineCustomStringVariable(const char *name, const char *short_desc, const char *long_desc, char **valueAddr, const char *bootValue, GucContext context, int flags, GucStringCheckHook check_hook, GucStringAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:5156
#define newval
void MarkGUCPrefixReserved(const char *className)
Definition: guc.c:5217
#define GUC_check_errdetail
Definition: guc.h:447
GucSource
Definition: guc.h:108
@ PGC_SIGHUP
Definition: guc.h:71
static bool pg_add_u64_overflow(uint64 a, uint64 b, uint64 *result)
Definition: int.h:380
static bool pg_mul_u64_overflow(uint64 a, uint64 b, uint64 *result)
Definition: int.h:414
#define read(a, b, c)
Definition: win32.h:13
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
Assert(fmt[strlen(fmt) - 1] !='\n')
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:371
void pfree(void *pointer)
Definition: mcxt.c:1508
MemoryContext TopMemoryContext
Definition: mcxt.c:137
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1202
MemoryContext CurrentMemoryContext
Definition: mcxt.c:131
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:442
#define MemoryContextIsValid(context)
Definition: memnodes.h:145
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:153
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:135
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:133
#define MAXPGPATH
const void * data
static rewind_source * source
Definition: pg_rewind.c:89
#define snprintf
Definition: port.h:238
MemoryContextSwitchTo(old_ctx)
ArchiveStartupCB startup_cb
MemoryContext context
Definition: basic_archive.c:45
unsigned short st_mode
Definition: win32_port.h:268
Definition: regguts.h:323
#define stat
Definition: win32_port.h:284
#define S_ISDIR(m)
Definition: win32_port.h:325
static const unsigned __int64 epoch
int gettimeofday(struct timeval *tp, void *tzp)