PostgreSQL Source Code  git master
walmethods.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walmethods.c - implementations of different ways to write received wal
4  *
5  * NOTE! The caller must ensure that only one method is instantiated in
6  * any given program, and that it's only instantiated once!
7  *
8  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/walmethods.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <time.h>
19 #include <unistd.h>
20 
21 #ifdef USE_LZ4
22 #include <lz4frame.h>
23 #endif
24 #ifdef HAVE_LIBZ
25 #include <zlib.h>
26 #endif
27 
28 #include "common/file_perm.h"
29 #include "common/file_utils.h"
30 #include "common/logging.h"
31 #include "pgtar.h"
32 #include "receivelog.h"
33 #include "streamutil.h"
34 
35 /* Size of zlib buffer for .tar.gz */
36 #define ZLIB_OUT_SIZE 4096
37 
38 /* Size of LZ4 input chunk for .lz4 */
39 #define LZ4_IN_SIZE 4096
40 
41 /*-------------------------------------------------------------------------
42  * WalDirectoryMethod - write wal to a directory looking like pg_wal
43  *-------------------------------------------------------------------------
44  */
45 
46 /*
47  * Global static data for this method
48  */
49 typedef struct DirectoryMethodData
50 {
51  char *basedir;
54  bool sync;
55  const char *lasterrstring; /* if set, takes precedence over lasterrno */
56  int lasterrno;
59 
60 /*
61  * Local file handle
62  */
63 typedef struct DirectoryMethodFile
64 {
65  int fd;
66  off_t currpos;
67  char *pathname;
68  char *fullpath;
69  char *temp_suffix;
70 #ifdef HAVE_LIBZ
71  gzFile gzfp;
72 #endif
73 #ifdef USE_LZ4
74  LZ4F_compressionContext_t ctx;
75  size_t lz4bufsize;
76  void *lz4buf;
77 #endif
79 
80 #define dir_clear_error() \
81  (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0)
82 #define dir_set_error(msg) \
83  (dir_data->lasterrstring = _(msg))
84 
85 static const char *
87 {
89  return dir_data->lasterrstring;
90  return strerror(dir_data->lasterrno);
91 }
92 
93 static char *
94 dir_get_file_name(const char *pathname, const char *temp_suffix)
95 {
96  char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
97 
98  snprintf(filename, MAXPGPATH, "%s%s%s",
99  pathname,
102  temp_suffix ? temp_suffix : "");
103 
104  return filename;
105 }
106 
107 static Walfile
108 dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
109 {
110  char tmppath[MAXPGPATH];
111  char *filename;
112  int fd;
114 #ifdef HAVE_LIBZ
115  gzFile gzfp = NULL;
116 #endif
117 #ifdef USE_LZ4
118  LZ4F_compressionContext_t ctx = NULL;
119  size_t lz4bufsize = 0;
120  void *lz4buf = NULL;
121 #endif
122 
123  dir_clear_error();
124 
125  filename = dir_get_file_name(pathname, temp_suffix);
126  snprintf(tmppath, sizeof(tmppath), "%s/%s",
128  pg_free(filename);
129 
130  /*
131  * Open a file for non-compressed as well as compressed files. Tracking
132  * the file descriptor is important for dir_sync() method as gzflush()
133  * does not do any system calls to fsync() to make changes permanent on
134  * disk.
135  */
136  fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
137  if (fd < 0)
138  {
139  dir_data->lasterrno = errno;
140  return NULL;
141  }
142 
143 #ifdef HAVE_LIBZ
145  {
146  gzfp = gzdopen(fd, "wb");
147  if (gzfp == NULL)
148  {
149  dir_data->lasterrno = errno;
150  close(fd);
151  return NULL;
152  }
153 
154  if (gzsetparams(gzfp, dir_data->compression_level,
155  Z_DEFAULT_STRATEGY) != Z_OK)
156  {
157  dir_data->lasterrno = errno;
158  gzclose(gzfp);
159  return NULL;
160  }
161  }
162 #endif
163 #ifdef USE_LZ4
165  {
166  size_t ctx_out;
167  size_t header_size;
168  LZ4F_preferences_t prefs;
169 
170  ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
171  if (LZ4F_isError(ctx_out))
172  {
173  dir_data->lasterrstring = LZ4F_getErrorName(ctx_out);
174  close(fd);
175  return NULL;
176  }
177 
178  lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
179  lz4buf = pg_malloc0(lz4bufsize);
180 
181  /* assign the compression level, default is 0 */
182  memset(&prefs, 0, sizeof(prefs));
183  prefs.compressionLevel = dir_data->compression_level;
184 
185  /* add the header */
186  header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
187  if (LZ4F_isError(header_size))
188  {
189  dir_data->lasterrstring = LZ4F_getErrorName(header_size);
190  (void) LZ4F_freeCompressionContext(ctx);
191  pg_free(lz4buf);
192  close(fd);
193  return NULL;
194  }
195 
196  errno = 0;
197  if (write(fd, lz4buf, header_size) != header_size)
198  {
199  /* If write didn't set errno, assume problem is no disk space */
200  dir_data->lasterrno = errno ? errno : ENOSPC;
201  (void) LZ4F_freeCompressionContext(ctx);
202  pg_free(lz4buf);
203  close(fd);
204  return NULL;
205  }
206  }
207 #endif
208 
209  /* Do pre-padding on non-compressed files */
210  if (pad_to_size && dir_data->compression_algorithm == PG_COMPRESSION_NONE)
211  {
212  PGAlignedXLogBlock zerobuf;
213  int bytes;
214 
215  memset(zerobuf.data, 0, XLOG_BLCKSZ);
216  for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
217  {
218  errno = 0;
219  if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
220  {
221  /* If write didn't set errno, assume problem is no disk space */
222  dir_data->lasterrno = errno ? errno : ENOSPC;
223  close(fd);
224  return NULL;
225  }
226  }
227 
228  if (lseek(fd, 0, SEEK_SET) != 0)
229  {
230  dir_data->lasterrno = errno;
231  close(fd);
232  return NULL;
233  }
234  }
235 
236  /*
237  * fsync WAL file and containing directory, to ensure the file is
238  * persistently created and zeroed (if padded). That's particularly
239  * important when using synchronous mode, where the file is modified and
240  * fsynced in-place, without a directory fsync.
241  */
242  if (dir_data->sync)
243  {
244  if (fsync_fname(tmppath, false) != 0 ||
245  fsync_parent_path(tmppath) != 0)
246  {
247  dir_data->lasterrno = errno;
248 #ifdef HAVE_LIBZ
250  gzclose(gzfp);
251  else
252 #endif
253 #ifdef USE_LZ4
255  {
256  (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
257  (void) LZ4F_freeCompressionContext(ctx);
258  pg_free(lz4buf);
259  close(fd);
260  }
261  else
262 #endif
263  close(fd);
264  return NULL;
265  }
266  }
267 
268  f = pg_malloc0(sizeof(DirectoryMethodFile));
269 #ifdef HAVE_LIBZ
271  f->gzfp = gzfp;
272 #endif
273 #ifdef USE_LZ4
275  {
276  f->ctx = ctx;
277  f->lz4buf = lz4buf;
278  f->lz4bufsize = lz4bufsize;
279  }
280 #endif
281 
282  f->fd = fd;
283  f->currpos = 0;
284  f->pathname = pg_strdup(pathname);
285  f->fullpath = pg_strdup(tmppath);
286  if (temp_suffix)
287  f->temp_suffix = pg_strdup(temp_suffix);
288 
289  return f;
290 }
291 
292 static ssize_t
293 dir_write(Walfile f, const void *buf, size_t count)
294 {
295  ssize_t r;
297 
298  Assert(f != NULL);
299  dir_clear_error();
300 
301 #ifdef HAVE_LIBZ
303  {
304  errno = 0;
305  r = (ssize_t) gzwrite(df->gzfp, buf, count);
306  if (r != count)
307  {
308  /* If write didn't set errno, assume problem is no disk space */
309  dir_data->lasterrno = errno ? errno : ENOSPC;
310  }
311  }
312  else
313 #endif
314 #ifdef USE_LZ4
316  {
317  size_t chunk;
318  size_t remaining;
319  const void *inbuf = buf;
320 
321  remaining = count;
322  while (remaining > 0)
323  {
324  size_t compressed;
325 
326  if (remaining > LZ4_IN_SIZE)
327  chunk = LZ4_IN_SIZE;
328  else
329  chunk = remaining;
330 
331  remaining -= chunk;
332  compressed = LZ4F_compressUpdate(df->ctx,
333  df->lz4buf, df->lz4bufsize,
334  inbuf, chunk,
335  NULL);
336 
337  if (LZ4F_isError(compressed))
338  {
339  dir_data->lasterrstring = LZ4F_getErrorName(compressed);
340  return -1;
341  }
342 
343  errno = 0;
344  if (write(df->fd, df->lz4buf, compressed) != compressed)
345  {
346  /* If write didn't set errno, assume problem is no disk space */
347  dir_data->lasterrno = errno ? errno : ENOSPC;
348  return -1;
349  }
350 
351  inbuf = ((char *) inbuf) + chunk;
352  }
353 
354  /* Our caller keeps track of the uncompressed size. */
355  r = (ssize_t) count;
356  }
357  else
358 #endif
359  {
360  errno = 0;
361  r = write(df->fd, buf, count);
362  if (r != count)
363  {
364  /* If write didn't set errno, assume problem is no disk space */
365  dir_data->lasterrno = errno ? errno : ENOSPC;
366  }
367  }
368  if (r > 0)
369  df->currpos += r;
370  return r;
371 }
372 
373 static off_t
375 {
376  Assert(f != NULL);
377  dir_clear_error();
378 
379  /* Use a cached value to prevent lots of reseeks */
380  return ((DirectoryMethodFile *) f)->currpos;
381 }
382 
383 static int
385 {
386  int r;
388  char tmppath[MAXPGPATH];
389  char tmppath2[MAXPGPATH];
390 
391  Assert(f != NULL);
392  dir_clear_error();
393 
394 #ifdef HAVE_LIBZ
396  {
397  errno = 0; /* in case gzclose() doesn't set it */
398  r = gzclose(df->gzfp);
399  }
400  else
401 #endif
402 #ifdef USE_LZ4
404  {
405  size_t compressed;
406 
407  compressed = LZ4F_compressEnd(df->ctx,
408  df->lz4buf, df->lz4bufsize,
409  NULL);
410 
411  if (LZ4F_isError(compressed))
412  {
413  dir_data->lasterrstring = LZ4F_getErrorName(compressed);
414  return -1;
415  }
416 
417  errno = 0;
418  if (write(df->fd, df->lz4buf, compressed) != compressed)
419  {
420  /* If write didn't set errno, assume problem is no disk space */
421  dir_data->lasterrno = errno ? errno : ENOSPC;
422  return -1;
423  }
424 
425  r = close(df->fd);
426  }
427  else
428 #endif
429  r = close(df->fd);
430 
431  if (r == 0)
432  {
433  /* Build path to the current version of the file */
434  if (method == CLOSE_NORMAL && df->temp_suffix)
435  {
436  char *filename;
437  char *filename2;
438 
439  /*
440  * If we have a temp prefix, normal operation is to rename the
441  * file.
442  */
444  snprintf(tmppath, sizeof(tmppath), "%s/%s",
446  pg_free(filename);
447 
448  /* permanent name, so no need for the prefix */
449  filename2 = dir_get_file_name(df->pathname, NULL);
450  snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
451  dir_data->basedir, filename2);
452  pg_free(filename2);
453  if (dir_data->sync)
454  r = durable_rename(tmppath, tmppath2);
455  else
456  {
457  if (rename(tmppath, tmppath2) != 0)
458  {
459  pg_log_error("could not rename file \"%s\" to \"%s\": %m",
460  tmppath, tmppath2);
461  r = -1;
462  }
463  }
464  }
465  else if (method == CLOSE_UNLINK)
466  {
467  char *filename;
468 
469  /* Unlink the file once it's closed */
471  snprintf(tmppath, sizeof(tmppath), "%s/%s",
473  pg_free(filename);
474  r = unlink(tmppath);
475  }
476  else
477  {
478  /*
479  * Else either CLOSE_NORMAL and no temp suffix, or
480  * CLOSE_NO_RENAME. In this case, fsync the file and containing
481  * directory if sync mode is requested.
482  */
483  if (dir_data->sync)
484  {
485  r = fsync_fname(df->fullpath, false);
486  if (r == 0)
487  r = fsync_parent_path(df->fullpath);
488  }
489  }
490  }
491 
492  if (r != 0)
493  dir_data->lasterrno = errno;
494 
495 #ifdef USE_LZ4
496  pg_free(df->lz4buf);
497  /* supports free on NULL */
498  LZ4F_freeCompressionContext(df->ctx);
499 #endif
500 
501  pg_free(df->pathname);
502  pg_free(df->fullpath);
503  if (df->temp_suffix)
504  pg_free(df->temp_suffix);
505  pg_free(df);
506 
507  return r;
508 }
509 
510 static int
512 {
513  int r;
514 
515  Assert(f != NULL);
516  dir_clear_error();
517 
518  if (!dir_data->sync)
519  return 0;
520 
521 #ifdef HAVE_LIBZ
523  {
524  if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
525  {
526  dir_data->lasterrno = errno;
527  return -1;
528  }
529  }
530 #endif
531 #ifdef USE_LZ4
533  {
535  size_t compressed;
536 
537  /* Flush any internal buffers */
538  compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
539  if (LZ4F_isError(compressed))
540  {
541  dir_data->lasterrstring = LZ4F_getErrorName(compressed);
542  return -1;
543  }
544 
545  errno = 0;
546  if (write(df->fd, df->lz4buf, compressed) != compressed)
547  {
548  /* If write didn't set errno, assume problem is no disk space */
549  dir_data->lasterrno = errno ? errno : ENOSPC;
550  return -1;
551  }
552  }
553 #endif
554 
555  r = fsync(((DirectoryMethodFile *) f)->fd);
556  if (r < 0)
557  dir_data->lasterrno = errno;
558  return r;
559 }
560 
561 static ssize_t
562 dir_get_file_size(const char *pathname)
563 {
564  struct stat statbuf;
565  char tmppath[MAXPGPATH];
566 
567  snprintf(tmppath, sizeof(tmppath), "%s/%s",
568  dir_data->basedir, pathname);
569 
570  if (stat(tmppath, &statbuf) != 0)
571  {
572  dir_data->lasterrno = errno;
573  return -1;
574  }
575 
576  return statbuf.st_size;
577 }
578 
581 {
583 }
584 
585 static bool
586 dir_existsfile(const char *pathname)
587 {
588  char tmppath[MAXPGPATH];
589  int fd;
590 
591  dir_clear_error();
592 
593  snprintf(tmppath, sizeof(tmppath), "%s/%s",
594  dir_data->basedir, pathname);
595 
596  fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
597  if (fd < 0)
598  return false;
599  close(fd);
600  return true;
601 }
602 
603 static bool
605 {
606  dir_clear_error();
607 
608  if (dir_data->sync)
609  {
610  /*
611  * Files are fsynced when they are closed, but we need to fsync the
612  * directory entry here as well.
613  */
614  if (fsync_fname(dir_data->basedir, true) != 0)
615  {
616  dir_data->lasterrno = errno;
617  return false;
618  }
619  }
620  return true;
621 }
622 
623 
627  int compression_level, bool sync)
628 {
629  WalWriteMethod *method;
630 
631  method = pg_malloc0(sizeof(WalWriteMethod));
633  method->write = dir_write;
638  method->close = dir_close;
639  method->sync = dir_sync;
640  method->existsfile = dir_existsfile;
641  method->finish = dir_finish;
642  method->getlasterror = dir_getlasterror;
643 
646  dir_data->compression_level = compression_level;
648  dir_data->sync = sync;
649 
650  return method;
651 }
652 
653 void
655 {
657  pg_free(dir_data);
658  dir_data = NULL;
659 }
660 
661 
662 /*-------------------------------------------------------------------------
663  * WalTarMethod - write wal to a tar file containing pg_wal contents
664  *-------------------------------------------------------------------------
665  */
666 
667 typedef struct TarMethodFile
668 {
669  off_t ofs_start; /* Where does the *header* for this file start */
670  off_t currpos;
672  char *pathname;
673  size_t pad_to_size;
675 
676 typedef struct TarMethodData
677 {
678  char *tarfilename;
679  int fd;
682  bool sync;
684  const char *lasterrstring; /* if set, takes precedence over lasterrno */
686 #ifdef HAVE_LIBZ
687  z_streamp zp;
688  void *zlibOut;
689 #endif
691 static TarMethodData *tar_data = NULL;
692 
693 #define tar_clear_error() \
694  (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0)
695 #define tar_set_error(msg) \
696  (tar_data->lasterrstring = _(msg))
697 
698 static const char *
700 {
701  if (tar_data->lasterrstring)
702  return tar_data->lasterrstring;
703  return strerror(tar_data->lasterrno);
704 }
705 
706 #ifdef HAVE_LIBZ
707 static bool
708 tar_write_compressed_data(void *buf, size_t count, bool flush)
709 {
710  tar_data->zp->next_in = buf;
711  tar_data->zp->avail_in = count;
712 
713  while (tar_data->zp->avail_in || flush)
714  {
715  int r;
716 
717  r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
718  if (r == Z_STREAM_ERROR)
719  {
720  tar_set_error("could not compress data");
721  return false;
722  }
723 
724  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
725  {
726  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
727 
728  errno = 0;
729  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
730  {
731  /* If write didn't set errno, assume problem is no disk space */
732  tar_data->lasterrno = errno ? errno : ENOSPC;
733  return false;
734  }
735 
736  tar_data->zp->next_out = tar_data->zlibOut;
737  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
738  }
739 
740  if (r == Z_STREAM_END)
741  break;
742  }
743 
744  if (flush)
745  {
746  /* Reset the stream for writing */
747  if (deflateReset(tar_data->zp) != Z_OK)
748  {
749  tar_set_error("could not reset compression stream");
750  return false;
751  }
752  }
753 
754  return true;
755 }
756 #endif
757 
758 static ssize_t
759 tar_write(Walfile f, const void *buf, size_t count)
760 {
761  ssize_t r;
762 
763  Assert(f != NULL);
764  tar_clear_error();
765 
766  /* Tarfile will always be positioned at the end */
768  {
769  errno = 0;
770  r = write(tar_data->fd, buf, count);
771  if (r != count)
772  {
773  /* If write didn't set errno, assume problem is no disk space */
774  tar_data->lasterrno = errno ? errno : ENOSPC;
775  return -1;
776  }
777  ((TarMethodFile *) f)->currpos += r;
778  return r;
779  }
780 #ifdef HAVE_LIBZ
782  {
783  if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
784  return -1;
785  ((TarMethodFile *) f)->currpos += count;
786  return count;
787  }
788 #endif
789  else
790  {
791  /* Can't happen - compression enabled with no method set */
792  tar_data->lasterrno = ENOSYS;
793  return -1;
794  }
795 }
796 
797 static bool
799 {
800  PGAlignedXLogBlock zerobuf;
801  size_t bytesleft = bytes;
802 
803  memset(zerobuf.data, 0, XLOG_BLCKSZ);
804  while (bytesleft)
805  {
806  size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
807  ssize_t r = tar_write(f, zerobuf.data, bytestowrite);
808 
809  if (r < 0)
810  return false;
811  bytesleft -= r;
812  }
813 
814  return true;
815 }
816 
817 static char *
818 tar_get_file_name(const char *pathname, const char *temp_suffix)
819 {
820  char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
821 
822  snprintf(filename, MAXPGPATH, "%s%s",
823  pathname, temp_suffix ? temp_suffix : "");
824 
825  return filename;
826 }
827 
828 static Walfile
829 tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
830 {
831  char *tmppath;
832 
833  tar_clear_error();
834 
835  if (tar_data->fd < 0)
836  {
837  /*
838  * We open the tar file only when we first try to write to it.
839  */
840  tar_data->fd = open(tar_data->tarfilename,
841  O_WRONLY | O_CREAT | PG_BINARY,
843  if (tar_data->fd < 0)
844  {
845  tar_data->lasterrno = errno;
846  return NULL;
847  }
848 
849 #ifdef HAVE_LIBZ
851  {
852  tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
853  tar_data->zp->zalloc = Z_NULL;
854  tar_data->zp->zfree = Z_NULL;
855  tar_data->zp->opaque = Z_NULL;
856  tar_data->zp->next_out = tar_data->zlibOut;
857  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
858 
859  /*
860  * Initialize deflation library. Adding the magic value 16 to the
861  * default 15 for the windowBits parameter makes the output be
862  * gzip instead of zlib.
863  */
864  if (deflateInit2(tar_data->zp, tar_data->compression_level,
865  Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
866  {
867  pg_free(tar_data->zp);
868  tar_data->zp = NULL;
869  tar_set_error("could not initialize compression library");
870  return NULL;
871  }
872  }
873 #endif
874 
875  /* There's no tar header itself, the file starts with regular files */
876  }
877 
878  if (tar_data->currentfile != NULL)
879  {
880  tar_set_error("implementation error: tar files can't have more than one open file");
881  return NULL;
882  }
883 
885 
886  tmppath = tar_get_file_name(pathname, temp_suffix);
887 
888  /* Create a header with size set to 0 - we will fill out the size on close */
889  if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
890  {
892  pg_free(tmppath);
893  tar_data->currentfile = NULL;
894  tar_set_error("could not create tar header");
895  return NULL;
896  }
897 
898  pg_free(tmppath);
899 
900 #ifdef HAVE_LIBZ
902  {
903  /* Flush existing data */
904  if (!tar_write_compressed_data(NULL, 0, true))
905  return NULL;
906 
907  /* Turn off compression for header */
908  if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
909  {
910  tar_set_error("could not change compression parameters");
911  return NULL;
912  }
913  }
914 #endif
915 
916  tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
917  if (tar_data->currentfile->ofs_start == -1)
918  {
919  tar_data->lasterrno = errno;
921  tar_data->currentfile = NULL;
922  return NULL;
923  }
925 
927  {
928  errno = 0;
931  {
932  /* If write didn't set errno, assume problem is no disk space */
933  tar_data->lasterrno = errno ? errno : ENOSPC;
935  tar_data->currentfile = NULL;
936  return NULL;
937  }
938  }
939 #ifdef HAVE_LIBZ
941  {
942  /* Write header through the zlib APIs but with no compression */
943  if (!tar_write_compressed_data(tar_data->currentfile->header,
944  TAR_BLOCK_SIZE, true))
945  return NULL;
946 
947  /* Re-enable compression for the rest of the file */
948  if (deflateParams(tar_data->zp, tar_data->compression_level, 0) != Z_OK)
949  {
950  tar_set_error("could not change compression parameters");
951  return NULL;
952  }
953  }
954 #endif
955  else
956  {
957  /* not reachable */
958  Assert(false);
959  }
960 
961  tar_data->currentfile->pathname = pg_strdup(pathname);
962 
963  /*
964  * Uncompressed files are padded on creation, but for compression we can't
965  * do that
966  */
967  if (pad_to_size)
968  {
969  tar_data->currentfile->pad_to_size = pad_to_size;
971  {
972  /* Uncompressed, so pad now */
973  if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
974  return NULL;
975  /* Seek back to start */
976  if (lseek(tar_data->fd,
978  SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
979  {
980  tar_data->lasterrno = errno;
981  return NULL;
982  }
983 
985  }
986  }
987 
988  return tar_data->currentfile;
989 }
990 
991 static ssize_t
992 tar_get_file_size(const char *pathname)
993 {
994  tar_clear_error();
995 
996  /* Currently not used, so not supported */
997  tar_data->lasterrno = ENOSYS;
998  return -1;
999 }
1000 
1001 static pg_compress_algorithm
1003 {
1005 }
1006 
1007 static off_t
1009 {
1010  Assert(f != NULL);
1011  tar_clear_error();
1012 
1013  return ((TarMethodFile *) f)->currpos;
1014 }
1015 
1016 static int
1018 {
1019  int r;
1020 
1021  Assert(f != NULL);
1022  tar_clear_error();
1023 
1024  if (!tar_data->sync)
1025  return 0;
1026 
1027  /*
1028  * Always sync the whole tarfile, because that's all we can do. This makes
1029  * no sense on compressed files, so just ignore those.
1030  */
1032  return 0;
1033 
1034  r = fsync(tar_data->fd);
1035  if (r < 0)
1036  tar_data->lasterrno = errno;
1037  return r;
1038 }
1039 
1040 static int
1042 {
1043  ssize_t filesize;
1044  int padding;
1045  TarMethodFile *tf = (TarMethodFile *) f;
1046 
1047  Assert(f != NULL);
1048  tar_clear_error();
1049 
1050  if (method == CLOSE_UNLINK)
1051  {
1053  {
1054  tar_set_error("unlink not supported with compression");
1055  return -1;
1056  }
1057 
1058  /*
1059  * Unlink the file that we just wrote to the tar. We do this by
1060  * truncating it to the start of the header. This is safe as we only
1061  * allow writing of the very last file.
1062  */
1063  if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
1064  {
1065  tar_data->lasterrno = errno;
1066  return -1;
1067  }
1068 
1069  pg_free(tf->pathname);
1070  pg_free(tf);
1071  tar_data->currentfile = NULL;
1072 
1073  return 0;
1074  }
1075 
1076  /*
1077  * Pad the file itself with zeroes if necessary. Note that this is
1078  * different from the tar format padding -- this is the padding we asked
1079  * for when the file was opened.
1080  */
1081  if (tf->pad_to_size)
1082  {
1084  {
1085  /*
1086  * A compressed tarfile is padded on close since we cannot know
1087  * the size of the compressed output until the end.
1088  */
1089  size_t sizeleft = tf->pad_to_size - tf->currpos;
1090 
1091  if (sizeleft)
1092  {
1093  if (!tar_write_padding_data(tf, sizeleft))
1094  return -1;
1095  }
1096  }
1097  else
1098  {
1099  /*
1100  * An uncompressed tarfile was padded on creation, so just adjust
1101  * the current position as if we seeked to the end.
1102  */
1103  tf->currpos = tf->pad_to_size;
1104  }
1105  }
1106 
1107  /*
1108  * Get the size of the file, and pad out to a multiple of the tar block
1109  * size.
1110  */
1111  filesize = tar_get_current_pos(f);
1112  padding = tarPaddingBytesRequired(filesize);
1113  if (padding)
1114  {
1115  char zerobuf[TAR_BLOCK_SIZE];
1116 
1117  MemSet(zerobuf, 0, padding);
1118  if (tar_write(f, zerobuf, padding) != padding)
1119  return -1;
1120  }
1121 
1122 
1123 #ifdef HAVE_LIBZ
1125  {
1126  /* Flush the current buffer */
1127  if (!tar_write_compressed_data(NULL, 0, true))
1128  return -1;
1129  }
1130 #endif
1131 
1132  /*
1133  * Now go back and update the header with the correct filesize and
1134  * possibly also renaming the file. We overwrite the entire current header
1135  * when done, including the checksum.
1136  */
1137  print_tar_number(&(tf->header[124]), 12, filesize);
1138 
1139  if (method == CLOSE_NORMAL)
1140 
1141  /*
1142  * We overwrite it with what it was before if we have no tempname,
1143  * since we're going to write the buffer anyway.
1144  */
1145  strlcpy(&(tf->header[0]), tf->pathname, 100);
1146 
1147  print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
1148  if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
1149  {
1150  tar_data->lasterrno = errno;
1151  return -1;
1152  }
1154  {
1155  errno = 0;
1157  {
1158  /* If write didn't set errno, assume problem is no disk space */
1159  tar_data->lasterrno = errno ? errno : ENOSPC;
1160  return -1;
1161  }
1162  }
1163 #ifdef HAVE_LIBZ
1165  {
1166  /* Turn off compression */
1167  if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
1168  {
1169  tar_set_error("could not change compression parameters");
1170  return -1;
1171  }
1172 
1173  /* Overwrite the header, assuming the size will be the same */
1174  if (!tar_write_compressed_data(tar_data->currentfile->header,
1175  TAR_BLOCK_SIZE, true))
1176  return -1;
1177 
1178  /* Turn compression back on */
1179  if (deflateParams(tar_data->zp, tar_data->compression_level, 0) != Z_OK)
1180  {
1181  tar_set_error("could not change compression parameters");
1182  return -1;
1183  }
1184  }
1185 #endif
1186  else
1187  {
1188  /* not reachable */
1189  Assert(false);
1190  }
1191 
1192  /* Move file pointer back down to end, so we can write the next file */
1193  if (lseek(tar_data->fd, 0, SEEK_END) < 0)
1194  {
1195  tar_data->lasterrno = errno;
1196  return -1;
1197  }
1198 
1199  /* Always fsync on close, so the padding gets fsynced */
1200  if (tar_sync(f) < 0)
1201  {
1202  /* XXX this seems pretty bogus; why is only this case fatal? */
1203  pg_fatal("could not fsync file \"%s\": %s",
1204  tf->pathname, tar_getlasterror());
1205  }
1206 
1207  /* Clean up and done */
1208  pg_free(tf->pathname);
1209  pg_free(tf);
1210  tar_data->currentfile = NULL;
1211 
1212  return 0;
1213 }
1214 
1215 static bool
1216 tar_existsfile(const char *pathname)
1217 {
1218  tar_clear_error();
1219  /* We only deal with new tarfiles, so nothing externally created exists */
1220  return false;
1221 }
1222 
1223 static bool
1225 {
1226  char zerobuf[1024];
1227 
1228  tar_clear_error();
1229 
1230  if (tar_data->currentfile)
1231  {
1233  return false;
1234  }
1235 
1236  /* A tarfile always ends with two empty blocks */
1237  MemSet(zerobuf, 0, sizeof(zerobuf));
1239  {
1240  errno = 0;
1241  if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
1242  {
1243  /* If write didn't set errno, assume problem is no disk space */
1244  tar_data->lasterrno = errno ? errno : ENOSPC;
1245  return false;
1246  }
1247  }
1248 #ifdef HAVE_LIBZ
1250  {
1251  if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
1252  return false;
1253 
1254  /* Also flush all data to make sure the gzip stream is finished */
1255  tar_data->zp->next_in = NULL;
1256  tar_data->zp->avail_in = 0;
1257  while (true)
1258  {
1259  int r;
1260 
1261  r = deflate(tar_data->zp, Z_FINISH);
1262 
1263  if (r == Z_STREAM_ERROR)
1264  {
1265  tar_set_error("could not compress data");
1266  return false;
1267  }
1268  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
1269  {
1270  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
1271 
1272  errno = 0;
1273  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
1274  {
1275  /*
1276  * If write didn't set errno, assume problem is no disk
1277  * space.
1278  */
1279  tar_data->lasterrno = errno ? errno : ENOSPC;
1280  return false;
1281  }
1282  }
1283  if (r == Z_STREAM_END)
1284  break;
1285  }
1286 
1287  if (deflateEnd(tar_data->zp) != Z_OK)
1288  {
1289  tar_set_error("could not close compression stream");
1290  return false;
1291  }
1292  }
1293 #endif
1294  else
1295  {
1296  /* not reachable */
1297  Assert(false);
1298  }
1299 
1300  /* sync the empty blocks as well, since they're after the last file */
1301  if (tar_data->sync)
1302  {
1303  if (fsync(tar_data->fd) != 0)
1304  {
1305  tar_data->lasterrno = errno;
1306  return false;
1307  }
1308  }
1309 
1310  if (close(tar_data->fd) != 0)
1311  {
1312  tar_data->lasterrno = errno;
1313  return false;
1314  }
1315 
1316  tar_data->fd = -1;
1317 
1318  if (tar_data->sync)
1319  {
1320  if (fsync_fname(tar_data->tarfilename, false) != 0 ||
1322  {
1323  tar_data->lasterrno = errno;
1324  return false;
1325  }
1326  }
1327 
1328  return true;
1329 }
1330 
1331 /*
1332  * The argument compression_algorithm is currently ignored. It is in place for
1333  * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
1334  * between the different compression methods. CreateWalTarMethod and its family
1335  * of functions handle only zlib compression.
1336  */
1338 CreateWalTarMethod(const char *tarbase,
1340  int compression_level, bool sync)
1341 {
1342  WalWriteMethod *method;
1343  const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
1344  ".tar.gz" : ".tar";
1345 
1346  method = pg_malloc0(sizeof(WalWriteMethod));
1348  method->write = tar_write;
1350  method->get_file_size = tar_get_file_size;
1351  method->get_file_name = tar_get_file_name;
1353  method->close = tar_close;
1354  method->sync = tar_sync;
1355  method->existsfile = tar_existsfile;
1356  method->finish = tar_finish;
1357  method->getlasterror = tar_getlasterror;
1358 
1359  tar_data = pg_malloc0(sizeof(TarMethodData));
1360  tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1361  sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
1362  tar_data->fd = -1;
1364  tar_data->compression_level = compression_level;
1365  tar_data->sync = sync;
1366 #ifdef HAVE_LIBZ
1368  tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
1369 #endif
1370 
1371  return method;
1372 }
1373 
1374 void
1376 {
1378 #ifdef HAVE_LIBZ
1380  pg_free(tar_data->zlibOut);
1381 #endif
1382  pg_free(tar_data);
1383  tar_data = NULL;
1384 }
#define unconstify(underlying_type, expr)
Definition: c.h:1240
#define Min(x, y)
Definition: c.h:986
#define PG_BINARY
Definition: c.h:1268
#define MemSet(start, val, len)
Definition: c.h:1008
pg_compress_algorithm
Definition: compression.h:18
@ PG_COMPRESSION_GZIP
Definition: compression.h:20
@ PG_COMPRESSION_LZ4
Definition: compression.h:21
@ PG_COMPRESSION_NONE
Definition: compression.h:19
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:699
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:673
static int fsync_parent_path(const char *fname, int elevel)
Definition: fd.c:3763
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
int pg_file_create_mode
Definition: file_perm.c:19
int remaining
Definition: informix.c:667
#define close(a)
Definition: win32.h:12
#define write(a, b, c)
Definition: win32.h:14
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
Definition: logging.h:106
z_stream * z_streamp
#define pg_fatal(...)
static char * basedir
#define MAXPGPATH
const void size_t len
static char * filename
Definition: pg_dumpall.c:94
static pg_compress_algorithm compression_algorithm
Definition: pg_receivewal.c:55
static void header(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:212
static char * buf
Definition: pg_test_fsync.c:67
static size_t tarPaddingBytesRequired(size_t len)
Definition: pgtar.h:40
int tarChecksum(char *header)
Definition: tar.c:90
enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime)
Definition: tar.c:114
@ TAR_OK
Definition: pgtar.h:21
#define TAR_BLOCK_SIZE
Definition: pgtar.h:17
void print_tar_number(char *s, int len, uint64 val)
Definition: tar.c:22
#define sprintf
Definition: port.h:227
#define strerror
Definition: port.h:238
#define snprintf
Definition: port.h:225
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static int fd(const char *x, int i)
Definition: preproc-init.c:105
const char * lasterrstring
Definition: walmethods.c:55
pg_compress_algorithm compression_algorithm
Definition: walmethods.c:52
int compression_level
Definition: walmethods.c:681
pg_compress_algorithm compression_algorithm
Definition: walmethods.c:680
TarMethodFile * currentfile
Definition: walmethods.c:683
const char * lasterrstring
Definition: walmethods.c:684
char * tarfilename
Definition: walmethods.c:678
char * pathname
Definition: walmethods.c:672
size_t pad_to_size
Definition: walmethods.c:673
char header[TAR_BLOCK_SIZE]
Definition: walmethods.c:671
off_t ofs_start
Definition: walmethods.c:669
const char *(* getlasterror)(void)
Definition: walmethods.h:88
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:72
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:54
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:42
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:51
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:48
char *(* get_file_name)(const char *pathname, const char *temp_suffix)
Definition: walmethods.h:60
int(* sync)(Walfile f)
Definition: walmethods.h:77
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:69
pg_compress_algorithm(* compression_algorithm)(void)
Definition: walmethods.h:63
bool(* finish)(void)
Definition: walmethods.h:85
__int64 st_size
Definition: win32_port.h:273
char data[XLOG_BLCKSZ]
Definition: c.h:1146
static bool dir_existsfile(const char *pathname)
Definition: walmethods.c:586
static bool tar_finish(void)
Definition: walmethods.c:1224
static Walfile dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:108
WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
Definition: walmethods.c:625
static int tar_sync(Walfile f)
Definition: walmethods.c:1017
struct DirectoryMethodData DirectoryMethodData
#define dir_clear_error()
Definition: walmethods.c:80
static const char * dir_getlasterror(void)
Definition: walmethods.c:86
void FreeWalTarMethod(void)
Definition: walmethods.c:1375
#define tar_clear_error()
Definition: walmethods.c:693
static ssize_t dir_write(Walfile f, const void *buf, size_t count)
Definition: walmethods.c:293
static bool dir_finish(void)
Definition: walmethods.c:604
static pg_compress_algorithm tar_compression_algorithm(void)
Definition: walmethods.c:1002
static char * dir_get_file_name(const char *pathname, const char *temp_suffix)
Definition: walmethods.c:94
static Walfile tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:829
static const char * tar_getlasterror(void)
Definition: walmethods.c:699
static ssize_t dir_get_file_size(const char *pathname)
Definition: walmethods.c:562
static off_t tar_get_current_pos(Walfile f)
Definition: walmethods.c:1008
struct TarMethodData TarMethodData
#define ZLIB_OUT_SIZE
Definition: walmethods.c:36
WalWriteMethod * CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
Definition: walmethods.c:1338
static off_t dir_get_current_pos(Walfile f)
Definition: walmethods.c:374
static int tar_close(Walfile f, WalCloseMethod method)
Definition: walmethods.c:1041
static TarMethodData * tar_data
Definition: walmethods.c:691
static ssize_t tar_get_file_size(const char *pathname)
Definition: walmethods.c:992
static char * tar_get_file_name(const char *pathname, const char *temp_suffix)
Definition: walmethods.c:818
struct DirectoryMethodFile DirectoryMethodFile
static int dir_close(Walfile f, WalCloseMethod method)
Definition: walmethods.c:384
void FreeWalDirectoryMethod(void)
Definition: walmethods.c:654
static ssize_t tar_write(Walfile f, const void *buf, size_t count)
Definition: walmethods.c:759
#define tar_set_error(msg)
Definition: walmethods.c:695
static int dir_sync(Walfile f)
Definition: walmethods.c:511
struct TarMethodFile TarMethodFile
static DirectoryMethodData * dir_data
Definition: walmethods.c:58
static bool tar_existsfile(const char *pathname)
Definition: walmethods.c:1216
static pg_compress_algorithm dir_compression_algorithm(void)
Definition: walmethods.c:580
static bool tar_write_padding_data(TarMethodFile *f, size_t bytes)
Definition: walmethods.c:798
#define LZ4_IN_SIZE
Definition: walmethods.c:39
WalCloseMethod
Definition: walmethods.h:17
@ CLOSE_UNLINK
Definition: walmethods.h:19
@ CLOSE_NORMAL
Definition: walmethods.h:18
void * Walfile
Definition: walmethods.h:14
#define fsync(fd)
Definition: win32_port.h:76
#define stat
Definition: win32_port.h:283
#define S_IRUSR
Definition: win32_port.h:288
#define ftruncate(a, b)
Definition: win32_port.h:73
#define S_IWUSR
Definition: win32_port.h:291