61 #define PGARCH_AUTOWAKE_INTERVAL 60
63 #define PGARCH_RESTART_INTERVAL 10
70 #define NUM_ARCHIVE_RETRIES 3
76 #define NUM_ORPHAN_CLEANUP_RETRIES 3
81 #define NUM_FILES_PER_DIRECTORY_SCAN 64
199 static time_t last_pgarch_start_time = 0;
200 time_t curtime = time(NULL);
206 if ((
unsigned int) (curtime - last_pgarch_start_time) <
210 last_pgarch_start_time = curtime;
219 Assert(startup_data_len == 0);
338 time_t curtime = time(NULL);
361 WAIT_EVENT_ARCHIVER_MAIN);
396 int failures_orphan = 0;
400 struct stat stat_buf;
428 (
errmsg(
"\"archive_mode\" enabled, yet archiving is not configured"),
444 if (
stat(pathname, &stat_buf) != 0 && errno == ENOENT)
449 if (unlink(xlogready) == 0)
452 (
errmsg(
"removed orphan archive status file \"%s\"",
462 (
errmsg(
"removal of orphan archive status file \"%s\" failed too many times, will try again later",
498 (
errmsg(
"archiving write-ahead log file \"%s\" failed too many times, will try again later",
518 sigjmp_buf local_sigjmp_buf;
527 snprintf(activitymsg,
sizeof(activitymsg),
"archiving %s", xlog);
547 if (sigsetjmp(local_sigjmp_buf, 1) != 0)
612 snprintf(activitymsg,
sizeof(activitymsg),
"last was %s", xlog);
614 snprintf(activitymsg,
sizeof(activitymsg),
"failed on %s", xlog);
672 if (
stat(status_file, &st) == 0)
674 strcpy(xlog, arch_file);
677 else if (errno != ENOENT)
680 errmsg(
"could not stat file \"%s\": %m", status_file)));
693 while ((rlde =
ReadDir(rldir, XLogArchiveStatusDir)) != NULL)
695 int basenamelen = (int) strlen(rlde->
d_name) - 6;
709 if (strcmp(rlde->
d_name + basenamelen,
".ready") != 0)
713 memcpy(basename, rlde->
d_name, basenamelen);
714 basename[basenamelen] =
'\0';
723 strcpy(arch_file, basename);
738 strcpy(arch_file, basename);
787 if (a_history != b_history)
788 return a_history ? -1 : 1;
791 return strcmp(a_str, b_str);
831 if (rename(rlogready, rlogdone) < 0)
834 errmsg(
"could not rename file \"%s\" to \"%s\": %m",
835 rlogready, rlogdone)));
871 bool archiveLibChanged;
878 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
879 errmsg(
"both \"archive_command\" and \"archive_library\" set"),
880 errdetail(
"Only one of \"archive_command\", \"archive_library\" may be set.")));
885 if (archiveLibChanged)
897 (
errmsg(
"restarting archiver process because value of "
898 "\"archive_library\" was changed")));
917 (
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
918 errmsg(
"both \"archive_command\" and \"archive_library\" set"),
919 errdetail(
"Only one of \"archive_command\", \"archive_library\" may be set.")));
930 "_PG_archive_module_init",
false, NULL);
932 if (archive_init == NULL)
934 (
errmsg(
"archive modules have to define the symbol %s",
"_PG_archive_module_init")));
940 (
errmsg(
"archive modules must register an archive callback")));
const ArchiveModuleCallbacks *(* ArchiveModuleInit)(void)
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static void pg_atomic_write_membarrier_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
static uint32 pg_atomic_exchange_u32(volatile pg_atomic_uint32 *ptr, uint32 newval)
void AuxiliaryProcessMainCommon(void)
void binaryheap_build(binaryheap *heap)
void binaryheap_reset(binaryheap *heap)
bh_node_type binaryheap_first(binaryheap *heap)
void binaryheap_add(binaryheap *heap, bh_node_type d)
bh_node_type binaryheap_remove_first(binaryheap *heap)
binaryheap * binaryheap_allocate(int capacity, binaryheap_comparator compare, void *arg)
void binaryheap_add_unordered(binaryheap *heap, bh_node_type d)
#define Assert(condition)
#define MemSet(start, val, len)
bool ConditionVariableCancelSleep(void)
void * load_external_function(const char *filename, const char *funcname, bool signalNotFound, void **filehandle)
void AtEOXact_HashTables(bool isCommit)
void EmitErrorReport(void)
int errdetail_internal(const char *fmt,...)
int errcode_for_file_access(void)
int errdetail(const char *fmt,...)
ErrorContextCallback * error_context_stack
void FlushErrorState(void)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
sigjmp_buf * PG_exception_stack
#define ereport(elevel,...)
struct dirent * ReadDir(DIR *dir, const char *dirname)
void AtEOXact_Files(bool isCommit)
DIR * AllocateDir(const char *dirname)
volatile sig_atomic_t LogMemoryContextPending
volatile sig_atomic_t ProcSignalBarrierPending
void ProcessConfigFile(GucContext context)
void SignalHandlerForShutdownRequest(SIGNAL_ARGS)
volatile sig_atomic_t ShutdownRequestPending
volatile sig_atomic_t ConfigReloadPending
void SignalHandlerForConfigReload(SIGNAL_ARGS)
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
void before_shmem_exit(pg_on_exit_callback function, Datum arg)
void SetLatch(Latch *latch)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH
void LWLockReleaseAll(void)
void MemoryContextReset(MemoryContext context)
char * pstrdup(const char *in)
void pfree(void *pointer)
MemoryContext TopMemoryContext
void * palloc0(Size size)
void ProcessLogMemoryContextInterrupt(void)
#define AllocSetContextCreate
#define ALLOCSET_DEFAULT_SIZES
#define RESUME_INTERRUPTS()
#define HOLD_INTERRUPTS()
BackendType MyBackendType
static volatile sig_atomic_t time_to_stop
static bool pgarch_archiveXlog(char *xlog)
static void pgarch_die(int code, Datum arg)
#define PGARCH_RESTART_INTERVAL
bool PgArchCanRestart(void)
static bool pgarch_readyXlog(char *xlog)
static PgArchData * PgArch
char * arch_module_check_errdetail_string
static void pgarch_MainLoop(void)
#define NUM_ARCHIVE_RETRIES
struct PgArchData PgArchData
static void pgarch_waken_stop(SIGNAL_ARGS)
static volatile sig_atomic_t ready_to_stop
static struct arch_files_state * arch_files
char * XLogArchiveLibrary
Size PgArchShmemSize(void)
static void LoadArchiveLibrary(void)
static void pgarch_ArchiverCopyLoop(void)
static int ready_file_comparator(Datum a, Datum b, void *arg)
#define NUM_FILES_PER_DIRECTORY_SCAN
void PgArchForceDirScan(void)
static void HandlePgArchInterrupts(void)
static const ArchiveModuleCallbacks * ArchiveCallbacks
static ArchiveModuleState * archive_module_state
void PgArchShmemInit(void)
static void pgarch_call_module_shutdown_cb(int code, Datum arg)
void PgArchiverMain(char *startup_data, size_t startup_data_len)
static void pgarch_archiveDone(char *xlog)
#define NUM_ORPHAN_CLEANUP_RETRIES
static MemoryContext archive_context
#define PGARCH_AUTOWAKE_INTERVAL
static time_t last_sigterm_time
void pgstat_report_archiver(const char *xlog, bool failed)
#define PostmasterIsAlive()
pqsigfunc pqsignal(int signo, pqsigfunc func)
static char * DatumGetCString(Datum X)
static Datum CStringGetDatum(const char *X)
#define INVALID_PROC_NUMBER
void ProcessProcSignalBarrier(void)
void procsignal_sigusr1_handler(SIGNAL_ARGS)
static void set_ps_display(const char *activity)
MemoryContextSwitchTo(old_ctx)
void ReleaseAuxProcessResources(bool isCommit)
const ArchiveModuleCallbacks * shell_archive_init(void)
Size add_size(Size s1, Size s2)
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
void pg_usleep(long microsec)
static pg_noinline void Size size
ArchiveFileCB archive_file_cb
ArchiveShutdownCB shutdown_cb
ArchiveCheckConfiguredCB check_configured_cb
ArchiveStartupCB startup_cb
pg_atomic_uint32 force_dir_scan
char arch_filenames[NUM_FILES_PER_DIRECTORY_SCAN][MAX_XFN_CHARS+1]
char * arch_files[NUM_FILES_PER_DIRECTORY_SCAN]
void disable_all_timeouts(bool keep_indicators)
static void pgstat_report_wait_end(void)
char * XLogArchiveCommand
#define XLogArchivingActive()
static bool IsTLHistoryFileName(const char *fname)
static void StatusFilePath(char *path, const char *xlog, const char *suffix)