PostgreSQL Source Code  git master
basic_archive.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basic_archive.c
4  *
5  * This file demonstrates a basic archive library implementation that is
6  * roughly equivalent to the following shell command:
7  *
8  * test ! -f /path/to/dest && cp /path/to/src /path/to/dest
9  *
10  * One notable difference between this module and the shell command above
11  * is that this module first copies the file to a temporary destination,
12  * syncs it to disk, and then durably moves it to the final destination.
13  *
14  * Another notable difference is that if /path/to/dest already exists
15  * but has contents identical to /path/to/src, archiving will succeed,
16  * whereas the command shown above would fail. This prevents problems if
17  * a file is successfully archived and then the system crashes before
18  * a durable record of the success has been made.
19  *
20  * Copyright (c) 2022, PostgreSQL Global Development Group
21  *
22  * IDENTIFICATION
23  * contrib/basic_archive/basic_archive.c
24  *
25  *-------------------------------------------------------------------------
26  */
27 #include "postgres.h"
28 
29 #include <sys/stat.h>
30 #include <sys/time.h>
31 #include <unistd.h>
32 
33 #include "common/int.h"
34 #include "miscadmin.h"
35 #include "postmaster/pgarch.h"
36 #include "storage/copydir.h"
37 #include "storage/fd.h"
38 #include "utils/guc.h"
39 #include "utils/memutils.h"
40 
42 
43 void _PG_init(void);
45 
46 static char *archive_directory = NULL;
48 
49 static bool basic_archive_configured(void);
50 static bool basic_archive_file(const char *file, const char *path);
51 static void basic_archive_file_internal(const char *file, const char *path);
52 static bool check_archive_directory(char **newval, void **extra, GucSource source);
53 static bool compare_files(const char *file1, const char *file2);
54 
55 /*
56  * _PG_init
57  *
58  * Defines the module's GUC.
59  */
60 void
61 _PG_init(void)
62 {
63  DefineCustomStringVariable("basic_archive.archive_directory",
64  gettext_noop("Archive file destination directory."),
65  NULL,
67  "",
68  PGC_SIGHUP,
69  0,
70  check_archive_directory, NULL, NULL);
71 
72  MarkGUCPrefixReserved("basic_archive");
73 
75  "basic_archive",
77 }
78 
79 /*
80  * _PG_archive_module_init
81  *
82  * Returns the module's archiving callbacks.
83  */
84 void
86 {
88 
91 }
92 
93 /*
94  * check_archive_directory
95  *
96  * Checks that the provided archive directory exists.
97  */
98 static bool
100 {
101  struct stat st;
102 
103  /*
104  * The default value is an empty string, so we have to accept that value.
105  * Our check_configured callback also checks for this and prevents
106  * archiving from proceeding if it is still empty.
107  */
108  if (*newval == NULL || *newval[0] == '\0')
109  return true;
110 
111  /*
112  * Make sure the file paths won't be too long. The docs indicate that the
113  * file names to be archived can be up to 64 characters long.
114  */
115  if (strlen(*newval) + 64 + 2 >= MAXPGPATH)
116  {
117  GUC_check_errdetail("archive directory too long");
118  return false;
119  }
120 
121  /*
122  * Do a basic sanity check that the specified archive directory exists. It
123  * could be removed at some point in the future, so we still need to be
124  * prepared for it not to exist in the actual archiving logic.
125  */
126  if (stat(*newval, &st) != 0 || !S_ISDIR(st.st_mode))
127  {
128  GUC_check_errdetail("specified archive directory does not exist");
129  return false;
130  }
131 
132  return true;
133 }
134 
135 /*
136  * basic_archive_configured
137  *
138  * Checks that archive_directory is not blank.
139  */
140 static bool
142 {
143  return archive_directory != NULL && archive_directory[0] != '\0';
144 }
145 
146 /*
147  * basic_archive_file
148  *
149  * Archives one file.
150  */
151 static bool
152 basic_archive_file(const char *file, const char *path)
153 {
154  sigjmp_buf local_sigjmp_buf;
155  MemoryContext oldcontext;
156 
157  /*
158  * We run basic_archive_file_internal() in our own memory context so that
159  * we can easily reset it during error recovery (thus avoiding memory
160  * leaks).
161  */
163 
164  /*
165  * Since the archiver operates at the bottom of the exception stack,
166  * ERRORs turn into FATALs and cause the archiver process to restart.
167  * However, using ereport(ERROR, ...) when there are problems is easy to
168  * code and maintain. Therefore, we create our own exception handler to
169  * catch ERRORs and return false instead of restarting the archiver
170  * whenever there is a failure.
171  */
172  if (sigsetjmp(local_sigjmp_buf, 1) != 0)
173  {
174  /* Since not using PG_TRY, must reset error stack by hand */
175  error_context_stack = NULL;
176 
177  /* Prevent interrupts while cleaning up */
178  HOLD_INTERRUPTS();
179 
180  /* Report the error and clear ErrorContext for next time */
181  EmitErrorReport();
182  FlushErrorState();
183 
184  /* Close any files left open by copy_file() or compare_files() */
186 
187  /* Reset our memory context and switch back to the original one */
188  MemoryContextSwitchTo(oldcontext);
190 
191  /* Remove our exception handler */
192  PG_exception_stack = NULL;
193 
194  /* Now we can allow interrupts again */
196 
197  /* Report failure so that the archiver retries this file */
198  return false;
199  }
200 
201  /* Enable our exception handler */
202  PG_exception_stack = &local_sigjmp_buf;
203 
204  /* Archive the file! */
205  basic_archive_file_internal(file, path);
206 
207  /* Remove our exception handler */
208  PG_exception_stack = NULL;
209 
210  /* Reset our memory context and switch back to the original one */
211  MemoryContextSwitchTo(oldcontext);
213 
214  return true;
215 }
216 
217 static void
218 basic_archive_file_internal(const char *file, const char *path)
219 {
220  char destination[MAXPGPATH];
221  char temp[MAXPGPATH + 256];
222  struct stat st;
223  struct timeval tv;
224  uint64 epoch;
225 
226  ereport(DEBUG3,
227  (errmsg("archiving \"%s\" via basic_archive", file)));
228 
229  snprintf(destination, MAXPGPATH, "%s/%s", archive_directory, file);
230 
231  /*
232  * First, check if the file has already been archived. If it already
233  * exists and has the same contents as the file we're trying to archive,
234  * we can return success (after ensuring the file is persisted to disk).
235  * This scenario is possible if the server crashed after archiving the
236  * file but before renaming its .ready file to .done.
237  *
238  * If the archive file already exists but has different contents,
239  * something might be wrong, so we just fail.
240  */
241  if (stat(destination, &st) == 0)
242  {
243  if (compare_files(path, destination))
244  {
245  ereport(DEBUG3,
246  (errmsg("archive file \"%s\" already exists with identical contents",
247  destination)));
248 
249  fsync_fname(destination, false);
251 
252  return;
253  }
254 
255  ereport(ERROR,
256  (errmsg("archive file \"%s\" already exists", destination)));
257  }
258  else if (errno != ENOENT)
259  ereport(ERROR,
261  errmsg("could not stat file \"%s\": %m", destination)));
262 
263  /*
264  * Pick a sufficiently unique name for the temporary file so that a
265  * collision is unlikely. This helps avoid problems in case a temporary
266  * file was left around after a crash or another server happens to be
267  * archiving to the same directory.
268  */
269  gettimeofday(&tv, NULL);
270  if (pg_mul_u64_overflow((uint64) 1000, (uint64) tv.tv_sec, &epoch) ||
271  pg_add_u64_overflow(epoch, (uint64) tv.tv_usec, &epoch))
272  elog(ERROR, "could not generate temporary file name for archiving");
273 
274  snprintf(temp, sizeof(temp), "%s/%s.%s.%d." UINT64_FORMAT,
275  archive_directory, "archtemp", file, MyProcPid, epoch);
276 
277  /*
278  * Copy the file to its temporary destination. Note that this will fail
279  * if temp already exists.
280  */
281  copy_file(unconstify(char *, path), temp);
282 
283  /*
284  * Sync the temporary file to disk and move it to its final destination.
285  * This will fail if destination already exists.
286  */
287  (void) durable_rename_excl(temp, destination, ERROR);
288 
289  ereport(DEBUG1,
290  (errmsg("archived \"%s\" via basic_archive", file)));
291 }
292 
293 /*
294  * compare_files
295  *
296  * Returns whether the contents of the files are the same.
297  */
298 static bool
299 compare_files(const char *file1, const char *file2)
300 {
301 #define CMP_BUF_SIZE (4096)
302  char buf1[CMP_BUF_SIZE];
303  char buf2[CMP_BUF_SIZE];
304  int fd1;
305  int fd2;
306  bool ret = true;
307 
308  fd1 = OpenTransientFile(file1, O_RDONLY | PG_BINARY);
309  if (fd1 < 0)
310  ereport(ERROR,
312  errmsg("could not open file \"%s\": %m", file1)));
313 
314  fd2 = OpenTransientFile(file2, O_RDONLY | PG_BINARY);
315  if (fd2 < 0)
316  ereport(ERROR,
318  errmsg("could not open file \"%s\": %m", file2)));
319 
320  for (;;)
321  {
322  int nbytes = 0;
323  int buf1_len = 0;
324  int buf2_len = 0;
325 
326  while (buf1_len < CMP_BUF_SIZE)
327  {
328  nbytes = read(fd1, buf1 + buf1_len, CMP_BUF_SIZE - buf1_len);
329  if (nbytes < 0)
330  ereport(ERROR,
332  errmsg("could not read file \"%s\": %m", file1)));
333  else if (nbytes == 0)
334  break;
335 
336  buf1_len += nbytes;
337  }
338 
339  while (buf2_len < CMP_BUF_SIZE)
340  {
341  nbytes = read(fd2, buf2 + buf2_len, CMP_BUF_SIZE - buf2_len);
342  if (nbytes < 0)
343  ereport(ERROR,
345  errmsg("could not read file \"%s\": %m", file2)));
346  else if (nbytes == 0)
347  break;
348 
349  buf2_len += nbytes;
350  }
351 
352  if (buf1_len != buf2_len || memcmp(buf1, buf2, buf1_len) != 0)
353  {
354  ret = false;
355  break;
356  }
357  else if (buf1_len == 0)
358  break;
359  }
360 
361  if (CloseTransientFile(fd1) != 0)
362  ereport(ERROR,
364  errmsg("could not close file \"%s\": %m", file1)));
365 
366  if (CloseTransientFile(fd2) != 0)
367  ereport(ERROR,
369  errmsg("could not close file \"%s\": %m", file2)));
370 
371  return ret;
372 }
static MemoryContext basic_archive_context
Definition: basic_archive.c:47
void _PG_archive_module_init(ArchiveModuleCallbacks *cb)
Definition: basic_archive.c:85
static char * archive_directory
Definition: basic_archive.c:46
void _PG_init(void)
Definition: basic_archive.c:61
PG_MODULE_MAGIC
Definition: basic_archive.c:41
static bool basic_archive_file(const char *file, const char *path)
#define CMP_BUF_SIZE
static bool basic_archive_configured(void)
static void basic_archive_file_internal(const char *file, const char *path)
static bool compare_files(const char *file1, const char *file2)
static bool check_archive_directory(char **newval, void **extra, GucSource source)
Definition: basic_archive.c:99
#define unconstify(underlying_type, expr)
Definition: c.h:1240
#define gettext_noop(x)
Definition: c.h:1194
#define AssertVariableIsOfType(varname, typename)
Definition: c.h:963
#define InvalidSubTransactionId
Definition: c.h:593
#define PG_BINARY
Definition: c.h:1268
#define UINT64_FORMAT
Definition: c.h:484
void copy_file(char *fromfile, char *tofile)
Definition: copydir.c:127
void EmitErrorReport(void)
Definition: elog.c:1504
int errcode_for_file_access(void)
Definition: elog.c:716
ErrorContextCallback * error_context_stack
Definition: elog.c:93
void FlushErrorState(void)
Definition: elog.c:1649
int errmsg(const char *fmt,...)
Definition: elog.c:904
sigjmp_buf * PG_exception_stack
Definition: elog.c:95
#define DEBUG3
Definition: elog.h:22
#define DEBUG1
Definition: elog.h:24
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
int durable_rename_excl(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:829
int CloseTransientFile(int fd)
Definition: fd.c:2688
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:673
void AtEOSubXact_Files(bool isCommit, SubTransactionId mySubid, SubTransactionId parentSubid)
Definition: fd.c:3011
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2511
static const unsigned __int64 epoch
Definition: gettimeofday.c:34
int gettimeofday(struct timeval *tp, struct timezone *tzp)
Definition: gettimeofday.c:104
int MyProcPid
Definition: globals.c:44
void DefineCustomStringVariable(const char *name, const char *short_desc, const char *long_desc, char **valueAddr, const char *bootValue, GucContext context, int flags, GucStringCheckHook check_hook, GucStringAssignHook assign_hook, GucShowHook show_hook)
Definition: guc.c:9568
#define newval
void MarkGUCPrefixReserved(const char *className)
Definition: guc.c:9629
#define GUC_check_errdetail
Definition: guc.h:431
GucSource
Definition: guc.h:109
@ PGC_SIGHUP
Definition: guc.h:72
static bool pg_add_u64_overflow(uint64 a, uint64 b, uint64 *result)
Definition: int.h:376
static bool pg_mul_u64_overflow(uint64 a, uint64 b, uint64 *result)
Definition: int.h:410
#define read(a, b, c)
Definition: win32.h:13
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:143
MemoryContext TopMemoryContext
Definition: mcxt.c:48
#define AllocSetContextCreate
Definition: memutils.h:173
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:134
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:132
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
#define MAXPGPATH
static rewind_source * source
Definition: pg_rewind.c:81
void(* ArchiveModuleInit)(ArchiveModuleCallbacks *cb)
Definition: pgarch.h:64
#define snprintf
Definition: port.h:225
ArchiveFileCB archive_file_cb
Definition: pgarch.h:56
ArchiveCheckConfiguredCB check_configured_cb
Definition: pgarch.h:55
unsigned short st_mode
Definition: win32_port.h:268
#define stat
Definition: win32_port.h:283
#define S_ISDIR(m)
Definition: win32_port.h:324