PostgreSQL Source Code  git master
walmethods.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walmethods.c - implementations of different ways to write received wal
4  *
5  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/bin/pg_basebackup/walmethods.c
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres_fe.h"
13 
14 #include <sys/stat.h>
15 #include <time.h>
16 #include <unistd.h>
17 
18 #ifdef USE_LZ4
19 #include <lz4frame.h>
20 #endif
21 #ifdef HAVE_LIBZ
22 #include <zlib.h>
23 #endif
24 
25 #include "common/file_perm.h"
26 #include "common/file_utils.h"
27 #include "common/logging.h"
28 #include "pgtar.h"
29 #include "receivelog.h"
30 #include "streamutil.h"
31 
32 /* Size of zlib buffer for .tar.gz */
33 #define ZLIB_OUT_SIZE 4096
34 
35 /* Size of LZ4 input chunk for .lz4 */
36 #define LZ4_IN_SIZE 4096
37 
38 /*-------------------------------------------------------------------------
39  * WalDirectoryMethod - write wal to a directory looking like pg_wal
40  *-------------------------------------------------------------------------
41  */
42 
43 static Walfile *dir_open_for_write(WalWriteMethod *wwmethod,
44  const char *pathname,
45  const char *temp_suffix,
46  size_t pad_to_size);
47 static int dir_close(Walfile *f, WalCloseMethod method);
48 static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname);
49 static ssize_t dir_get_file_size(WalWriteMethod *wwmethod,
50  const char *pathname);
51 static char *dir_get_file_name(WalWriteMethod *wwmethod,
52  const char *pathname, const char *temp_suffix);
53 static ssize_t dir_write(Walfile *f, const void *buf, size_t count);
54 static int dir_sync(Walfile *f);
55 static bool dir_finish(WalWriteMethod *wwmethod);
56 static void dir_free(WalWriteMethod *wwmethod);
57 
60  .close = dir_close,
61  .existsfile = dir_existsfile,
62  .get_file_size = dir_get_file_size,
63  .get_file_name = dir_get_file_name,
64  .write = dir_write,
65  .sync = dir_sync,
66  .finish = dir_finish,
67  .free = dir_free
68 };
69 
70 /*
71  * Global static data for this method
72  */
73 typedef struct DirectoryMethodData
74 {
76  char *basedir;
78 
79 /*
80  * Local file handle
81  */
82 typedef struct DirectoryMethodFile
83 {
85  int fd;
86  char *fullpath;
87  char *temp_suffix;
88 #ifdef HAVE_LIBZ
89  gzFile gzfp;
90 #endif
91 #ifdef USE_LZ4
92  LZ4F_compressionContext_t ctx;
93  size_t lz4bufsize;
94  void *lz4buf;
95 #endif
97 
98 #define clear_error(wwmethod) \
99  ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
100 
101 static char *
103  const char *pathname, const char *temp_suffix)
104 {
105  char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
106 
107  snprintf(filename, MAXPGPATH, "%s%s%s",
108  pathname,
109  wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
110  wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
111  temp_suffix ? temp_suffix : "");
112 
113  return filename;
114 }
115 
116 static Walfile *
117 dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
118  const char *temp_suffix, size_t pad_to_size)
119 {
120  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
121  char tmppath[MAXPGPATH];
122  char *filename;
123  int fd;
125 #ifdef HAVE_LIBZ
126  gzFile gzfp = NULL;
127 #endif
128 #ifdef USE_LZ4
129  LZ4F_compressionContext_t ctx = NULL;
130  size_t lz4bufsize = 0;
131  void *lz4buf = NULL;
132 #endif
133 
134  clear_error(wwmethod);
135 
136  filename = dir_get_file_name(wwmethod, pathname, temp_suffix);
137  snprintf(tmppath, sizeof(tmppath), "%s/%s",
138  dir_data->basedir, filename);
139  pg_free(filename);
140 
141  /*
142  * Open a file for non-compressed as well as compressed files. Tracking
143  * the file descriptor is important for dir_sync() method as gzflush()
144  * does not do any system calls to fsync() to make changes permanent on
145  * disk.
146  */
147  fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
148  if (fd < 0)
149  {
150  wwmethod->lasterrno = errno;
151  return NULL;
152  }
153 
154 #ifdef HAVE_LIBZ
156  {
157  gzfp = gzdopen(fd, "wb");
158  if (gzfp == NULL)
159  {
160  wwmethod->lasterrno = errno;
161  close(fd);
162  return NULL;
163  }
164 
165  if (gzsetparams(gzfp, wwmethod->compression_level,
166  Z_DEFAULT_STRATEGY) != Z_OK)
167  {
168  wwmethod->lasterrno = errno;
169  gzclose(gzfp);
170  return NULL;
171  }
172  }
173 #endif
174 #ifdef USE_LZ4
175  if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
176  {
177  size_t ctx_out;
178  size_t header_size;
179  LZ4F_preferences_t prefs;
180 
181  ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
182  if (LZ4F_isError(ctx_out))
183  {
184  wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
185  close(fd);
186  return NULL;
187  }
188 
189  lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
190  lz4buf = pg_malloc0(lz4bufsize);
191 
192  /* assign the compression level, default is 0 */
193  memset(&prefs, 0, sizeof(prefs));
194  prefs.compressionLevel = wwmethod->compression_level;
195 
196  /* add the header */
197  header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
198  if (LZ4F_isError(header_size))
199  {
200  wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
201  (void) LZ4F_freeCompressionContext(ctx);
202  pg_free(lz4buf);
203  close(fd);
204  return NULL;
205  }
206 
207  errno = 0;
208  if (write(fd, lz4buf, header_size) != header_size)
209  {
210  /* If write didn't set errno, assume problem is no disk space */
211  wwmethod->lasterrno = errno ? errno : ENOSPC;
212  (void) LZ4F_freeCompressionContext(ctx);
213  pg_free(lz4buf);
214  close(fd);
215  return NULL;
216  }
217  }
218 #endif
219 
220  /* Do pre-padding on non-compressed files */
221  if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
222  {
223  ssize_t rc;
224 
225  rc = pg_pwrite_zeros(fd, pad_to_size, 0);
226 
227  if (rc < 0)
228  {
229  wwmethod->lasterrno = errno;
230  close(fd);
231  return NULL;
232  }
233 
234  /*
235  * pg_pwrite() (called via pg_pwrite_zeros()) may have moved the file
236  * position, so reset it (see win32pwrite.c).
237  */
238  if (lseek(fd, 0, SEEK_SET) != 0)
239  {
240  wwmethod->lasterrno = errno;
241  close(fd);
242  return NULL;
243  }
244  }
245 
246  /*
247  * fsync WAL file and containing directory, to ensure the file is
248  * persistently created and zeroed (if padded). That's particularly
249  * important when using synchronous mode, where the file is modified and
250  * fsynced in-place, without a directory fsync.
251  */
252  if (wwmethod->sync)
253  {
254  if (fsync_fname(tmppath, false) != 0 ||
255  fsync_parent_path(tmppath) != 0)
256  {
257  wwmethod->lasterrno = errno;
258 #ifdef HAVE_LIBZ
260  gzclose(gzfp);
261  else
262 #endif
263 #ifdef USE_LZ4
264  if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
265  {
266  (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
267  (void) LZ4F_freeCompressionContext(ctx);
268  pg_free(lz4buf);
269  close(fd);
270  }
271  else
272 #endif
273  close(fd);
274  return NULL;
275  }
276  }
277 
278  f = pg_malloc0(sizeof(DirectoryMethodFile));
279 #ifdef HAVE_LIBZ
281  f->gzfp = gzfp;
282 #endif
283 #ifdef USE_LZ4
284  if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
285  {
286  f->ctx = ctx;
287  f->lz4buf = lz4buf;
288  f->lz4bufsize = lz4bufsize;
289  }
290 #endif
291 
292  f->base.wwmethod = wwmethod;
293  f->base.currpos = 0;
294  f->base.pathname = pg_strdup(pathname);
295  f->fd = fd;
296  f->fullpath = pg_strdup(tmppath);
297  if (temp_suffix)
298  f->temp_suffix = pg_strdup(temp_suffix);
299 
300  return &f->base;
301 }
302 
303 static ssize_t
304 dir_write(Walfile *f, const void *buf, size_t count)
305 {
306  ssize_t r;
308 
309  Assert(f != NULL);
310  clear_error(f->wwmethod);
311 
312 #ifdef HAVE_LIBZ
314  {
315  errno = 0;
316  r = (ssize_t) gzwrite(df->gzfp, buf, count);
317  if (r != count)
318  {
319  /* If write didn't set errno, assume problem is no disk space */
320  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
321  }
322  }
323  else
324 #endif
325 #ifdef USE_LZ4
327  {
328  size_t chunk;
329  size_t remaining;
330  const void *inbuf = buf;
331 
332  remaining = count;
333  while (remaining > 0)
334  {
335  size_t compressed;
336 
337  if (remaining > LZ4_IN_SIZE)
338  chunk = LZ4_IN_SIZE;
339  else
340  chunk = remaining;
341 
342  remaining -= chunk;
343  compressed = LZ4F_compressUpdate(df->ctx,
344  df->lz4buf, df->lz4bufsize,
345  inbuf, chunk,
346  NULL);
347 
348  if (LZ4F_isError(compressed))
349  {
350  f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
351  return -1;
352  }
353 
354  errno = 0;
355  if (write(df->fd, df->lz4buf, compressed) != compressed)
356  {
357  /* If write didn't set errno, assume problem is no disk space */
358  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
359  return -1;
360  }
361 
362  inbuf = ((char *) inbuf) + chunk;
363  }
364 
365  /* Our caller keeps track of the uncompressed size. */
366  r = (ssize_t) count;
367  }
368  else
369 #endif
370  {
371  errno = 0;
372  r = write(df->fd, buf, count);
373  if (r != count)
374  {
375  /* If write didn't set errno, assume problem is no disk space */
376  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
377  }
378  }
379  if (r > 0)
380  df->base.currpos += r;
381  return r;
382 }
383 
384 static int
386 {
387  int r;
390  char tmppath[MAXPGPATH];
391  char tmppath2[MAXPGPATH];
392 
393  Assert(f != NULL);
394  clear_error(f->wwmethod);
395 
396 #ifdef HAVE_LIBZ
398  {
399  errno = 0; /* in case gzclose() doesn't set it */
400  r = gzclose(df->gzfp);
401  }
402  else
403 #endif
404 #ifdef USE_LZ4
406  {
407  size_t compressed;
408 
409  compressed = LZ4F_compressEnd(df->ctx,
410  df->lz4buf, df->lz4bufsize,
411  NULL);
412 
413  if (LZ4F_isError(compressed))
414  {
415  f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
416  return -1;
417  }
418 
419  errno = 0;
420  if (write(df->fd, df->lz4buf, compressed) != compressed)
421  {
422  /* If write didn't set errno, assume problem is no disk space */
423  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
424  return -1;
425  }
426 
427  r = close(df->fd);
428  }
429  else
430 #endif
431  r = close(df->fd);
432 
433  if (r == 0)
434  {
435  /* Build path to the current version of the file */
436  if (method == CLOSE_NORMAL && df->temp_suffix)
437  {
438  char *filename;
439  char *filename2;
440 
441  /*
442  * If we have a temp prefix, normal operation is to rename the
443  * file.
444  */
446  df->temp_suffix);
447  snprintf(tmppath, sizeof(tmppath), "%s/%s",
448  dir_data->basedir, filename);
449  pg_free(filename);
450 
451  /* permanent name, so no need for the prefix */
452  filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL);
453  snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
454  dir_data->basedir, filename2);
455  pg_free(filename2);
456  if (f->wwmethod->sync)
457  r = durable_rename(tmppath, tmppath2);
458  else
459  {
460  if (rename(tmppath, tmppath2) != 0)
461  {
462  pg_log_error("could not rename file \"%s\" to \"%s\": %m",
463  tmppath, tmppath2);
464  r = -1;
465  }
466  }
467  }
468  else if (method == CLOSE_UNLINK)
469  {
470  char *filename;
471 
472  /* Unlink the file once it's closed */
474  df->temp_suffix);
475  snprintf(tmppath, sizeof(tmppath), "%s/%s",
476  dir_data->basedir, filename);
477  pg_free(filename);
478  r = unlink(tmppath);
479  }
480  else
481  {
482  /*
483  * Else either CLOSE_NORMAL and no temp suffix, or
484  * CLOSE_NO_RENAME. In this case, fsync the file and containing
485  * directory if sync mode is requested.
486  */
487  if (f->wwmethod->sync)
488  {
489  r = fsync_fname(df->fullpath, false);
490  if (r == 0)
491  r = fsync_parent_path(df->fullpath);
492  }
493  }
494  }
495 
496  if (r != 0)
497  f->wwmethod->lasterrno = errno;
498 
499 #ifdef USE_LZ4
500  pg_free(df->lz4buf);
501  /* supports free on NULL */
502  LZ4F_freeCompressionContext(df->ctx);
503 #endif
504 
505  pg_free(df->base.pathname);
506  pg_free(df->fullpath);
507  pg_free(df->temp_suffix);
508  pg_free(df);
509 
510  return r;
511 }
512 
513 static int
515 {
516  int r;
517 
518  Assert(f != NULL);
519  clear_error(f->wwmethod);
520 
521  if (!f->wwmethod->sync)
522  return 0;
523 
524 #ifdef HAVE_LIBZ
526  {
527  if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
528  {
529  f->wwmethod->lasterrno = errno;
530  return -1;
531  }
532  }
533 #endif
534 #ifdef USE_LZ4
536  {
538  size_t compressed;
539 
540  /* Flush any internal buffers */
541  compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
542  if (LZ4F_isError(compressed))
543  {
544  f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
545  return -1;
546  }
547 
548  errno = 0;
549  if (write(df->fd, df->lz4buf, compressed) != compressed)
550  {
551  /* If write didn't set errno, assume problem is no disk space */
552  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
553  return -1;
554  }
555  }
556 #endif
557 
558  r = fsync(((DirectoryMethodFile *) f)->fd);
559  if (r < 0)
560  f->wwmethod->lasterrno = errno;
561  return r;
562 }
563 
564 static ssize_t
565 dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
566 {
567  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
568  struct stat statbuf;
569  char tmppath[MAXPGPATH];
570 
571  snprintf(tmppath, sizeof(tmppath), "%s/%s",
572  dir_data->basedir, pathname);
573 
574  if (stat(tmppath, &statbuf) != 0)
575  {
576  wwmethod->lasterrno = errno;
577  return -1;
578  }
579 
580  return statbuf.st_size;
581 }
582 
583 static bool
584 dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
585 {
586  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
587  char tmppath[MAXPGPATH];
588  int fd;
589 
590  clear_error(wwmethod);
591 
592  snprintf(tmppath, sizeof(tmppath), "%s/%s",
593  dir_data->basedir, pathname);
594 
595  fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
596  if (fd < 0)
597 
598  /*
599  * Skip setting dir_data->lasterrno here because we are only checking
600  * for existence.
601  */
602  return false;
603  close(fd);
604  return true;
605 }
606 
607 static bool
609 {
610  clear_error(wwmethod);
611 
612  if (wwmethod->sync)
613  {
614  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
615 
616  /*
617  * Files are fsynced when they are closed, but we need to fsync the
618  * directory entry here as well.
619  */
620  if (fsync_fname(dir_data->basedir, true) != 0)
621  {
622  wwmethod->lasterrno = errno;
623  return false;
624  }
625  }
626  return true;
627 }
628 
629 static void
631 {
632  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
633 
634  pg_free(dir_data->basedir);
635  pg_free(wwmethod);
636 }
637 
638 
642  int compression_level, bool sync)
643 {
644  DirectoryMethodData *wwmethod;
645 
646  wwmethod = pg_malloc0(sizeof(DirectoryMethodData));
647  *((const WalWriteMethodOps **) &wwmethod->base.ops) =
650  wwmethod->base.compression_level = compression_level;
651  wwmethod->base.sync = sync;
652  clear_error(&wwmethod->base);
653  wwmethod->basedir = pg_strdup(basedir);
654 
655  return &wwmethod->base;
656 }
657 
658 
659 /*-------------------------------------------------------------------------
660  * WalTarMethod - write wal to a tar file containing pg_wal contents
661  *-------------------------------------------------------------------------
662  */
663 
664 static Walfile *tar_open_for_write(WalWriteMethod *wwmethod,
665  const char *pathname,
666  const char *temp_suffix,
667  size_t pad_to_size);
668 static int tar_close(Walfile *f, WalCloseMethod method);
669 static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname);
670 static ssize_t tar_get_file_size(WalWriteMethod *wwmethod,
671  const char *pathname);
672 static char *tar_get_file_name(WalWriteMethod *wwmethod,
673  const char *pathname, const char *temp_suffix);
674 static ssize_t tar_write(Walfile *f, const void *buf, size_t count);
675 static int tar_sync(Walfile *f);
676 static bool tar_finish(WalWriteMethod *wwmethod);
677 static void tar_free(WalWriteMethod *wwmethod);
678 
681  .close = tar_close,
682  .existsfile = tar_existsfile,
683  .get_file_size = tar_get_file_size,
684  .get_file_name = tar_get_file_name,
685  .write = tar_write,
686  .sync = tar_sync,
687  .finish = tar_finish,
688  .free = tar_free
689 };
690 
691 typedef struct TarMethodFile
692 {
694  off_t ofs_start; /* Where does the *header* for this file start */
696  size_t pad_to_size;
698 
699 typedef struct TarMethodData
700 {
702  char *tarfilename;
703  int fd;
705 #ifdef HAVE_LIBZ
706  z_streamp zp;
707  void *zlibOut;
708 #endif
710 
711 #ifdef HAVE_LIBZ
712 static bool
713 tar_write_compressed_data(TarMethodData *tar_data, const void *buf, size_t count,
714  bool flush)
715 {
716  tar_data->zp->next_in = buf;
717  tar_data->zp->avail_in = count;
718 
719  while (tar_data->zp->avail_in || flush)
720  {
721  int r;
722 
723  r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
724  if (r == Z_STREAM_ERROR)
725  {
726  tar_data->base.lasterrstring = _("could not compress data");
727  return false;
728  }
729 
730  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
731  {
732  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
733 
734  errno = 0;
735  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
736  {
737  /* If write didn't set errno, assume problem is no disk space */
738  tar_data->base.lasterrno = errno ? errno : ENOSPC;
739  return false;
740  }
741 
742  tar_data->zp->next_out = tar_data->zlibOut;
743  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
744  }
745 
746  if (r == Z_STREAM_END)
747  break;
748  }
749 
750  if (flush)
751  {
752  /* Reset the stream for writing */
753  if (deflateReset(tar_data->zp) != Z_OK)
754  {
755  tar_data->base.lasterrstring = _("could not reset compression stream");
756  return false;
757  }
758  }
759 
760  return true;
761 }
762 #endif
763 
764 static ssize_t
765 tar_write(Walfile *f, const void *buf, size_t count)
766 {
767  TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
768  ssize_t r;
769 
770  Assert(f != NULL);
771  clear_error(f->wwmethod);
772 
773  /* Tarfile will always be positioned at the end */
775  {
776  errno = 0;
777  r = write(tar_data->fd, buf, count);
778  if (r != count)
779  {
780  /* If write didn't set errno, assume problem is no disk space */
781  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
782  return -1;
783  }
784  f->currpos += r;
785  return r;
786  }
787 #ifdef HAVE_LIBZ
789  {
790  if (!tar_write_compressed_data(tar_data, buf, count, false))
791  return -1;
792  f->currpos += count;
793  return count;
794  }
795 #endif
796  else
797  {
798  /* Can't happen - compression enabled with no method set */
799  f->wwmethod->lasterrno = ENOSYS;
800  return -1;
801  }
802 }
803 
804 static bool
806 {
807  PGAlignedXLogBlock zerobuf;
808  size_t bytesleft = bytes;
809 
810  memset(zerobuf.data, 0, XLOG_BLCKSZ);
811  while (bytesleft)
812  {
813  size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
814  ssize_t r = tar_write(&f->base, zerobuf.data, bytestowrite);
815 
816  if (r < 0)
817  return false;
818  bytesleft -= r;
819  }
820 
821  return true;
822 }
823 
824 static char *
825 tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname,
826  const char *temp_suffix)
827 {
828  char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
829 
830  snprintf(filename, MAXPGPATH, "%s%s",
831  pathname, temp_suffix ? temp_suffix : "");
832 
833  return filename;
834 }
835 
836 static Walfile *
837 tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
838  const char *temp_suffix, size_t pad_to_size)
839 {
840  TarMethodData *tar_data = (TarMethodData *) wwmethod;
841  char *tmppath;
842 
843  clear_error(wwmethod);
844 
845  if (tar_data->fd < 0)
846  {
847  /*
848  * We open the tar file only when we first try to write to it.
849  */
850  tar_data->fd = open(tar_data->tarfilename,
851  O_WRONLY | O_CREAT | PG_BINARY,
853  if (tar_data->fd < 0)
854  {
855  wwmethod->lasterrno = errno;
856  return NULL;
857  }
858 
859 #ifdef HAVE_LIBZ
861  {
862  tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
863  tar_data->zp->zalloc = Z_NULL;
864  tar_data->zp->zfree = Z_NULL;
865  tar_data->zp->opaque = Z_NULL;
866  tar_data->zp->next_out = tar_data->zlibOut;
867  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
868 
869  /*
870  * Initialize deflation library. Adding the magic value 16 to the
871  * default 15 for the windowBits parameter makes the output be
872  * gzip instead of zlib.
873  */
874  if (deflateInit2(tar_data->zp, wwmethod->compression_level,
875  Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
876  {
877  pg_free(tar_data->zp);
878  tar_data->zp = NULL;
879  wwmethod->lasterrstring =
880  _("could not initialize compression library");
881  return NULL;
882  }
883  }
884 #endif
885 
886  /* There's no tar header itself, the file starts with regular files */
887  }
888 
889  if (tar_data->currentfile != NULL)
890  {
891  wwmethod->lasterrstring =
892  _("implementation error: tar files can't have more than one open file");
893  return NULL;
894  }
895 
896  tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
897  tar_data->currentfile->base.wwmethod = wwmethod;
898 
899  tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix);
900 
901  /* Create a header with size set to 0 - we will fill out the size on close */
902  if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
903  {
904  pg_free(tar_data->currentfile);
905  pg_free(tmppath);
906  tar_data->currentfile = NULL;
907  wwmethod->lasterrstring = _("could not create tar header");
908  return NULL;
909  }
910 
911  pg_free(tmppath);
912 
913 #ifdef HAVE_LIBZ
915  {
916  /* Flush existing data */
917  if (!tar_write_compressed_data(tar_data, NULL, 0, true))
918  return NULL;
919 
920  /* Turn off compression for header */
921  if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
922  {
923  wwmethod->lasterrstring =
924  _("could not change compression parameters");
925  return NULL;
926  }
927  }
928 #endif
929 
930  tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
931  if (tar_data->currentfile->ofs_start == -1)
932  {
933  wwmethod->lasterrno = errno;
934  pg_free(tar_data->currentfile);
935  tar_data->currentfile = NULL;
936  return NULL;
937  }
938  tar_data->currentfile->base.currpos = 0;
939 
941  {
942  errno = 0;
943  if (write(tar_data->fd, tar_data->currentfile->header,
945  {
946  /* If write didn't set errno, assume problem is no disk space */
947  wwmethod->lasterrno = errno ? errno : ENOSPC;
948  pg_free(tar_data->currentfile);
949  tar_data->currentfile = NULL;
950  return NULL;
951  }
952  }
953 #ifdef HAVE_LIBZ
954  else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
955  {
956  /* Write header through the zlib APIs but with no compression */
957  if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
958  TAR_BLOCK_SIZE, true))
959  return NULL;
960 
961  /* Re-enable compression for the rest of the file */
962  if (deflateParams(tar_data->zp, wwmethod->compression_level,
963  Z_DEFAULT_STRATEGY) != Z_OK)
964  {
965  wwmethod->lasterrstring = _("could not change compression parameters");
966  return NULL;
967  }
968  }
969 #endif
970  else
971  {
972  /* not reachable */
973  Assert(false);
974  }
975 
976  tar_data->currentfile->base.pathname = pg_strdup(pathname);
977 
978  /*
979  * Uncompressed files are padded on creation, but for compression we can't
980  * do that
981  */
982  if (pad_to_size)
983  {
984  tar_data->currentfile->pad_to_size = pad_to_size;
986  {
987  /* Uncompressed, so pad now */
988  if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
989  return NULL;
990  /* Seek back to start */
991  if (lseek(tar_data->fd,
992  tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
993  SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
994  {
995  wwmethod->lasterrno = errno;
996  return NULL;
997  }
998 
999  tar_data->currentfile->base.currpos = 0;
1000  }
1001  }
1002 
1003  return &tar_data->currentfile->base;
1004 }
1005 
1006 static ssize_t
1007 tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
1008 {
1009  clear_error(wwmethod);
1010 
1011  /* Currently not used, so not supported */
1012  wwmethod->lasterrno = ENOSYS;
1013  return -1;
1014 }
1015 
1016 static int
1018 {
1019  TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
1020  int r;
1021 
1022  Assert(f != NULL);
1023  clear_error(f->wwmethod);
1024 
1025  if (!f->wwmethod->sync)
1026  return 0;
1027 
1028  /*
1029  * Always sync the whole tarfile, because that's all we can do. This makes
1030  * no sense on compressed files, so just ignore those.
1031  */
1033  return 0;
1034 
1035  r = fsync(tar_data->fd);
1036  if (r < 0)
1037  f->wwmethod->lasterrno = errno;
1038  return r;
1039 }
1040 
1041 static int
1043 {
1044  ssize_t filesize;
1045  int padding;
1046  TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
1047  TarMethodFile *tf = (TarMethodFile *) f;
1048 
1049  Assert(f != NULL);
1050  clear_error(f->wwmethod);
1051 
1052  if (method == CLOSE_UNLINK)
1053  {
1055  {
1056  f->wwmethod->lasterrstring = _("unlink not supported with compression");
1057  return -1;
1058  }
1059 
1060  /*
1061  * Unlink the file that we just wrote to the tar. We do this by
1062  * truncating it to the start of the header. This is safe as we only
1063  * allow writing of the very last file.
1064  */
1065  if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
1066  {
1067  f->wwmethod->lasterrno = errno;
1068  return -1;
1069  }
1070 
1071  pg_free(tf->base.pathname);
1072  pg_free(tf);
1073  tar_data->currentfile = NULL;
1074 
1075  return 0;
1076  }
1077 
1078  /*
1079  * Pad the file itself with zeroes if necessary. Note that this is
1080  * different from the tar format padding -- this is the padding we asked
1081  * for when the file was opened.
1082  */
1083  if (tf->pad_to_size)
1084  {
1086  {
1087  /*
1088  * A compressed tarfile is padded on close since we cannot know
1089  * the size of the compressed output until the end.
1090  */
1091  size_t sizeleft = tf->pad_to_size - tf->base.currpos;
1092 
1093  if (sizeleft)
1094  {
1095  if (!tar_write_padding_data(tf, sizeleft))
1096  return -1;
1097  }
1098  }
1099  else
1100  {
1101  /*
1102  * An uncompressed tarfile was padded on creation, so just adjust
1103  * the current position as if we seeked to the end.
1104  */
1105  tf->base.currpos = tf->pad_to_size;
1106  }
1107  }
1108 
1109  /*
1110  * Get the size of the file, and pad out to a multiple of the tar block
1111  * size.
1112  */
1113  filesize = f->currpos;
1114  padding = tarPaddingBytesRequired(filesize);
1115  if (padding)
1116  {
1117  char zerobuf[TAR_BLOCK_SIZE] = {0};
1118 
1119  if (tar_write(f, zerobuf, padding) != padding)
1120  return -1;
1121  }
1122 
1123 
1124 #ifdef HAVE_LIBZ
1126  {
1127  /* Flush the current buffer */
1128  if (!tar_write_compressed_data(tar_data, NULL, 0, true))
1129  return -1;
1130  }
1131 #endif
1132 
1133  /*
1134  * Now go back and update the header with the correct filesize and
1135  * possibly also renaming the file. We overwrite the entire current header
1136  * when done, including the checksum.
1137  */
1138  print_tar_number(&(tf->header[TAR_OFFSET_SIZE]), 12, filesize);
1139 
1140  if (method == CLOSE_NORMAL)
1141 
1142  /*
1143  * We overwrite it with what it was before if we have no tempname,
1144  * since we're going to write the buffer anyway.
1145  */
1146  strlcpy(&(tf->header[TAR_OFFSET_NAME]), tf->base.pathname, 100);
1147 
1148  print_tar_number(&(tf->header[TAR_OFFSET_CHECKSUM]), 8,
1149  tarChecksum(((TarMethodFile *) f)->header));
1150  if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
1151  {
1152  f->wwmethod->lasterrno = errno;
1153  return -1;
1154  }
1156  {
1157  errno = 0;
1158  if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
1159  {
1160  /* If write didn't set errno, assume problem is no disk space */
1161  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
1162  return -1;
1163  }
1164  }
1165 #ifdef HAVE_LIBZ
1167  {
1168  /* Turn off compression */
1169  if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
1170  {
1171  f->wwmethod->lasterrstring = _("could not change compression parameters");
1172  return -1;
1173  }
1174 
1175  /* Overwrite the header, assuming the size will be the same */
1176  if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
1177  TAR_BLOCK_SIZE, true))
1178  return -1;
1179 
1180  /* Turn compression back on */
1181  if (deflateParams(tar_data->zp, f->wwmethod->compression_level,
1182  Z_DEFAULT_STRATEGY) != Z_OK)
1183  {
1184  f->wwmethod->lasterrstring = _("could not change compression parameters");
1185  return -1;
1186  }
1187  }
1188 #endif
1189  else
1190  {
1191  /* not reachable */
1192  Assert(false);
1193  }
1194 
1195  /* Move file pointer back down to end, so we can write the next file */
1196  if (lseek(tar_data->fd, 0, SEEK_END) < 0)
1197  {
1198  f->wwmethod->lasterrno = errno;
1199  return -1;
1200  }
1201 
1202  /* Always fsync on close, so the padding gets fsynced */
1203  if (tar_sync(f) < 0)
1204  {
1205  /* XXX this seems pretty bogus; why is only this case fatal? */
1206  pg_fatal("could not fsync file \"%s\": %s",
1207  tf->base.pathname, GetLastWalMethodError(f->wwmethod));
1208  }
1209 
1210  /* Clean up and done */
1211  pg_free(tf->base.pathname);
1212  pg_free(tf);
1213  tar_data->currentfile = NULL;
1214 
1215  return 0;
1216 }
1217 
1218 static bool
1219 tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
1220 {
1221  clear_error(wwmethod);
1222  /* We only deal with new tarfiles, so nothing externally created exists */
1223  return false;
1224 }
1225 
1226 static bool
1228 {
1229  TarMethodData *tar_data = (TarMethodData *) wwmethod;
1230  char zerobuf[1024] = {0};
1231 
1232  clear_error(wwmethod);
1233 
1234  if (tar_data->currentfile)
1235  {
1236  if (tar_close(&tar_data->currentfile->base, CLOSE_NORMAL) != 0)
1237  return false;
1238  }
1239 
1240  /* A tarfile always ends with two empty blocks */
1241  if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
1242  {
1243  errno = 0;
1244  if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
1245  {
1246  /* If write didn't set errno, assume problem is no disk space */
1247  wwmethod->lasterrno = errno ? errno : ENOSPC;
1248  return false;
1249  }
1250  }
1251 #ifdef HAVE_LIBZ
1252  else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1253  {
1254  if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
1255  false))
1256  return false;
1257 
1258  /* Also flush all data to make sure the gzip stream is finished */
1259  tar_data->zp->next_in = NULL;
1260  tar_data->zp->avail_in = 0;
1261  while (true)
1262  {
1263  int r;
1264 
1265  r = deflate(tar_data->zp, Z_FINISH);
1266 
1267  if (r == Z_STREAM_ERROR)
1268  {
1269  wwmethod->lasterrstring = _("could not compress data");
1270  return false;
1271  }
1272  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
1273  {
1274  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
1275 
1276  errno = 0;
1277  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
1278  {
1279  /*
1280  * If write didn't set errno, assume problem is no disk
1281  * space.
1282  */
1283  wwmethod->lasterrno = errno ? errno : ENOSPC;
1284  return false;
1285  }
1286  }
1287  if (r == Z_STREAM_END)
1288  break;
1289  }
1290 
1291  if (deflateEnd(tar_data->zp) != Z_OK)
1292  {
1293  wwmethod->lasterrstring = _("could not close compression stream");
1294  return false;
1295  }
1296  }
1297 #endif
1298  else
1299  {
1300  /* not reachable */
1301  Assert(false);
1302  }
1303 
1304  /* sync the empty blocks as well, since they're after the last file */
1305  if (wwmethod->sync)
1306  {
1307  if (fsync(tar_data->fd) != 0)
1308  {
1309  wwmethod->lasterrno = errno;
1310  return false;
1311  }
1312  }
1313 
1314  if (close(tar_data->fd) != 0)
1315  {
1316  wwmethod->lasterrno = errno;
1317  return false;
1318  }
1319 
1320  tar_data->fd = -1;
1321 
1322  if (wwmethod->sync)
1323  {
1324  if (fsync_fname(tar_data->tarfilename, false) != 0 ||
1325  fsync_parent_path(tar_data->tarfilename) != 0)
1326  {
1327  wwmethod->lasterrno = errno;
1328  return false;
1329  }
1330  }
1331 
1332  return true;
1333 }
1334 
1335 static void
1337 {
1338  TarMethodData *tar_data = (TarMethodData *) wwmethod;
1339 
1340  pg_free(tar_data->tarfilename);
1341 #ifdef HAVE_LIBZ
1342  if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1343  pg_free(tar_data->zlibOut);
1344 #endif
1345  pg_free(wwmethod);
1346 }
1347 
1348 /*
1349  * The argument compression_algorithm is currently ignored. It is in place for
1350  * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
1351  * between the different compression methods. CreateWalTarMethod and its family
1352  * of functions handle only zlib compression.
1353  */
1355 CreateWalTarMethod(const char *tarbase,
1357  int compression_level, bool sync)
1358 {
1359  TarMethodData *wwmethod;
1360  const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
1361  ".tar.gz" : ".tar";
1362 
1363  wwmethod = pg_malloc0(sizeof(TarMethodData));
1364  *((const WalWriteMethodOps **) &wwmethod->base.ops) =
1365  &WalTarMethodOps;
1367  wwmethod->base.compression_level = compression_level;
1368  wwmethod->base.sync = sync;
1369  clear_error(&wwmethod->base);
1370 
1371  wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1372  sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix);
1373  wwmethod->fd = -1;
1374 #ifdef HAVE_LIBZ
1376  wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
1377 #endif
1378 
1379  return &wwmethod->base;
1380 }
1381 
1382 const char *
1384 {
1385  if (wwmethod->lasterrstring)
1386  return wwmethod->lasterrstring;
1387  return strerror(wwmethod->lasterrno);
1388 }
#define Min(x, y)
Definition: c.h:1004
#define Assert(condition)
Definition: c.h:858
#define PG_BINARY
Definition: c.h:1273
pg_compress_algorithm
Definition: compression.h:22
@ PG_COMPRESSION_GZIP
Definition: compression.h:24
@ PG_COMPRESSION_LZ4
Definition: compression.h:25
@ PG_COMPRESSION_NONE
Definition: compression.h:23
#define _(x)
Definition: elog.c:90
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:782
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:756
static int fsync_parent_path(const char *fname, int elevel)
Definition: fd.c:3897
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
int pg_file_create_mode
Definition: file_perm.c:19
ssize_t pg_pwrite_zeros(int fd, size_t size, off_t offset)
Definition: file_utils.c:688
uint64 chunk
int remaining
Definition: informix.c:692
#define close(a)
Definition: win32.h:12
#define write(a, b, c)
Definition: win32.h:14
#define pg_log_error(...)
Definition: logging.h:106
#define pg_fatal(...)
static char * basedir
#define MAXPGPATH
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
static pg_compress_algorithm compression_algorithm
Definition: pg_receivewal.c:55
static char * buf
Definition: pg_test_fsync.c:73
static size_t tarPaddingBytesRequired(size_t len)
Definition: pgtar.h:79
int tarChecksum(char *header)
Definition: tar.c:90
@ TAR_OFFSET_NAME
Definition: pgtar.h:39
@ TAR_OFFSET_SIZE
Definition: pgtar.h:43
@ TAR_OFFSET_CHECKSUM
Definition: pgtar.h:45
enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime)
Definition: tar.c:114
@ TAR_OK
Definition: pgtar.h:21
#define TAR_BLOCK_SIZE
Definition: pgtar.h:17
void print_tar_number(char *s, int len, uint64 val)
Definition: tar.c:22
#define sprintf
Definition: port.h:240
#define strerror
Definition: port.h:251
#define snprintf
Definition: port.h:238
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static int fd(const char *x, int i)
Definition: preproc-init.c:105
WalWriteMethod base
Definition: walmethods.c:75
TarMethodFile * currentfile
Definition: walmethods.c:704
WalWriteMethod base
Definition: walmethods.c:701
char * tarfilename
Definition: walmethods.c:702
size_t pad_to_size
Definition: walmethods.c:696
char header[TAR_BLOCK_SIZE]
Definition: walmethods.c:695
off_t ofs_start
Definition: walmethods.c:694
Walfile base
Definition: walmethods.c:693
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:49
const char * lasterrstring
Definition: walmethods.h:109
int compression_level
Definition: walmethods.h:107
const WalWriteMethodOps * ops
Definition: walmethods.h:105
pg_compress_algorithm compression_algorithm
Definition: walmethods.h:106
WalWriteMethod * wwmethod
Definition: walmethods.h:19
char * pathname
Definition: walmethods.h:21
off_t currpos
Definition: walmethods.h:20
__int64 st_size
Definition: win32_port.h:273
char data[XLOG_BLCKSZ]
Definition: c.h:1148
static char * dir_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Definition: walmethods.c:102
static Walfile * dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:117
WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
Definition: walmethods.c:640
struct DirectoryMethodData DirectoryMethodData
static Walfile * tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:837
static ssize_t tar_write(Walfile *f, const void *buf, size_t count)
Definition: walmethods.c:765
static void tar_free(WalWriteMethod *wwmethod)
Definition: walmethods.c:1336
static const WalWriteMethodOps WalTarMethodOps
Definition: walmethods.c:679
struct TarMethodData TarMethodData
#define ZLIB_OUT_SIZE
Definition: walmethods.c:33
WalWriteMethod * CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
Definition: walmethods.c:1355
static bool dir_finish(WalWriteMethod *wwmethod)
Definition: walmethods.c:608
#define clear_error(wwmethod)
Definition: walmethods.c:98
static const WalWriteMethodOps WalDirectoryMethodOps
Definition: walmethods.c:58
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
Definition: walmethods.c:1383
struct DirectoryMethodFile DirectoryMethodFile
static void dir_free(WalWriteMethod *wwmethod)
Definition: walmethods.c:630
static int dir_sync(Walfile *f)
Definition: walmethods.c:514
static int tar_sync(Walfile *f)
Definition: walmethods.c:1017
static bool tar_finish(WalWriteMethod *wwmethod)
Definition: walmethods.c:1227
static int dir_close(Walfile *f, WalCloseMethod method)
Definition: walmethods.c:385
static ssize_t dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.c:565
static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.c:1219
static ssize_t tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.c:1007
struct TarMethodFile TarMethodFile
static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.c:584
static int tar_close(Walfile *f, WalCloseMethod method)
Definition: walmethods.c:1042
static ssize_t dir_write(Walfile *f, const void *buf, size_t count)
Definition: walmethods.c:304
static bool tar_write_padding_data(TarMethodFile *f, size_t bytes)
Definition: walmethods.c:805
static char * tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Definition: walmethods.c:825
#define LZ4_IN_SIZE
Definition: walmethods.c:36
WalCloseMethod
Definition: walmethods.h:32
@ CLOSE_UNLINK
Definition: walmethods.h:34
@ CLOSE_NORMAL
Definition: walmethods.h:33
#define fsync(fd)
Definition: win32_port.h:85
#define stat
Definition: win32_port.h:284
#define S_IRUSR
Definition: win32_port.h:289
#define ftruncate(a, b)
Definition: win32_port.h:82
#define S_IWUSR
Definition: win32_port.h:292