Definition at line 453 of file receivelog.c.
454{
455 char query[128];
456 char slotcmd[128];
459
460
461
462
463
465 return false;
466
467
468
469
470
471
472
473
474
475
476
477
479 {
482 }
483 else
484 {
487 else
489 slotcmd[0] = 0;
490 }
491
493 {
494 char *sysidentifier = NULL;
496
497
498
499
501 {
503 return false;
504 }
505
507 {
508 pg_log_error(
"system identifier does not match between base backup and streaming connection");
510 return false;
511 }
513
515 {
516 pg_log_error(
"starting timeline %u is not present in the server",
518 return false;
519 }
520 }
521
522
523
524
525
527
528 while (1)
529 {
530
531
532
533
534
535
537 {
541 {
542
543 pg_log_error(
"could not send replication command \"%s\": %s",
546 return false;
547 }
548
549
550
551
552
554 {
555 pg_log_warning(
"unexpected response to TIMELINE_HISTORY command: got %d rows and %d fields, expected %d rows and %d fields",
557 }
558
559
563
565 }
566
567
568
569
570
572 return true;
573
574
575 snprintf(query,
sizeof(query),
"START_REPLICATION %s%X/%08X TIMELINE %u",
576 slotcmd,
581 {
582 pg_log_error(
"could not send replication command \"%s\": %s",
585 return false;
586 }
588
589
591 if (res == NULL)
593
594
595
596
597
598
599
600
601
602
603
605 {
606
607
608
609
610
611
612
613
614
616 bool parsed;
617
620 if (!parsed)
622
623
624 if (newtimeline <= stream->timeline)
625 {
626 pg_log_error(
"server reported unexpected next timeline %u, following timeline %u",
629 }
631 {
632 pg_log_error(
"server stopped streaming timeline %u at %X/%08X, but reported next timeline %u to begin at %X/%08X",
636 }
637
638
641 {
642 pg_log_error(
"unexpected termination of replication stream: %s",
646 }
648
649
650
651
652
656 continue;
657 }
659 {
661
662
663
664
665
666
667
669 return true;
670 else
671 {
672 pg_log_error(
"replication stream was terminated before stop point");
674 }
675 }
676 else
677 {
678
679 pg_log_error(
"unexpected termination of replication stream: %s",
683 }
684 }
685
691 return false;
692}
PGresult * PQexec(PGconn *conn, const char *query)
#define PQresultErrorMessage
#define pg_log_warning(...)
static PGresult * HandleCopyStream(PGconn *conn, StreamCtl *stream, XLogRecPtr *stoppos)
static bool reportFlushPosition
static bool existsTimeLineHistoryFile(StreamCtl *stream)
static bool writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
bool CheckServerVersionForStreaming(PGconn *conn)
static XLogRecPtr lastFlushPosition
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
bool RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli, XLogRecPtr *startpos, char **db_name)
stream_stop_callback stream_stop
WalWriteMethod * walmethod
int(* close)(Walfile *f, WalCloseMethod method)
const WalWriteMethodOps * ops
const char * GetLastWalMethodError(WalWriteMethod *wwmethod)
#define XLogSegmentOffset(xlogptr, wal_segsz_bytes)
#define LSN_FORMAT_ARGS(lsn)
References CheckServerVersionForStreaming(), WalWriteMethodOps::close, CLOSE_NO_RENAME, conn, error(), existsTimeLineHistoryFile(), GetLastWalMethodError(), HandleCopyStream(), lastFlushPosition, LSN_FORMAT_ARGS, WalWriteMethod::ops, Walfile::pathname, pg_free(), pg_log_error, pg_log_warning, PGRES_COMMAND_OK, PGRES_COPY_BOTH, PGRES_TUPLES_OK, PQclear, PQexec(), PQgetResult, PQgetvalue, PQnfields, PQntuples, PQresultErrorMessage, PQresultStatus, ReadEndOfStreamingResult(), StreamCtl::replication_slot, reportFlushPosition, RunIdentifySystem(), snprintf, sprintf, StreamCtl::startpos, StreamCtl::stream_stop, StreamCtl::synchronous, StreamCtl::sysidentifier, StreamCtl::timeline, walfile, StreamCtl::walmethod, WalSegSz, writeTimeLineHistoryFile(), and XLogSegmentOffset.
Referenced by LogStreamerMain(), and StreamLog().