PostgreSQL Source Code  git master
walmethods.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walmethods.c - implementations of different ways to write received wal
4  *
5  * NOTE! The caller must ensure that only one method is instantiated in
6  * any given program, and that it's only instantiated once!
7  *
8  * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/walmethods.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <time.h>
19 #include <unistd.h>
20 
21 #ifdef HAVE_LIBLZ4
22 #include <lz4frame.h>
23 #endif
24 #ifdef HAVE_LIBZ
25 #include <zlib.h>
26 #endif
27 
28 #include "common/file_perm.h"
29 #include "common/file_utils.h"
30 #include "common/logging.h"
31 #include "pgtar.h"
32 #include "receivelog.h"
33 #include "streamutil.h"
34 
35 /* Size of zlib buffer for .tar.gz */
36 #define ZLIB_OUT_SIZE 4096
37 
38 /* Size of LZ4 input chunk for .lz4 */
39 #define LZ4_IN_SIZE 4096
40 
41 /*-------------------------------------------------------------------------
42  * WalDirectoryMethod - write wal to a directory looking like pg_wal
43  *-------------------------------------------------------------------------
44  */
45 
46 /*
47  * Global static data for this method
48  */
49 typedef struct DirectoryMethodData
50 {
51  char *basedir;
54  bool sync;
55  const char *lasterrstring; /* if set, takes precedence over lasterrno */
56  int lasterrno;
59 
60 /*
61  * Local file handle
62  */
63 typedef struct DirectoryMethodFile
64 {
65  int fd;
66  off_t currpos;
67  char *pathname;
68  char *fullpath;
69  char *temp_suffix;
70 #ifdef HAVE_LIBZ
71  gzFile gzfp;
72 #endif
73 #ifdef HAVE_LIBLZ4
74  LZ4F_compressionContext_t ctx;
75  size_t lz4bufsize;
76  void *lz4buf;
77 #endif
79 
80 #define dir_clear_error() \
81  (dir_data->lasterrstring = NULL, dir_data->lasterrno = 0)
82 #define dir_set_error(msg) \
83  (dir_data->lasterrstring = _(msg))
84 
85 static const char *
87 {
89  return dir_data->lasterrstring;
90  return strerror(dir_data->lasterrno);
91 }
92 
93 static char *
94 dir_get_file_name(const char *pathname, const char *temp_suffix)
95 {
96  char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
97 
98  snprintf(filename, MAXPGPATH, "%s%s%s",
99  pathname,
101  dir_data->compression_method == COMPRESSION_LZ4 ? ".lz4" : "",
102  temp_suffix ? temp_suffix : "");
103 
104  return filename;
105 }
106 
107 static Walfile
108 dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
109 {
110  char tmppath[MAXPGPATH];
111  char *filename;
112  int fd;
114 #ifdef HAVE_LIBZ
115  gzFile gzfp = NULL;
116 #endif
117 #ifdef HAVE_LIBLZ4
118  LZ4F_compressionContext_t ctx = NULL;
119  size_t lz4bufsize = 0;
120  void *lz4buf = NULL;
121 #endif
122 
123  dir_clear_error();
124 
125  filename = dir_get_file_name(pathname, temp_suffix);
126  snprintf(tmppath, sizeof(tmppath), "%s/%s",
128  pg_free(filename);
129 
130  /*
131  * Open a file for non-compressed as well as compressed files. Tracking
132  * the file descriptor is important for dir_sync() method as gzflush()
133  * does not do any system calls to fsync() to make changes permanent on
134  * disk.
135  */
136  fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
137  if (fd < 0)
138  {
139  dir_data->lasterrno = errno;
140  return NULL;
141  }
142 
143 #ifdef HAVE_LIBZ
145  {
146  gzfp = gzdopen(fd, "wb");
147  if (gzfp == NULL)
148  {
149  dir_data->lasterrno = errno;
150  close(fd);
151  return NULL;
152  }
153 
154  if (gzsetparams(gzfp, dir_data->compression_level,
155  Z_DEFAULT_STRATEGY) != Z_OK)
156  {
157  dir_data->lasterrno = errno;
158  gzclose(gzfp);
159  return NULL;
160  }
161  }
162 #endif
163 #ifdef HAVE_LIBLZ4
165  {
166  size_t ctx_out;
167  size_t header_size;
168 
169  ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
170  if (LZ4F_isError(ctx_out))
171  {
172  dir_data->lasterrstring = LZ4F_getErrorName(ctx_out);
173  close(fd);
174  return NULL;
175  }
176 
177  lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
178  lz4buf = pg_malloc0(lz4bufsize);
179 
180  /* add the header */
181  header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, NULL);
182  if (LZ4F_isError(header_size))
183  {
184  dir_data->lasterrstring = LZ4F_getErrorName(header_size);
185  (void) LZ4F_freeCompressionContext(ctx);
186  pg_free(lz4buf);
187  close(fd);
188  return NULL;
189  }
190 
191  errno = 0;
192  if (write(fd, lz4buf, header_size) != header_size)
193  {
194  /* If write didn't set errno, assume problem is no disk space */
195  dir_data->lasterrno = errno ? errno : ENOSPC;
196  (void) LZ4F_freeCompressionContext(ctx);
197  pg_free(lz4buf);
198  close(fd);
199  return NULL;
200  }
201  }
202 #endif
203 
204  /* Do pre-padding on non-compressed files */
205  if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE)
206  {
207  PGAlignedXLogBlock zerobuf;
208  int bytes;
209 
210  memset(zerobuf.data, 0, XLOG_BLCKSZ);
211  for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
212  {
213  errno = 0;
214  if (write(fd, zerobuf.data, XLOG_BLCKSZ) != XLOG_BLCKSZ)
215  {
216  /* If write didn't set errno, assume problem is no disk space */
217  dir_data->lasterrno = errno ? errno : ENOSPC;
218  close(fd);
219  return NULL;
220  }
221  }
222 
223  if (lseek(fd, 0, SEEK_SET) != 0)
224  {
225  dir_data->lasterrno = errno;
226  close(fd);
227  return NULL;
228  }
229  }
230 
231  /*
232  * fsync WAL file and containing directory, to ensure the file is
233  * persistently created and zeroed (if padded). That's particularly
234  * important when using synchronous mode, where the file is modified and
235  * fsynced in-place, without a directory fsync.
236  */
237  if (dir_data->sync)
238  {
239  if (fsync_fname(tmppath, false) != 0 ||
240  fsync_parent_path(tmppath) != 0)
241  {
242  dir_data->lasterrno = errno;
243 #ifdef HAVE_LIBZ
245  gzclose(gzfp);
246  else
247 #endif
248 #ifdef HAVE_LIBLZ4
250  {
251  (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
252  (void) LZ4F_freeCompressionContext(ctx);
253  pg_free(lz4buf);
254  close(fd);
255  }
256  else
257 #endif
258  close(fd);
259  return NULL;
260  }
261  }
262 
263  f = pg_malloc0(sizeof(DirectoryMethodFile));
264 #ifdef HAVE_LIBZ
266  f->gzfp = gzfp;
267 #endif
268 #ifdef HAVE_LIBLZ4
270  {
271  f->ctx = ctx;
272  f->lz4buf = lz4buf;
273  f->lz4bufsize = lz4bufsize;
274  }
275 #endif
276 
277  f->fd = fd;
278  f->currpos = 0;
279  f->pathname = pg_strdup(pathname);
280  f->fullpath = pg_strdup(tmppath);
281  if (temp_suffix)
282  f->temp_suffix = pg_strdup(temp_suffix);
283 
284  return f;
285 }
286 
287 static ssize_t
288 dir_write(Walfile f, const void *buf, size_t count)
289 {
290  ssize_t r;
292 
293  Assert(f != NULL);
294  dir_clear_error();
295 
296 #ifdef HAVE_LIBZ
298  {
299  errno = 0;
300  r = (ssize_t) gzwrite(df->gzfp, buf, count);
301  if (r != count)
302  {
303  /* If write didn't set errno, assume problem is no disk space */
304  dir_data->lasterrno = errno ? errno : ENOSPC;
305  }
306  }
307  else
308 #endif
309 #ifdef HAVE_LIBLZ4
311  {
312  size_t chunk;
313  size_t remaining;
314  const void *inbuf = buf;
315 
316  remaining = count;
317  while (remaining > 0)
318  {
319  size_t compressed;
320 
321  if (remaining > LZ4_IN_SIZE)
322  chunk = LZ4_IN_SIZE;
323  else
324  chunk = remaining;
325 
326  remaining -= chunk;
327  compressed = LZ4F_compressUpdate(df->ctx,
328  df->lz4buf, df->lz4bufsize,
329  inbuf, chunk,
330  NULL);
331 
332  if (LZ4F_isError(compressed))
333  {
334  dir_data->lasterrstring = LZ4F_getErrorName(compressed);
335  return -1;
336  }
337 
338  errno = 0;
339  if (write(df->fd, df->lz4buf, compressed) != compressed)
340  {
341  /* If write didn't set errno, assume problem is no disk space */
342  dir_data->lasterrno = errno ? errno : ENOSPC;
343  return -1;
344  }
345 
346  inbuf = ((char *) inbuf) + chunk;
347  }
348 
349  /* Our caller keeps track of the uncompressed size. */
350  r = (ssize_t) count;
351  }
352  else
353 #endif
354  {
355  errno = 0;
356  r = write(df->fd, buf, count);
357  if (r != count)
358  {
359  /* If write didn't set errno, assume problem is no disk space */
360  dir_data->lasterrno = errno ? errno : ENOSPC;
361  }
362  }
363  if (r > 0)
364  df->currpos += r;
365  return r;
366 }
367 
368 static off_t
370 {
371  Assert(f != NULL);
372  dir_clear_error();
373 
374  /* Use a cached value to prevent lots of reseeks */
375  return ((DirectoryMethodFile *) f)->currpos;
376 }
377 
378 static int
380 {
381  int r;
383  char tmppath[MAXPGPATH];
384  char tmppath2[MAXPGPATH];
385 
386  Assert(f != NULL);
387  dir_clear_error();
388 
389 #ifdef HAVE_LIBZ
391  {
392  errno = 0; /* in case gzclose() doesn't set it */
393  r = gzclose(df->gzfp);
394  }
395  else
396 #endif
397 #ifdef HAVE_LIBLZ4
399  {
400  size_t compressed;
401 
402  compressed = LZ4F_compressEnd(df->ctx,
403  df->lz4buf, df->lz4bufsize,
404  NULL);
405 
406  if (LZ4F_isError(compressed))
407  {
408  dir_data->lasterrstring = LZ4F_getErrorName(compressed);
409  return -1;
410  }
411 
412  errno = 0;
413  if (write(df->fd, df->lz4buf, compressed) != compressed)
414  {
415  /* If write didn't set errno, assume problem is no disk space */
416  dir_data->lasterrno = errno ? errno : ENOSPC;
417  return -1;
418  }
419 
420  r = close(df->fd);
421  }
422  else
423 #endif
424  r = close(df->fd);
425 
426  if (r == 0)
427  {
428  /* Build path to the current version of the file */
429  if (method == CLOSE_NORMAL && df->temp_suffix)
430  {
431  char *filename;
432  char *filename2;
433 
434  /*
435  * If we have a temp prefix, normal operation is to rename the
436  * file.
437  */
439  snprintf(tmppath, sizeof(tmppath), "%s/%s",
441  pg_free(filename);
442 
443  /* permanent name, so no need for the prefix */
444  filename2 = dir_get_file_name(df->pathname, NULL);
445  snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
446  dir_data->basedir, filename2);
447  pg_free(filename2);
448  r = durable_rename(tmppath, tmppath2);
449  }
450  else if (method == CLOSE_UNLINK)
451  {
452  char *filename;
453 
454  /* Unlink the file once it's closed */
456  snprintf(tmppath, sizeof(tmppath), "%s/%s",
458  pg_free(filename);
459  r = unlink(tmppath);
460  }
461  else
462  {
463  /*
464  * Else either CLOSE_NORMAL and no temp suffix, or
465  * CLOSE_NO_RENAME. In this case, fsync the file and containing
466  * directory if sync mode is requested.
467  */
468  if (dir_data->sync)
469  {
470  r = fsync_fname(df->fullpath, false);
471  if (r == 0)
472  r = fsync_parent_path(df->fullpath);
473  }
474  }
475  }
476 
477  if (r != 0)
478  dir_data->lasterrno = errno;
479 
480 #ifdef HAVE_LIBLZ4
481  pg_free(df->lz4buf);
482  /* supports free on NULL */
483  LZ4F_freeCompressionContext(df->ctx);
484 #endif
485 
486  pg_free(df->pathname);
487  pg_free(df->fullpath);
488  if (df->temp_suffix)
489  pg_free(df->temp_suffix);
490  pg_free(df);
491 
492  return r;
493 }
494 
495 static int
497 {
498  int r;
499 
500  Assert(f != NULL);
501  dir_clear_error();
502 
503  if (!dir_data->sync)
504  return 0;
505 
506 #ifdef HAVE_LIBZ
508  {
509  if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
510  {
511  dir_data->lasterrno = errno;
512  return -1;
513  }
514  }
515 #endif
516 #ifdef HAVE_LIBLZ4
518  {
520  size_t compressed;
521 
522  /* Flush any internal buffers */
523  compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
524  if (LZ4F_isError(compressed))
525  {
526  dir_data->lasterrstring = LZ4F_getErrorName(compressed);
527  return -1;
528  }
529 
530  errno = 0;
531  if (write(df->fd, df->lz4buf, compressed) != compressed)
532  {
533  /* If write didn't set errno, assume problem is no disk space */
534  dir_data->lasterrno = errno ? errno : ENOSPC;
535  return -1;
536  }
537  }
538 #endif
539 
540  r = fsync(((DirectoryMethodFile *) f)->fd);
541  if (r < 0)
542  dir_data->lasterrno = errno;
543  return r;
544 }
545 
546 static ssize_t
547 dir_get_file_size(const char *pathname)
548 {
549  struct stat statbuf;
550  char tmppath[MAXPGPATH];
551 
552  snprintf(tmppath, sizeof(tmppath), "%s/%s",
553  dir_data->basedir, pathname);
554 
555  if (stat(tmppath, &statbuf) != 0)
556  {
557  dir_data->lasterrno = errno;
558  return -1;
559  }
560 
561  return statbuf.st_size;
562 }
563 
566 {
568 }
569 
570 static bool
571 dir_existsfile(const char *pathname)
572 {
573  char tmppath[MAXPGPATH];
574  int fd;
575 
576  dir_clear_error();
577 
578  snprintf(tmppath, sizeof(tmppath), "%s/%s",
579  dir_data->basedir, pathname);
580 
581  fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
582  if (fd < 0)
583  return false;
584  close(fd);
585  return true;
586 }
587 
588 static bool
590 {
591  dir_clear_error();
592 
593  if (dir_data->sync)
594  {
595  /*
596  * Files are fsynced when they are closed, but we need to fsync the
597  * directory entry here as well.
598  */
599  if (fsync_fname(dir_data->basedir, true) != 0)
600  {
601  dir_data->lasterrno = errno;
602  return false;
603  }
604  }
605  return true;
606 }
607 
608 
612  int compression_level, bool sync)
613 {
614  WalWriteMethod *method;
615 
616  method = pg_malloc0(sizeof(WalWriteMethod));
618  method->write = dir_write;
623  method->close = dir_close;
624  method->sync = dir_sync;
625  method->existsfile = dir_existsfile;
626  method->finish = dir_finish;
627  method->getlasterror = dir_getlasterror;
628 
631  dir_data->compression_level = compression_level;
633  dir_data->sync = sync;
634 
635  return method;
636 }
637 
638 void
640 {
642  pg_free(dir_data);
643  dir_data = NULL;
644 }
645 
646 
647 /*-------------------------------------------------------------------------
648  * WalTarMethod - write wal to a tar file containing pg_wal contents
649  *-------------------------------------------------------------------------
650  */
651 
652 typedef struct TarMethodFile
653 {
654  off_t ofs_start; /* Where does the *header* for this file start */
655  off_t currpos;
657  char *pathname;
658  size_t pad_to_size;
660 
661 typedef struct TarMethodData
662 {
663  char *tarfilename;
664  int fd;
667  bool sync;
669  const char *lasterrstring; /* if set, takes precedence over lasterrno */
671 #ifdef HAVE_LIBZ
672  z_streamp zp;
673  void *zlibOut;
674 #endif
676 static TarMethodData *tar_data = NULL;
677 
678 #define tar_clear_error() \
679  (tar_data->lasterrstring = NULL, tar_data->lasterrno = 0)
680 #define tar_set_error(msg) \
681  (tar_data->lasterrstring = _(msg))
682 
683 static const char *
685 {
686  if (tar_data->lasterrstring)
687  return tar_data->lasterrstring;
688  return strerror(tar_data->lasterrno);
689 }
690 
691 #ifdef HAVE_LIBZ
692 static bool
693 tar_write_compressed_data(void *buf, size_t count, bool flush)
694 {
695  tar_data->zp->next_in = buf;
696  tar_data->zp->avail_in = count;
697 
698  while (tar_data->zp->avail_in || flush)
699  {
700  int r;
701 
702  r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
703  if (r == Z_STREAM_ERROR)
704  {
705  tar_set_error("could not compress data");
706  return false;
707  }
708 
709  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
710  {
711  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
712 
713  errno = 0;
714  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
715  {
716  /* If write didn't set errno, assume problem is no disk space */
717  tar_data->lasterrno = errno ? errno : ENOSPC;
718  return false;
719  }
720 
721  tar_data->zp->next_out = tar_data->zlibOut;
722  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
723  }
724 
725  if (r == Z_STREAM_END)
726  break;
727  }
728 
729  if (flush)
730  {
731  /* Reset the stream for writing */
732  if (deflateReset(tar_data->zp) != Z_OK)
733  {
734  tar_set_error("could not reset compression stream");
735  return false;
736  }
737  }
738 
739  return true;
740 }
741 #endif
742 
743 static ssize_t
744 tar_write(Walfile f, const void *buf, size_t count)
745 {
746  ssize_t r;
747 
748  Assert(f != NULL);
749  tar_clear_error();
750 
751  /* Tarfile will always be positioned at the end */
753  {
754  errno = 0;
755  r = write(tar_data->fd, buf, count);
756  if (r != count)
757  {
758  /* If write didn't set errno, assume problem is no disk space */
759  tar_data->lasterrno = errno ? errno : ENOSPC;
760  return -1;
761  }
762  ((TarMethodFile *) f)->currpos += r;
763  return r;
764  }
765 #ifdef HAVE_LIBZ
766  else
767  {
768  if (!tar_write_compressed_data(unconstify(void *, buf), count, false))
769  return -1;
770  ((TarMethodFile *) f)->currpos += count;
771  return count;
772  }
773 #else
774  else
775  {
776  /* Can't happen - compression enabled with no libz */
777  tar_data->lasterrno = ENOSYS;
778  return -1;
779  }
780 #endif
781 }
782 
783 static bool
785 {
786  PGAlignedXLogBlock zerobuf;
787  size_t bytesleft = bytes;
788 
789  memset(zerobuf.data, 0, XLOG_BLCKSZ);
790  while (bytesleft)
791  {
792  size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
793  ssize_t r = tar_write(f, zerobuf.data, bytestowrite);
794 
795  if (r < 0)
796  return false;
797  bytesleft -= r;
798  }
799 
800  return true;
801 }
802 
803 static char *
804 tar_get_file_name(const char *pathname, const char *temp_suffix)
805 {
806  char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
807 
808  snprintf(filename, MAXPGPATH, "%s%s",
809  pathname, temp_suffix ? temp_suffix : "");
810 
811  return filename;
812 }
813 
814 static Walfile
815 tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
816 {
817  char *tmppath;
818 
819  tar_clear_error();
820 
821  if (tar_data->fd < 0)
822  {
823  /*
824  * We open the tar file only when we first try to write to it.
825  */
826  tar_data->fd = open(tar_data->tarfilename,
827  O_WRONLY | O_CREAT | PG_BINARY,
829  if (tar_data->fd < 0)
830  {
831  tar_data->lasterrno = errno;
832  return NULL;
833  }
834 
835 #ifdef HAVE_LIBZ
837  {
838  tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
839  tar_data->zp->zalloc = Z_NULL;
840  tar_data->zp->zfree = Z_NULL;
841  tar_data->zp->opaque = Z_NULL;
842  tar_data->zp->next_out = tar_data->zlibOut;
843  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
844 
845  /*
846  * Initialize deflation library. Adding the magic value 16 to the
847  * default 15 for the windowBits parameter makes the output be
848  * gzip instead of zlib.
849  */
850  if (deflateInit2(tar_data->zp, tar_data->compression_level,
851  Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
852  {
853  pg_free(tar_data->zp);
854  tar_data->zp = NULL;
855  tar_set_error("could not initialize compression library");
856  return NULL;
857  }
858  }
859 #endif
860 
861  /* There's no tar header itself, the file starts with regular files */
862  }
863 
864  if (tar_data->currentfile != NULL)
865  {
866  tar_set_error("implementation error: tar files can't have more than one open file");
867  return NULL;
868  }
869 
871 
872  tmppath = tar_get_file_name(pathname, temp_suffix);
873 
874  /* Create a header with size set to 0 - we will fill out the size on close */
875  if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
876  {
878  pg_free(tmppath);
879  tar_data->currentfile = NULL;
880  tar_set_error("could not create tar header");
881  return NULL;
882  }
883 
884  pg_free(tmppath);
885 
886 #ifdef HAVE_LIBZ
888  {
889  /* Flush existing data */
890  if (!tar_write_compressed_data(NULL, 0, true))
891  return NULL;
892 
893  /* Turn off compression for header */
894  if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
895  {
896  tar_set_error("could not change compression parameters");
897  return NULL;
898  }
899  }
900 #endif
901 
902  tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
903  if (tar_data->currentfile->ofs_start == -1)
904  {
905  tar_data->lasterrno = errno;
907  tar_data->currentfile = NULL;
908  return NULL;
909  }
911 
913  {
914  errno = 0;
917  {
918  /* If write didn't set errno, assume problem is no disk space */
919  tar_data->lasterrno = errno ? errno : ENOSPC;
921  tar_data->currentfile = NULL;
922  return NULL;
923  }
924  }
925 #ifdef HAVE_LIBZ
926  else
927  {
928  /* Write header through the zlib APIs but with no compression */
929  if (!tar_write_compressed_data(tar_data->currentfile->header,
930  TAR_BLOCK_SIZE, true))
931  return NULL;
932 
933  /* Re-enable compression for the rest of the file */
934  if (deflateParams(tar_data->zp, tar_data->compression_level, 0) != Z_OK)
935  {
936  tar_set_error("could not change compression parameters");
937  return NULL;
938  }
939  }
940 #endif
941 
942  tar_data->currentfile->pathname = pg_strdup(pathname);
943 
944  /*
945  * Uncompressed files are padded on creation, but for compression we can't
946  * do that
947  */
948  if (pad_to_size)
949  {
950  tar_data->currentfile->pad_to_size = pad_to_size;
952  {
953  /* Uncompressed, so pad now */
954  if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
955  return NULL;
956  /* Seek back to start */
957  if (lseek(tar_data->fd,
959  SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
960  {
961  tar_data->lasterrno = errno;
962  return NULL;
963  }
964 
966  }
967  }
968 
969  return tar_data->currentfile;
970 }
971 
972 static ssize_t
973 tar_get_file_size(const char *pathname)
974 {
975  tar_clear_error();
976 
977  /* Currently not used, so not supported */
978  tar_data->lasterrno = ENOSYS;
979  return -1;
980 }
981 
984 {
986 }
987 
988 static off_t
990 {
991  Assert(f != NULL);
992  tar_clear_error();
993 
994  return ((TarMethodFile *) f)->currpos;
995 }
996 
997 static int
999 {
1000  int r;
1001 
1002  Assert(f != NULL);
1003  tar_clear_error();
1004 
1005  if (!tar_data->sync)
1006  return 0;
1007 
1008  /*
1009  * Always sync the whole tarfile, because that's all we can do. This makes
1010  * no sense on compressed files, so just ignore those.
1011  */
1013  return 0;
1014 
1015  r = fsync(tar_data->fd);
1016  if (r < 0)
1017  tar_data->lasterrno = errno;
1018  return r;
1019 }
1020 
1021 static int
1023 {
1024  ssize_t filesize;
1025  int padding;
1026  TarMethodFile *tf = (TarMethodFile *) f;
1027 
1028  Assert(f != NULL);
1029  tar_clear_error();
1030 
1031  if (method == CLOSE_UNLINK)
1032  {
1034  {
1035  tar_set_error("unlink not supported with compression");
1036  return -1;
1037  }
1038 
1039  /*
1040  * Unlink the file that we just wrote to the tar. We do this by
1041  * truncating it to the start of the header. This is safe as we only
1042  * allow writing of the very last file.
1043  */
1044  if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
1045  {
1046  tar_data->lasterrno = errno;
1047  return -1;
1048  }
1049 
1050  pg_free(tf->pathname);
1051  pg_free(tf);
1052  tar_data->currentfile = NULL;
1053 
1054  return 0;
1055  }
1056 
1057  /*
1058  * Pad the file itself with zeroes if necessary. Note that this is
1059  * different from the tar format padding -- this is the padding we asked
1060  * for when the file was opened.
1061  */
1062  if (tf->pad_to_size)
1063  {
1065  {
1066  /*
1067  * A compressed tarfile is padded on close since we cannot know
1068  * the size of the compressed output until the end.
1069  */
1070  size_t sizeleft = tf->pad_to_size - tf->currpos;
1071 
1072  if (sizeleft)
1073  {
1074  if (!tar_write_padding_data(tf, sizeleft))
1075  return -1;
1076  }
1077  }
1078  else
1079  {
1080  /*
1081  * An uncompressed tarfile was padded on creation, so just adjust
1082  * the current position as if we seeked to the end.
1083  */
1084  tf->currpos = tf->pad_to_size;
1085  }
1086  }
1087 
1088  /*
1089  * Get the size of the file, and pad out to a multiple of the tar block
1090  * size.
1091  */
1092  filesize = tar_get_current_pos(f);
1093  padding = tarPaddingBytesRequired(filesize);
1094  if (padding)
1095  {
1096  char zerobuf[TAR_BLOCK_SIZE];
1097 
1098  MemSet(zerobuf, 0, padding);
1099  if (tar_write(f, zerobuf, padding) != padding)
1100  return -1;
1101  }
1102 
1103 
1104 #ifdef HAVE_LIBZ
1106  {
1107  /* Flush the current buffer */
1108  if (!tar_write_compressed_data(NULL, 0, true))
1109  return -1;
1110  }
1111 #endif
1112 
1113  /*
1114  * Now go back and update the header with the correct filesize and
1115  * possibly also renaming the file. We overwrite the entire current header
1116  * when done, including the checksum.
1117  */
1118  print_tar_number(&(tf->header[124]), 12, filesize);
1119 
1120  if (method == CLOSE_NORMAL)
1121 
1122  /*
1123  * We overwrite it with what it was before if we have no tempname,
1124  * since we're going to write the buffer anyway.
1125  */
1126  strlcpy(&(tf->header[0]), tf->pathname, 100);
1127 
1128  print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
1129  if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
1130  {
1131  tar_data->lasterrno = errno;
1132  return -1;
1133  }
1135  {
1136  errno = 0;
1138  {
1139  /* If write didn't set errno, assume problem is no disk space */
1140  tar_data->lasterrno = errno ? errno : ENOSPC;
1141  return -1;
1142  }
1143  }
1144 #ifdef HAVE_LIBZ
1145  else
1146  {
1147  /* Turn off compression */
1148  if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
1149  {
1150  tar_set_error("could not change compression parameters");
1151  return -1;
1152  }
1153 
1154  /* Overwrite the header, assuming the size will be the same */
1155  if (!tar_write_compressed_data(tar_data->currentfile->header,
1156  TAR_BLOCK_SIZE, true))
1157  return -1;
1158 
1159  /* Turn compression back on */
1160  if (deflateParams(tar_data->zp, tar_data->compression_level, 0) != Z_OK)
1161  {
1162  tar_set_error("could not change compression parameters");
1163  return -1;
1164  }
1165  }
1166 #endif
1167 
1168  /* Move file pointer back down to end, so we can write the next file */
1169  if (lseek(tar_data->fd, 0, SEEK_END) < 0)
1170  {
1171  tar_data->lasterrno = errno;
1172  return -1;
1173  }
1174 
1175  /* Always fsync on close, so the padding gets fsynced */
1176  if (tar_sync(f) < 0)
1177  {
1178  /* XXX this seems pretty bogus; why is only this case fatal? */
1179  pg_log_fatal("could not fsync file \"%s\": %s",
1180  tf->pathname, tar_getlasterror());
1181  exit(1);
1182  }
1183 
1184  /* Clean up and done */
1185  pg_free(tf->pathname);
1186  pg_free(tf);
1187  tar_data->currentfile = NULL;
1188 
1189  return 0;
1190 }
1191 
1192 static bool
1193 tar_existsfile(const char *pathname)
1194 {
1195  tar_clear_error();
1196  /* We only deal with new tarfiles, so nothing externally created exists */
1197  return false;
1198 }
1199 
1200 static bool
1202 {
1203  char zerobuf[1024];
1204 
1205  tar_clear_error();
1206 
1207  if (tar_data->currentfile)
1208  {
1210  return false;
1211  }
1212 
1213  /* A tarfile always ends with two empty blocks */
1214  MemSet(zerobuf, 0, sizeof(zerobuf));
1216  {
1217  errno = 0;
1218  if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
1219  {
1220  /* If write didn't set errno, assume problem is no disk space */
1221  tar_data->lasterrno = errno ? errno : ENOSPC;
1222  return false;
1223  }
1224  }
1225 #ifdef HAVE_LIBZ
1226  else
1227  {
1228  if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
1229  return false;
1230 
1231  /* Also flush all data to make sure the gzip stream is finished */
1232  tar_data->zp->next_in = NULL;
1233  tar_data->zp->avail_in = 0;
1234  while (true)
1235  {
1236  int r;
1237 
1238  r = deflate(tar_data->zp, Z_FINISH);
1239 
1240  if (r == Z_STREAM_ERROR)
1241  {
1242  tar_set_error("could not compress data");
1243  return false;
1244  }
1245  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
1246  {
1247  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
1248 
1249  errno = 0;
1250  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
1251  {
1252  /*
1253  * If write didn't set errno, assume problem is no disk
1254  * space.
1255  */
1256  tar_data->lasterrno = errno ? errno : ENOSPC;
1257  return false;
1258  }
1259  }
1260  if (r == Z_STREAM_END)
1261  break;
1262  }
1263 
1264  if (deflateEnd(tar_data->zp) != Z_OK)
1265  {
1266  tar_set_error("could not close compression stream");
1267  return false;
1268  }
1269  }
1270 #endif
1271 
1272  /* sync the empty blocks as well, since they're after the last file */
1273  if (tar_data->sync)
1274  {
1275  if (fsync(tar_data->fd) != 0)
1276  {
1277  tar_data->lasterrno = errno;
1278  return false;
1279  }
1280  }
1281 
1282  if (close(tar_data->fd) != 0)
1283  {
1284  tar_data->lasterrno = errno;
1285  return false;
1286  }
1287 
1288  tar_data->fd = -1;
1289 
1290  if (tar_data->sync)
1291  {
1292  if (fsync_fname(tar_data->tarfilename, false) != 0 ||
1294  {
1295  tar_data->lasterrno = errno;
1296  return false;
1297  }
1298  }
1299 
1300  return true;
1301 }
1302 
1303 /*
1304  * The argument compression_method is currently ignored. It is in place for
1305  * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
1306  * between the different compression methods. CreateWalTarMethod and its family
1307  * of functions handle only zlib compression.
1308  */
1310 CreateWalTarMethod(const char *tarbase,
1312  int compression_level, bool sync)
1313 {
1314  WalWriteMethod *method;
1315  const char *suffix = (compression_level != 0) ? ".tar.gz" : ".tar";
1316 
1317  method = pg_malloc0(sizeof(WalWriteMethod));
1319  method->write = tar_write;
1321  method->get_file_size = tar_get_file_size;
1322  method->get_file_name = tar_get_file_name;
1324  method->close = tar_close;
1325  method->sync = tar_sync;
1326  method->existsfile = tar_existsfile;
1327  method->finish = tar_finish;
1328  method->getlasterror = tar_getlasterror;
1329 
1330  tar_data = pg_malloc0(sizeof(TarMethodData));
1331  tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1332  sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
1333  tar_data->fd = -1;
1335  tar_data->compression_level = compression_level;
1336  tar_data->sync = sync;
1337 #ifdef HAVE_LIBZ
1338  if (compression_level)
1339  tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
1340 #endif
1341 
1342  return method;
1343 }
1344 
1345 void
1347 {
1349 #ifdef HAVE_LIBZ
1351  pg_free(tar_data->zlibOut);
1352 #endif
1353  pg_free(tar_data);
1354  tar_data = NULL;
1355 }
#define unconstify(underlying_type, expr)
Definition: c.h:1240
#define Min(x, y)
Definition: c.h:986
#define PG_BINARY
Definition: c.h:1268
#define MemSet(start, val, len)
Definition: c.h:1008
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:699
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:673
static int fsync_parent_path(const char *fname, int elevel)
Definition: fd.c:3763
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
int pg_file_create_mode
Definition: file_perm.c:19
int remaining
Definition: informix.c:667
#define close(a)
Definition: win32.h:12
#define write(a, b, c)
Definition: win32.h:14
Assert(fmt[strlen(fmt) - 1] !='\n')
exit(1)
#define pg_log_fatal(...)
Definition: logging.h:76
def bytes(source, encoding='ascii', errors='strict')
z_stream * z_streamp
static char * basedir
#define MAXPGPATH
const void size_t len
static char * filename
Definition: pg_dumpall.c:92
static WalCompressionMethod compression_method
Definition: pg_receivewal.c:55
static void header(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:212
static char * buf
Definition: pg_test_fsync.c:70
static size_t tarPaddingBytesRequired(size_t len)
Definition: pgtar.h:40
int tarChecksum(char *header)
Definition: tar.c:90
enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime)
Definition: tar.c:114
@ TAR_OK
Definition: pgtar.h:21
#define TAR_BLOCK_SIZE
Definition: pgtar.h:17
void print_tar_number(char *s, int len, uint64 val)
Definition: tar.c:22
#define sprintf
Definition: port.h:224
#define strerror
Definition: port.h:235
#define snprintf
Definition: port.h:222
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static int fd(const char *x, int i)
Definition: preproc-init.c:105
const char * lasterrstring
Definition: walmethods.c:55
WalCompressionMethod compression_method
Definition: walmethods.c:52
int compression_level
Definition: walmethods.c:666
TarMethodFile * currentfile
Definition: walmethods.c:668
const char * lasterrstring
Definition: walmethods.c:669
char * tarfilename
Definition: walmethods.c:663
WalCompressionMethod compression_method
Definition: walmethods.c:665
char * pathname
Definition: walmethods.c:657
size_t pad_to_size
Definition: walmethods.c:658
char header[TAR_BLOCK_SIZE]
Definition: walmethods.c:656
off_t ofs_start
Definition: walmethods.c:654
const char *(* getlasterror)(void)
Definition: walmethods.h:95
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:79
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:61
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:49
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:58
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:55
char *(* get_file_name)(const char *pathname, const char *temp_suffix)
Definition: walmethods.h:67
int(* sync)(Walfile f)
Definition: walmethods.h:84
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:76
WalCompressionMethod(* compression_method)(void)
Definition: walmethods.h:70
bool(* finish)(void)
Definition: walmethods.h:92
__int64 st_size
Definition: win32_port.h:273
char data[XLOG_BLCKSZ]
Definition: c.h:1146
static bool dir_existsfile(const char *pathname)
Definition: walmethods.c:571
static bool tar_finish(void)
Definition: walmethods.c:1201
static Walfile dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:108
static int tar_sync(Walfile f)
Definition: walmethods.c:998
struct DirectoryMethodData DirectoryMethodData
#define dir_clear_error()
Definition: walmethods.c:80
static WalCompressionMethod tar_compression_method(void)
Definition: walmethods.c:983
static const char * dir_getlasterror(void)
Definition: walmethods.c:86
void FreeWalTarMethod(void)
Definition: walmethods.c:1346
#define tar_clear_error()
Definition: walmethods.c:678
static ssize_t dir_write(Walfile f, const void *buf, size_t count)
Definition: walmethods.c:288
static bool dir_finish(void)
Definition: walmethods.c:589
static char * dir_get_file_name(const char *pathname, const char *temp_suffix)
Definition: walmethods.c:94
WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, WalCompressionMethod compression_method, int compression_level, bool sync)
Definition: walmethods.c:610
static Walfile tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:815
static const char * tar_getlasterror(void)
Definition: walmethods.c:684
WalWriteMethod * CreateWalTarMethod(const char *tarbase, WalCompressionMethod compression_method, int compression_level, bool sync)
Definition: walmethods.c:1310
static ssize_t dir_get_file_size(const char *pathname)
Definition: walmethods.c:547
static off_t tar_get_current_pos(Walfile f)
Definition: walmethods.c:989
struct TarMethodData TarMethodData
#define ZLIB_OUT_SIZE
Definition: walmethods.c:36
static off_t dir_get_current_pos(Walfile f)
Definition: walmethods.c:369
static int tar_close(Walfile f, WalCloseMethod method)
Definition: walmethods.c:1022
static TarMethodData * tar_data
Definition: walmethods.c:676
static ssize_t tar_get_file_size(const char *pathname)
Definition: walmethods.c:973
static char * tar_get_file_name(const char *pathname, const char *temp_suffix)
Definition: walmethods.c:804
struct DirectoryMethodFile DirectoryMethodFile
static int dir_close(Walfile f, WalCloseMethod method)
Definition: walmethods.c:379
void FreeWalDirectoryMethod(void)
Definition: walmethods.c:639
static ssize_t tar_write(Walfile f, const void *buf, size_t count)
Definition: walmethods.c:744
static WalCompressionMethod dir_compression_method(void)
Definition: walmethods.c:565
#define tar_set_error(msg)
Definition: walmethods.c:680
static int dir_sync(Walfile f)
Definition: walmethods.c:496
struct TarMethodFile TarMethodFile
static DirectoryMethodData * dir_data
Definition: walmethods.c:58
static bool tar_existsfile(const char *pathname)
Definition: walmethods.c:1193
static bool tar_write_padding_data(TarMethodFile *f, size_t bytes)
Definition: walmethods.c:784
#define LZ4_IN_SIZE
Definition: walmethods.c:39
WalCompressionMethod
Definition: walmethods.h:24
@ COMPRESSION_LZ4
Definition: walmethods.h:26
@ COMPRESSION_NONE
Definition: walmethods.h:27
@ COMPRESSION_GZIP
Definition: walmethods.h:25
WalCloseMethod
Definition: walmethods.h:16
@ CLOSE_UNLINK
Definition: walmethods.h:18
@ CLOSE_NORMAL
Definition: walmethods.h:17
void * Walfile
Definition: walmethods.h:13
#define fsync(fd)
Definition: win32_port.h:76
#define stat
Definition: win32_port.h:283
#define S_IRUSR
Definition: win32_port.h:288
#define ftruncate(a, b)
Definition: win32_port.h:73
#define S_IWUSR
Definition: win32_port.h:291