PostgreSQL Source Code  git master
walmethods.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walmethods.c - implementations of different ways to write received wal
4  *
5  * NOTE! The caller must ensure that only one method is instantiated in
6  * any given program, and that it's only instantiated once!
7  *
8  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/walmethods.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <time.h>
19 #include <unistd.h>
20 #ifdef HAVE_LIBZ
21 #include <zlib.h>
22 #endif
23 
24 #include "pgtar.h"
25 #include "common/file_perm.h"
26 #include "common/file_utils.h"
27 
28 #include "receivelog.h"
29 #include "streamutil.h"
30 
31 /* Size of zlib buffer for .tar.gz */
32 #define ZLIB_OUT_SIZE 4096
33 
34 /*-------------------------------------------------------------------------
35  * WalDirectoryMethod - write wal to a directory looking like pg_wal
36  *-------------------------------------------------------------------------
37  */
38 
39 /*
40  * Global static data for this method
41  */
42 typedef struct DirectoryMethodData
43 {
44  char *basedir;
46  bool sync;
49 
50 /*
51  * Local file handle
52  */
53 typedef struct DirectoryMethodFile
54 {
55  int fd;
56  off_t currpos;
57  char *pathname;
58  char *fullpath;
59  char *temp_suffix;
60 #ifdef HAVE_LIBZ
61  gzFile gzfp;
62 #endif
64 
65 static const char *
67 {
68  /* Directory method always sets errno, so just use strerror */
69  return strerror(errno);
70 }
71 
72 static Walfile
73 dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
74 {
75  static char tmppath[MAXPGPATH];
76  int fd;
78 #ifdef HAVE_LIBZ
79  gzFile gzfp = NULL;
80 #endif
81 
82  snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
83  dir_data->basedir, pathname,
84  dir_data->compression > 0 ? ".gz" : "",
85  temp_suffix ? temp_suffix : "");
86 
87  /*
88  * Open a file for non-compressed as well as compressed files. Tracking
89  * the file descriptor is important for dir_sync() method as gzflush()
90  * does not do any system calls to fsync() to make changes permanent on
91  * disk.
92  */
93  fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, pg_file_create_mode);
94  if (fd < 0)
95  return NULL;
96 
97 #ifdef HAVE_LIBZ
98  if (dir_data->compression > 0)
99  {
100  gzfp = gzdopen(fd, "wb");
101  if (gzfp == NULL)
102  {
103  close(fd);
104  return NULL;
105  }
106 
107  if (gzsetparams(gzfp, dir_data->compression,
108  Z_DEFAULT_STRATEGY) != Z_OK)
109  {
110  gzclose(gzfp);
111  return NULL;
112  }
113  }
114 #endif
115 
116  /* Do pre-padding on non-compressed files */
117  if (pad_to_size && dir_data->compression == 0)
118  {
119  char *zerobuf;
120  int bytes;
121 
122  zerobuf = pg_malloc0(XLOG_BLCKSZ);
123  for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
124  {
125  if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
126  {
127  int save_errno = errno;
128 
129  pg_free(zerobuf);
130  close(fd);
131  errno = save_errno;
132  return NULL;
133  }
134  }
135  pg_free(zerobuf);
136 
137  if (lseek(fd, 0, SEEK_SET) != 0)
138  {
139  int save_errno = errno;
140 
141  close(fd);
142  errno = save_errno;
143  return NULL;
144  }
145  }
146 
147  /*
148  * fsync WAL file and containing directory, to ensure the file is
149  * persistently created and zeroed (if padded). That's particularly
150  * important when using synchronous mode, where the file is modified and
151  * fsynced in-place, without a directory fsync.
152  */
153  if (dir_data->sync)
154  {
155  if (fsync_fname(tmppath, false, progname) != 0 ||
156  fsync_parent_path(tmppath, progname) != 0)
157  {
158 #ifdef HAVE_LIBZ
159  if (dir_data->compression > 0)
160  gzclose(gzfp);
161  else
162 #endif
163  close(fd);
164  return NULL;
165  }
166  }
167 
168  f = pg_malloc0(sizeof(DirectoryMethodFile));
169 #ifdef HAVE_LIBZ
170  if (dir_data->compression > 0)
171  f->gzfp = gzfp;
172 #endif
173  f->fd = fd;
174  f->currpos = 0;
175  f->pathname = pg_strdup(pathname);
176  f->fullpath = pg_strdup(tmppath);
177  if (temp_suffix)
178  f->temp_suffix = pg_strdup(temp_suffix);
179 
180  return f;
181 }
182 
183 static ssize_t
184 dir_write(Walfile f, const void *buf, size_t count)
185 {
186  ssize_t r;
188 
189  Assert(f != NULL);
190 
191 #ifdef HAVE_LIBZ
192  if (dir_data->compression > 0)
193  r = (ssize_t) gzwrite(df->gzfp, buf, count);
194  else
195 #endif
196  r = write(df->fd, buf, count);
197  if (r > 0)
198  df->currpos += r;
199  return r;
200 }
201 
202 static off_t
204 {
205  Assert(f != NULL);
206 
207  /* Use a cached value to prevent lots of reseeks */
208  return ((DirectoryMethodFile *) f)->currpos;
209 }
210 
211 static int
213 {
214  int r;
216  static char tmppath[MAXPGPATH];
217  static char tmppath2[MAXPGPATH];
218 
219  Assert(f != NULL);
220 
221 #ifdef HAVE_LIBZ
222  if (dir_data->compression > 0)
223  r = gzclose(df->gzfp);
224  else
225 #endif
226  r = close(df->fd);
227 
228  if (r == 0)
229  {
230  /* Build path to the current version of the file */
231  if (method == CLOSE_NORMAL && df->temp_suffix)
232  {
233  /*
234  * If we have a temp prefix, normal operation is to rename the
235  * file.
236  */
237  snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
238  dir_data->basedir, df->pathname,
239  dir_data->compression > 0 ? ".gz" : "",
240  df->temp_suffix);
241  snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
242  dir_data->basedir, df->pathname,
243  dir_data->compression > 0 ? ".gz" : "");
244  r = durable_rename(tmppath, tmppath2, progname);
245  }
246  else if (method == CLOSE_UNLINK)
247  {
248  /* Unlink the file once it's closed */
249  snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
250  dir_data->basedir, df->pathname,
251  dir_data->compression > 0 ? ".gz" : "",
252  df->temp_suffix ? df->temp_suffix : "");
253  r = unlink(tmppath);
254  }
255  else
256  {
257  /*
258  * Else either CLOSE_NORMAL and no temp suffix, or
259  * CLOSE_NO_RENAME. In this case, fsync the file and containing
260  * directory if sync mode is requested.
261  */
262  if (dir_data->sync)
263  {
264  r = fsync_fname(df->fullpath, false, progname);
265  if (r == 0)
267  }
268  }
269  }
270 
271  pg_free(df->pathname);
272  pg_free(df->fullpath);
273  if (df->temp_suffix)
274  pg_free(df->temp_suffix);
275  pg_free(df);
276 
277  return r;
278 }
279 
280 static int
282 {
283  Assert(f != NULL);
284 
285  if (!dir_data->sync)
286  return 0;
287 
288 #ifdef HAVE_LIBZ
289  if (dir_data->compression > 0)
290  {
291  if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
292  return -1;
293  }
294 #endif
295 
296  return fsync(((DirectoryMethodFile *) f)->fd);
297 }
298 
299 static ssize_t
300 dir_get_file_size(const char *pathname)
301 {
302  struct stat statbuf;
303  static char tmppath[MAXPGPATH];
304 
305  snprintf(tmppath, sizeof(tmppath), "%s/%s",
306  dir_data->basedir, pathname);
307 
308  if (stat(tmppath, &statbuf) != 0)
309  return -1;
310 
311  return statbuf.st_size;
312 }
313 
314 static bool
315 dir_existsfile(const char *pathname)
316 {
317  static char tmppath[MAXPGPATH];
318  int fd;
319 
320  snprintf(tmppath, sizeof(tmppath), "%s/%s",
321  dir_data->basedir, pathname);
322 
323  fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
324  if (fd < 0)
325  return false;
326  close(fd);
327  return true;
328 }
329 
330 static bool
332 {
333  if (dir_data->sync)
334  {
335  /*
336  * Files are fsynced when they are closed, but we need to fsync the
337  * directory entry here as well.
338  */
339  if (fsync_fname(dir_data->basedir, true, progname) != 0)
340  return false;
341  }
342  return true;
343 }
344 
345 
348 {
349  WalWriteMethod *method;
350 
351  method = pg_malloc0(sizeof(WalWriteMethod));
353  method->write = dir_write;
356  method->close = dir_close;
357  method->sync = dir_sync;
358  method->existsfile = dir_existsfile;
359  method->finish = dir_finish;
360  method->getlasterror = dir_getlasterror;
361 
362  dir_data = pg_malloc0(sizeof(DirectoryMethodData));
363  dir_data->compression = compression;
364  dir_data->basedir = pg_strdup(basedir);
365  dir_data->sync = sync;
366 
367  return method;
368 }
369 
370 void
372 {
373  pg_free(dir_data->basedir);
374  pg_free(dir_data);
375 }
376 
377 
378 /*-------------------------------------------------------------------------
379  * WalTarMethod - write wal to a tar file containing pg_wal contents
380  *-------------------------------------------------------------------------
381  */
382 
383 typedef struct TarMethodFile
384 {
385  off_t ofs_start; /* Where does the *header* for this file start */
386  off_t currpos;
387  char header[512];
388  char *pathname;
389  size_t pad_to_size;
390 } TarMethodFile;
391 
392 typedef struct TarMethodData
393 {
394  char *tarfilename;
395  int fd;
397  bool sync;
399  char lasterror[1024];
400 #ifdef HAVE_LIBZ
401  z_streamp zp;
402  void *zlibOut;
403 #endif
404 } TarMethodData;
405 static TarMethodData *tar_data = NULL;
406 
407 #define tar_clear_error() tar_data->lasterror[0] = '\0'
408 #define tar_set_error(msg) strlcpy(tar_data->lasterror, _(msg), sizeof(tar_data->lasterror))
409 
410 static const char *
412 {
413  /*
414  * If a custom error is set, return that one. Otherwise, assume errno is
415  * set and return that one.
416  */
417  if (tar_data->lasterror[0])
418  return tar_data->lasterror;
419  return strerror(errno);
420 }
421 
422 #ifdef HAVE_LIBZ
423 static bool
424 tar_write_compressed_data(void *buf, size_t count, bool flush)
425 {
426  tar_data->zp->next_in = buf;
427  tar_data->zp->avail_in = count;
428 
429  while (tar_data->zp->avail_in || flush)
430  {
431  int r;
432 
433  r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
434  if (r == Z_STREAM_ERROR)
435  {
436  tar_set_error("could not compress data");
437  return false;
438  }
439 
440  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
441  {
442  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
443 
444  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
445  return false;
446 
447  tar_data->zp->next_out = tar_data->zlibOut;
448  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
449  }
450 
451  if (r == Z_STREAM_END)
452  break;
453  }
454 
455  if (flush)
456  {
457  /* Reset the stream for writing */
458  if (deflateReset(tar_data->zp) != Z_OK)
459  {
460  tar_set_error("could not reset compression stream");
461  return false;
462  }
463  }
464 
465  return true;
466 }
467 #endif
468 
469 static ssize_t
470 tar_write(Walfile f, const void *buf, size_t count)
471 {
472  ssize_t r;
473 
474  Assert(f != NULL);
475  tar_clear_error();
476 
477  /* Tarfile will always be positioned at the end */
478  if (!tar_data->compression)
479  {
480  r = write(tar_data->fd, buf, count);
481  if (r > 0)
482  ((TarMethodFile *) f)->currpos += r;
483  return r;
484  }
485 #ifdef HAVE_LIBZ
486  else
487  {
488  if (!tar_write_compressed_data((void *) buf, count, false))
489  return -1;
490  ((TarMethodFile *) f)->currpos += count;
491  return count;
492  }
493 #else
494  else
495  /* Can't happen - compression enabled with no libz */
496  return -1;
497 #endif
498 }
499 
500 static bool
502 {
503  char *zerobuf = pg_malloc0(XLOG_BLCKSZ);
504  size_t bytesleft = bytes;
505 
506  while (bytesleft)
507  {
508  size_t bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft;
509 
510  ssize_t r = tar_write(f, zerobuf, bytestowrite);
511 
512  if (r < 0)
513  {
514  pg_free(zerobuf);
515  return false;
516  }
517  bytesleft -= r;
518  }
519 
520  pg_free(zerobuf);
521  return true;
522 }
523 
524 static Walfile
525 tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
526 {
527  int save_errno;
528  static char tmppath[MAXPGPATH];
529 
530  tar_clear_error();
531 
532  if (tar_data->fd < 0)
533  {
534  /*
535  * We open the tar file only when we first try to write to it.
536  */
537  tar_data->fd = open(tar_data->tarfilename,
538  O_WRONLY | O_CREAT | PG_BINARY,
540  if (tar_data->fd < 0)
541  return NULL;
542 
543 #ifdef HAVE_LIBZ
544  if (tar_data->compression)
545  {
546  tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
547  tar_data->zp->zalloc = Z_NULL;
548  tar_data->zp->zfree = Z_NULL;
549  tar_data->zp->opaque = Z_NULL;
550  tar_data->zp->next_out = tar_data->zlibOut;
551  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
552 
553  /*
554  * Initialize deflation library. Adding the magic value 16 to the
555  * default 15 for the windowBits parameter makes the output be
556  * gzip instead of zlib.
557  */
558  if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
559  {
560  pg_free(tar_data->zp);
561  tar_data->zp = NULL;
562  tar_set_error("could not initialize compression library");
563  return NULL;
564  }
565  }
566 #endif
567 
568  /* There's no tar header itself, the file starts with regular files */
569  }
570 
571  Assert(tar_data->currentfile == NULL);
572  if (tar_data->currentfile != NULL)
573  {
574  tar_set_error("implementation error: tar files can't have more than one open file");
575  return NULL;
576  }
577 
578  tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
579 
580  snprintf(tmppath, sizeof(tmppath), "%s%s",
581  pathname, temp_suffix ? temp_suffix : "");
582 
583  /* Create a header with size set to 0 - we will fill out the size on close */
584  if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
585  {
586  pg_free(tar_data->currentfile);
587  tar_data->currentfile = NULL;
588  tar_set_error("could not create tar header");
589  return NULL;
590  }
591 
592 #ifdef HAVE_LIBZ
593  if (tar_data->compression)
594  {
595  /* Flush existing data */
596  if (!tar_write_compressed_data(NULL, 0, true))
597  return NULL;
598 
599  /* Turn off compression for header */
600  if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
601  {
602  tar_set_error("could not change compression parameters");
603  return NULL;
604  }
605  }
606 #endif
607 
608  tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
609  if (tar_data->currentfile->ofs_start == -1)
610  {
611  save_errno = errno;
612  pg_free(tar_data->currentfile);
613  tar_data->currentfile = NULL;
614  errno = save_errno;
615  return NULL;
616  }
617  tar_data->currentfile->currpos = 0;
618 
619  if (!tar_data->compression)
620  {
621  if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
622  {
623  save_errno = errno;
624  pg_free(tar_data->currentfile);
625  tar_data->currentfile = NULL;
626  errno = save_errno;
627  return NULL;
628  }
629  }
630 #ifdef HAVE_LIBZ
631  else
632  {
633  /* Write header through the zlib APIs but with no compression */
634  if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
635  return NULL;
636 
637  /* Re-enable compression for the rest of the file */
638  if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
639  {
640  tar_set_error("could not change compression parameters");
641  return NULL;
642  }
643  }
644 #endif
645 
646  tar_data->currentfile->pathname = pg_strdup(pathname);
647 
648  /*
649  * Uncompressed files are padded on creation, but for compression we can't
650  * do that
651  */
652  if (pad_to_size)
653  {
654  tar_data->currentfile->pad_to_size = pad_to_size;
655  if (!tar_data->compression)
656  {
657  /* Uncompressed, so pad now */
658  tar_write_padding_data(tar_data->currentfile, pad_to_size);
659  /* Seek back to start */
660  if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
661  return NULL;
662 
663  tar_data->currentfile->currpos = 0;
664  }
665  }
666 
667  return tar_data->currentfile;
668 }
669 
670 static ssize_t
671 tar_get_file_size(const char *pathname)
672 {
673  tar_clear_error();
674 
675  /* Currently not used, so not supported */
676  errno = ENOSYS;
677  return -1;
678 }
679 
680 static off_t
682 {
683  Assert(f != NULL);
684  tar_clear_error();
685 
686  return ((TarMethodFile *) f)->currpos;
687 }
688 
689 static int
691 {
692  Assert(f != NULL);
693  tar_clear_error();
694 
695  if (!tar_data->sync)
696  return 0;
697 
698  /*
699  * Always sync the whole tarfile, because that's all we can do. This makes
700  * no sense on compressed files, so just ignore those.
701  */
702  if (tar_data->compression)
703  return 0;
704 
705  return fsync(tar_data->fd);
706 }
707 
708 static int
710 {
711  ssize_t filesize;
712  int padding;
713  TarMethodFile *tf = (TarMethodFile *) f;
714 
715  Assert(f != NULL);
716  tar_clear_error();
717 
718  if (method == CLOSE_UNLINK)
719  {
720  if (tar_data->compression)
721  {
722  tar_set_error("unlink not supported with compression");
723  return -1;
724  }
725 
726  /*
727  * Unlink the file that we just wrote to the tar. We do this by
728  * truncating it to the start of the header. This is safe as we only
729  * allow writing of the very last file.
730  */
731  if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
732  return -1;
733 
734  pg_free(tf->pathname);
735  pg_free(tf);
736  tar_data->currentfile = NULL;
737 
738  return 0;
739  }
740 
741  /*
742  * Pad the file itself with zeroes if necessary. Note that this is
743  * different from the tar format padding -- this is the padding we asked
744  * for when the file was opened.
745  */
746  if (tf->pad_to_size)
747  {
748  if (tar_data->compression)
749  {
750  /*
751  * A compressed tarfile is padded on close since we cannot know
752  * the size of the compressed output until the end.
753  */
754  size_t sizeleft = tf->pad_to_size - tf->currpos;
755 
756  if (sizeleft)
757  {
758  if (!tar_write_padding_data(tf, sizeleft))
759  return -1;
760  }
761  }
762  else
763  {
764  /*
765  * An uncompressed tarfile was padded on creation, so just adjust
766  * the current position as if we seeked to the end.
767  */
768  tf->currpos = tf->pad_to_size;
769  }
770  }
771 
772  /*
773  * Get the size of the file, and pad the current data up to the nearest
774  * 512 byte boundary.
775  */
776  filesize = tar_get_current_pos(f);
777  padding = ((filesize + 511) & ~511) - filesize;
778  if (padding)
779  {
780  char zerobuf[512];
781 
782  MemSet(zerobuf, 0, padding);
783  if (tar_write(f, zerobuf, padding) != padding)
784  return -1;
785  }
786 
787 
788 #ifdef HAVE_LIBZ
789  if (tar_data->compression)
790  {
791  /* Flush the current buffer */
792  if (!tar_write_compressed_data(NULL, 0, true))
793  {
794  errno = EINVAL;
795  return -1;
796  }
797  }
798 #endif
799 
800  /*
801  * Now go back and update the header with the correct filesize and
802  * possibly also renaming the file. We overwrite the entire current header
803  * when done, including the checksum.
804  */
805  print_tar_number(&(tf->header[124]), 12, filesize);
806 
807  if (method == CLOSE_NORMAL)
808 
809  /*
810  * We overwrite it with what it was before if we have no tempname,
811  * since we're going to write the buffer anyway.
812  */
813  strlcpy(&(tf->header[0]), tf->pathname, 100);
814 
815  print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
816  if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
817  return -1;
818  if (!tar_data->compression)
819  {
820  if (write(tar_data->fd, tf->header, 512) != 512)
821  return -1;
822  }
823 #ifdef HAVE_LIBZ
824  else
825  {
826  /* Turn off compression */
827  if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
828  {
829  tar_set_error("could not change compression parameters");
830  return -1;
831  }
832 
833  /* Overwrite the header, assuming the size will be the same */
834  if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
835  return -1;
836 
837  /* Turn compression back on */
838  if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
839  {
840  tar_set_error("could not change compression parameters");
841  return -1;
842  }
843  }
844 #endif
845 
846  /* Move file pointer back down to end, so we can write the next file */
847  if (lseek(tar_data->fd, 0, SEEK_END) < 0)
848  return -1;
849 
850  /* Always fsync on close, so the padding gets fsynced */
851  tar_sync(f);
852 
853  /* Clean up and done */
854  pg_free(tf->pathname);
855  pg_free(tf);
856  tar_data->currentfile = NULL;
857 
858  return 0;
859 }
860 
861 static bool
862 tar_existsfile(const char *pathname)
863 {
864  tar_clear_error();
865  /* We only deal with new tarfiles, so nothing externally created exists */
866  return false;
867 }
868 
869 static bool
871 {
872  char zerobuf[1024];
873 
874  tar_clear_error();
875 
876  if (tar_data->currentfile)
877  {
878  if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
879  return false;
880  }
881 
882  /* A tarfile always ends with two empty blocks */
883  MemSet(zerobuf, 0, sizeof(zerobuf));
884  if (!tar_data->compression)
885  {
886  if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
887  return false;
888  }
889 #ifdef HAVE_LIBZ
890  else
891  {
892  if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
893  return false;
894 
895  /* Also flush all data to make sure the gzip stream is finished */
896  tar_data->zp->next_in = NULL;
897  tar_data->zp->avail_in = 0;
898  while (true)
899  {
900  int r;
901 
902  r = deflate(tar_data->zp, Z_FINISH);
903 
904  if (r == Z_STREAM_ERROR)
905  {
906  tar_set_error("could not compress data");
907  return false;
908  }
909  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
910  {
911  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
912 
913  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
914  return false;
915  }
916  if (r == Z_STREAM_END)
917  break;
918  }
919 
920  if (deflateEnd(tar_data->zp) != Z_OK)
921  {
922  tar_set_error("could not close compression stream");
923  return false;
924  }
925  }
926 #endif
927 
928  /* sync the empty blocks as well, since they're after the last file */
929  if (tar_data->sync)
930  fsync(tar_data->fd);
931 
932  if (close(tar_data->fd) != 0)
933  return false;
934 
935  tar_data->fd = -1;
936 
937  if (tar_data->sync)
938  {
939  if (fsync_fname(tar_data->tarfilename, false, progname) != 0)
940  return false;
941  if (fsync_parent_path(tar_data->tarfilename, progname) != 0)
942  return false;
943  }
944 
945  return true;
946 }
947 
949 CreateWalTarMethod(const char *tarbase, int compression, bool sync)
950 {
951  WalWriteMethod *method;
952  const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
953 
954  method = pg_malloc0(sizeof(WalWriteMethod));
956  method->write = tar_write;
959  method->close = tar_close;
960  method->sync = tar_sync;
961  method->existsfile = tar_existsfile;
962  method->finish = tar_finish;
963  method->getlasterror = tar_getlasterror;
964 
965  tar_data = pg_malloc0(sizeof(TarMethodData));
966  tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
967  sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
968  tar_data->fd = -1;
969  tar_data->compression = compression;
970  tar_data->sync = sync;
971 #ifdef HAVE_LIBZ
972  if (compression)
973  tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
974 #endif
975 
976  return method;
977 }
978 
979 void
981 {
982  pg_free(tar_data->tarfilename);
983 #ifdef HAVE_LIBZ
984  if (tar_data->compression)
985  pg_free(tar_data->zlibOut);
986 #endif
987  pg_free(tar_data);
988 }
WalCloseMethod
Definition: walmethods.h:15
int pg_file_create_mode
Definition: file_perm.c:20
static off_t dir_get_current_pos(Walfile f)
Definition: walmethods.c:203
void * Walfile
Definition: walmethods.h:13
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:47
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define write(a, b, c)
Definition: win32.h:14
char * pathname
Definition: walmethods.c:388
Definition: pgtar.h:17
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:575
static bool tar_finish(void)
Definition: walmethods.c:870
#define MemSet(start, val, len)
Definition: c.h:908
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
struct DirectoryMethodData DirectoryMethodData
const char * progname
Definition: pg_standby.c:37
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1080
int(* sync)(Walfile f)
Definition: walmethods.h:67
WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
Definition: walmethods.c:347
static ssize_t tar_write(Walfile f, const void *buf, size_t count)
Definition: walmethods.c:470
const char *(* getlasterror)(void)
Definition: walmethods.h:78
struct DirectoryMethodFile DirectoryMethodFile
size_t pad_to_size
Definition: walmethods.c:389
static int tar_close(Walfile f, WalCloseMethod method)
Definition: walmethods.c:709
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:62
char header[512]
Definition: walmethods.c:387
static bool dir_finish(void)
Definition: walmethods.c:331
#define fsync(fd)
Definition: win32_port.h:63
#define S_IWUSR
Definition: win32_port.h:274
static const char * tar_getlasterror(void)
Definition: walmethods.c:411
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
#define MAXPGPATH
static char * buf
Definition: pg_test_fsync.c:67
static Walfile tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:525
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:59
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
z_stream * z_streamp
#define tar_set_error(msg)
Definition: walmethods.c:408
static ssize_t dir_write(Walfile f, const void *buf, size_t count)
Definition: walmethods.c:184
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:601
static bool tar_write_padding_data(TarMethodFile *f, size_t bytes)
Definition: walmethods.c:501
static ssize_t tar_get_file_size(const char *pathname)
Definition: walmethods.c:671
#define stat(a, b)
Definition: win32_port.h:266
enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime)
Definition: tar.c:112
void FreeWalTarMethod(void)
Definition: walmethods.c:980
static DirectoryMethodData * dir_data
Definition: walmethods.c:48
static bool tar_existsfile(const char *pathname)
Definition: walmethods.c:862
static int dir_close(Walfile f, WalCloseMethod method)
Definition: walmethods.c:212
static off_t tar_get_current_pos(Walfile f)
Definition: walmethods.c:681
int tarChecksum(char *header)
Definition: tar.c:88
char * tarfilename
Definition: walmethods.c:394
TarMethodFile * currentfile
Definition: walmethods.c:398
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define Assert(condition)
Definition: c.h:699
WalWriteMethod * CreateWalTarMethod(const char *tarbase, int compression, bool sync)
Definition: walmethods.c:949
struct TarMethodFile TarMethodFile
void print_tar_number(char *s, int len, uint64 val)
Definition: tar.c:20
char lasterror[1024]
Definition: walmethods.c:399
#define tar_clear_error()
Definition: walmethods.c:407
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define ZLIB_OUT_SIZE
Definition: walmethods.c:32
bool(* finish)(void)
Definition: walmethods.h:75
static void header(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:208
#define S_IRUSR
Definition: win32_port.h:271
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:50
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:41
static Walfile dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:73
const char * strerror(int errnum)
Definition: strerror.c:19
struct TarMethodData TarMethodData
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:53
static const char * dir_getlasterror(void)
Definition: walmethods.c:66
static ssize_t dir_get_file_size(const char *pathname)
Definition: walmethods.c:300
#define close(a)
Definition: win32.h:12
static int fsync_parent_path(const char *fname, int elevel)
Definition: fd.c:3531
off_t ofs_start
Definition: walmethods.c:385
static TarMethodData * tar_data
Definition: walmethods.c:405
void FreeWalDirectoryMethod(void)
Definition: walmethods.c:371
static bool dir_existsfile(const char *pathname)
Definition: walmethods.c:315
static int tar_sync(Walfile f)
Definition: walmethods.c:690
static int dir_sync(Walfile f)
Definition: walmethods.c:281
#define ftruncate(a, b)
Definition: win32_port.h:60