PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
walmethods.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * walmethods.c - implementations of different ways to write received wal
4  *
5  * NOTE! The caller must ensure that only one method is instantiated in
6  * any given program, and that it's only instantiated once!
7  *
8  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
9  *
10  * IDENTIFICATION
11  * src/bin/pg_basebackup/walmethods.c
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres_fe.h"
16 
17 #include <sys/stat.h>
18 #include <time.h>
19 #include <unistd.h>
20 #ifdef HAVE_LIBZ
21 #include <zlib.h>
22 #endif
23 
24 #include "pgtar.h"
25 #include "common/file_utils.h"
26 
27 #include "receivelog.h"
28 #include "streamutil.h"
29 
30 /* Size of zlib buffer for .tar.gz */
31 #define ZLIB_OUT_SIZE 4096
32 
33 /*-------------------------------------------------------------------------
34  * WalDirectoryMethod - write wal to a directory looking like pg_xlog
35  *-------------------------------------------------------------------------
36  */
37 
38 /*
39  * Global static data for this method
40  */
41 typedef struct DirectoryMethodData
42 {
43  char *basedir;
45  bool sync;
48 
49 /*
50  * Local file handle
51  */
52 typedef struct DirectoryMethodFile
53 {
54  int fd;
55  off_t currpos;
56  char *pathname;
57  char *fullpath;
58  char *temp_suffix;
59 #ifdef HAVE_LIBZ
60  gzFile gzfp;
61 #endif
63 
64 static char *
66 {
67  /* Directory method always sets errno, so just use strerror */
68  return strerror(errno);
69 }
70 
71 static Walfile
72 dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
73 {
74  static char tmppath[MAXPGPATH];
75  int fd;
77 #ifdef HAVE_LIBZ
78  gzFile gzfp = NULL;
79 #endif
80 
81  snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
82  dir_data->basedir, pathname,
83  dir_data->compression > 0 ? ".gz" : "",
84  temp_suffix ? temp_suffix : "");
85 
86  /*
87  * Open a file for non-compressed as well as compressed files. Tracking
88  * the file descriptor is important for dir_sync() method as gzflush()
89  * does not do any system calls to fsync() to make changes permanent on
90  * disk.
91  */
92  fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
93  if (fd < 0)
94  return NULL;
95 
96 #ifdef HAVE_LIBZ
97  if (dir_data->compression > 0)
98  {
99  gzfp = gzdopen(fd, "wb");
100  if (gzfp == NULL)
101  {
102  close(fd);
103  return NULL;
104  }
105 
106  if (gzsetparams(gzfp, dir_data->compression,
107  Z_DEFAULT_STRATEGY) != Z_OK)
108  {
109  gzclose(gzfp);
110  return NULL;
111  }
112  }
113 #endif
114 
115  /* Do pre-padding on non-compressed files */
116  if (pad_to_size && dir_data->compression == 0)
117  {
118  char *zerobuf;
119  int bytes;
120 
121  zerobuf = pg_malloc0(XLOG_BLCKSZ);
122  for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
123  {
124  if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
125  {
126  int save_errno = errno;
127 
128  pg_free(zerobuf);
129  close(fd);
130  errno = save_errno;
131  return NULL;
132  }
133  }
134  pg_free(zerobuf);
135 
136  if (lseek(fd, 0, SEEK_SET) != 0)
137  {
138  int save_errno = errno;
139 
140  close(fd);
141  errno = save_errno;
142  return NULL;
143  }
144  }
145 
146  /*
147  * fsync WAL file and containing directory, to ensure the file is
148  * persistently created and zeroed (if padded). That's particularly
149  * important when using synchronous mode, where the file is modified and
150  * fsynced in-place, without a directory fsync.
151  */
152  if (dir_data->sync)
153  {
154  if (fsync_fname(tmppath, false, progname) != 0 ||
155  fsync_parent_path(tmppath, progname) != 0)
156  {
157 #ifdef HAVE_LIBZ
158  if (dir_data->compression > 0)
159  gzclose(gzfp);
160  else
161 #endif
162  close(fd);
163  return NULL;
164  }
165  }
166 
167  f = pg_malloc0(sizeof(DirectoryMethodFile));
168 #ifdef HAVE_LIBZ
169  if (dir_data->compression > 0)
170  f->gzfp = gzfp;
171 #endif
172  f->fd = fd;
173  f->currpos = 0;
174  f->pathname = pg_strdup(pathname);
175  f->fullpath = pg_strdup(tmppath);
176  if (temp_suffix)
177  f->temp_suffix = pg_strdup(temp_suffix);
178 
179  return f;
180 }
181 
182 static ssize_t
183 dir_write(Walfile f, const void *buf, size_t count)
184 {
185  ssize_t r;
187 
188  Assert(f != NULL);
189 
190 #ifdef HAVE_LIBZ
191  if (dir_data->compression > 0)
192  r = (ssize_t) gzwrite(df->gzfp, buf, count);
193  else
194 #endif
195  r = write(df->fd, buf, count);
196  if (r > 0)
197  df->currpos += r;
198  return r;
199 }
200 
201 static off_t
203 {
204  Assert(f != NULL);
205 
206  /* Use a cached value to prevent lots of reseeks */
207  return ((DirectoryMethodFile *) f)->currpos;
208 }
209 
210 static int
212 {
213  int r;
215  static char tmppath[MAXPGPATH];
216  static char tmppath2[MAXPGPATH];
217 
218  Assert(f != NULL);
219 
220 #ifdef HAVE_LIBZ
221  if (dir_data->compression > 0)
222  r = gzclose(df->gzfp);
223  else
224 #endif
225  r = close(df->fd);
226 
227  if (r == 0)
228  {
229  /* Build path to the current version of the file */
230  if (method == CLOSE_NORMAL && df->temp_suffix)
231  {
232  /*
233  * If we have a temp prefix, normal operation is to rename the
234  * file.
235  */
236  snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
237  dir_data->basedir, df->pathname,
238  dir_data->compression > 0 ? ".gz" : "",
239  df->temp_suffix);
240  snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
241  dir_data->basedir, df->pathname,
242  dir_data->compression > 0 ? ".gz" : "");
243  r = durable_rename(tmppath, tmppath2, progname);
244  }
245  else if (method == CLOSE_UNLINK)
246  {
247  /* Unlink the file once it's closed */
248  snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
249  dir_data->basedir, df->pathname,
250  dir_data->compression > 0 ? ".gz" : "",
251  df->temp_suffix ? df->temp_suffix : "");
252  r = unlink(tmppath);
253  }
254  else
255  {
256  /*
257  * Else either CLOSE_NORMAL and no temp suffix, or
258  * CLOSE_NO_RENAME. In this case, fsync the file and containing
259  * directory if sync mode is requested.
260  */
261  if (dir_data->sync)
262  {
263  r = fsync_fname(df->fullpath, false, progname);
264  if (r == 0)
266  }
267  }
268  }
269 
270  pg_free(df->pathname);
271  pg_free(df->fullpath);
272  if (df->temp_suffix)
273  pg_free(df->temp_suffix);
274  pg_free(df);
275 
276  return r;
277 }
278 
279 static int
281 {
282  Assert(f != NULL);
283 
284  if (!dir_data->sync)
285  return 0;
286 
287 #ifdef HAVE_LIBZ
288  if (dir_data->compression > 0)
289  {
290  if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
291  return -1;
292  }
293 #endif
294 
295  return fsync(((DirectoryMethodFile *) f)->fd);
296 }
297 
298 static ssize_t
299 dir_get_file_size(const char *pathname)
300 {
301  struct stat statbuf;
302  static char tmppath[MAXPGPATH];
303 
304  snprintf(tmppath, sizeof(tmppath), "%s/%s",
305  dir_data->basedir, pathname);
306 
307  if (stat(tmppath, &statbuf) != 0)
308  return -1;
309 
310  return statbuf.st_size;
311 }
312 
313 static bool
314 dir_existsfile(const char *pathname)
315 {
316  static char tmppath[MAXPGPATH];
317  int fd;
318 
319  snprintf(tmppath, sizeof(tmppath), "%s/%s",
320  dir_data->basedir, pathname);
321 
322  fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
323  if (fd < 0)
324  return false;
325  close(fd);
326  return true;
327 }
328 
329 static bool
331 {
332  if (dir_data->sync)
333  {
334  /*
335  * Files are fsynced when they are closed, but we need to fsync the
336  * directory entry here as well.
337  */
338  if (fsync_fname(dir_data->basedir, true, progname) != 0)
339  return false;
340  }
341  return true;
342 }
343 
344 
346 CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
347 {
348  WalWriteMethod *method;
349 
350  method = pg_malloc0(sizeof(WalWriteMethod));
352  method->write = dir_write;
355  method->close = dir_close;
356  method->sync = dir_sync;
357  method->existsfile = dir_existsfile;
358  method->finish = dir_finish;
359  method->getlasterror = dir_getlasterror;
360 
361  dir_data = pg_malloc0(sizeof(DirectoryMethodData));
362  dir_data->compression = compression;
363  dir_data->basedir = pg_strdup(basedir);
364  dir_data->sync = sync;
365 
366  return method;
367 }
368 
369 void
371 {
372  pg_free(dir_data->basedir);
373  pg_free(dir_data);
374 }
375 
376 
377 /*-------------------------------------------------------------------------
378  * WalTarMethod - write wal to a tar file containing pg_xlog contents
379  *-------------------------------------------------------------------------
380  */
381 
382 typedef struct TarMethodFile
383 {
384  off_t ofs_start; /* Where does the *header* for this file start */
385  off_t currpos;
386  char header[512];
387  char *pathname;
388  size_t pad_to_size;
389 } TarMethodFile;
390 
391 typedef struct TarMethodData
392 {
393  char *tarfilename;
394  int fd;
396  bool sync;
398  char lasterror[1024];
399 #ifdef HAVE_LIBZ
400  z_streamp zp;
401  void *zlibOut;
402 #endif
403 } TarMethodData;
405 
406 #define tar_clear_error() tar_data->lasterror[0] = '\0'
407 #define tar_set_error(msg) strlcpy(tar_data->lasterror, msg, sizeof(tar_data->lasterror))
408 
409 static char *
411 {
412  /*
413  * If a custom error is set, return that one. Otherwise, assume errno is
414  * set and return that one.
415  */
416  if (tar_data->lasterror[0])
417  return tar_data->lasterror;
418  return strerror(errno);
419 }
420 
421 #ifdef HAVE_LIBZ
422 static bool
423 tar_write_compressed_data(void *buf, size_t count, bool flush)
424 {
425  tar_data->zp->next_in = buf;
426  tar_data->zp->avail_in = count;
427 
428  while (tar_data->zp->avail_in || flush)
429  {
430  int r;
431 
432  r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
433  if (r == Z_STREAM_ERROR)
434  {
435  tar_set_error("deflate failed");
436  return false;
437  }
438 
439  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
440  {
441  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
442 
443  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
444  return false;
445 
446  tar_data->zp->next_out = tar_data->zlibOut;
447  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
448  }
449 
450  if (r == Z_STREAM_END)
451  break;
452  }
453 
454  if (flush)
455  {
456  /* Reset the stream for writing */
457  if (deflateReset(tar_data->zp) != Z_OK)
458  {
459  tar_set_error("deflateReset failed");
460  return false;
461  }
462  }
463 
464  return true;
465 }
466 #endif
467 
468 static ssize_t
469 tar_write(Walfile f, const void *buf, size_t count)
470 {
471  ssize_t r;
472 
473  Assert(f != NULL);
474  tar_clear_error();
475 
476  /* Tarfile will always be positioned at the end */
477  if (!tar_data->compression)
478  {
479  r = write(tar_data->fd, buf, count);
480  if (r > 0)
481  ((TarMethodFile *) f)->currpos += r;
482  return r;
483  }
484 #ifdef HAVE_LIBZ
485  else
486  {
487  if (!tar_write_compressed_data((void *) buf, count, false))
488  return -1;
489  ((TarMethodFile *) f)->currpos += count;
490  return count;
491  }
492 #else
493  else
494  /* Can't happen - compression enabled with no libz */
495  return -1;
496 #endif
497 }
498 
499 static bool
501 {
502  char *zerobuf = pg_malloc0(XLOG_BLCKSZ);
503  size_t bytesleft = bytes;
504 
505  while (bytesleft)
506  {
507  size_t bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft;
508 
509  ssize_t r = tar_write(f, zerobuf, bytestowrite);
510 
511  if (r < 0)
512  {
513  pg_free(zerobuf);
514  return false;
515  }
516  bytesleft -= r;
517  }
518 
519  pg_free(zerobuf);
520  return true;
521 }
522 
523 static Walfile
524 tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
525 {
526  int save_errno;
527  static char tmppath[MAXPGPATH];
528 
529  tar_clear_error();
530 
531  if (tar_data->fd < 0)
532  {
533  /*
534  * We open the tar file only when we first try to write to it.
535  */
536  tar_data->fd = open(tar_data->tarfilename,
537  O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
538  if (tar_data->fd < 0)
539  return NULL;
540 
541 #ifdef HAVE_LIBZ
542  if (tar_data->compression)
543  {
544  tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
545  tar_data->zp->zalloc = Z_NULL;
546  tar_data->zp->zfree = Z_NULL;
547  tar_data->zp->opaque = Z_NULL;
548  tar_data->zp->next_out = tar_data->zlibOut;
549  tar_data->zp->avail_out = ZLIB_OUT_SIZE;
550 
551  /*
552  * Initialize deflation library. Adding the magic value 16 to the
553  * default 15 for the windowBits parameter makes the output be
554  * gzip instead of zlib.
555  */
556  if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
557  {
558  pg_free(tar_data->zp);
559  tar_data->zp = NULL;
560  tar_set_error("deflateInit2 failed");
561  return NULL;
562  }
563  }
564 #endif
565 
566  /* There's no tar header itself, the file starts with regular files */
567  }
568 
569  Assert(tar_data->currentfile == NULL);
570  if (tar_data->currentfile != NULL)
571  {
572  tar_set_error("implementation error: tar files can't have more than one open file\n");
573  return NULL;
574  }
575 
576  tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
577 
578  snprintf(tmppath, sizeof(tmppath), "%s%s",
579  pathname, temp_suffix ? temp_suffix : "");
580 
581  /* Create a header with size set to 0 - we will fill out the size on close */
582  if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
583  {
584  pg_free(tar_data->currentfile);
585  tar_data->currentfile = NULL;
586  tar_set_error("could not create tar header");
587  return NULL;
588  }
589 
590 #ifdef HAVE_LIBZ
591  if (tar_data->compression)
592  {
593  /* Flush existing data */
594  if (!tar_write_compressed_data(NULL, 0, true))
595  return NULL;
596 
597  /* Turn off compression for header */
598  if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
599  {
600  tar_set_error("deflateParams failed");
601  return NULL;
602  }
603  }
604 #endif
605 
606  tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
607  if (tar_data->currentfile->ofs_start == -1)
608  {
609  save_errno = errno;
610  pg_free(tar_data->currentfile);
611  tar_data->currentfile = NULL;
612  errno = save_errno;
613  return NULL;
614  }
615  tar_data->currentfile->currpos = 0;
616 
617  if (!tar_data->compression)
618  {
619  if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
620  {
621  save_errno = errno;
622  pg_free(tar_data->currentfile);
623  tar_data->currentfile = NULL;
624  errno = save_errno;
625  return NULL;
626  }
627  }
628 #ifdef HAVE_LIBZ
629  else
630  {
631  /* Write header through the zlib APIs but with no compression */
632  if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
633  return NULL;
634 
635  /* Re-enable compression for the rest of the file */
636  if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
637  {
638  tar_set_error("deflateParams failed");
639  return NULL;
640  }
641  }
642 #endif
643 
644  tar_data->currentfile->pathname = pg_strdup(pathname);
645 
646  /*
647  * Uncompressed files are padded on creation, but for compression we can't
648  * do that
649  */
650  if (pad_to_size)
651  {
652  tar_data->currentfile->pad_to_size = pad_to_size;
653  if (!tar_data->compression)
654  {
655  /* Uncompressed, so pad now */
656  tar_write_padding_data(tar_data->currentfile, pad_to_size);
657  /* Seek back to start */
658  if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
659  return NULL;
660 
661  tar_data->currentfile->currpos = 0;
662  }
663  }
664 
665  return tar_data->currentfile;
666 }
667 
668 static ssize_t
669 tar_get_file_size(const char *pathname)
670 {
671  tar_clear_error();
672 
673  /* Currently not used, so not supported */
674  errno = ENOSYS;
675  return -1;
676 }
677 
678 static off_t
680 {
681  Assert(f != NULL);
682  tar_clear_error();
683 
684  return ((TarMethodFile *) f)->currpos;
685 }
686 
687 static int
689 {
690  Assert(f != NULL);
691  tar_clear_error();
692 
693  if (!tar_data->sync)
694  return 0;
695 
696  /*
697  * Always sync the whole tarfile, because that's all we can do. This makes
698  * no sense on compressed files, so just ignore those.
699  */
700  if (tar_data->compression)
701  return 0;
702 
703  return fsync(tar_data->fd);
704 }
705 
706 static int
708 {
709  ssize_t filesize;
710  int padding;
711  TarMethodFile *tf = (TarMethodFile *) f;
712 
713  Assert(f != NULL);
714  tar_clear_error();
715 
716  if (method == CLOSE_UNLINK)
717  {
718  if (tar_data->compression)
719  {
720  tar_set_error("unlink not supported with compression");
721  return -1;
722  }
723 
724  /*
725  * Unlink the file that we just wrote to the tar. We do this by
726  * truncating it to the start of the header. This is safe as we only
727  * allow writing of the very last file.
728  */
729  if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
730  return -1;
731 
732  pg_free(tf->pathname);
733  pg_free(tf);
734  tar_data->currentfile = NULL;
735 
736  return 0;
737  }
738 
739  /*
740  * Pad the file itself with zeroes if necessary. Note that this is
741  * different from the tar format padding -- this is the padding we asked
742  * for when the file was opened.
743  */
744  if (tf->pad_to_size)
745  {
746  if (tar_data->compression)
747  {
748  /*
749  * A compressed tarfile is padded on close since we cannot know
750  * the size of the compressed output until the end.
751  */
752  size_t sizeleft = tf->pad_to_size - tf->currpos;
753 
754  if (sizeleft)
755  {
756  if (!tar_write_padding_data(tf, sizeleft))
757  return -1;
758  }
759  }
760  else
761  {
762  /*
763  * An uncompressed tarfile was padded on creation, so just adjust
764  * the current position as if we seeked to the end.
765  */
766  tf->currpos = tf->pad_to_size;
767  }
768  }
769 
770  /*
771  * Get the size of the file, and pad the current data up to the nearest
772  * 512 byte boundary.
773  */
774  filesize = tar_get_current_pos(f);
775  padding = ((filesize + 511) & ~511) - filesize;
776  if (padding)
777  {
778  char zerobuf[512];
779 
780  MemSet(zerobuf, 0, padding);
781  if (tar_write(f, zerobuf, padding) != padding)
782  return -1;
783  }
784 
785 
786 #ifdef HAVE_LIBZ
787  if (tar_data->compression)
788  {
789  /* Flush the current buffer */
790  if (!tar_write_compressed_data(NULL, 0, true))
791  {
792  errno = EINVAL;
793  return -1;
794  }
795  }
796 #endif
797 
798  /*
799  * Now go back and update the header with the correct filesize and
800  * possibly also renaming the file. We overwrite the entire current header
801  * when done, including the checksum.
802  */
803  print_tar_number(&(tf->header[124]), 12, filesize);
804 
805  if (method == CLOSE_NORMAL)
806 
807  /*
808  * We overwrite it with what it was before if we have no tempname,
809  * since we're going to write the buffer anyway.
810  */
811  strlcpy(&(tf->header[0]), tf->pathname, 100);
812 
813  print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
814  if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
815  return -1;
816  if (!tar_data->compression)
817  {
818  if (write(tar_data->fd, tf->header, 512) != 512)
819  return -1;
820  }
821 #ifdef HAVE_LIBZ
822  else
823  {
824  /* Turn off compression */
825  if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
826  {
827  tar_set_error("deflateParams failed");
828  return -1;
829  }
830 
831  /* Overwrite the header, assuming the size will be the same */
832  if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
833  return -1;
834 
835  /* Turn compression back on */
836  if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
837  {
838  tar_set_error("deflateParams failed");
839  return -1;
840  }
841  }
842 #endif
843 
844  /* Move file pointer back down to end, so we can write the next file */
845  if (lseek(tar_data->fd, 0, SEEK_END) < 0)
846  return -1;
847 
848  /* Always fsync on close, so the padding gets fsynced */
849  tar_sync(f);
850 
851  /* Clean up and done */
852  pg_free(tf->pathname);
853  pg_free(tf);
854  tar_data->currentfile = NULL;
855 
856  return 0;
857 }
858 
859 static bool
860 tar_existsfile(const char *pathname)
861 {
862  tar_clear_error();
863  /* We only deal with new tarfiles, so nothing externally created exists */
864  return false;
865 }
866 
867 static bool
869 {
870  char zerobuf[1024];
871 
872  tar_clear_error();
873 
874  if (tar_data->currentfile)
875  {
876  if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
877  return false;
878  }
879 
880  /* A tarfile always ends with two empty blocks */
881  MemSet(zerobuf, 0, sizeof(zerobuf));
882  if (!tar_data->compression)
883  {
884  if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
885  return false;
886  }
887 #ifdef HAVE_LIBZ
888  else
889  {
890  if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
891  return false;
892 
893  /* Also flush all data to make sure the gzip stream is finished */
894  tar_data->zp->next_in = NULL;
895  tar_data->zp->avail_in = 0;
896  while (true)
897  {
898  int r;
899 
900  r = deflate(tar_data->zp, Z_FINISH);
901 
902  if (r == Z_STREAM_ERROR)
903  {
904  tar_set_error("deflate failed");
905  return false;
906  }
907  if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
908  {
909  size_t len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
910 
911  if (write(tar_data->fd, tar_data->zlibOut, len) != len)
912  return false;
913  }
914  if (r == Z_STREAM_END)
915  break;
916  }
917 
918  if (deflateEnd(tar_data->zp) != Z_OK)
919  {
920  tar_set_error("deflateEnd failed");
921  return false;
922  }
923  }
924 #endif
925 
926  /* sync the empty blocks as well, since they're after the last file */
927  if (tar_data->sync)
928  fsync(tar_data->fd);
929 
930  if (close(tar_data->fd) != 0)
931  return false;
932 
933  tar_data->fd = -1;
934 
935  if (tar_data->sync)
936  {
937  if (fsync_fname(tar_data->tarfilename, false, progname) != 0)
938  return false;
939  if (fsync_parent_path(tar_data->tarfilename, progname) != 0)
940  return false;
941  }
942 
943  return true;
944 }
945 
947 CreateWalTarMethod(const char *tarbase, int compression, bool sync)
948 {
949  WalWriteMethod *method;
950  const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
951 
952  method = pg_malloc0(sizeof(WalWriteMethod));
954  method->write = tar_write;
957  method->close = tar_close;
958  method->sync = tar_sync;
959  method->existsfile = tar_existsfile;
960  method->finish = tar_finish;
961  method->getlasterror = tar_getlasterror;
962 
963  tar_data = pg_malloc0(sizeof(TarMethodData));
964  tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
965  sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
966  tar_data->fd = -1;
967  tar_data->compression = compression;
968  tar_data->sync = sync;
969 #ifdef HAVE_LIBZ
970  if (compression)
971  tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
972 #endif
973 
974  return method;
975 }
976 
977 void
979 {
980  pg_free(tar_data->tarfilename);
981 #ifdef HAVE_LIBZ
982  if (tar_data->compression)
983  pg_free(tar_data->zlibOut);
984 #endif
985  pg_free(tar_data);
986 }
WalCloseMethod
Definition: walmethods.h:15
static off_t dir_get_current_pos(Walfile f)
Definition: walmethods.c:202
char *(* getlasterror)(void)
Definition: walmethods.h:34
void * Walfile
Definition: walmethods.h:13
void * pg_malloc(size_t size)
Definition: fe_memutils.c:47
#define write(a, b, c)
Definition: win32.h:19
char * pathname
Definition: walmethods.c:387
struct TarMethodData TarMethodData
Definition: pgtar.h:17
bool(* existsfile)(const char *pathname)
Definition: walmethods.h:27
void fsync_fname(const char *fname, bool isdir)
Definition: fd.c:567
static bool tar_finish(void)
Definition: walmethods.c:868
#define MemSet(start, val, len)
Definition: c.h:857
int snprintf(char *str, size_t count, const char *fmt,...) pg_attribute_printf(3
const char * progname
Definition: pg_standby.c:37
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PG_BINARY
Definition: c.h:1038
static char * basedir
Definition: pg_basebackup.c:77
WalWriteMethod * CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
Definition: walmethods.c:346
static ssize_t tar_write(Walfile f, const void *buf, size_t count)
Definition: walmethods.c:469
bool(* finish)(void)
Definition: walmethods.h:33
ssize_t(* get_file_size)(const char *pathname)
Definition: walmethods.h:28
size_t pad_to_size
Definition: walmethods.c:388
static int tar_close(Walfile f, WalCloseMethod method)
Definition: walmethods.c:707
static char * tar_getlasterror(void)
Definition: walmethods.c:410
char header[512]
Definition: walmethods.c:386
static bool dir_finish(void)
Definition: walmethods.c:330
void * pg_malloc0(size_t size)
Definition: fe_memutils.c:53
#define MAXPGPATH
struct TarMethodFile TarMethodFile
static char * buf
Definition: pg_test_fsync.c:65
static Walfile tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:524
off_t(* get_current_pos)(Walfile f)
Definition: walmethods.h:31
char * pg_strdup(const char *in)
Definition: fe_memutils.c:85
z_stream * z_streamp
#define tar_set_error(msg)
Definition: walmethods.c:407
static ssize_t dir_write(Walfile f, const void *buf, size_t count)
Definition: walmethods.c:183
static char * dir_getlasterror(void)
Definition: walmethods.c:65
int unlink(const char *filename)
#define fsync(fd)
Definition: win32.h:70
int durable_rename(const char *oldfile, const char *newfile, int elevel)
Definition: fd.c:593
static bool tar_write_padding_data(TarMethodFile *f, size_t bytes)
Definition: walmethods.c:500
static ssize_t tar_get_file_size(const char *pathname)
Definition: walmethods.c:669
int(* sync)(Walfile f)
Definition: walmethods.h:32
enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget, pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime)
Definition: tar.c:112
void FreeWalTarMethod(void)
Definition: walmethods.c:978
static DirectoryMethodData * dir_data
Definition: walmethods.c:47
static bool tar_existsfile(const char *pathname)
Definition: walmethods.c:860
static int dir_close(Walfile f, WalCloseMethod method)
Definition: walmethods.c:211
ssize_t(* write)(Walfile f, const void *buf, size_t count)
Definition: walmethods.h:30
static off_t tar_get_current_pos(Walfile f)
Definition: walmethods.c:679
int tarChecksum(char *header)
Definition: tar.c:88
char * tarfilename
Definition: walmethods.c:393
TarMethodFile * currentfile
Definition: walmethods.c:397
size_t strlcpy(char *dst, const char *src, size_t siz)
Definition: strlcpy.c:45
#define ftruncate(a, b)
Definition: win32.h:67
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
WalWriteMethod * CreateWalTarMethod(const char *tarbase, int compression, bool sync)
Definition: walmethods.c:947
Walfile(* open_for_write)(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.h:25
void print_tar_number(char *s, int len, uint64 val)
Definition: tar.c:20
char lasterror[1024]
Definition: walmethods.c:398
#define tar_clear_error()
Definition: walmethods.c:406
void pg_free(void *ptr)
Definition: fe_memutils.c:105
#define ZLIB_OUT_SIZE
Definition: walmethods.c:31
static void header(const char *fmt,...) pg_attribute_printf(1
Definition: pg_regress.c:207
static Walfile dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
Definition: walmethods.c:72
const char * strerror(int errnum)
Definition: strerror.c:19
static ssize_t dir_get_file_size(const char *pathname)
Definition: walmethods.c:299
#define close(a)
Definition: win32.h:17
static int fsync_parent_path(const char *fname, int elevel)
Definition: fd.c:3196
off_t ofs_start
Definition: walmethods.c:384
static TarMethodData * tar_data
Definition: walmethods.c:404
void FreeWalDirectoryMethod(void)
Definition: walmethods.c:370
static bool dir_existsfile(const char *pathname)
Definition: walmethods.c:314
static int tar_sync(Walfile f)
Definition: walmethods.c:688
static int dir_sync(Walfile f)
Definition: walmethods.c:280
struct DirectoryMethodData DirectoryMethodData
int(* close)(Walfile f, WalCloseMethod method)
Definition: walmethods.h:26
struct DirectoryMethodFile DirectoryMethodFile