Definition at line 452 of file receivelog.c.
453{
454 char query[128];
455 char slotcmd[128];
458
459
460
461
462
464 return false;
465
466
467
468
469
470
471
472
473
474
475
476
478 {
481 }
482 else
483 {
486 else
488 slotcmd[0] = 0;
489 }
490
492 {
493 char *sysidentifier = NULL;
495
496
497
498
500 {
502 return false;
503 }
504
506 {
507 pg_log_error(
"system identifier does not match between base backup and streaming connection");
509 return false;
510 }
512
514 {
515 pg_log_error(
"starting timeline %u is not present in the server",
517 return false;
518 }
519 }
520
521
522
523
524
526
527 while (1)
528 {
529
530
531
532
533
534
536 {
540 {
541
542 pg_log_error(
"could not send replication command \"%s\": %s",
545 return false;
546 }
547
548
549
550
551
553 {
554 pg_log_warning(
"unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
556 }
557
558
562
564 }
565
566
567
568
569
571 return true;
572
573
574 snprintf(query,
sizeof(query),
"START_REPLICATION %s%X/%X TIMELINE %u",
575 slotcmd,
580 {
581 pg_log_error(
"could not send replication command \"%s\": %s",
584 return false;
585 }
587
588
592
593
594
595
596
597
598
599
600
601
602
604 {
605
606
607
608
609
610
611
612
613
615 bool parsed;
616
619 if (!parsed)
621
622
623 if (newtimeline <= stream->timeline)
624 {
625 pg_log_error(
"server reported unexpected next timeline %u, following timeline %u",
628 }
630 {
631 pg_log_error(
"server stopped streaming timeline %u at %X/%X, but reported next timeline %u to begin at %X/%X",
635 }
636
637
640 {
641 pg_log_error(
"unexpected termination of replication stream: %s",
645 }
647
648
649
650
651
655 continue;
656 }
658 {
660
661
662
663
664
665
666
668 return true;
669 else
670 {
671 pg_log_error(
"replication stream was terminated before stop point");
673 }
674 }
675 else
676 {
677
678 pg_log_error(
"unexpected termination of replication stream: %s",
682 }
683 }
684
690 return false;
691}
static void PGresult * res
char * PQgetvalue(const PGresult *res, int tup_num, int field_num)
PGresult * PQgetResult(PGconn *conn)
ExecStatusType PQresultStatus(const PGresult *res)
int PQntuples(const PGresult *res)
char * PQresultErrorMessage(const PGresult *res)
PGresult * PQexec(PGconn *conn, const char *query)
int PQnfields(const PGresult *res)
#define pg_log_warning(...)
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
static bool reportFlushPosition
static bool existsTimeLineHistoryFile(StreamCtl *stream)
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
bool CheckServerVersionForStreaming(PGconn *conn)
static XLogRecPtr lastFlushPosition
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
stream_stop_callback stream_stop
WalWriteMethod * walmethod
int(* close)(Walfile *f, WalCloseMethod method)
const WalWriteMethodOps * ops
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
References CheckServerVersionForStreaming(), WalWriteMethodOps::close, CLOSE_NO_RENAME, conn, error(), existsTimeLineHistoryFile(), GetLastWalMethodError(), HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, WalWriteMethod::ops, Walfile::pathname, pg_free(), pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear(), PQexec(), PQgetResult(), PQgetvalue(), PQnfields(), PQntuples(), PQresultErrorMessage(), PQresultStatus(), ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, res, RunIdentifySystem(), snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.
Referenced by LogStreamerMain(), and StreamLog().