PostgreSQL Source Code  git master
walmethods.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walmethods.c - implementations of different ways to write received wal
4  *
5  * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
6  *
7  * IDENTIFICATION
8  * src/bin/pg_basebackup/walmethods.c
9  *-------------------------------------------------------------------------
10  */
11 
12 #include "postgres_fe.h"
13 
14 #include <sys/stat.h>
15 #include <time.h>
16 #include <unistd.h>
17 
18 #ifdef USE_LZ4
19 #include <lz4frame.h>
20 #endif
21 #ifdef HAVE_LIBZ
22 #include <zlib.h>
23 #endif
24 
25 #include "common/file_perm.h"
26 #include "common/file_utils.h"
27 #include "common/logging.h"
28 #include "pgtar.h"
29 #include "receivelog.h"
30 #include "streamutil.h"
31 
32 /* Size of zlib buffer for .tar.gz */
33 #define ZLIB_OUT_SIZE 4096
34 
35 /* Size of LZ4 input chunk for .lz4 */
36 #define LZ4_IN_SIZE 4096
37 
38 /*-------------------------------------------------------------------------
39  * WalDirectoryMethod - write wal to a directory looking like pg_wal
40  *-------------------------------------------------------------------------
41  */
42 
43 static Walfile *dir_open_for_write(WalWriteMethod *wwmethod,
44  const char *pathname,
45  const char *temp_suffix,
46  size_t pad_to_size);
47 static int dir_close(Walfile *f, WalCloseMethod method);
48 static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname);
49 static ssize_t dir_get_file_size(WalWriteMethod *wwmethod,
50  const char *pathname);
51 static char *dir_get_file_name(WalWriteMethod *wwmethod,
52  const char *pathname, const char *temp_suffix);
53 static ssize_t dir_write(Walfile *f, const void *buf, size_t count);
54 static int dir_sync(Walfile *f);
55 static bool dir_finish(WalWriteMethod *wwmethod);
56 static void dir_free(WalWriteMethod *wwmethod);
57 
60  .close = dir_close,
61  .existsfile = dir_existsfile,
62  .get_file_size = dir_get_file_size,
63  .get_file_name = dir_get_file_name,
64  .write = dir_write,
65  .sync = dir_sync,
66  .finish = dir_finish,
67  .free = dir_free
68 };
69 
70 /*
71  * Global static data for this method
72  */
73 typedef struct DirectoryMethodData
74 {
76  char *basedir;
78 
79 /*
80  * Local file handle
81  */
82 typedef struct DirectoryMethodFile
83 {
85  int fd;
86  char *fullpath;
87  char *temp_suffix;
88 #ifdef HAVE_LIBZ
89  gzFile gzfp;
90 #endif
91 #ifdef USE_LZ4
92  LZ4F_compressionContext_t ctx;
93  size_t lz4bufsize;
94  void *lz4buf;
95 #endif
97 
98 #define clear_error(wwmethod) \
99  ((wwmethod)->lasterrstring = NULL, (wwmethod)->lasterrno = 0)
100 
101 static char *
103  const char *pathname, const char *temp_suffix)
104 {
105  char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
106 
107  snprintf(filename, MAXPGPATH, "%s%s%s",
108  pathname,
109  wwmethod->compression_algorithm == PG_COMPRESSION_GZIP ? ".gz" :
110  wwmethod->compression_algorithm == PG_COMPRESSION_LZ4 ? ".lz4" : "",
111  temp_suffix ? temp_suffix : "");
112 
113  return filename;
114 }
115 
116 static Walfile *
117 dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
118  const char *temp_suffix, size_t pad_to_size)
119 {
120  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
121  char tmppath[MAXPGPATH];
122  char *filename;
123  int fd;
125 #ifdef HAVE_LIBZ
126  gzFile gzfp = NULL;
127 #endif
128 #ifdef USE_LZ4
129  LZ4F_compressionContext_t ctx = NULL;
130  size_t lz4bufsize = 0;
131  void *lz4buf = NULL;
132 #endif
133 
134  clear_error(wwmethod);
135 
136  filename = dir_get_file_name(wwmethod, pathname, temp_suffix);
137  snprintf(tmppath, sizeof(tmppath), "%s/%s",
138  dir_data->basedir, filename);
139  pg_free(filename);
140 
141  /*
142  * Open a file for non-compressed as well as compressed files. Tracking
143  * the file descriptor is important for dir_sync() method as gzflush()
144  * does not do any system calls to fsync() to make changes permanent on
145  * disk.
146  */
147  fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
148  if (fd < 0)
149  {
150  wwmethod->lasterrno = errno;
151  return NULL;
152  }
153 
154 #ifdef HAVE_LIBZ
156  {
157  gzfp = gzdopen(fd, "wb");
158  if (gzfp == NULL)
159  {
160  wwmethod->lasterrno = errno;
161  close(fd);
162  return NULL;
163  }
164 
165  if (gzsetparams(gzfp, wwmethod->compression_level,
166  Z_DEFAULT_STRATEGY) != Z_OK)
167  {
168  wwmethod->lasterrno = errno;
169  gzclose(gzfp);
170  return NULL;
171  }
172  }
173 #endif
174 #ifdef USE_LZ4
175  if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
176  {
177  size_t ctx_out;
178  size_t header_size;
179  LZ4F_preferences_t prefs;
180 
181  ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
182  if (LZ4F_isError(ctx_out))
183  {
184  wwmethod->lasterrstring = LZ4F_getErrorName(ctx_out);
185  close(fd);
186  return NULL;
187  }
188 
189  lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
190  lz4buf = pg_malloc0(lz4bufsize);
191 
192  /* assign the compression level, default is 0 */
193  memset(&prefs, 0, sizeof(prefs));
194  prefs.compressionLevel = wwmethod->compression_level;
195 
196  /* add the header */
197  header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &prefs);
198  if (LZ4F_isError(header_size))
199  {
200  wwmethod->lasterrstring = LZ4F_getErrorName(header_size);
201  (void) LZ4F_freeCompressionContext(ctx);
202  pg_free(lz4buf);
203  close(fd);
204  return NULL;
205  }
206 
207  errno = 0;
208  if (write(fd, lz4buf, header_size) != header_size)
209  {
210  /* If write didn't set errno, assume problem is no disk space */
211  wwmethod->lasterrno = errno ? errno : ENOSPC;
212  (void) LZ4F_freeCompressionContext(ctx);
213  pg_free(lz4buf);
214  close(fd);
215  return NULL;
216  }
217  }
218 #endif
219 
220  /* Do pre-padding on non-compressed files */
221  if (pad_to_size && wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
222  {
223  ssize_t rc;
224 
225  rc = pg_pwrite_zeros(fd, pad_to_size);
226 
227  if (rc < 0)
228  {
229  wwmethod->lasterrno = errno;
230  close(fd);
231  return NULL;
232  }
233 
234  /*
235  * pg_pwrite() (called via pg_pwrite_zeros()) may have moved the file
236  * position, so reset it (see win32pwrite.c).
237  */
238  if (lseek(fd, 0, SEEK_SET) != 0)
239  {
240  wwmethod->lasterrno = errno;
241  close(fd);
242  return NULL;
243  }
244  }
245 
246  /*
247  * fsync WAL file and containing directory, to ensure the file is
248  * persistently created and zeroed (if padded). That's particularly
249  * important when using synchronous mode, where the file is modified and
250  * fsynced in-place, without a directory fsync.
251  */
252  if (wwmethod->sync)
253  {
254  if (fsync_fname(tmppath, false) != 0 ||
255  fsync_parent_path(tmppath) != 0)
256  {
257  wwmethod->lasterrno = errno;
258 #ifdef HAVE_LIBZ
260  gzclose(gzfp);
261  else
262 #endif
263 #ifdef USE_LZ4
264  if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
265  {
266  (void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
267  (void) LZ4F_freeCompressionContext(ctx);
268  pg_free(lz4buf);
269  close(fd);
270  }
271  else
272 #endif
273  close(fd);
274  return NULL;
275  }
276  }
277 
278  f = pg_malloc0(sizeof(DirectoryMethodFile));
279 #ifdef HAVE_LIBZ
281  f->gzfp = gzfp;
282 #endif
283 #ifdef USE_LZ4
284  if (wwmethod->compression_algorithm == PG_COMPRESSION_LZ4)
285  {
286  f->ctx = ctx;
287  f->lz4buf = lz4buf;
288  f->lz4bufsize = lz4bufsize;
289  }
290 #endif
291 
292  f->base.wwmethod = wwmethod;
293  f->base.currpos = 0;
294  f->base.pathname = pg_strdup(pathname);
295  f->fd = fd;
296  f->fullpath = pg_strdup(tmppath);
297  if (temp_suffix)
298  f->temp_suffix = pg_strdup(temp_suffix);
299 
300  return &f->base;
301 }
302 
303 static ssize_t
304 dir_write(Walfile *f, const void *buf, size_t count)
305 {
306  ssize_t r;
308 
309  Assert(f != NULL);
310  clear_error(f->wwmethod);
311 
312 #ifdef HAVE_LIBZ
314  {
315  errno = 0;
316  r = (ssize_t) gzwrite(df->gzfp, buf, count);
317  if (r != count)
318  {
319  /* If write didn't set errno, assume problem is no disk space */
320  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
321  }
322  }
323  else
324 #endif
325 #ifdef USE_LZ4
327  {
328  size_t chunk;
329  size_t remaining;
330  const void *inbuf = buf;
331 
332  remaining = count;
333  while (remaining > 0)
334  {
335  size_t compressed;
336 
337  if (remaining > LZ4_IN_SIZE)
338  chunk = LZ4_IN_SIZE;
339  else
340  chunk = remaining;
341 
342  remaining -= chunk;
343  compressed = LZ4F_compressUpdate(df->ctx,
344  df->lz4buf, df->lz4bufsize,
345  inbuf, chunk,
346  NULL);
347 
348  if (LZ4F_isError(compressed))
349  {
350  f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
351  return -1;
352  }
353 
354  errno = 0;
355  if (write(df->fd, df->lz4buf, compressed) != compressed)
356  {
357  /* If write didn't set errno, assume problem is no disk space */
358  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
359  return -1;
360  }
361 
362  inbuf = ((char *) inbuf) + chunk;
363  }
364 
365  /* Our caller keeps track of the uncompressed size. */
366  r = (ssize_t) count;
367  }
368  else
369 #endif
370  {
371  errno = 0;
372  r = write(df->fd, buf, count);
373  if (r != count)
374  {
375  /* If write didn't set errno, assume problem is no disk space */
376  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
377  }
378  }
379  if (r > 0)
380  df->base.currpos += r;
381  return r;
382 }
383 
384 static int
386 {
387  int r;
390  char tmppath[MAXPGPATH];
391  char tmppath2[MAXPGPATH];
392 
393  Assert(f != NULL);
394  clear_error(f->wwmethod);
395 
396 #ifdef HAVE_LIBZ
398  {
399  errno = 0; /* in case gzclose() doesn't set it */
400  r = gzclose(df->gzfp);
401  }
402  else
403 #endif
404 #ifdef USE_LZ4
406  {
407  size_t compressed;
408 
409  compressed = LZ4F_compressEnd(df->ctx,
410  df->lz4buf, df->lz4bufsize,
411  NULL);
412 
413  if (LZ4F_isError(compressed))
414  {
415  f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
416  return -1;
417  }
418 
419  errno = 0;
420  if (write(df->fd, df->lz4buf, compressed) != compressed)
421  {
422  /* If write didn't set errno, assume problem is no disk space */
423  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
424  return -1;
425  }
426 
427  r = close(df->fd);
428  }
429  else
430 #endif
431  r = close(df->fd);
432 
433  if (r == 0)
434  {
435  /* Build path to the current version of the file */
436  if (method == CLOSE_NORMAL && df->temp_suffix)
437  {
438  char *filename;
439  char *filename2;
440 
441  /*
442  * If we have a temp prefix, normal operation is to rename the
443  * file.
444  */
446  df->temp_suffix);
447  snprintf(tmppath, sizeof(tmppath), "%s/%s",
448  dir_data->basedir, filename);
449  pg_free(filename);
450 
451  /* permanent name, so no need for the prefix */
452  filename2 = dir_get_file_name(f->wwmethod, df->base.pathname, NULL);
453  snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
454  dir_data->basedir, filename2);
455  pg_free(filename2);
456  if (f->wwmethod->sync)
457  r = durable_rename(tmppath, tmppath2);
458  else
459  {
460  if (rename(tmppath, tmppath2) != 0)
461  {
462  pg_log_error("could not rename file \"%s\" to \"%s\": %m",
463  tmppath, tmppath2);
464  r = -1;
465  }
466  }
467  }
468  else if (method == CLOSE_UNLINK)
469  {
470  char *filename;
471 
472  /* Unlink the file once it's closed */
474  df->temp_suffix);
475  snprintf(tmppath, sizeof(tmppath), "%s/%s",
476  dir_data->basedir, filename);
477  pg_free(filename);
478  r = unlink(tmppath);
479  }
480  else
481  {
482  /*
483  * Else either CLOSE_NORMAL and no temp suffix, or
484  * CLOSE_NO_RENAME. In this case, fsync the file and containing
485  * directory if sync mode is requested.
486  */
487  if (f->wwmethod->sync)
488  {
489  r = fsync_fname(df->fullpath, false);
490  if (r == 0)
491  r = fsync_parent_path(df->fullpath);
492  }
493  }
494  }
495 
496  if (r != 0)
497  f->wwmethod->lasterrno = errno;
498 
499 #ifdef USE_LZ4
500  pg_free(df->lz4buf);
501  /* supports free on NULL */
502  LZ4F_freeCompressionContext(df->ctx);
503 #endif
504 
505  pg_free(df->base.pathname);
506  pg_free(df->fullpath);
507  pg_free(df->temp_suffix);
508  pg_free(df);
509 
510  return r;
511 }
512 
513 static int
515 {
516  int r;
517 
518  Assert(f != NULL);
519  clear_error(f->wwmethod);
520 
521  if (!f->wwmethod->sync)
522  return 0;
523 
524 #ifdef HAVE_LIBZ
526  {
527  if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
528  {
529  f->wwmethod->lasterrno = errno;
530  return -1;
531  }
532  }
533 #endif
534 #ifdef USE_LZ4
536  {
538  size_t compressed;
539 
540  /* Flush any internal buffers */
541  compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
542  if (LZ4F_isError(compressed))
543  {
544  f->wwmethod->lasterrstring = LZ4F_getErrorName(compressed);
545  return -1;
546  }
547 
548  errno = 0;
549  if (write(df->fd, df->lz4buf, compressed) != compressed)
550  {
551  /* If write didn't set errno, assume problem is no disk space */
552  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
553  return -1;
554  }
555  }
556 #endif
557 
558  r = fsync(((DirectoryMethodFile *) f)->fd);
559  if (r < 0)
560  f->wwmethod->lasterrno = errno;
561  return r;
562 }
563 
564 static ssize_t
565 dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
566 {
567  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
568  struct stat statbuf;
569  char tmppath[MAXPGPATH];
570 
571  snprintf(tmppath, sizeof(tmppath), "%s/%s",
572  dir_data->basedir, pathname);
573 
574  if (stat(tmppath, &statbuf) != 0)
575  {
576  wwmethod->lasterrno = errno;
577  return -1;
578  }
579 
580  return statbuf.st_size;
581 }
582 
583 static bool
584 dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
585 {
586  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
587  char tmppath[MAXPGPATH];
588  int fd;
589 
590  clear_error(wwmethod);
591 
592  snprintf(tmppath, sizeof(tmppath), "%s/%s",
593  dir_data->basedir, pathname);
594 
595  fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
596  if (fd < 0)
597  return false;
598  close(fd);
599  return true;
600 }
601 
602 static bool
604 {
605  clear_error(wwmethod);
606 
607  if (wwmethod->sync)
608  {
609  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
610 
611  /*
612  * Files are fsynced when they are closed, but we need to fsync the
613  * directory entry here as well.
614  */
615  if (fsync_fname(dir_data->basedir, true) != 0)
616  {
617  wwmethod->lasterrno = errno;
618  return false;
619  }
620  }
621  return true;
622 }
623 
624 static void
626 {
627  DirectoryMethodData *dir_data = (DirectoryMethodData *) wwmethod;
628 
629  pg_free(dir_data->basedir);
630  pg_free(wwmethod);
631 }
632 
633 
637  int compression_level, bool sync)
638 {
639  DirectoryMethodData *wwmethod;
640 
641  wwmethod = pg_malloc0(sizeof(DirectoryMethodData));
642  *((const WalWriteMethodOps **) &wwmethod->base.ops) =
645  wwmethod->base.compression_level = compression_level;
646  wwmethod->base.sync = sync;
647  clear_error(&wwmethod->base);
648  wwmethod->basedir = pg_strdup(basedir);
649 
650  return &wwmethod->base;
651 }
652 
653 
654 /*-------------------------------------------------------------------------
655  * WalTarMethod - write wal to a tar file containing pg_wal contents
656  *-------------------------------------------------------------------------
657  */
658 
659 static Walfile *tar_open_for_write(WalWriteMethod *wwmethod,
660  const char *pathname,
661  const char *temp_suffix,
662  size_t pad_to_size);
663 static int tar_close(Walfile *f, WalCloseMethod method);
664 static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname);
665 static ssize_t tar_get_file_size(WalWriteMethod *wwmethod,
666  const char *pathname);
667 static char *tar_get_file_name(WalWriteMethod *wwmethod,
668  const char *pathname, const char *temp_suffix);
669 static ssize_t tar_write(Walfile *f, const void *buf, size_t count);
670 static int tar_sync(Walfile *f);
671 static bool tar_finish(WalWriteMethod *wwmethod);
672 static void tar_free(WalWriteMethod *wwmethod);
673 
676  .close = tar_close,
677  .existsfile = tar_existsfile,
678  .get_file_size = tar_get_file_size,
679  .get_file_name = tar_get_file_name,
680  .write = tar_write,
681  .sync = tar_sync,
682  .finish = tar_finish,
683  .free = tar_free
684 };
685 
686 typedef struct TarMethodFile
687 {
689  off_t ofs_start; /* Where does the *header* for this file start */
691  size_t pad_to_size;
693 
694 typedef struct TarMethodData
695 {
697  char *tarfilename;
698  int fd;
700 #ifdef HAVE_LIBZ
701  z_streamp zp;
702  void *zlibOut;
703 #endif
705 
706 #ifdef HAVE_LIBZ
707 static bool
708 tar_write_compressed_data(TarMethodData *tar_data, void *buf, size_t count,
709  bool flush)
710 {
711  tar_data->zp->next_in = buf;
712  tar_data->zp->avail_in = count;
713 
714  while (tar_data->zp->avail_in || flush)
715  {
716  int r;
717 
718  r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
719  if (r == Z_STREAM_ERROR)
720  {
721  tar_data->base.lasterrstring = "could not compress data";
722  return false;
723  }
724 
725  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
726  {
727  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
728 
729  errno = 0;
730  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
731  {
732  /* If write didn't set errno, assume problem is no disk space */
733  tar_data->base.lasterrno = errno ? errno : ENOSPC;
734  return false;
735  }
736 
737  tar_data->zp->next_out = tar_data->zlibOut;
738  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
739  }
740 
741  if (r == Z_STREAM_END)
742  break;
743  }
744 
745  if (flush)
746  {
747  /* Reset the stream for writing */
748  if (deflateReset(tar_data->zp) != Z_OK)
749  {
750  tar_data->base.lasterrstring = "could not reset compression stream";
751  return false;
752  }
753  }
754 
755  return true;
756 }
757 #endif
758 
759 static ssize_t
760 tar_write(Walfile *f, const void *buf, size_t count)
761 {
762  TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
763  ssize_t r;
764 
765  Assert(f != NULL);
766  clear_error(f->wwmethod);
767 
768  /* Tarfile will always be positioned at the end */
770  {
771  errno = 0;
772  r = write(tar_data->fd, buf, count);
773  if (r != count)
774  {
775  /* If write didn't set errno, assume problem is no disk space */
776  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
777  return -1;
778  }
779  f->currpos += r;
780  return r;
781  }
782 #ifdef HAVE_LIBZ
784  {
785  if (!tar_write_compressed_data(tar_data, unconstify(void *, buf),
786  count, false))
787  return -1;
788  f->currpos += count;
789  return count;
790  }
791 #endif
792  else
793  {
794  /* Can't happen - compression enabled with no method set */
795  f->wwmethod->lasterrno = ENOSYS;
796  return -1;
797  }
798 }
799 
800 static bool
802 {
803  PGAlignedXLogBlock zerobuf;
804  size_t bytesleft = bytes;
805 
806  memset(zerobuf.data, 0, XLOG_BLCKSZ);
807  while (bytesleft)
808  {
809  size_t bytestowrite = Min(bytesleft, XLOG_BLCKSZ);
810  ssize_t r = tar_write(&f->base, zerobuf.data, bytestowrite);
811 
812  if (r < 0)
813  return false;
814  bytesleft -= r;
815  }
816 
817  return true;
818 }
819 
820 static char *
821 tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname,
822  const char *temp_suffix)
823 {
824  char *filename = pg_malloc0(MAXPGPATH * sizeof(char));
825 
826  snprintf(filename, MAXPGPATH, "%s%s",
827  pathname, temp_suffix ? temp_suffix : "");
828 
829  return filename;
830 }
831 
832 static Walfile *
833 tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname,
834  const char *temp_suffix, size_t pad_to_size)
835 {
836  TarMethodData *tar_data = (TarMethodData *) wwmethod;
837  char *tmppath;
838 
839  clear_error(wwmethod);
840 
841  if (tar_data->fd < 0)
842  {
843  /*
844  * We open the tar file only when we first try to write to it.
845  */
846  tar_data->fd = open(tar_data->tarfilename,
847  O_WRONLY | O_CREAT | PG_BINARY,
849  if (tar_data->fd < 0)
850  {
851  wwmethod->lasterrno = errno;
852  return NULL;
853  }
854 
855 #ifdef HAVE_LIBZ
857  {
858  tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
859  tar_data->zp->zalloc = Z_NULL;
860  tar_data->zp->zfree = Z_NULL;
861  tar_data->zp->opaque = Z_NULL;
862  tar_data->zp->next_out = tar_data->zlibOut;
863  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
864 
865  /*
866  * Initialize deflation library. Adding the magic value 16 to the
867  * default 15 for the windowBits parameter makes the output be
868  * gzip instead of zlib.
869  */
870  if (deflateInit2(tar_data->zp, wwmethod->compression_level,
871  Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
872  {
873  pg_free(tar_data->zp);
874  tar_data->zp = NULL;
875  wwmethod->lasterrstring =
876  "could not initialize compression library";
877  return NULL;
878  }
879  }
880 #endif
881 
882  /* There's no tar header itself, the file starts with regular files */
883  }
884 
885  if (tar_data->currentfile != NULL)
886  {
887  wwmethod->lasterrstring =
888  "implementation error: tar files can't have more than one open file";
889  return NULL;
890  }
891 
892  tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
893  tar_data->currentfile->base.wwmethod = wwmethod;
894 
895  tmppath = tar_get_file_name(wwmethod, pathname, temp_suffix);
896 
897  /* Create a header with size set to 0 - we will fill out the size on close */
898  if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
899  {
900  pg_free(tar_data->currentfile);
901  pg_free(tmppath);
902  tar_data->currentfile = NULL;
903  wwmethod->lasterrstring = "could not create tar header";
904  return NULL;
905  }
906 
907  pg_free(tmppath);
908 
909 #ifdef HAVE_LIBZ
911  {
912  /* Flush existing data */
913  if (!tar_write_compressed_data(tar_data, NULL, 0, true))
914  return NULL;
915 
916  /* Turn off compression for header */
917  if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
918  {
919  wwmethod->lasterrstring =
920  "could not change compression parameters";
921  return NULL;
922  }
923  }
924 #endif
925 
926  tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
927  if (tar_data->currentfile->ofs_start == -1)
928  {
929  wwmethod->lasterrno = errno;
930  pg_free(tar_data->currentfile);
931  tar_data->currentfile = NULL;
932  return NULL;
933  }
934  tar_data->currentfile->base.currpos = 0;
935 
937  {
938  errno = 0;
939  if (write(tar_data->fd, tar_data->currentfile->header,
941  {
942  /* If write didn't set errno, assume problem is no disk space */
943  wwmethod->lasterrno = errno ? errno : ENOSPC;
944  pg_free(tar_data->currentfile);
945  tar_data->currentfile = NULL;
946  return NULL;
947  }
948  }
949 #ifdef HAVE_LIBZ
950  else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
951  {
952  /* Write header through the zlib APIs but with no compression */
953  if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
954  TAR_BLOCK_SIZE, true))
955  return NULL;
956 
957  /* Re-enable compression for the rest of the file */
958  if (deflateParams(tar_data->zp, wwmethod->compression_level,
959  Z_DEFAULT_STRATEGY) != Z_OK)
960  {
961  wwmethod->lasterrstring = "could not change compression parameters";
962  return NULL;
963  }
964  }
965 #endif
966  else
967  {
968  /* not reachable */
969  Assert(false);
970  }
971 
972  tar_data->currentfile->base.pathname = pg_strdup(pathname);
973 
974  /*
975  * Uncompressed files are padded on creation, but for compression we can't
976  * do that
977  */
978  if (pad_to_size)
979  {
980  tar_data->currentfile->pad_to_size = pad_to_size;
982  {
983  /* Uncompressed, so pad now */
984  if (!tar_write_padding_data(tar_data->currentfile, pad_to_size))
985  return NULL;
986  /* Seek back to start */
987  if (lseek(tar_data->fd,
988  tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE,
989  SEEK_SET) != tar_data->currentfile->ofs_start + TAR_BLOCK_SIZE)
990  {
991  wwmethod->lasterrno = errno;
992  return NULL;
993  }
994 
995  tar_data->currentfile->base.currpos = 0;
996  }
997  }
998 
999  return &tar_data->currentfile->base;
1000 }
1001 
1002 static ssize_t
1003 tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
1004 {
1005  clear_error(wwmethod);
1006 
1007  /* Currently not used, so not supported */
1008  wwmethod->lasterrno = ENOSYS;
1009  return -1;
1010 }
1011 
1012 static int
1014 {
1015  TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
1016  int r;
1017 
1018  Assert(f != NULL);
1019  clear_error(f->wwmethod);
1020 
1021  if (!f->wwmethod->sync)
1022  return 0;
1023 
1024  /*
1025  * Always sync the whole tarfile, because that's all we can do. This makes
1026  * no sense on compressed files, so just ignore those.
1027  */
1029  return 0;
1030 
1031  r = fsync(tar_data->fd);
1032  if (r < 0)
1033  f->wwmethod->lasterrno = errno;
1034  return r;
1035 }
1036 
1037 static int
1039 {
1040  ssize_t filesize;
1041  int padding;
1042  TarMethodData *tar_data = (TarMethodData *) f->wwmethod;
1043  TarMethodFile *tf = (TarMethodFile *) f;
1044 
1045  Assert(f != NULL);
1046  clear_error(f->wwmethod);
1047 
1048  if (method == CLOSE_UNLINK)
1049  {
1051  {
1052  f->wwmethod->lasterrstring = "unlink not supported with compression";
1053  return -1;
1054  }
1055 
1056  /*
1057  * Unlink the file that we just wrote to the tar. We do this by
1058  * truncating it to the start of the header. This is safe as we only
1059  * allow writing of the very last file.
1060  */
1061  if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
1062  {
1063  f->wwmethod->lasterrno = errno;
1064  return -1;
1065  }
1066 
1067  pg_free(tf->base.pathname);
1068  pg_free(tf);
1069  tar_data->currentfile = NULL;
1070 
1071  return 0;
1072  }
1073 
1074  /*
1075  * Pad the file itself with zeroes if necessary. Note that this is
1076  * different from the tar format padding -- this is the padding we asked
1077  * for when the file was opened.
1078  */
1079  if (tf->pad_to_size)
1080  {
1082  {
1083  /*
1084  * A compressed tarfile is padded on close since we cannot know
1085  * the size of the compressed output until the end.
1086  */
1087  size_t sizeleft = tf->pad_to_size - tf->base.currpos;
1088 
1089  if (sizeleft)
1090  {
1091  if (!tar_write_padding_data(tf, sizeleft))
1092  return -1;
1093  }
1094  }
1095  else
1096  {
1097  /*
1098  * An uncompressed tarfile was padded on creation, so just adjust
1099  * the current position as if we seeked to the end.
1100  */
1101  tf->base.currpos = tf->pad_to_size;
1102  }
1103  }
1104 
1105  /*
1106  * Get the size of the file, and pad out to a multiple of the tar block
1107  * size.
1108  */
1109  filesize = f->currpos;
1110  padding = tarPaddingBytesRequired(filesize);
1111  if (padding)
1112  {
1113  char zerobuf[TAR_BLOCK_SIZE] = {0};
1114 
1115  if (tar_write(f, zerobuf, padding) != padding)
1116  return -1;
1117  }
1118 
1119 
1120 #ifdef HAVE_LIBZ
1122  {
1123  /* Flush the current buffer */
1124  if (!tar_write_compressed_data(tar_data, NULL, 0, true))
1125  return -1;
1126  }
1127 #endif
1128 
1129  /*
1130  * Now go back and update the header with the correct filesize and
1131  * possibly also renaming the file. We overwrite the entire current header
1132  * when done, including the checksum.
1133  */
1134  print_tar_number(&(tf->header[124]), 12, filesize);
1135 
1136  if (method == CLOSE_NORMAL)
1137 
1138  /*
1139  * We overwrite it with what it was before if we have no tempname,
1140  * since we're going to write the buffer anyway.
1141  */
1142  strlcpy(&(tf->header[0]), tf->base.pathname, 100);
1143 
1144  print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
1145  if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
1146  {
1147  f->wwmethod->lasterrno = errno;
1148  return -1;
1149  }
1151  {
1152  errno = 0;
1153  if (write(tar_data->fd, tf->header, TAR_BLOCK_SIZE) != TAR_BLOCK_SIZE)
1154  {
1155  /* If write didn't set errno, assume problem is no disk space */
1156  f->wwmethod->lasterrno = errno ? errno : ENOSPC;
1157  return -1;
1158  }
1159  }
1160 #ifdef HAVE_LIBZ
1162  {
1163  /* Turn off compression */
1164  if (deflateParams(tar_data->zp, 0, Z_DEFAULT_STRATEGY) != Z_OK)
1165  {
1166  f->wwmethod->lasterrstring = "could not change compression parameters";
1167  return -1;
1168  }
1169 
1170  /* Overwrite the header, assuming the size will be the same */
1171  if (!tar_write_compressed_data(tar_data, tar_data->currentfile->header,
1172  TAR_BLOCK_SIZE, true))
1173  return -1;
1174 
1175  /* Turn compression back on */
1176  if (deflateParams(tar_data->zp, f->wwmethod->compression_level,
1177  Z_DEFAULT_STRATEGY) != Z_OK)
1178  {
1179  f->wwmethod->lasterrstring = "could not change compression parameters";
1180  return -1;
1181  }
1182  }
1183 #endif
1184  else
1185  {
1186  /* not reachable */
1187  Assert(false);
1188  }
1189 
1190  /* Move file pointer back down to end, so we can write the next file */
1191  if (lseek(tar_data->fd, 0, SEEK_END) < 0)
1192  {
1193  f->wwmethod->lasterrno = errno;
1194  return -1;
1195  }
1196 
1197  /* Always fsync on close, so the padding gets fsynced */
1198  if (tar_sync(f) < 0)
1199  {
1200  /* XXX this seems pretty bogus; why is only this case fatal? */
1201  pg_fatal("could not fsync file \"%s\": %s",
1202  tf->base.pathname, GetLastWalMethodError(f->wwmethod));
1203  }
1204 
1205  /* Clean up and done */
1206  pg_free(tf->base.pathname);
1207  pg_free(tf);
1208  tar_data->currentfile = NULL;
1209 
1210  return 0;
1211 }
1212 
1213 static bool
1214 tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
1215 {
1216  clear_error(wwmethod);
1217  /* We only deal with new tarfiles, so nothing externally created exists */
1218  return false;
1219 }
1220 
1221 static bool
1223 {
1224  TarMethodData *tar_data = (TarMethodData *) wwmethod;
1225  char zerobuf[1024] = {0};
1226 
1227  clear_error(wwmethod);
1228 
1229  if (tar_data->currentfile)
1230  {
1231  if (tar_close(&tar_data->currentfile->base, CLOSE_NORMAL) != 0)
1232  return false;
1233  }
1234 
1235  /* A tarfile always ends with two empty blocks */
1236  if (wwmethod->compression_algorithm == PG_COMPRESSION_NONE)
1237  {
1238  errno = 0;
1239  if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
1240  {
1241  /* If write didn't set errno, assume problem is no disk space */
1242  wwmethod->lasterrno = errno ? errno : ENOSPC;
1243  return false;
1244  }
1245  }
1246 #ifdef HAVE_LIBZ
1247  else if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1248  {
1249  if (!tar_write_compressed_data(tar_data, zerobuf, sizeof(zerobuf),
1250  false))
1251  return false;
1252 
1253  /* Also flush all data to make sure the gzip stream is finished */
1254  tar_data->zp->next_in = NULL;
1255  tar_data->zp->avail_in = 0;
1256  while (true)
1257  {
1258  int r;
1259 
1260  r = deflate(tar_data->zp, Z_FINISH);
1261 
1262  if (r == Z_STREAM_ERROR)
1263  {
1264  wwmethod->lasterrstring = "could not compress data";
1265  return false;
1266  }
1267  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
1268  {
1269  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
1270 
1271  errno = 0;
1272  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
1273  {
1274  /*
1275  * If write didn't set errno, assume problem is no disk
1276  * space.
1277  */
1278  wwmethod->lasterrno = errno ? errno : ENOSPC;
1279  return false;
1280  }
1281  }
1282  if (r == Z_STREAM_END)
1283  break;
1284  }
1285 
1286  if (deflateEnd(tar_data->zp) != Z_OK)
1287  {
1288  wwmethod->lasterrstring = "could not close compression stream";
1289  return false;
1290  }
1291  }
1292 #endif
1293  else
1294  {
1295  /* not reachable */
1296  Assert(false);
1297  }
1298 
1299  /* sync the empty blocks as well, since they're after the last file */
1300  if (wwmethod->sync)
1301  {
1302  if (fsync(tar_data->fd) != 0)
1303  {
1304  wwmethod->lasterrno = errno;
1305  return false;
1306  }
1307  }
1308 
1309  if (close(tar_data->fd) != 0)
1310  {
1311  wwmethod->lasterrno = errno;
1312  return false;
1313  }
1314 
1315  tar_data->fd = -1;
1316 
1317  if (wwmethod->sync)
1318  {
1319  if (fsync_fname(tar_data->tarfilename, false) != 0 ||
1320  fsync_parent_path(tar_data->tarfilename) != 0)
1321  {
1322  wwmethod->lasterrno = errno;
1323  return false;
1324  }
1325  }
1326 
1327  return true;
1328 }
1329 
1330 static void
1332 {
1333  TarMethodData *tar_data = (TarMethodData *) wwmethod;
1334 
1335  pg_free(tar_data->tarfilename);
1336 #ifdef HAVE_LIBZ
1337  if (wwmethod->compression_algorithm == PG_COMPRESSION_GZIP)
1338  pg_free(tar_data->zlibOut);
1339 #endif
1340  pg_free(wwmethod);
1341 }
1342 
1343 /*
1344  * The argument compression_algorithm is currently ignored. It is in place for
1345  * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
1346  * between the different compression methods. CreateWalTarMethod and its family
1347  * of functions handle only zlib compression.
1348  */
1350 CreateWalTarMethod(const char *tarbase,
1352  int compression_level, bool sync)
1353 {
1354  TarMethodData *wwmethod;
1355  const char *suffix = (compression_algorithm == PG_COMPRESSION_GZIP) ?
1356  ".tar.gz" : ".tar";
1357 
1358  wwmethod = pg_malloc0(sizeof(TarMethodData));
1359  *((const WalWriteMethodOps **) &wwmethod->base.ops) =
1360  &WalTarMethodOps;
1362  wwmethod->base.compression_level = compression_level;
1363  wwmethod->base.sync = sync;
1364  clear_error(&wwmethod->base);
1365 
1366  wwmethod->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
1367  sprintf(wwmethod->tarfilename, "%s%s", tarbase, suffix);
1368  wwmethod->fd = -1;
1369 #ifdef HAVE_LIBZ
1371  wwmethod->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
1372 #endif
1373 
1374  return &wwmethod->base;
1375 }
1376 
1377 const char *
1379 {
1380  if (wwmethod->lasterrstring)
1381  return wwmethod->lasterrstring;
1382  return strerror(wwmethod->lasterrno);
1383 }
#define unconstify(underlying_type, expr)
Definition: c.h:1181
#define Min(x, y)
Definition: c.h:937
#define PG_BINARY
Definition: c.h:1209
pg_compress_algorithm
Definition: compression.h:18
@ PG_COMPRESSION_GZIP
Definition: compression.h:20
@ PG_COMPRESSION_LZ4
Definition: compression.h:21
@ PG_COMPRESSION_NONE
Definition: compression.h:19
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:688
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:662
static int fsync_parent_path(const char *fname, int elevel)
Definition: fd.c:3673
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
void pg_free(void *ptr)
Definition: fe_memutils.c:105
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
int pg_file_create_mode
Definition: file_perm.c:19
ssize_t pg_pwrite_zeros(int fd, size_t size)
Definition: file_utils.c:540
int remaining
Definition: informix.c:667
#define close(a)
Definition: win32.h:12
#define write(a, b, c)
Definition: win32.h:14
Assert(fmt[strlen(fmt) - 1] !='\n')
#define pg_log_error(...)
Definition: logging.h:106
z_stream * z_streamp
#define pg_fatal(...)
static char * basedir
#define MAXPGPATH
const void size_t len
static char * filename
Definition: pg_dumpall.c:119
static pg_compress_algorithm compression_algorithm
Definition: pg_receivewal.c:55
static void header(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:208
static char * buf
Definition: pg_test_fsync.c:67
static size_t tarPaddingBytesRequired(size_t len)
Definition: pgtar.h:40
int tarChecksum(char *header)
Definition: tar.c:90
enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime)
Definition: tar.c:114
@ TAR_OK
Definition: pgtar.h:21
#define TAR_BLOCK_SIZE
Definition: pgtar.h:17
void print_tar_number(char *s, int len, uint64 val)
Definition: tar.c:22
#define sprintf
Definition: port.h:240
#define strerror
Definition: port.h:251
#define snprintf
Definition: port.h:238
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
static int fd(const char *x, int i)
Definition: preproc-init.c:105
WalWriteMethod base
Definition: walmethods.c:75
TarMethodFile * currentfile
Definition: walmethods.c:699
WalWriteMethod base
Definition: walmethods.c:696
char * tarfilename
Definition: walmethods.c:697
size_t pad_to_size
Definition: walmethods.c:691
char header[TAR_BLOCK_SIZE]
Definition: walmethods.c:690
off_t ofs_start
Definition: walmethods.c:689
Walfile base
Definition: walmethods.c:688
Walfile *(* open_for_write)(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:48
const char * lasterrstring
Definition: walmethods.h:108
int compression_level
Definition: walmethods.h:106
const WalWriteMethodOps * ops
Definition: walmethods.h:104
pg_compress_algorithm compression_algorithm
Definition: walmethods.h:105
WalWriteMethod * wwmethod
Definition: walmethods.h:19
char * pathname
Definition: walmethods.h:21
off_t currpos
Definition: walmethods.h:20
__int64 st_size
Definition: win32_port.h:275
char data[XLOG_BLCKSZ]
Definition: c.h:1087
static char * dir_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Definition: walmethods.c:102
static Walfile * dir_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:117
WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
Definition: walmethods.c:635
struct DirectoryMethodData DirectoryMethodData
static Walfile * tar_open_for_write(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:833
static ssize_t tar_write(Walfile *f, const void *buf, size_t count)
Definition: walmethods.c:760
static void tar_free(WalWriteMethod *wwmethod)
Definition: walmethods.c:1331
const WalWriteMethodOps WalTarMethodOps
Definition: walmethods.c:674
struct TarMethodData TarMethodData
#define ZLIB_OUT_SIZE
Definition: walmethods.c:33
WalWriteMethod * CreateWalTarMethod(const char *tarbase, pg_compress_algorithm compression_algorithm, int compression_level, bool sync)
Definition: walmethods.c:1350
static bool dir_finish(WalWriteMethod *wwmethod)
Definition: walmethods.c:603
#define clear_error(wwmethod)
Definition: walmethods.c:98
const WalWriteMethodOps WalDirectoryMethodOps
Definition: walmethods.c:58
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
Definition: walmethods.c:1378
struct DirectoryMethodFile DirectoryMethodFile
static void dir_free(WalWriteMethod *wwmethod)
Definition: walmethods.c:625
static int dir_sync(Walfile *f)
Definition: walmethods.c:514
static int tar_sync(Walfile *f)
Definition: walmethods.c:1013
static bool tar_finish(WalWriteMethod *wwmethod)
Definition: walmethods.c:1222
static int dir_close(Walfile *f, WalCloseMethod method)
Definition: walmethods.c:385
static ssize_t dir_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.c:565
static bool tar_existsfile(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.c:1214
static ssize_t tar_get_file_size(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.c:1003
struct TarMethodFile TarMethodFile
static bool dir_existsfile(WalWriteMethod *wwmethod, const char *pathname)
Definition: walmethods.c:584
static int tar_close(Walfile *f, WalCloseMethod method)
Definition: walmethods.c:1038
static ssize_t dir_write(Walfile *f, const void *buf, size_t count)
Definition: walmethods.c:304
static bool tar_write_padding_data(TarMethodFile *f, size_t bytes)
Definition: walmethods.c:801
static char * tar_get_file_name(WalWriteMethod *wwmethod, const char *pathname, const char *temp_suffix)
Definition: walmethods.c:821
#define LZ4_IN_SIZE
Definition: walmethods.c:36
WalCloseMethod
Definition: walmethods.h:31
@ CLOSE_UNLINK
Definition: walmethods.h:33
@ CLOSE_NORMAL
Definition: walmethods.h:32
#define fsync(fd)
Definition: win32_port.h:85
#define stat
Definition: win32_port.h:286
#define S_IRUSR
Definition: win32_port.h:291
#define ftruncate(a, b)
Definition: win32_port.h:82
#define S_IWUSR
Definition: win32_port.h:294