PostgreSQL Source Code git master
Loading...
Searching...
No Matches
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "libpq/protocol.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/aio_subsys.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define WALSENDER_STATS_FLUSH_INTERVAL   1000
 
#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static pg_noreturn void WalSndShutdown (void)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
static void IdentifySystem (void)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void ProcessStandbyMessage (void)
 
static void ProcessStandbyReplyMessage (void)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static void ProcessStandbyPSRequestMessage (void)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessPendingWrites (void)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void WalSndCheckTimeOut (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
void InitWalSender (void)
 
void WalSndErrorCleanup (void)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
static void WalSndHandleConfigReload (void)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
bool exec_replication_command (const char *cmd_string)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const charWalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 227 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 115 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSENDER_STATS_FLUSH_INTERVAL

#define WALSENDER_STATS_FLUSH_INTERVAL   1000

Definition at line 104 of file walsender.c.

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 259 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1421 of file walsender.c.

1422{
1423 bool failover_given = false;
1424 bool two_phase_given = false;
1425 bool failover;
1426 bool two_phase;
1427
1428 /* Parse options */
1430 {
1431 if (strcmp(defel->defname, "failover") == 0)
1432 {
1433 if (failover_given)
1434 ereport(ERROR,
1436 errmsg("conflicting or redundant options")));
1437 failover_given = true;
1439 }
1440 else if (strcmp(defel->defname, "two_phase") == 0)
1441 {
1442 if (two_phase_given)
1443 ereport(ERROR,
1445 errmsg("conflicting or redundant options")));
1446 two_phase_given = true;
1448 }
1449 else
1450 elog(ERROR, "unrecognized option: %s", defel->defname);
1451 }
1452
1456}
bool defGetBoolean(DefElem *def)
Definition define.c:93
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
static char * errmsg
#define foreach_ptr(type, var, lst)
Definition pg_list.h:469
static bool two_phase
static bool failover
static int fb(int x)
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition slot.c:953

References defGetBoolean(), elog, ereport, errcode(), errmsg, ERROR, failover, fb(), foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1199 of file walsender.c.

1200{
1201 const char *snapshot_name = NULL;
1202 char xloc[MAXFNAMELEN];
1203 char *slot_name;
1204 bool reserve_wal = false;
1205 bool two_phase = false;
1206 bool failover = false;
1210 TupleDesc tupdesc;
1211 Datum values[4];
1212 bool nulls[4] = {0};
1213
1215
1217 &failover);
1218
1219 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1220 {
1221 ReplicationSlotCreate(cmd->slotname, false,
1223 false, false, false);
1224
1225 if (reserve_wal)
1226 {
1228
1230
1231 /* Write this slot to disk if it's a permanent one. */
1232 if (!cmd->temporary)
1234 }
1235 }
1236 else
1237 {
1239 bool need_full_snapshot = false;
1240
1242
1244
1245 /*
1246 * Initially create persistent slot as ephemeral - that allows us to
1247 * nicely handle errors during initialization because it'll get
1248 * dropped if this transaction fails. We'll make it persistent at the
1249 * end. Temporary slots can be created as temporary from beginning as
1250 * they get dropped on error as well.
1251 */
1255
1256 /*
1257 * Do options check early so that we can bail before calling the
1258 * DecodingContextFindStartpoint which can take long time.
1259 */
1261 {
1262 if (IsTransactionBlock())
1263 ereport(ERROR,
1264 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1265 (errmsg("%s must not be called inside a transaction",
1266 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1267
1268 need_full_snapshot = true;
1269 }
1271 {
1272 if (!IsTransactionBlock())
1273 ereport(ERROR,
1274 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1275 (errmsg("%s must be called inside a transaction",
1276 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1277
1279 ereport(ERROR,
1280 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1281 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1282 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1283 if (!XactReadOnly)
1284 ereport(ERROR,
1285 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1286 (errmsg("%s must be called in a read-only transaction",
1287 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1288
1289 if (FirstSnapshotSet)
1290 ereport(ERROR,
1291 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1292 (errmsg("%s must be called before any query",
1293 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1294
1295 if (IsSubTransaction())
1296 ereport(ERROR,
1297 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1298 (errmsg("%s must not be called in a subtransaction",
1299 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1300
1301 need_full_snapshot = true;
1302 }
1303
1304 /*
1305 * Ensure the logical decoding is enabled before initializing the
1306 * logical decoding context.
1307 */
1310
1314 .segment_open = WalSndSegmentOpen,
1315 .segment_close = wal_segment_close),
1318
1319 /*
1320 * Signal that we don't need the timeout mechanism. We're just
1321 * creating the replication slot and don't yet accept feedback
1322 * messages or send keepalives. As we possibly need to wait for
1323 * further WAL the walsender would otherwise possibly be killed too
1324 * soon.
1325 */
1327
1328 /* build initial snapshot, might take a while */
1330
1331 /*
1332 * Export or use the snapshot if we've been asked to do so.
1333 *
1334 * NB. We will convert the snapbuild.c kind of snapshot to normal
1335 * snapshot when doing this.
1336 */
1338 {
1340 }
1342 {
1343 Snapshot snap;
1344
1347 }
1348
1349 /* don't need the decoding context anymore */
1351
1352 if (!cmd->temporary)
1354 }
1355
1356 snprintf(xloc, sizeof(xloc), "%X/%08X",
1358
1360
1361 /*----------
1362 * Need a tuple descriptor representing four columns:
1363 * - first field: the slot name
1364 * - second field: LSN at which we became consistent
1365 * - third field: exported snapshot's name
1366 * - fourth field: output plugin
1367 */
1368 tupdesc = CreateTemplateTupleDesc(4);
1369 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1370 TEXTOID, -1, 0);
1371 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1372 TEXTOID, -1, 0);
1373 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1374 TEXTOID, -1, 0);
1375 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1376 TEXTOID, -1, 0);
1377 TupleDescFinalize(tupdesc);
1378
1379 /* prepare for projection of tuples */
1381
1382 /* slot_name */
1383 slot_name = NameStr(MyReplicationSlot->data.name);
1384 values[0] = CStringGetTextDatum(slot_name);
1385
1386 /* consistent wal location */
1388
1389 /* snapshot name, or NULL if none */
1390 if (snapshot_name != NULL)
1392 else
1393 nulls[2] = true;
1394
1395 /* plugin, or NULL if none */
1396 if (cmd->plugin != NULL)
1398 else
1399 nulls[3] = true;
1400
1401 /* send it to dest */
1402 do_tup_output(tstate, values, nulls);
1404
1406}
int16 AttrNumber
Definition attnum.h:21
static Datum values[MAXATTR]
Definition bootstrap.c:188
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define NameStr(name)
Definition c.h:837
#define Assert(condition)
Definition c.h:945
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition dest.c:113
@ DestRemoteSimple
Definition dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
const TupleTableSlotOps TTSOpsVirtual
Definition execTuples.c:84
void end_tup_output(TupOutputState *tstate)
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
#define false
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition logical.c:668
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition logical.c:624
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:321
void CheckLogicalDecodingRequirements(void)
Definition logical.c:111
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:205
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:306
#define NIL
Definition pg_list.h:68
#define snprintf
Definition port.h:260
uint64_t Datum
Definition postgres.h:70
@ REPLICATION_KIND_PHYSICAL
Definition replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition slot.c:380
void ReplicationSlotMarkDirty(void)
Definition slot.c:1177
void ReplicationSlotReserveWal(void)
Definition slot.c:1697
void ReplicationSlotPersist(void)
Definition slot.c:1194
ReplicationSlot * MyReplicationSlot
Definition slot.c:149
void ReplicationSlotSave(void)
Definition slot.c:1159
void ReplicationSlotRelease(void)
Definition slot.c:762
@ RS_PERSISTENT
Definition slot.h:45
@ RS_EPHEMERAL
Definition slot.h:46
@ RS_TEMPORARY
Definition slot.h:47
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition snapbuild.c:443
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition snapbuild.c:541
bool FirstSnapshotSet
Definition snapmgr.c:193
void RestoreTransactionSnapshot(Snapshot snapshot, PGPROC *source_pgproc)
Definition snapmgr.c:1853
PGPROC * MyProc
Definition proc.c:68
ReplicationKind kind
Definition replnodes.h:56
struct SnapBuild * snapshot_builder
Definition logical.h:44
ReplicationSlotPersistentData data
Definition slot.h:213
TupleDesc CreateTemplateTupleDesc(int natts)
Definition tupdesc.c:165
void TupleDescFinalize(TupleDesc tupdesc)
Definition tupdesc.c:508
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition tupdesc.c:973
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition walsender.c:1122
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition walsender.c:3136
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1583
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition walsender.c:1700
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition walsender.c:1049
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition walsender.c:1556
static TimestampTz last_reply_timestamp
Definition walsender.c:188
CRSSnapshotAction
Definition walsender.h:21
@ CRS_USE_SNAPSHOT
Definition walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition walsender.h:22
bool XactReadOnly
Definition xact.c:84
int XactIsoLevel
Definition xact.c:81
bool IsSubTransaction(void)
Definition xact.c:5067
bool IsTransactionBlock(void)
Definition xact.c:4994
#define XACT_REPEATABLE_READ
Definition xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
#define InvalidXLogRecPtr
Definition xlogdefs.h:28
#define XL_ROUTINE(...)
Definition xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:831

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), DestRemoteSimple, do_tup_output(), end_tup_output(), EnsureLogicalDecodingEnabled(), ereport, errmsg, ERROR, failover, fb(), FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsLogicalDecodingEnabled(), IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1412 of file walsender.c.

1413{
1414 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1415}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:913

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char cmd_string)

Definition at line 2022 of file walsender.c.

2023{
2024 yyscan_t scanner;
2025 int parse_rc;
2026 Node *cmd_node;
2027 const char *cmdtag;
2029
2030 /* We save and re-use the cmd_context across calls */
2032
2033 /*
2034 * If WAL sender has been told that shutdown is getting close, switch its
2035 * status accordingly to handle the next replication commands correctly.
2036 */
2037 if (got_STOPPING)
2039
2040 /*
2041 * Throw error if in stopping mode. We need prevent commands that could
2042 * generate WAL while the shutdown checkpoint is being written. To be
2043 * safe, we just prohibit all new commands.
2044 */
2046 ereport(ERROR,
2048 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
2049
2050 /*
2051 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
2052 * command arrives. Clean up the old stuff if there's anything.
2053 */
2055
2057
2058 /*
2059 * Prepare to parse and execute the command.
2060 *
2061 * Because replication command execution can involve beginning or ending
2062 * transactions, we need a working context that will survive that, so we
2063 * make it a child of TopMemoryContext. That in turn creates a hazard of
2064 * long-lived memory leaks if we lose track of the working context. We
2065 * deal with that by creating it only once per walsender, and resetting it
2066 * for each new command. (Normally this reset is a no-op, but if the
2067 * prior exec_replication_command call failed with an error, it won't be.)
2068 *
2069 * This is subtler than it looks. The transactions we manage can extend
2070 * across replication commands, indeed SnapBuildClearExportedSnapshot
2071 * might have just ended one. Because transaction exit will revert to the
2072 * memory context that was current at transaction start, we need to be
2073 * sure that that context is still valid. That motivates re-using the
2074 * same cmd_context rather than making a new one each time.
2075 */
2076 if (cmd_context == NULL)
2078 "Replication command context",
2080 else
2082
2084
2086
2087 /*
2088 * Is it a WalSender command?
2089 */
2091 {
2092 /* Nope; clean up and get out. */
2094
2097
2098 /* XXX this is a pretty random place to make this check */
2099 if (MyDatabaseId == InvalidOid)
2100 ereport(ERROR,
2102 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2103
2104 /* Tell the caller that this wasn't a WalSender command. */
2105 return false;
2106 }
2107
2108 /*
2109 * Looks like a WalSender command, so parse it.
2110 */
2112 if (parse_rc != 0)
2113 ereport(ERROR,
2115 errmsg_internal("replication command parser returned %d",
2116 parse_rc)));
2118
2119 /*
2120 * Report query to various monitoring facilities. For this purpose, we
2121 * report replication commands just like SQL commands.
2122 */
2124
2126
2127 /*
2128 * Log replication command if log_replication_commands is enabled. Even
2129 * when it's disabled, log the command with DEBUG1 level for backward
2130 * compatibility.
2131 */
2133 (errmsg("received replication command: %s", cmd_string)));
2134
2135 /*
2136 * Disallow replication commands in aborted transaction blocks.
2137 */
2139 ereport(ERROR,
2141 errmsg("current transaction is aborted, "
2142 "commands ignored until end of transaction block")));
2143
2145
2146 /*
2147 * Allocate buffers that will be used for each outgoing and incoming
2148 * message. We do this just once per command to reduce palloc overhead.
2149 */
2153
2154 switch (cmd_node->type)
2155 {
2157 cmdtag = "IDENTIFY_SYSTEM";
2161 break;
2162
2164 cmdtag = "READ_REPLICATION_SLOT";
2168 break;
2169
2170 case T_BaseBackupCmd:
2171 cmdtag = "BASE_BACKUP";
2176 break;
2177
2179 cmdtag = "CREATE_REPLICATION_SLOT";
2183 break;
2184
2186 cmdtag = "DROP_REPLICATION_SLOT";
2190 break;
2191
2193 cmdtag = "ALTER_REPLICATION_SLOT";
2197 break;
2198
2200 {
2202
2203 cmdtag = "START_REPLICATION";
2206
2207 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2208 StartReplication(cmd);
2209 else
2211
2212 /* dupe, but necessary per libpqrcv_endstreaming */
2214
2216 break;
2217 }
2218
2220 cmdtag = "TIMELINE_HISTORY";
2225 break;
2226
2227 case T_VariableShowStmt:
2228 {
2231
2232 cmdtag = "SHOW";
2234
2235 /* syscache access needs a transaction environment */
2237 GetPGVariable(n->name, dest);
2240 }
2241 break;
2242
2244 cmdtag = "UPLOAD_MANIFEST";
2249 break;
2250
2251 default:
2252 elog(ERROR, "unrecognized replication command node tag: %u",
2253 cmd_node->type);
2254 }
2255
2256 /*
2257 * Done. Revert to caller's memory context, and clean out the cmd_context
2258 * to recover memory right away.
2259 */
2262
2263 /*
2264 * We need not update ps display or pg_stat_activity, because PostgresMain
2265 * will reset those to "idle". But we must reset debug_query_string to
2266 * ensure it doesn't become a dangling pointer.
2267 */
2269
2270 return true;
2271}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition basebackup.c:992
void * yyscan_t
Definition cubedata.h:65
void EndReplicationCommand(const char *commandTag)
Definition dest.c:206
#define LOG
Definition elog.h:31
int int errmsg_internal(const char *fmt,...) pg_attribute_printf(1
#define DEBUG1
Definition elog.h:30
Oid MyDatabaseId
Definition globals.c:94
void GetPGVariable(const char *name, DestReceiver *dest)
Definition guc_funcs.c:410
void MemoryContextReset(MemoryContext context)
Definition mcxt.c:403
MemoryContext TopMemoryContext
Definition mcxt.c:166
MemoryContext CurrentMemoryContext
Definition mcxt.c:160
#define AllocSetContextCreate
Definition memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition miscadmin.h:123
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition palloc.h:124
const char * debug_query_string
Definition postgres.c:91
#define InvalidOid
static void set_ps_display(const char *activity)
Definition ps_status.h:40
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
void replication_scanner_finish(yyscan_t yyscanner)
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
void SnapBuildClearExportedSnapshot(void)
Definition snapbuild.c:602
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
Definition nodes.h:135
ReplicationKind kind
Definition replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition walsender.c:1421
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition walsender.c:583
WalSnd * MyWalSnd
Definition walsender.c:121
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition walsender.c:483
static StringInfoData tmpbuf
Definition walsender.c:179
static void IdentifySystem(void)
Definition walsender.c:401
static StringInfoData reply_message
Definition walsender.c:178
void WalSndSetState(WalSndState state)
Definition walsender.c:3965
static StringInfoData output_message
Definition walsender.c:177
static void UploadManifest(void)
Definition walsender.c:674
static volatile sig_atomic_t got_STOPPING
Definition walsender.c:207
bool log_replication_commands
Definition walsender.c:134
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition walsender.c:1199
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition walsender.c:1463
static IncrementalBackupInfo * uploaded_manifest
Definition walsender.c:156
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition walsender.c:1412
static void StartReplication(StartReplicationCmd *cmd)
Definition walsender.c:816
static XLogReaderState * xlogreader
Definition walsender.c:146
@ WALSNDSTATE_STOPPING
int replication_yyparse(Node **replication_parse_result_p, yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition xact.c:3670
void StartTransactionCommand(void)
Definition xact.c:3081
bool IsAbortedTransactionBlockState(void)
Definition xact.c:409
void CommitTransactionCommand(void)
Definition xact.c:3179

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg, errmsg_internal(), ERROR, fb(), GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextReset(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, TopMemoryContext, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3660 of file walsender.c.

3661{
3663 TimeLineID replayTLI;
3666 XLogRecPtr result;
3667
3669
3670 /*
3671 * We can safely send what's already been replayed. Also, if walreceiver
3672 * is streaming WAL from the same timeline, we can send anything that it
3673 * has streamed, but hasn't been replayed yet.
3674 */
3675
3677 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3678
3679 if (tli)
3680 *tli = replayTLI;
3681
3682 result = replayPtr;
3683 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3684 result = receivePtr;
3685
3686 return result;
3687}
bool IsSyncingReplicationSlots(void)
Definition slotsync.c:1825
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition walsender.c:125
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint32 TimeLineID
Definition xlogdefs.h:63
static TimeLineID receiveTLI
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, fb(), GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), update_local_synced_slot(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t offset,
IncrementalBackupInfo ib 
)
static

Definition at line 740 of file walsender.c.

742{
743 int mtype;
744 int maxmsglen;
745
747
749 mtype = pq_getbyte();
750 if (mtype == EOF)
753 errmsg("unexpected EOF on client connection with an open transaction")));
754
755 switch (mtype)
756 {
757 case PqMsg_CopyData:
759 break;
760 case PqMsg_CopyDone:
761 case PqMsg_CopyFail:
762 case PqMsg_Flush:
763 case PqMsg_Sync:
765 break;
766 default:
769 errmsg("unexpected message type 0x%02X during COPY from stdin",
770 mtype)));
771 maxmsglen = 0; /* keep compiler quiet */
772 break;
773 }
774
775 /* Now collect the message body */
779 errmsg("unexpected EOF on client connection with an open transaction")));
781
782 /* Process the message */
783 switch (mtype)
784 {
785 case PqMsg_CopyData:
787 return true;
788
789 case PqMsg_CopyDone:
790 return false;
791
792 case PqMsg_Sync:
793 case PqMsg_Flush:
794 /* Ignore these while in CopyOut mode as we do elsewhere. */
795 return true;
796
797 case PqMsg_CopyFail:
800 errmsg("COPY from stdin failed: %s",
802 }
803
804 /* Not reached. */
805 Assert(false);
806 return false;
807}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define ERRCODE_PROTOCOL_VIOLATION
Definition fe-connect.c:96
#define PQ_SMALL_MESSAGE_LIMIT
Definition libpq.h:33
#define PQ_LARGE_MESSAGE_LIMIT
Definition libpq.h:34
#define HOLD_CANCEL_INTERRUPTS()
Definition miscadmin.h:142
#define RESUME_CANCEL_INTERRUPTS()
Definition miscadmin.h:144
static char buf[DEFAULT_XLOG_SEG_SIZE]
int pq_getmessage(StringInfo s, int maxlen)
Definition pqcomm.c:1204
int pq_getbyte(void)
Definition pqcomm.c:964
void pq_startmsgread(void)
Definition pqcomm.c:1142
const char * pq_getmsgstring(StringInfo msg)
Definition pqformat.c:578
#define PqMsg_CopyDone
Definition protocol.h:64
#define PqMsg_CopyData
Definition protocol.h:65
#define PqMsg_Sync
Definition protocol.h:27
#define PqMsg_CopyFail
Definition protocol.h:29
#define PqMsg_Flush
Definition protocol.h:24

References AppendIncrementalManifestData(), Assert, buf, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, ERROR, fb(), HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_CopyFail, PqMsg_Flush, PqMsg_Sync, and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3716 of file walsender.c.

3717{
3719
3720 /*
3721 * If replication has not yet started, die like with SIGTERM. If
3722 * replication is active, only set a flag and wake up the main loop. It
3723 * will send any outstanding WAL, wait for it to be replicated to the
3724 * standby, and then exit gracefully.
3725 */
3726 if (!replication_active)
3728 else
3729 got_STOPPING = true;
3730}
int MyProcPid
Definition globals.c:47
bool am_walsender
Definition walsender.c:124
static volatile sig_atomic_t replication_active
Definition walsender.c:215
#define kill(pid, sig)
Definition win32_port.h:490

References am_walsender, Assert, fb(), got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 401 of file walsender.c.

402{
403 char sysid[32];
404 char xloc[MAXFNAMELEN];
406 char *dbname = NULL;
409 TupleDesc tupdesc;
410 Datum values[4];
411 bool nulls[4] = {0};
412 TimeLineID currTLI;
413
414 /*
415 * Reply with a result set with one row, four columns. First col is system
416 * ID, second is timeline ID, third is current xlog location and the
417 * fourth contains the database name if we are connected to one.
418 */
419
422
425 logptr = GetStandbyFlushRecPtr(&currTLI);
426 else
427 logptr = GetFlushRecPtr(&currTLI);
428
429 snprintf(xloc, sizeof(xloc), "%X/%08X", LSN_FORMAT_ARGS(logptr));
430
432 {
434
435 /* syscache access needs a transaction env. */
438 /* copy dbname out of TX context */
441 }
442
444
445 /* need a tuple descriptor representing four columns */
446 tupdesc = CreateTemplateTupleDesc(4);
447 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
448 TEXTOID, -1, 0);
449 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
450 INT8OID, -1, 0);
451 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
452 TEXTOID, -1, 0);
453 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
454 TEXTOID, -1, 0);
455 TupleDescFinalize(tupdesc);
456
457 /* prepare for projection of tuples */
459
460 /* column 1: system identifier */
462
463 /* column 2: timeline */
464 values[1] = Int64GetDatum(currTLI);
465
466 /* column 3: wal location */
468
469 /* column 4: database name, or NULL if none */
470 if (dbname)
472 else
473 nulls[3] = true;
474
475 /* send it to dest */
476 do_tup_output(tstate, values, nulls);
477
479}
#define UINT64_FORMAT
Definition c.h:637
struct cursor * cur
Definition ecpg.c:29
char * get_database_name(Oid dbid)
Definition lsyscache.c:1312
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition mcxt.c:1768
static Datum Int64GetDatum(int64 X)
Definition postgres.h:413
char * dbname
Definition streamutil.c:49
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition walsender.c:3660
uint64 GetSystemIdentifier(void)
Definition xlog.c:4611
bool RecoveryInProgress(void)
Definition xlog.c:6444
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6609

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, DestRemoteSimple, do_tup_output(), end_tup_output(), fb(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSender()

void InitWalSender ( void  )

Definition at line 302 of file walsender.c.

303{
305
306 /* Create a per-walsender data structure in shared memory */
308
309 /* need resource owner for e.g. basebackups */
311
312 /*
313 * Let postmaster know that we're a WAL sender. Once we've declared us as
314 * a WAL sender process, postmaster will let us outlive the bgwriter and
315 * kill us last in the shutdown sequence, so we get a chance to stream all
316 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
317 * there's no going back, and we mustn't write any WAL records after this.
318 */
321
322 /*
323 * If the client didn't specify a database to connect to, show in PGPROC
324 * that our advertised xmin should affect vacuum horizons in all
325 * databases. This allows physical replication clients to send hot
326 * standby feedback that will delay vacuum cleanup in all databases.
327 */
329 {
335 }
336
337 /* Initialize empty timestamp buffer for lag tracking. */
339}
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1177
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1794
@ LW_EXCLUSIVE
Definition lwlock.h:112
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition mcxt.c:1266
void SendPostmasterSignal(PMSignalReason reason)
Definition pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition pmsignal.h:43
#define PROC_AFFECTS_ALL_HORIZONS
Definition proc.h:63
void CreateAuxProcessResourceOwner(void)
Definition resowner.c:996
PROC_HDR * ProcGlobal
Definition proc.c:71
TransactionId xmin
Definition proc.h:239
uint8 statusFlags
Definition proc.h:207
int pgxactoff
Definition proc.h:204
uint8 * statusFlags
Definition proc.h:453
#define InvalidTransactionId
Definition transam.h:31
static void InitWalSenderSlot(void)
Definition walsender.c:3044
static LagTracker * lag_tracker
Definition walsender.c:253

References am_cascading_walsender, Assert, CreateAuxProcessResourceOwner(), fb(), InitWalSenderSlot(), InvalidOid, InvalidTransactionId, lag_tracker, LW_EXCLUSIVE, LWLockAcquire(), LWLockRelease(), MarkPostmasterChildWalSender(), MemoryContextAllocZero(), MyDatabaseId, MyProc, PGPROC::pgxactoff, PMSIGNAL_ADVANCE_STATE_MACHINE, PROC_AFFECTS_ALL_HORIZONS, ProcGlobal, RecoveryInProgress(), SendPostmasterSignal(), PGPROC::statusFlags, PROC_HDR::statusFlags, TopMemoryContext, and PGPROC::xmin.

Referenced by PostgresMain().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 3044 of file walsender.c.

3045{
3046 int i;
3047
3048 /*
3049 * WalSndCtl should be set up already (we inherit this by fork() or
3050 * EXEC_BACKEND mechanism from the postmaster).
3051 */
3052 Assert(WalSndCtl != NULL);
3053 Assert(MyWalSnd == NULL);
3054
3055 /*
3056 * Find a free walsender slot and reserve it. This must not fail due to
3057 * the prior check for free WAL senders in InitProcess().
3058 */
3059 for (i = 0; i < max_wal_senders; i++)
3060 {
3062
3063 SpinLockAcquire(&walsnd->mutex);
3064
3065 if (walsnd->pid != 0)
3066 {
3067 SpinLockRelease(&walsnd->mutex);
3068 continue;
3069 }
3070 else
3071 {
3072 /*
3073 * Found a free slot. Reserve it for us.
3074 */
3075 walsnd->pid = MyProcPid;
3076 walsnd->state = WALSNDSTATE_STARTUP;
3077 walsnd->sentPtr = InvalidXLogRecPtr;
3078 walsnd->needreload = false;
3079 walsnd->write = InvalidXLogRecPtr;
3080 walsnd->flush = InvalidXLogRecPtr;
3081 walsnd->apply = InvalidXLogRecPtr;
3082 walsnd->writeLag = -1;
3083 walsnd->flushLag = -1;
3084 walsnd->applyLag = -1;
3085 walsnd->sync_standby_priority = 0;
3086 walsnd->replyTime = 0;
3087
3088 /*
3089 * The kind assignment is done here and not in StartReplication()
3090 * and StartLogicalReplication(). Indeed, the logical walsender
3091 * needs to read WAL records (like snapshot of running
3092 * transactions) during the slot creation. So it needs to be woken
3093 * up based on its kind.
3094 *
3095 * The kind assignment could also be done in StartReplication(),
3096 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
3097 * seems better to set it on one place.
3098 */
3099 if (MyDatabaseId == InvalidOid)
3101 else
3103
3104 SpinLockRelease(&walsnd->mutex);
3105 /* don't need the lock anymore */
3106 MyWalSnd = walsnd;
3107
3108 break;
3109 }
3110 }
3111
3112 Assert(MyWalSnd != NULL);
3113
3114 /* Arrange to clean up at walsender exit */
3116}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition ipc.c:372
int i
Definition isn.c:77
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
int max_wal_senders
Definition walsender.c:130
static void WalSndKill(int code, Datum arg)
Definition walsender.c:3120
WalSndCtlData * WalSndCtl
Definition walsender.c:118
@ WALSNDSTATE_STARTUP

References Assert, fb(), i, InvalidOid, InvalidXLogRecPtr, max_wal_senders, MyDatabaseId, MyProcPid, MyWalSnd, on_shmem_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, SpinLockAcquire(), SpinLockRelease(), WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, and WALSNDSTATE_STARTUP.

Referenced by InitWalSender().

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4309 of file walsender.c.

4310{
4311 TimestampTz time = 0;
4312
4313 /*
4314 * If 'lsn' has not passed the WAL position stored in the overflow entry,
4315 * return the elapsed time (in microseconds) since the saved local flush
4316 * time. If the flush time is in the future (due to clock drift), return
4317 * -1 to treat as no valid sample.
4318 *
4319 * Otherwise, switch back to using the buffer to control the read head and
4320 * compute the elapsed time. The read head is then reset to point to the
4321 * oldest entry in the buffer.
4322 */
4323 if (lag_tracker->read_heads[head] == -1)
4324 {
4325 if (lag_tracker->overflowed[head].lsn > lsn)
4326 return (now >= lag_tracker->overflowed[head].time) ?
4327 now - lag_tracker->overflowed[head].time : -1;
4328
4329 time = lag_tracker->overflowed[head].time;
4331 lag_tracker->read_heads[head] =
4333 }
4334
4335 /* Read all unread samples up to this LSN or end of buffer. */
4336 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4338 {
4340 lag_tracker->last_read[head] =
4342 lag_tracker->read_heads[head] =
4344 }
4345
4346 /*
4347 * If the lag tracker is empty, that means the standby has processed
4348 * everything we've ever sent so we should now clear 'last_read'. If we
4349 * didn't do that, we'd risk using a stale and irrelevant sample for
4350 * interpolation at the beginning of the next burst of WAL after a period
4351 * of idleness.
4352 */
4354 lag_tracker->last_read[head].time = 0;
4355
4356 if (time > now)
4357 {
4358 /* If the clock somehow went backwards, treat as not found. */
4359 return -1;
4360 }
4361 else if (time == 0)
4362 {
4363 /*
4364 * We didn't cross a time. If there is a future sample that we
4365 * haven't reached yet, and we've already reached at least one sample,
4366 * let's interpolate the local flushed time. This is mainly useful
4367 * for reporting a completely stuck apply position as having
4368 * increasing lag, since otherwise we'd have to wait for it to
4369 * eventually start moving again and cross one of our samples before
4370 * we can show the lag increasing.
4371 */
4373 {
4374 /* There are no future samples, so we can't interpolate. */
4375 return -1;
4376 }
4377 else if (lag_tracker->last_read[head].time != 0)
4378 {
4379 /* We can interpolate between last_read and the next sample. */
4380 double fraction;
4381 WalTimeSample prev = lag_tracker->last_read[head];
4383
4384 if (lsn < prev.lsn)
4385 {
4386 /*
4387 * Reported LSNs shouldn't normally go backwards, but it's
4388 * possible when there is a timeline change. Treat as not
4389 * found.
4390 */
4391 return -1;
4392 }
4393
4394 Assert(prev.lsn < next.lsn);
4395
4396 if (prev.time > next.time)
4397 {
4398 /* If the clock somehow went backwards, treat as not found. */
4399 return -1;
4400 }
4401
4402 /* See how far we are between the previous and next samples. */
4403 fraction =
4404 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4405
4406 /* Scale the local flush time proportionally. */
4407 time = (TimestampTz)
4408 ((double) prev.time + (next.time - prev.time) * fraction);
4409 }
4410 else
4411 {
4412 /*
4413 * We have only a future sample, implying that we were entirely
4414 * caught up but and now there is a new burst of WAL and the
4415 * standby hasn't processed the first sample yet. Until the
4416 * standby reaches the future sample the best we can do is report
4417 * the hypothetical lag if that sample were to be replayed now.
4418 */
4420 }
4421 }
4422
4423 /* Return the elapsed time since local flush time in microseconds. */
4424 Assert(time != 0);
4425 return now - time;
4426}
Datum now(PG_FUNCTION_ARGS)
Definition timestamp.c:1600
static int32 next
Definition blutils.c:225
int64 TimestampTz
Definition timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition walsender.c:233
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:235
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:236
int write_head
Definition walsender.c:234
WalTimeSample overflowed[NUM_SYNC_REP_WAIT_MODE]
Definition walsender.c:250
TimestampTz time
Definition walsender.c:223
XLogRecPtr lsn
Definition walsender.c:222
#define LAG_TRACKER_BUFFER_SIZE
Definition walsender.c:227

References Assert, LagTracker::buffer, fb(), lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4251 of file walsender.c.

4252{
4253 int new_write_head;
4254 int i;
4255
4256 if (!am_walsender)
4257 return;
4258
4259 /*
4260 * If the lsn hasn't advanced since last time, then do nothing. This way
4261 * we only record a new sample when new WAL has been written.
4262 */
4263 if (lag_tracker->last_lsn == lsn)
4264 return;
4265 lag_tracker->last_lsn = lsn;
4266
4267 /*
4268 * If advancing the write head of the circular buffer would crash into any
4269 * of the read heads, then the buffer is full. In other words, the
4270 * slowest reader (presumably apply) is the one that controls the release
4271 * of space.
4272 */
4274 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4275 {
4276 /*
4277 * If the buffer is full, move the slowest reader to a separate
4278 * overflow entry and free its space in the buffer so the write head
4279 * can advance.
4280 */
4282 {
4285 lag_tracker->read_heads[i] = -1;
4286 }
4287 }
4288
4289 /* Store a sample at the current write head position. */
4293}
XLogRecPtr last_lsn
Definition walsender.c:232
#define NUM_SYNC_REP_WAIT_MODE
Definition syncrep.h:27

References am_walsender, LagTracker::buffer, fb(), i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::overflowed, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char cur_page 
)
static

Definition at line 1049 of file walsender.c.

1051{
1053 int count;
1055 XLogSegNo segno;
1056 TimeLineID currTLI;
1057
1058 /*
1059 * Make sure we have enough WAL available before retrieving the current
1060 * timeline.
1061 */
1063
1064 /* Fail if not enough (implies we are going to shut down) */
1066 return -1;
1067
1068 /*
1069 * Since logical decoding is also permitted on a standby server, we need
1070 * to check if the server is in recovery to decide how to get the current
1071 * timeline ID (so that it also covers the promotion or timeline change
1072 * cases). We must determine am_cascading_walsender after waiting for the
1073 * required WAL so that it is correct when the walsender wakes up after a
1074 * promotion.
1075 */
1077
1079 GetXLogReplayRecPtr(&currTLI);
1080 else
1081 currTLI = GetWALInsertionTimeLine();
1082
1084 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1085 sendTimeLine = state->currTLI;
1086 sendTimeLineValidUpto = state->currTLIValidUntil;
1087 sendTimeLineNextTLI = state->nextTLI;
1088
1090 count = XLOG_BLCKSZ; /* more than one block available */
1091 else
1092 count = flushptr - targetPagePtr; /* part of the page available */
1093
1094 /* now actually read the data, we know it's there */
1095 if (!WALRead(state,
1096 cur_page,
1098 count,
1099 currTLI, /* Pass the current TLI because only
1100 * WalSndSegmentOpen controls whether new TLI
1101 * is needed. */
1102 &errinfo))
1104
1105 /*
1106 * After reading into the buffer, check that what we read was valid. We do
1107 * this after reading, because even though the segment was present when we
1108 * opened it, it might get recycled or removed while we read it. The
1109 * read() succeeds in that case, but the data we tried to read might
1110 * already have been overwritten with new WAL records.
1111 */
1112 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1113 CheckXLogRemoved(segno, state->seg.ws_tli);
1114
1115 return count;
1116}
static TimeLineID sendTimeLine
Definition walsender.c:165
static bool sendTimeLineIsHistoric
Definition walsender.c:167
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition walsender.c:1850
static TimeLineID sendTimeLineNextTLI
Definition walsender.c:166
static XLogRecPtr sendTimeLineValidUpto
Definition walsender.c:168
TimeLineID GetWALInsertionTimeLine(void)
Definition xlog.c:6630
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition xlog.c:3748
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition xlogdefs.h:52
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition xlogutils.c:707
void WALReadRaiseError(WALReadError *errinfo)
Definition xlogutils.c:1011

References am_cascading_walsender, CheckXLogRemoved(), fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1790 of file walsender.c.

1791{
1792 int elevel = got_STOPPING ? ERROR : WARNING;
1793 bool failover_slot;
1794
1796
1797 /*
1798 * Note that after receiving the shutdown signal, an ERROR is reported if
1799 * any slots are dropped, invalidated, or inactive. This measure is taken
1800 * to prevent the walsender from waiting indefinitely.
1801 */
1803 {
1805 return true;
1806 }
1807
1808 *wait_event = 0;
1809 return false;
1810}
#define WARNING
Definition elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition slot.c:3093

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, fb(), got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1822 of file walsender.c.

1824{
1825 /* Check if we need to wait for WALs to be flushed to disk */
1826 if (target_lsn > flushed_lsn)
1827 {
1829 return true;
1830 }
1831
1832 /* Check if the standby slots have caught up to the flushed position */
1834}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1790

References fb(), and NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 4003 of file walsender.c.

4004{
4005 Interval *result = palloc_object(Interval);
4006
4007 result->month = 0;
4008 result->day = 0;
4009 result->time = offset;
4010
4011 return result;
4012}
#define palloc_object(type)
Definition fe_memutils.h:74
int32 day
Definition timestamp.h:51
int32 month
Definition timestamp.h:52
TimeOffset time
Definition timestamp.h:49

References Interval::day, Interval::month, palloc_object, and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool reserve_wal,
CRSSnapshotAction snapshot_action,
bool two_phase,
bool failover 
)
static

Definition at line 1122 of file walsender.c.

1126{
1127 ListCell *lc;
1128 bool snapshot_action_given = false;
1129 bool reserve_wal_given = false;
1130 bool two_phase_given = false;
1131 bool failover_given = false;
1132
1133 /* Parse options */
1134 foreach(lc, cmd->options)
1135 {
1136 DefElem *defel = (DefElem *) lfirst(lc);
1137
1138 if (strcmp(defel->defname, "snapshot") == 0)
1139 {
1140 char *action;
1141
1143 ereport(ERROR,
1145 errmsg("conflicting or redundant options")));
1146
1148 snapshot_action_given = true;
1149
1150 if (strcmp(action, "export") == 0)
1152 else if (strcmp(action, "nothing") == 0)
1154 else if (strcmp(action, "use") == 0)
1156 else
1157 ereport(ERROR,
1159 errmsg("unrecognized value for %s option \"%s\": \"%s\"",
1160 "CREATE_REPLICATION_SLOT", defel->defname, action)));
1161 }
1162 else if (strcmp(defel->defname, "reserve_wal") == 0)
1163 {
1165 ereport(ERROR,
1167 errmsg("conflicting or redundant options")));
1168
1169 reserve_wal_given = true;
1171 }
1172 else if (strcmp(defel->defname, "two_phase") == 0)
1173 {
1175 ereport(ERROR,
1177 errmsg("conflicting or redundant options")));
1178 two_phase_given = true;
1180 }
1181 else if (strcmp(defel->defname, "failover") == 0)
1182 {
1184 ereport(ERROR,
1186 errmsg("conflicting or redundant options")));
1187 failover_given = true;
1189 }
1190 else
1191 elog(ERROR, "unrecognized option: %s", defel->defname);
1192 }
1193}
char * defGetString(DefElem *def)
Definition define.c:34
#define lfirst(lc)
Definition pg_list.h:172
@ CRS_NOEXPORT_SNAPSHOT
Definition walsender.h:23

References CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), elog, ereport, errcode(), errmsg, ERROR, failover, fb(), CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 4019 of file walsender.c.

4020{
4021#define PG_STAT_GET_WAL_SENDERS_COLS 12
4022 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
4024 int num_standbys;
4025 int i;
4026
4027 InitMaterializedSRF(fcinfo, 0);
4028
4029 /*
4030 * Get the currently active synchronous standbys. This could be out of
4031 * date before we're done, but we'll use the data anyway.
4032 */
4034
4035 for (i = 0; i < max_wal_senders; i++)
4036 {
4040 XLogRecPtr flush;
4041 XLogRecPtr apply;
4042 TimeOffset writeLag;
4043 TimeOffset flushLag;
4044 TimeOffset applyLag;
4045 int priority;
4046 int pid;
4048 TimestampTz replyTime;
4049 bool is_sync_standby;
4051 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
4052 int j;
4053
4054 /* Collect data from shared memory */
4055 SpinLockAcquire(&walsnd->mutex);
4056 if (walsnd->pid == 0)
4057 {
4058 SpinLockRelease(&walsnd->mutex);
4059 continue;
4060 }
4061 pid = walsnd->pid;
4062 sent_ptr = walsnd->sentPtr;
4063 state = walsnd->state;
4064 write = walsnd->write;
4065 flush = walsnd->flush;
4066 apply = walsnd->apply;
4067 writeLag = walsnd->writeLag;
4068 flushLag = walsnd->flushLag;
4069 applyLag = walsnd->applyLag;
4070 priority = walsnd->sync_standby_priority;
4071 replyTime = walsnd->replyTime;
4072 SpinLockRelease(&walsnd->mutex);
4073
4074 /*
4075 * Detect whether walsender is/was considered synchronous. We can
4076 * provide some protection against stale data by checking the PID
4077 * along with walsnd_index.
4078 */
4079 is_sync_standby = false;
4080 for (j = 0; j < num_standbys; j++)
4081 {
4082 if (sync_standbys[j].walsnd_index == i &&
4083 sync_standbys[j].pid == pid)
4084 {
4085 is_sync_standby = true;
4086 break;
4087 }
4088 }
4089
4090 values[0] = Int32GetDatum(pid);
4091
4093 {
4094 /*
4095 * Only superusers and roles with privileges of pg_read_all_stats
4096 * can see details. Other users only get the pid value to know
4097 * it's a walsender, but no details.
4098 */
4099 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
4100 }
4101 else
4102 {
4104
4106 nulls[2] = true;
4108
4110 nulls[3] = true;
4111 values[3] = LSNGetDatum(write);
4112
4113 if (!XLogRecPtrIsValid(flush))
4114 nulls[4] = true;
4115 values[4] = LSNGetDatum(flush);
4116
4117 if (!XLogRecPtrIsValid(apply))
4118 nulls[5] = true;
4119 values[5] = LSNGetDatum(apply);
4120
4121 /*
4122 * Treat a standby such as a pg_basebackup background process
4123 * which always returns an invalid flush location, as an
4124 * asynchronous standby.
4125 */
4126 priority = XLogRecPtrIsValid(flush) ? priority : 0;
4127
4128 if (writeLag < 0)
4129 nulls[6] = true;
4130 else
4132
4133 if (flushLag < 0)
4134 nulls[7] = true;
4135 else
4137
4138 if (applyLag < 0)
4139 nulls[8] = true;
4140 else
4142
4144
4145 /*
4146 * More easily understood version of standby state. This is purely
4147 * informational.
4148 *
4149 * In quorum-based sync replication, the role of each standby
4150 * listed in synchronous_standby_names can be changing very
4151 * frequently. Any standbys considered as "sync" at one moment can
4152 * be switched to "potential" ones at the next moment. So, it's
4153 * basically useless to report "sync" or "potential" as their sync
4154 * states. We report just "quorum" for them.
4155 */
4156 if (priority == 0)
4157 values[10] = CStringGetTextDatum("async");
4158 else if (is_sync_standby)
4160 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
4161 else
4162 values[10] = CStringGetTextDatum("potential");
4163
4164 if (replyTime == 0)
4165 nulls[11] = true;
4166 else
4167 values[11] = TimestampTzGetDatum(replyTime);
4168 }
4169
4170 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
4171 values, nulls);
4172 }
4173
4174 return (Datum) 0;
4175}
bool has_privs_of_role(Oid member, Oid role)
Definition acl.c:5314
#define MemSet(start, val, len)
Definition c.h:1109
int64 TimeOffset
Definition timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition funcapi.c:76
#define write(a, b, c)
Definition win32.h:14
int j
Definition isn.c:78
Oid GetUserId(void)
Definition miscinit.c:470
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
uint8 syncrep_method
Definition syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition syncrep.c:98
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition syncrep.c:755
#define SYNC_REP_PRIORITY
Definition syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition walsender.c:4003
static const char * WalSndGetStateString(WalSndState state)
Definition walsender.c:3984
WalSndState
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29

References CStringGetTextDatum, fb(), GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, SpinLockAcquire(), SpinLockRelease(), SYNC_REP_PRIORITY, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, write, and XLogRecPtrIsValid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2429 of file walsender.c.

2430{
2431 bool changed = false;
2433
2435 SpinLockAcquire(&slot->mutex);
2436 if (slot->data.restart_lsn != lsn)
2437 {
2438 changed = true;
2439 slot->data.restart_lsn = lsn;
2440 }
2441 SpinLockRelease(&slot->mutex);
2442
2443 if (changed)
2444 {
2448 }
2449
2450 /*
2451 * One could argue that the slot should be saved to disk now, but that'd
2452 * be energy wasted - the worst thing lost information could cause here is
2453 * to give wrong information in a statistics view - we'll just potentially
2454 * be more conservative in removing files.
2455 */
2456}
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1301
slock_t mutex
Definition slot.h:183
void PhysicalWakeupLogicalWalSnd(void)
Definition walsender.c:1765

References Assert, ReplicationSlot::data, fb(), ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire(), SpinLockRelease(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2567 of file walsender.c.

2568{
2569 bool changed = false;
2571
2572 SpinLockAcquire(&slot->mutex);
2574
2575 /*
2576 * For physical replication we don't need the interlock provided by xmin
2577 * and effective_xmin since the consequences of a missed increase are
2578 * limited to query cancellations, so set both at once.
2579 */
2580 if (!TransactionIdIsNormal(slot->data.xmin) ||
2583 {
2584 changed = true;
2585 slot->data.xmin = feedbackXmin;
2587 }
2591 {
2592 changed = true;
2595 }
2596 SpinLockRelease(&slot->mutex);
2597
2598 if (changed)
2599 {
2602 }
2603}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1219
TransactionId catalog_xmin
Definition slot.h:122
TransactionId effective_catalog_xmin
Definition slot.h:210
TransactionId effective_xmin
Definition slot.h:209
#define TransactionIdIsNormal(xid)
Definition transam.h:42
static bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition transam.h:263

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, fb(), InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire(), SpinLockRelease(), TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1765 of file walsender.c.

1766{
1768
1769 /*
1770 * If we are running in a standby, there is no need to wake up walsenders.
1771 * This is because we do not support syncing slots to cascading standbys,
1772 * so, there are no walsenders waiting for standbys to catch up.
1773 */
1774 if (RecoveryInProgress())
1775 return;
1776
1779}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition slot.c:3060
#define SlotIsPhysical(slot)
Definition slot.h:287
ConditionVariable wal_confirm_rcv_cv

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1651 of file walsender.c.

1652{
1653 for (;;)
1654 {
1655 long sleeptime;
1656
1657 /* Check for input from the client */
1659
1660 /* die if timeout was reached */
1662
1663 /* Send keepalive if the time has come */
1665
1666 if (!pq_is_send_pending())
1667 break;
1668
1670
1671 /* Sleep until something happens or we time out */
1674
1675 /* Clear any already-pending wakeups */
1677
1679
1680 /* Process any requests or signals received recently */
1682
1683 /* Try to flush pending output to the client */
1684 if (pq_flush_if_writable() != 0)
1686 }
1687
1688 /* reactivate latch so WalSndLoop knows to continue */
1690}
TimestampTz GetCurrentTimestamp(void)
Definition timestamp.c:1636
struct Latch * MyLatch
Definition globals.c:63
void SetLatch(Latch *latch)
Definition latch.c:290
void ResetLatch(Latch *latch)
Definition latch.c:374
#define pq_flush_if_writable()
Definition libpq.h:50
#define pq_is_send_pending()
Definition libpq.h:51
#define WL_SOCKET_READABLE
#define WL_SOCKET_WRITEABLE
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition walsender.c:3843
static void WalSndCheckTimeOut(void)
Definition walsender.c:2880
static void ProcessRepliesIfAny(void)
Definition walsender.c:2278
static void WalSndKeepaliveIfNecessary(void)
Definition walsender.c:4213
static void WalSndHandleConfigReload(void)
Definition walsender.c:1628
static pg_noreturn void WalSndShutdown(void)
Definition walsender.c:385
static long WalSndComputeSleeptime(TimestampTz now)
Definition walsender.c:2836

References CHECK_FOR_INTERRUPTS, fb(), GetCurrentTimestamp(), MyLatch, pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), SetLatch(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2278 of file walsender.c.

2279{
2280 unsigned char firstchar;
2281 int maxmsglen;
2282 int r;
2283 bool received = false;
2284
2286
2287 /*
2288 * If we already received a CopyDone from the frontend, any subsequent
2289 * message is the beginning of a new command, and should be processed in
2290 * the main processing loop.
2291 */
2292 while (!streamingDoneReceiving)
2293 {
2296 if (r < 0)
2297 {
2298 /* unexpected error or EOF */
2301 errmsg("unexpected EOF on standby connection")));
2302 proc_exit(0);
2303 }
2304 if (r == 0)
2305 {
2306 /* no data available without blocking */
2307 pq_endmsgread();
2308 break;
2309 }
2310
2311 /* Validate message type and set packet size limit */
2312 switch (firstchar)
2313 {
2314 case PqMsg_CopyData:
2316 break;
2317 case PqMsg_CopyDone:
2318 case PqMsg_Terminate:
2320 break;
2321 default:
2322 ereport(FATAL,
2324 errmsg("invalid standby message type \"%c\"",
2325 firstchar)));
2326 maxmsglen = 0; /* keep compiler quiet */
2327 break;
2328 }
2329
2330 /* Read the message contents */
2333 {
2336 errmsg("unexpected EOF on standby connection")));
2337 proc_exit(0);
2338 }
2339
2340 /* ... and process it */
2341 switch (firstchar)
2342 {
2343 /*
2344 * PqMsg_CopyData means a standby reply wrapped in a CopyData
2345 * packet.
2346 */
2347 case PqMsg_CopyData:
2349 received = true;
2350 break;
2351
2352 /*
2353 * PqMsg_CopyDone means the standby requested to finish
2354 * streaming. Reply with CopyDone, if we had not sent that
2355 * already.
2356 */
2357 case PqMsg_CopyDone:
2359 {
2361 streamingDoneSending = true;
2362 }
2363
2365 received = true;
2366 break;
2367
2368 /*
2369 * PqMsg_Terminate means that the standby is closing down the
2370 * socket.
2371 */
2372 case PqMsg_Terminate:
2373 proc_exit(0);
2374
2375 default:
2376 Assert(false); /* NOT REACHED */
2377 }
2378 }
2379
2380 /*
2381 * Save the last reply timestamp if we've received at least one reply.
2382 */
2383 if (received)
2384 {
2387 }
2388}
#define COMMERROR
Definition elog.h:33
#define FATAL
Definition elog.h:41
void proc_exit(int code)
Definition ipc.c:105
#define pq_putmessage_noblock(msgtype, s, len)
Definition libpq.h:54
int pq_getbyte_if_available(unsigned char *c)
Definition pqcomm.c:1004
void pq_endmsgread(void)
Definition pqcomm.c:1166
#define PqMsg_Terminate
Definition protocol.h:28
void resetStringInfo(StringInfo str)
Definition stringinfo.c:126
static bool waiting_for_ping_response
Definition walsender.c:191
static TimestampTz last_processing
Definition walsender.c:182
static bool streamingDoneSending
Definition walsender.c:199
static void ProcessStandbyMessage(void)
Definition walsender.c:2394
static bool streamingDoneReceiving
Definition walsender.c:200

References Assert, COMMERROR, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, FATAL, fb(), GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2647 of file walsender.c.

2648{
2653 TimestampTz replyTime;
2654
2655 /*
2656 * Decipher the reply message. The caller already consumed the msgtype
2657 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2658 * of this message.
2659 */
2660 replyTime = pq_getmsgint64(&reply_message);
2665
2667 {
2668 char *replyTimeStr;
2669
2670 /* Copy because timestamptz_to_str returns a static buffer */
2672
2673 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2678 replyTimeStr);
2679
2681 }
2682
2683 /*
2684 * Update shared state for this WalSender process based on reply data from
2685 * standby.
2686 */
2687 {
2689
2690 SpinLockAcquire(&walsnd->mutex);
2691 walsnd->replyTime = replyTime;
2692 SpinLockRelease(&walsnd->mutex);
2693 }
2694
2695 /*
2696 * Unset WalSender's xmins if the feedback message values are invalid.
2697 * This happens when the downstream turned hot_standby_feedback off.
2698 */
2701 {
2703 if (MyReplicationSlot != NULL)
2705 return;
2706 }
2707
2708 /*
2709 * Check that the provided xmin/epoch are sane, that is, not in the future
2710 * and not so far back as to be already wrapped around. Ignore if not.
2711 */
2714 return;
2715
2718 return;
2719
2720 /*
2721 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2722 * the xmin will be taken into account by GetSnapshotData() /
2723 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2724 * thereby prevent the generation of cleanup conflicts on the standby
2725 * server.
2726 *
2727 * There is a small window for a race condition here: although we just
2728 * checked that feedbackXmin precedes nextXid, the nextXid could have
2729 * gotten advanced between our fetching it and applying the xmin below,
2730 * perhaps far enough to make feedbackXmin wrap around. In that case the
2731 * xmin we set here would be "in the future" and have no effect. No point
2732 * in worrying about this since it's too late to save the desired data
2733 * anyway. Assuming that the standby sends us an increasing sequence of
2734 * xmins, this could only happen during the first reply cycle, else our
2735 * own xmin would prevent nextXid from advancing so far.
2736 *
2737 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2738 * is assumed atomic, and there's no real need to prevent concurrent
2739 * horizon determinations. (If we're moving our xmin forward, this is
2740 * obviously safe, and if we're moving it backwards, well, the data is at
2741 * risk already since a VACUUM could already have determined the horizon.)
2742 *
2743 * If we're using a replication slot we reserve the xmin via that,
2744 * otherwise via the walsender's PGPROC entry. We can only track the
2745 * catalog xmin separately when using a slot, so we store the least of the
2746 * two provided when not using a slot.
2747 *
2748 * XXX: It might make sense to generalize the ephemeral slot concept and
2749 * always use the slot mechanism to handle the feedback xmin.
2750 */
2751 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2753 else
2754 {
2758 else
2760 }
2761}
const char * timestamptz_to_str(TimestampTz t)
Definition timestamp.c:1853
uint32_t uint32
Definition c.h:618
uint32 TransactionId
Definition c.h:738
bool message_level_is_interesting(int elevel)
Definition elog.c:284
#define DEBUG2
Definition elog.h:29
char * pstrdup(const char *in)
Definition mcxt.c:1781
void pfree(void *pointer)
Definition mcxt.c:1616
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition pqformat.c:414
int64 pq_getmsgint64(StringInfo msg)
Definition pqformat.c:452
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition walsender.c:2567
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition walsender.c:2616

References DEBUG2, elog, fb(), InvalidTransactionId, message_level_is_interesting(), MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, SpinLockAcquire(), SpinLockRelease(), timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2394 of file walsender.c.

2395{
2396 char msgtype;
2397
2398 /*
2399 * Check message type from the first byte.
2400 */
2402
2403 switch (msgtype)
2404 {
2407 break;
2408
2411 break;
2412
2415 break;
2416
2417 default:
2420 errmsg("unexpected message type \"%c\"", msgtype)));
2421 proc_exit(0);
2422 }
2423}
int pq_getmsgbyte(StringInfo msg)
Definition pqformat.c:398
#define PqReplMsg_PrimaryStatusRequest
Definition protocol.h:83
#define PqReplMsg_HotStandbyFeedback
Definition protocol.h:82
#define PqReplMsg_StandbyStatusUpdate
Definition protocol.h:84
static void ProcessStandbyHSFeedbackMessage(void)
Definition walsender.c:2647
static void ProcessStandbyPSRequestMessage(void)
Definition walsender.c:2767
static void ProcessStandbyReplyMessage(void)
Definition walsender.c:2462

References COMMERROR, ereport, errcode(), ERRCODE_PROTOCOL_VIOLATION, errmsg, fb(), pq_getmsgbyte(), PqReplMsg_HotStandbyFeedback, PqReplMsg_PrimaryStatusRequest, PqReplMsg_StandbyStatusUpdate, proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyPSRequestMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyPSRequestMessage()

static void ProcessStandbyPSRequestMessage ( void  )
static

Definition at line 2767 of file walsender.c.

2768{
2775 TimestampTz replyTime;
2776
2777 /*
2778 * This shouldn't happen because we don't support getting primary status
2779 * message from standby.
2780 */
2781 if (RecoveryInProgress())
2782 elog(ERROR, "the primary status is unavailable during recovery");
2783
2784 replyTime = pq_getmsgint64(&reply_message);
2785
2786 /*
2787 * Update shared state for this WalSender process based on reply data from
2788 * standby.
2789 */
2790 SpinLockAcquire(&walsnd->mutex);
2791 walsnd->replyTime = replyTime;
2792 SpinLockRelease(&walsnd->mutex);
2793
2794 /*
2795 * Consider transactions in the current database, as only these are the
2796 * ones replicated.
2797 */
2800
2801 /*
2802 * Update the oldest xid for standby transmission if an older prepared
2803 * transaction exists and is currently in commit phase.
2804 */
2808
2812 lsn = GetXLogWriteRecPtr();
2813
2814 elog(DEBUG2, "sending primary status");
2815
2816 /* construct the message... */
2823
2824 /* ... and send it wrapped in CopyData */
2826}
int64_t int64
Definition c.h:615
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition pqformat.h:160
static void pq_sendint64(StringInfo buf, uint64 i)
Definition pqformat.h:152
TransactionId GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
Definition procarray.c:2832
#define PqReplMsg_PrimaryStatusUpdate
Definition protocol.h:76
static FullTransactionId FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid, TransactionId xid)
Definition transam.h:443
#define U64FromFullTransactionId(x)
Definition transam.h:49
#define TransactionIdIsValid(xid)
Definition transam.h:41
TransactionId TwoPhaseGetOldestXidInCommit(void)
Definition twophase.c:2832
FullTransactionId ReadNextFullTransactionId(void)
Definition varsup.c:288
XLogRecPtr GetXLogWriteRecPtr(void)
Definition xlog.c:9646

References StringInfoData::data, DEBUG2, elog, ERROR, fb(), FullTransactionIdFromAllowableAt(), GetCurrentTimestamp(), GetOldestActiveTransactionId(), GetXLogWriteRecPtr(), InvalidXLogRecPtr, StringInfoData::len, MyWalSnd, output_message, pq_getmsgint64(), pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqReplMsg_PrimaryStatusUpdate, ReadNextFullTransactionId(), RecoveryInProgress(), reply_message, resetStringInfo(), SpinLockAcquire(), SpinLockRelease(), TransactionIdIsValid, TransactionIdPrecedes(), TwoPhaseGetOldestXidInCommit(), and U64FromFullTransactionId.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2462 of file walsender.c.

2463{
2465 flushPtr,
2466 applyPtr;
2467 bool replyRequested;
2468 TimeOffset writeLag,
2469 flushLag,
2470 applyLag;
2471 bool clearLagTimes;
2473 TimestampTz replyTime;
2474
2475 static bool fullyAppliedLastTime = false;
2476
2477 /* the caller already consumed the msgtype byte */
2481 replyTime = pq_getmsgint64(&reply_message);
2483
2485 {
2486 char *replyTimeStr;
2487
2488 /* Copy because timestamptz_to_str returns a static buffer */
2490
2491 elog(DEBUG2, "write %X/%08X flush %X/%08X apply %X/%08X%s reply_time %s",
2495 replyRequested ? " (reply requested)" : "",
2496 replyTimeStr);
2497
2499 }
2500
2501 /* See if we can compute the round-trip lag for these positions. */
2506
2507 /*
2508 * If the standby reports that it has fully replayed the WAL in two
2509 * consecutive reply messages, then the second such message must result
2510 * from wal_receiver_status_interval expiring on the standby. This is a
2511 * convenient time to forget the lag times measured when it last
2512 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2513 * until more WAL traffic arrives.
2514 */
2515 clearLagTimes = false;
2516 if (applyPtr == sentPtr)
2517 {
2519 clearLagTimes = true;
2520 fullyAppliedLastTime = true;
2521 }
2522 else
2523 fullyAppliedLastTime = false;
2524
2525 /* Send a reply if the standby requested one. */
2526 if (replyRequested)
2528
2529 /*
2530 * Update shared state for this WalSender process based on reply data from
2531 * standby.
2532 */
2533 {
2535
2536 SpinLockAcquire(&walsnd->mutex);
2537 walsnd->write = writePtr;
2538 walsnd->flush = flushPtr;
2539 walsnd->apply = applyPtr;
2540 if (writeLag != -1 || clearLagTimes)
2541 walsnd->writeLag = writeLag;
2542 if (flushLag != -1 || clearLagTimes)
2543 walsnd->flushLag = flushLag;
2544 if (applyLag != -1 || clearLagTimes)
2545 walsnd->applyLag = applyLag;
2546 walsnd->replyTime = replyTime;
2547 SpinLockRelease(&walsnd->mutex);
2548 }
2549
2552
2553 /*
2554 * Advance our local xmin horizon when the client confirmed a flush.
2555 */
2557 {
2560 else
2562 }
2563}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition logical.c:1811
#define SlotIsLogical(slot)
Definition slot.h:288
void SyncRepReleaseWaiters(void)
Definition syncrep.c:475
#define SYNC_REP_WAIT_WRITE
Definition syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition syncrep.h:25
static XLogRecPtr sentPtr
Definition walsender.c:174
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition walsender.c:2429
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition walsender.c:4190
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition walsender.c:4309

References am_cascading_walsender, DEBUG2, elog, fb(), GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, sentPtr, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), and XLogRecPtrIsValid.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 483 of file walsender.c.

484{
485#define READ_REPLICATION_SLOT_COLS 3
486 ReplicationSlot *slot;
489 TupleDesc tupdesc;
491 bool nulls[READ_REPLICATION_SLOT_COLS];
492
494 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
495 TEXTOID, -1, 0);
496 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
497 TEXTOID, -1, 0);
498 /* TimeLineID is unsigned, so int4 is not wide enough. */
499 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
500 INT8OID, -1, 0);
501 TupleDescFinalize(tupdesc);
502
503 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
504
506 slot = SearchNamedReplicationSlot(cmd->slotname, false);
507 if (slot == NULL || !slot->in_use)
508 {
510 }
511 else
512 {
514 int i = 0;
515
516 /* Copy slot contents while holding spinlock */
517 SpinLockAcquire(&slot->mutex);
518 slot_contents = *slot;
519 SpinLockRelease(&slot->mutex);
521
522 if (OidIsValid(slot_contents.data.database))
525 errmsg("cannot use %s with a logical replication slot",
526 "READ_REPLICATION_SLOT"));
527
528 /* slot type */
529 values[i] = CStringGetTextDatum("physical");
530 nulls[i] = false;
531 i++;
532
533 /* start LSN */
534 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
535 {
536 char xloc[64];
537
538 snprintf(xloc, sizeof(xloc), "%X/%08X",
539 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
541 nulls[i] = false;
542 }
543 i++;
544
545 /* timeline this WAL was produced on */
546 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
547 {
551
552 /*
553 * While in recovery, use as timeline the currently-replaying one
554 * to get the LSN position's history.
555 */
556 if (RecoveryInProgress())
558 else
560
565 nulls[i] = false;
566 }
567 i++;
568
570 }
571
574 do_tup_output(tstate, values, nulls);
576}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition timeline.c:77
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition timeline.c:545
#define OidIsValid(objectId)
Definition c.h:860
@ LW_SHARED
Definition lwlock.h:113
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition slot.c:542
Definition pg_list.h:54
bool in_use
Definition slot.h:186
#define READ_REPLICATION_SLOT_COLS

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg, ERROR, fb(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire(), SpinLockRelease(), tliOfPointInHistory(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsValid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 583 of file walsender.c.

584{
586 TupleDesc tupdesc;
589 char path[MAXPGPATH];
590 int fd;
593 Size len;
594
596
597 /*
598 * Reply with a result set with one row, and two columns. The first col is
599 * the name of the history file, 2nd is the contents.
600 */
601 tupdesc = CreateTemplateTupleDesc(2);
602 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
603 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
604 TupleDescFinalize(tupdesc);
605
607 TLHistoryFilePath(path, cmd->timeline);
608
609 /* Send a RowDescription message */
610 dest->rStartup(dest, CMD_SELECT, tupdesc);
611
612 /* Send a DataRow message */
614 pq_sendint16(&buf, 2); /* # of columns */
616 pq_sendint32(&buf, len); /* col1 len */
618
620 if (fd < 0)
623 errmsg("could not open file \"%s\": %m", path)));
624
625 /* Determine file length and send it to client */
627 if (histfilelen < 0)
630 errmsg("could not seek to end of file \"%s\": %m", path)));
631 if (lseek(fd, 0, SEEK_SET) != 0)
634 errmsg("could not seek to beginning of file \"%s\": %m", path)));
635
636 pq_sendint32(&buf, histfilelen); /* col2 len */
637
639 while (bytesleft > 0)
640 {
642 int nread;
643
645 nread = read(fd, rbuf.data, sizeof(rbuf));
647 if (nread < 0)
650 errmsg("could not read file \"%s\": %m",
651 path)));
652 else if (nread == 0)
655 errmsg("could not read file \"%s\": read %d of %zu",
656 path, nread, (Size) bytesleft)));
657
658 pq_sendbytes(&buf, rbuf.data, nread);
659 bytesleft -= nread;
660 }
661
662 if (CloseTransientFile(fd) != 0)
665 errmsg("could not close file \"%s\": %m", path)));
666
668}
#define PG_BINARY
Definition c.h:1376
size_t Size
Definition c.h:691
int errcode_for_file_access(void)
Definition elog.c:897
int CloseTransientFile(int fd)
Definition fd.c:2855
int OpenTransientFile(const char *fileName, int fileFlags)
Definition fd.c:2678
#define read(a, b, c)
Definition win32.h:13
@ CMD_SELECT
Definition nodes.h:275
#define ERRCODE_DATA_CORRUPTED
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition pqformat.h:136
static int fd(const char *x, int i)
#define PqMsg_DataRow
Definition protocol.h:43
TimeLineID timeline
Definition replnodes.h:120
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition wait_event.h:69
static void pgstat_report_wait_end(void)
Definition wait_event.h:85
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg, ERROR, fb(), fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), TupleDescFinalize(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1463 of file walsender.c.

1464{
1466 QueryCompletion qc;
1467
1468 /* make sure that our requirements are still fulfilled */
1470
1472
1473 ReplicationSlotAcquire(cmd->slotname, true, true);
1474
1475 /*
1476 * Force a disconnect, so that the decoding code doesn't need to care
1477 * about an eventual switch from running in recovery, to running in a
1478 * normal environment. Client code is expected to handle reconnects.
1479 */
1481 {
1482 ereport(LOG,
1483 (errmsg("terminating walsender process after promotion")));
1484 got_STOPPING = true;
1485 }
1486
1487 /*
1488 * Create our decoding context, making it start at the previously ack'ed
1489 * position.
1490 *
1491 * Do this before sending a CopyBothResponse message, so that any errors
1492 * are reported early.
1493 */
1495 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1497 .segment_open = WalSndSegmentOpen,
1498 .segment_close = wal_segment_close),
1502
1504
1505 /* Send a CopyBothResponse message, and start streaming */
1507 pq_sendbyte(&buf, 0);
1508 pq_sendint16(&buf, 0);
1510 pq_flush();
1511
1512 /* Start reading WAL from the oldest required WAL. */
1515
1516 /*
1517 * Report the location after which we'll send out further commits as the
1518 * current sentPtr.
1519 */
1521
1522 /* Also update the sent position status in shared memory */
1526
1527 replication_active = true;
1528
1530
1531 /* Main loop of walsender */
1533
1536
1537 replication_active = false;
1538 if (got_STOPPING)
1539 proc_exit(0);
1541
1542 /* Get out of COPY mode (CommandComplete). */
1544 EndCommand(&qc, DestRemote, false);
1545}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition dest.c:169
@ DestRemote
Definition dest.h:89
#define pq_flush()
Definition libpq.h:49
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:489
#define PqMsg_CopyBothResponse
Definition protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:622
XLogReaderState * reader
Definition logical.h:42
XLogRecPtr startpoint
Definition replnodes.h:97
slock_t mutex
XLogRecPtr sentPtr
void SyncRepInitConfig(void)
Definition syncrep.c:446
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition walsender.c:2907
static LogicalDecodingContext * logical_decoding_ctx
Definition walsender.c:217
static void XLogSendLogical(void)
Definition walsender.c:3524
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition xlogreader.c:233

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg, fb(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire(), SpinLockRelease(), StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 816 of file walsender.c.

817{
821
822 /* create xlogreader for physical replication */
823 xlogreader =
825 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
826 .segment_close = wal_segment_close),
827 NULL);
828
829 if (!xlogreader)
832 errmsg("out of memory"),
833 errdetail("Failed while allocating a WAL reading processor.")));
834
835 /*
836 * We assume here that we're logging enough information in the WAL for
837 * log-shipping, since this is checked in PostmasterMain().
838 *
839 * NOTE: wal_level can only change at shutdown, so in most cases it is
840 * difficult for there to be WAL data that we can still see that was
841 * written at wal_level='minimal'.
842 */
843
844 if (cmd->slotname)
845 {
846 ReplicationSlotAcquire(cmd->slotname, true, true);
850 errmsg("cannot use a logical replication slot for physical replication")));
851
852 /*
853 * We don't need to verify the slot's restart_lsn here; instead we
854 * rely on the caller requesting the starting point to use. If the
855 * WAL segment doesn't exist, we'll fail later.
856 */
857 }
858
859 /*
860 * Select the timeline. If it was given explicitly by the client, use
861 * that. Otherwise use the timeline of the last replayed record.
862 */
866 else
868
869 if (cmd->timeline != 0)
870 {
872
873 sendTimeLine = cmd->timeline;
874 if (sendTimeLine == FlushTLI)
875 {
878 }
879 else
880 {
882
884
885 /*
886 * Check that the timeline the client requested exists, and the
887 * requested start location is on that timeline.
888 */
893
894 /*
895 * Found the requested timeline in the history. Check that
896 * requested startpoint is on that timeline in our history.
897 *
898 * This is quite loose on purpose. We only check that we didn't
899 * fork off the requested timeline before the switchpoint. We
900 * don't check that we switched *to* it before the requested
901 * starting point. This is because the client can legitimately
902 * request to start replication from the beginning of the WAL
903 * segment that contains switchpoint, but on the new timeline, so
904 * that it doesn't end up with a partial segment. If you ask for
905 * too old a starting point, you'll get an error later when we
906 * fail to find the requested WAL segment in pg_wal.
907 *
908 * XXX: we could be more strict here and only allow a startpoint
909 * that's older than the switchpoint, if it's still in the same
910 * WAL segment.
911 */
913 switchpoint < cmd->startpoint)
914 {
916 errmsg("requested starting point %X/%08X on timeline %u is not in this server's history",
918 cmd->timeline),
919 errdetail("This server's history forked from timeline %u at %X/%08X.",
920 cmd->timeline,
922 }
924 }
925 }
926 else
927 {
931 }
932
934
935 /* If there is nothing to stream, don't even enter COPY mode */
937 {
938 /*
939 * When we first start replication the standby will be behind the
940 * primary. For some applications, for example synchronous
941 * replication, it is important to have a clear state for this initial
942 * catchup mode, so we can trigger actions when we change streaming
943 * state later. We may stay in this state for a long time, which is
944 * exactly why we want to be able to monitor whether or not we are
945 * still here.
946 */
948
949 /* Send a CopyBothResponse message, and start streaming */
951 pq_sendbyte(&buf, 0);
952 pq_sendint16(&buf, 0);
954 pq_flush();
955
956 /*
957 * Don't allow a request to stream from a future point in WAL that
958 * hasn't been flushed to disk in this server yet.
959 */
960 if (FlushPtr < cmd->startpoint)
961 {
963 errmsg("requested starting point %X/%08X is ahead of the WAL flush position of this server %X/%08X",
966 }
967
968 /* Start streaming from the requested point */
969 sentPtr = cmd->startpoint;
970
971 /* Initialize shared memory status, too */
975
977
978 /* Main loop of walsender */
979 replication_active = true;
980
982
983 replication_active = false;
984 if (got_STOPPING)
985 proc_exit(0);
987
989 }
990
991 if (cmd->slotname)
993
994 /*
995 * Copy is finished now. Send a single-row result set indicating the next
996 * timeline.
997 */
999 {
1000 char startpos_str[8 + 1 + 8 + 1];
1003 TupleDesc tupdesc;
1004 Datum values[2];
1005 bool nulls[2] = {0};
1006
1007 snprintf(startpos_str, sizeof(startpos_str), "%X/%08X",
1009
1011
1012 /*
1013 * Need a tuple descriptor representing two columns. int8 may seem
1014 * like a surprising data type for this, but in theory int4 would not
1015 * be wide enough for this, as TimeLineID is unsigned.
1016 */
1017 tupdesc = CreateTemplateTupleDesc(2);
1018 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
1019 INT8OID, -1, 0);
1020 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
1021 TEXTOID, -1, 0);
1022 TupleDescFinalize(tupdesc);
1023
1024 /* prepare for projection of tuple */
1026
1029
1030 /* send it to dest */
1031 do_tup_output(tstate, values, nulls);
1032
1034 }
1035
1036 /* Send CommandComplete message */
1037 EndReplicationCommand("START_STREAMING");
1038}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition timeline.c:573
int errdetail(const char *fmt,...) pg_attribute_printf(1
void list_free_deep(List *list)
Definition list.c:1560
TimeLineID timeline
Definition replnodes.h:96
static void XLogSendPhysical(void)
Definition walsender.c:3214
int wal_segment_size
Definition xlog.c:147
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition xlogreader.c:108

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg, ERROR, fb(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire(), SpinLockRelease(), StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescFinalize(), TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsValid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2616 of file walsender.c.

2617{
2619 TransactionId nextXid;
2621
2625
2626 if (xid <= nextXid)
2627 {
2628 if (epoch != nextEpoch)
2629 return false;
2630 }
2631 else
2632 {
2633 if (epoch + 1 != nextEpoch)
2634 return false;
2635 }
2636
2637 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2638 return false; /* epoch OK, but it's wrapped around */
2639
2640 return true;
2641}
#define EpochFromFullTransactionId(x)
Definition transam.h:47
static bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition transam.h:282
#define XidFromFullTransactionId(x)
Definition transam.h:48
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, fb(), ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 674 of file walsender.c.

675{
676 MemoryContext mcxt;
678 off_t offset = 0;
680
681 /*
682 * parsing the manifest will use the cryptohash stuff, which requires a
683 * resource owner
684 */
689
690 /* Prepare to read manifest data into a temporary context. */
692 "incremental backup information",
695
696 /* Send a CopyInResponse message */
698 pq_sendbyte(&buf, 0);
699 pq_sendint16(&buf, 0);
701 pq_flush();
702
703 /* Receive packets from client until done. */
704 while (HandleUploadManifestPacket(&buf, &offset, ib))
705 ;
706
707 /* Finish up manifest processing. */
709
710 /*
711 * Discard any old manifest information and arrange to preserve the new
712 * information we just got.
713 *
714 * We assume that MemoryContextDelete and MemoryContextSetParent won't
715 * fail, and thus we shouldn't end up bailing out of here in such a way as
716 * to leave dangling pointers.
717 */
723
724 /* clean up the resource owner we created */
726}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition mcxt.c:686
MemoryContext CacheMemoryContext
Definition mcxt.c:169
void MemoryContextDelete(MemoryContext context)
Definition mcxt.c:472
void pq_endmessage_reuse(StringInfo buf)
Definition pqformat.c:313
#define PqMsg_CopyInResponse
Definition protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition resowner.c:1016
ResourceOwner CurrentResourceOwner
Definition resowner.c:173
ResourceOwner AuxProcessResourceOwner
Definition resowner.c:176
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition walsender.c:740
static MemoryContext uploaded_manifest_mcxt
Definition walsender.c:157

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, fb(), FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2880 of file walsender.c.

2881{
2883
2884 /* don't bail out if we're doing something that doesn't require timeouts */
2885 if (last_reply_timestamp <= 0)
2886 return;
2887
2890
2892 {
2893 /*
2894 * Since typically expiration of replication timeout means
2895 * communication problem, we don't send the error message to the
2896 * standby.
2897 */
2899 (errmsg("terminating walsender process due to replication timeout")));
2900
2902 }
2903}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition timestamp.h:85
int wal_sender_timeout
Definition walsender.c:132

References COMMERROR, ereport, errmsg, fb(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2836 of file walsender.c.

2837{
2838 long sleeptime = 10000; /* 10 s */
2839
2841 {
2843
2844 /*
2845 * At the latest stop sleeping once wal_sender_timeout has been
2846 * reached.
2847 */
2850
2851 /*
2852 * If no ping has been sent yet, wakeup when it's time to do so.
2853 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2854 * the timeout passed without a response.
2855 */
2858 wal_sender_timeout / 2);
2859
2860 /* Compute relative time until wakeup. */
2862 }
2863
2864 return sleeptime;
2865}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition timestamp.c:1748

References fb(), last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3617 of file walsender.c.

3618{
3620
3621 /* ... let's just be real sure we're caught up ... */
3622 send_data();
3623
3624 /*
3625 * To figure out whether all WAL has successfully been replicated, check
3626 * flush location if valid, write otherwise. Tools like pg_receivewal will
3627 * usually (unless in synchronous mode) return an invalid flush location.
3628 */
3631
3634 {
3635 QueryCompletion qc;
3636
3637 /* Inform the standby that XLOG streaming is done */
3639 EndCommand(&qc, DestRemote, false);
3640 pq_flush();
3641
3642 proc_exit(0);
3643 }
3646}
XLogRecPtr flush
XLogRecPtr write
static bool WalSndCaughtUp
Definition walsender.c:203

References DestRemote, EndCommand(), fb(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsValid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 349 of file walsender.c.

350{
355
356 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
358
359 if (MyReplicationSlot != NULL)
361
363
364 replication_active = false;
365
366 /*
367 * If there is a transaction in progress, it will clean up our
368 * ResourceOwner, but if a replication command set up a resource owner
369 * without a transaction, we've got to clean that up now.
370 */
373
375 proc_exit(0);
376
377 /* Revert back to startup state */
379}
void pgaio_error_cleanup(void)
Definition aio.c:1165
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition lwlock.c:1893
void ReplicationSlotCleanup(bool synced_only)
Definition slot.c:861
WALOpenSegment seg
Definition xlogreader.h:271
static volatile sig_atomic_t got_SIGUSR2
Definition walsender.c:206
bool IsTransactionOrTransactionBlock(void)
Definition xact.c:5012

References ConditionVariableCancelSleep(), fb(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgaio_error_cleanup(), pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 3984 of file walsender.c.

3985{
3986 switch (state)
3987 {
3989 return "startup";
3990 case WALSNDSTATE_BACKUP:
3991 return "backup";
3993 return "catchup";
3995 return "streaming";
3997 return "stopping";
3998 }
3999 return "UNKNOWN";
4000}
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndHandleConfigReload()

static void WalSndHandleConfigReload ( void  )
static

Definition at line 1628 of file walsender.c.

1629{
1631 return;
1632
1633 ConfigReloadPending = false;
1636
1637 /*
1638 * Recheck and release any now-satisfied waiters after config reload
1639 * changes synchronous replication requirements (e.g., reducing the number
1640 * of sync standbys or changing the standby names).
1641 */
1644}
void ProcessConfigFile(GucContext context)
Definition guc-file.l:120
@ PGC_SIGHUP
Definition guc.h:75
volatile sig_atomic_t ConfigReloadPending
Definition interrupt.c:27

References am_cascading_walsender, ConfigReloadPending, PGC_SIGHUP, ProcessConfigFile(), SyncRepInitConfig(), and SyncRepReleaseWaiters().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3901 of file walsender.c.

3902{
3903 int i;
3904
3905 for (i = 0; i < max_wal_senders; i++)
3906 {
3908 pid_t pid;
3909
3910 SpinLockAcquire(&walsnd->mutex);
3911 pid = walsnd->pid;
3912 SpinLockRelease(&walsnd->mutex);
3913
3914 if (pid == 0)
3915 continue;
3916
3918 }
3919}
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition procsignal.c:287
@ PROCSIG_WALSND_INIT_STOPPING
Definition procsignal.h:35

References fb(), i, INVALID_PROC_NUMBER, max_wal_senders, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4213 of file walsender.c.

4214{
4216
4217 /*
4218 * Don't send keepalive messages if timeouts are globally disabled or
4219 * we're doing something not partaking in timeouts.
4220 */
4222 return;
4223
4225 return;
4226
4227 /*
4228 * If half of wal_sender_timeout has lapsed without receiving any reply
4229 * from the standby, send a keep-alive message to the standby requesting
4230 * an immediate reply.
4231 */
4233 wal_sender_timeout / 2);
4235 {
4237
4238 /* Try to flush pending output to the client */
4239 if (pq_flush_if_writable() != 0)
4241 }
4242}

References fb(), InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 3120 of file walsender.c.

3121{
3123
3124 Assert(walsnd != NULL);
3125
3126 MyWalSnd = NULL;
3127
3128 SpinLockAcquire(&walsnd->mutex);
3129 /* Mark WalSnd struct as no longer being in use. */
3130 walsnd->pid = 0;
3131 SpinLockRelease(&walsnd->mutex);
3132}

References Assert, fb(), MyWalSnd, SpinLockAcquire(), and SpinLockRelease().

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3738 of file walsender.c.

3739{
3740 got_SIGUSR2 = true;
3742}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2907 of file walsender.c.

2908{
2910
2911 /*
2912 * Initialize the last reply timestamp. That enables timeout processing
2913 * from hereon.
2914 */
2917
2918 /*
2919 * Loop until we reach the end of this timeline or the client requests to
2920 * stop streaming.
2921 */
2922 for (;;)
2923 {
2924 /* Clear any already-pending wakeups */
2926
2928
2929 /* Process any requests or signals received recently */
2931
2932 /* Check for input from the client */
2934
2935 /*
2936 * If we have received CopyDone from the client, sent CopyDone
2937 * ourselves, and the output buffer is empty, it's time to exit
2938 * streaming.
2939 */
2942 break;
2943
2944 /*
2945 * If we don't have any pending data in the output buffer, try to send
2946 * some more. If there is some, we don't bother to call send_data
2947 * again until we've flushed it ... but we'd better assume we are not
2948 * caught up.
2949 */
2950 if (!pq_is_send_pending())
2951 send_data();
2952 else
2953 WalSndCaughtUp = false;
2954
2955 /* Try to flush pending output to the client */
2956 if (pq_flush_if_writable() != 0)
2958
2959 /* If nothing remains to be sent right now ... */
2961 {
2962 /*
2963 * If we're in catchup state, move to streaming. This is an
2964 * important state change for users to know about, since before
2965 * this point data loss might occur if the primary dies and we
2966 * need to failover to the standby. The state change is also
2967 * important for synchronous replication, since commits that
2968 * started to wait at that point might wait for some time.
2969 */
2971 {
2973 (errmsg_internal("\"%s\" has now caught up with upstream server",
2976 }
2977
2978 /*
2979 * When SIGUSR2 arrives, we send any outstanding logs up to the
2980 * shutdown checkpoint record (i.e., the latest record), wait for
2981 * them to be replicated to the standby, and exit. This may be a
2982 * normal termination at shutdown, or a promotion, the walsender
2983 * is not sure which.
2984 */
2985 if (got_SIGUSR2)
2987 }
2988
2989 /* Check for replication timeout. */
2991
2992 /* Send keepalive if the time has come */
2994
2995 /*
2996 * Block if we have unsent data. XXX For logical replication, let
2997 * WalSndWaitForWal() handle any other blocking; idle receivers need
2998 * its additional actions. For physical replication, also block if
2999 * caught up; its send_data does not block.
3000 *
3001 * The IO statistics are reported in WalSndWaitForWal() for the
3002 * logical WAL senders.
3003 */
3007 {
3008 long sleeptime;
3009 int wakeEvents;
3011
3014 else
3015 wakeEvents = 0;
3016
3017 /*
3018 * Use fresh timestamp, not last_processing, to reduce the chance
3019 * of reaching wal_sender_timeout before sending a keepalive.
3020 */
3023
3024 if (pq_is_send_pending())
3026
3027 /* Report IO statistics, if needed */
3030 {
3031 pgstat_flush_io(false);
3033 last_flush = now;
3034 }
3035
3036 /* Sleep until something happens or we time out */
3038 }
3039 }
3040}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition timestamp.c:1772
char * application_name
Definition guc_tables.c:571
bool pgstat_flush_backend(bool nowait, bits32 flags)
#define PGSTAT_BACKEND_FLUSH_IO
void pgstat_flush_io(bool nowait)
Definition pgstat_io.c:175
#define WALSENDER_STATS_FLUSH_INTERVAL
Definition walsender.c:104
static void WalSndDone(WalSndSendDataCallback send_data)
Definition walsender.c:3617

References application_name, CHECK_FOR_INTERRUPTS, DEBUG1, ereport, errmsg_internal(), fb(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndHandleConfigReload(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1556 of file walsender.c.

1557{
1558 /* can't have sync rep confused by sending the same LSN several times */
1559 if (!last_write)
1560 lsn = InvalidXLogRecPtr;
1561
1562 resetStringInfo(ctx->out);
1563
1565 pq_sendint64(ctx->out, lsn); /* dataStart */
1566 pq_sendint64(ctx->out, lsn); /* walEnd */
1567
1568 /*
1569 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1570 * reserve space here.
1571 */
1572 pq_sendint64(ctx->out, 0); /* sendtime */
1573}
#define PqReplMsg_WALData
Definition protocol.h:77

References fb(), InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), PqReplMsg_WALData, and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3693 of file walsender.c.

3694{
3695 int i;
3696
3697 for (i = 0; i < max_wal_senders; i++)
3698 {
3700
3701 SpinLockAcquire(&walsnd->mutex);
3702 if (walsnd->pid == 0)
3703 {
3704 SpinLockRelease(&walsnd->mutex);
3705 continue;
3706 }
3707 walsnd->needreload = true;
3708 SpinLockRelease(&walsnd->mutex);
3709 }
3710}

References fb(), i, max_wal_senders, SpinLockAcquire(), SpinLockRelease(), WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 3136 of file walsender.c.

3138{
3139 char path[MAXPGPATH];
3140
3141 /*-------
3142 * When reading from a historic timeline, and there is a timeline switch
3143 * within this segment, read from the WAL segment belonging to the new
3144 * timeline.
3145 *
3146 * For example, imagine that this server is currently on timeline 5, and
3147 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
3148 * 0/13002088. In pg_wal, we have these files:
3149 *
3150 * ...
3151 * 000000040000000000000012
3152 * 000000040000000000000013
3153 * 000000050000000000000013
3154 * 000000050000000000000014
3155 * ...
3156 *
3157 * In this situation, when requested to send the WAL from segment 0x13, on
3158 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
3159 * recovery prefers files from newer timelines, so if the segment was
3160 * restored from the archive on this server, the file belonging to the old
3161 * timeline, 000000040000000000000013, might not exist. Their contents are
3162 * equal up to the switchpoint, because at a timeline switch, the used
3163 * portion of the old segment is copied to the new file.
3164 */
3167 {
3169
3170 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
3171 if (nextSegNo == endSegNo)
3173 }
3174
3175 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3176 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3177 if (state->seg.ws_file >= 0)
3178 return;
3179
3180 /*
3181 * If the file is not found, assume it's because the standby asked for a
3182 * too old WAL segment that has already been removed or recycled.
3183 */
3184 if (errno == ENOENT)
3185 {
3186 char xlogfname[MAXFNAMELEN];
3187 int save_errno = errno;
3188
3190 errno = save_errno;
3191 ereport(ERROR,
3193 errmsg("requested WAL segment %s has already been removed",
3194 xlogfname)));
3195 }
3196 else
3197 ereport(ERROR,
3199 errmsg("could not open file \"%s\": %m",
3200 path)));
3201}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition fd.c:1090
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg, ERROR, fb(), MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3965 of file walsender.c.

3966{
3968
3970
3971 if (walsnd->state == state)
3972 return;
3973
3974 SpinLockAcquire(&walsnd->mutex);
3975 walsnd->state = state;
3976 SpinLockRelease(&walsnd->mutex);
3977}

References am_walsender, Assert, fb(), MyWalSnd, SpinLockAcquire(), and SpinLockRelease().

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3777 of file walsender.c.

3778{
3779 bool found;
3780 int i;
3781
3783 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3784
3785 if (!found)
3786 {
3787 /* First time through, so initialize */
3789
3790 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3792
3793 for (i = 0; i < max_wal_senders; i++)
3794 {
3796
3797 SpinLockInit(&walsnd->mutex);
3798 }
3799
3803 }
3804}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition shmem.c:381
static void SpinLockInit(volatile slock_t *lock)
Definition spin.h:50
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition walsender.c:3765

References ConditionVariableInit(), dlist_init(), fb(), i, max_wal_senders, MemSet, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit(), WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3765 of file walsender.c.

3766{
3767 Size size = 0;
3768
3769 size = offsetof(WalSndCtlData, walsnds);
3770 size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3771
3772 return size;
3773}
Size add_size(Size s1, Size s2)
Definition shmem.c:485
Size mul_size(Size s1, Size s2)
Definition shmem.c:500

References add_size(), fb(), max_wal_senders, and mul_size().

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 385 of file walsender.c.

386{
387 /*
388 * Reset whereToSendOutput to prevent ereport from attempting to send any
389 * more messages to the standby.
390 */
393
394 proc_exit(0);
395}
@ DestNone
Definition dest.h:87
CommandDest whereToSendOutput
Definition postgres.c:94

References DestNone, DestRemote, proc_exit(), and whereToSendOutput.

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3746 of file walsender.c.

3747{
3748 /* Set up signal handlers */
3750 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3751 pqsignal(SIGTERM, die); /* request shutdown */
3752 /* SIGQUIT handler was already set up by InitPostmasterChild */
3753 InitializeTimeouts(); /* establishes SIGALRM handler */
3756 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3757 * shutdown */
3758
3759 /* Reset some signals that are accepted by postmaster but not here */
3761}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition interrupt.c:61
#define die(msg)
#define pqsignal
Definition port.h:547
void StatementCancelHandler(SIGNAL_ARGS)
Definition postgres.c:3042
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition procsignal.c:680
void InitializeTimeouts(void)
Definition timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition walsender.c:3738
#define SIGCHLD
Definition win32_port.h:168
#define SIGHUP
Definition win32_port.h:158
#define SIGPIPE
Definition win32_port.h:163
#define SIGUSR1
Definition win32_port.h:170
#define SIGUSR2
Definition win32_port.h:171

References die, fb(), InitializeTimeouts(), pqsignal, procsignal_sigusr1_handler(), SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1700 of file walsender.c.

1702{
1703 static TimestampTz sendTime = 0;
1705 bool pending_writes = false;
1706 bool end_xact = ctx->end_xact;
1707
1708 /*
1709 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1710 * avoid flooding the lag tracker when we commit frequently.
1711 *
1712 * We don't have a mechanism to get the ack for any LSN other than end
1713 * xact LSN from the downstream. So, we track lag only for end of
1714 * transaction LSN.
1715 */
1716#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1717 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1719 {
1720 LagTrackerWrite(lsn, now);
1721 sendTime = now;
1722 }
1723
1724 /*
1725 * When skipping empty transactions in synchronous replication, we send a
1726 * keepalive message to avoid delaying such transactions.
1727 *
1728 * It is okay to check sync_standbys_status without lock here as in the
1729 * worst case we will just send an extra keepalive message when it is
1730 * really not required.
1731 */
1732 if (skipped_xact &&
1733 SyncRepRequested() &&
1734 (((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_status & SYNC_STANDBY_DEFINED))
1735 {
1736 WalSndKeepalive(false, lsn);
1737
1738 /* Try to flush pending output to the client */
1739 if (pq_flush_if_writable() != 0)
1741
1742 /* If we have pending write here, make sure it's actually flushed */
1743 if (pq_is_send_pending())
1744 pending_writes = true;
1745 }
1746
1747 /*
1748 * Process pending writes if any or try to send a keepalive if required.
1749 * We don't need to try sending keep alive messages at the transaction end
1750 * as that will be done at a later point in time. This is required only
1751 * for large transactions where we don't send any changes to the
1752 * downstream and the receiver can timeout due to that.
1753 */
1754 if (pending_writes || (!end_xact &&
1756 wal_sender_timeout / 2)))
1758}
#define SyncRepRequested()
Definition syncrep.h:18
static void ProcessPendingWrites(void)
Definition walsender.c:1651
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition walsender.c:4251
#define SYNC_STANDBY_DEFINED

References LogicalDecodingContext::end_xact, fb(), GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SYNC_STANDBY_DEFINED, SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3843 of file walsender.c.

3844{
3845 WaitEvent event;
3846
3848
3849 /*
3850 * We use a condition variable to efficiently wake up walsenders in
3851 * WalSndWakeup().
3852 *
3853 * Every walsender prepares to sleep on a shared memory CV. Note that it
3854 * just prepares to sleep on the CV (i.e., adds itself to the CV's
3855 * waitlist), but does not actually wait on the CV (IOW, it never calls
3856 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3857 * waiting, because we also need to wait for socket events. The processes
3858 * (startup process, walreceiver etc.) wanting to wake up walsenders use
3859 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3860 * walsenders come out of WaitEventSetWait().
3861 *
3862 * This approach is simple and efficient because, one doesn't have to loop
3863 * through all the walsenders slots, with a spinlock acquisition and
3864 * release for every iteration, just to wake up only the waiting
3865 * walsenders. It makes WalSndWakeup() callers' life easy.
3866 *
3867 * XXX: A desirable future improvement would be to add support for CVs
3868 * into WaitEventSetWait().
3869 *
3870 * And, we use separate shared memory CVs for physical and logical
3871 * walsenders for selective wake ups, see WalSndWakeup() for more details.
3872 *
3873 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3874 * until awakened by physical walsenders after the walreceiver confirms
3875 * the receipt of the LSN.
3876 */
3883
3884 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3885 (event.events & WL_POSTMASTER_DEATH))
3886 {
3888 proc_exit(1);
3889 }
3890
3892}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
#define FeBeWaitSetSocketPos
Definition libpq.h:66
WaitEventSet * FeBeWaitSet
Definition pqcomm.c:167
uint32 events
ReplicationKind kind
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
#define WL_POSTMASTER_DEATH

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, fb(), FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1850 of file walsender.c.

1851{
1852 int wakeEvents;
1853 uint32 wait_event = 0;
1856
1857 /*
1858 * Fast path to avoid acquiring the spinlock in case we already know we
1859 * have enough WAL available and all the standby servers have confirmed
1860 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1861 * if we're far behind.
1862 */
1865 return RecentFlushPtr;
1866
1867 /*
1868 * Within the loop, we wait for the necessary WALs to be flushed to disk
1869 * first, followed by waiting for standbys to catch up if there are enough
1870 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1871 */
1872 for (;;)
1873 {
1874 bool wait_for_standby_at_stop = false;
1875 long sleeptime;
1877
1878 /* Clear any already-pending wakeups */
1880
1882
1883 /* Process any requests or signals received recently */
1885
1886 /* Check for input from the client */
1888
1889 /*
1890 * If we're shutting down, trigger pending WAL to be written out,
1891 * otherwise we'd possibly end up waiting for WAL that never gets
1892 * written, because walwriter has shut down already.
1893 *
1894 * Note that GetXLogInsertEndRecPtr() is used to obtain the WAL flush
1895 * request location instead of GetXLogInsertRecPtr(). Because if the
1896 * last WAL record ends at a page boundary, GetXLogInsertRecPtr() can
1897 * return an LSN pointing past the page header, which may cause
1898 * XLogFlush() to report an error.
1899 */
1902
1903 /*
1904 * To avoid the scenario where standbys need to catch up to a newer
1905 * WAL location in each iteration, we update our idea of the currently
1906 * flushed position only if we are not waiting for standbys to catch
1907 * up.
1908 */
1910 {
1911 if (!RecoveryInProgress())
1913 else
1915 }
1916
1917 /*
1918 * If postmaster asked us to stop and the standby slots have caught up
1919 * to the flushed position, don't wait anymore.
1920 *
1921 * It's important to do this check after the recomputation of
1922 * RecentFlushPtr, so we can send all remaining data before shutting
1923 * down.
1924 */
1925 if (got_STOPPING)
1926 {
1929 else
1930 break;
1931 }
1932
1933 /*
1934 * We only send regular messages to the client for full decoded
1935 * transactions, but a synchronous replication and walsender shutdown
1936 * possibly are waiting for a later location. So, before sleeping, we
1937 * send a ping containing the flush location. If the receiver is
1938 * otherwise idle, this keepalive will trigger a reply. Processing the
1939 * reply will update these MyWalSnd locations.
1940 */
1941 if (MyWalSnd->flush < sentPtr &&
1942 MyWalSnd->write < sentPtr &&
1945
1946 /*
1947 * Exit the loop if already caught up and doesn't need to wait for
1948 * standby slots.
1949 */
1952 break;
1953
1954 /*
1955 * Waiting for new WAL or waiting for standbys to catch up. Since we
1956 * need to wait, we're now caught up.
1957 */
1958 WalSndCaughtUp = true;
1959
1960 /*
1961 * Try to flush any pending output to the client.
1962 */
1963 if (pq_flush_if_writable() != 0)
1965
1966 /*
1967 * If we have received CopyDone from the client, sent CopyDone
1968 * ourselves, and the output buffer is empty, it's time to exit
1969 * streaming, so fail the current WAL fetch request.
1970 */
1973 break;
1974
1975 /* die if timeout was reached */
1977
1978 /* Send keepalive if the time has come */
1980
1981 /*
1982 * Sleep until something happens or we time out. Also wait for the
1983 * socket becoming writable, if there's still pending output.
1984 * Otherwise we might sit on sendable output data while waiting for
1985 * new WAL to be generated. (But if we have nothing to send, we don't
1986 * want to wake on socket-writable.)
1987 */
1990
1992
1993 if (pq_is_send_pending())
1995
1996 Assert(wait_event != 0);
1997
1998 /* Report IO statistics, if needed */
2001 {
2002 pgstat_flush_io(false);
2004 last_flush = now;
2005 }
2006
2008 }
2009
2010 /* reactivate latch so WalSndLoop knows to continue */
2012 return RecentFlushPtr;
2013}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition walsender.c:1822
XLogRecPtr GetXLogInsertEndRecPtr(void)
Definition xlog.c:9630
void XLogFlush(XLogRecPtr record)
Definition xlog.c:2767

References Assert, CHECK_FOR_INTERRUPTS, fb(), WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogInsertEndRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), now(), PGSTAT_BACKEND_FLUSH_IO, pgstat_flush_backend(), pgstat_flush_io(), pq_flush_if_writable, pq_is_send_pending, ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, TimestampDifferenceExceeds(), waiting_for_ping_response, WALSENDER_STATS_FLUSH_INTERVAL, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndHandleConfigReload(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogFlush(), and XLogRecPtrIsValid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3927 of file walsender.c.

3928{
3929 for (;;)
3930 {
3931 int i;
3932 bool all_stopped = true;
3933
3934 for (i = 0; i < max_wal_senders; i++)
3935 {
3937
3938 SpinLockAcquire(&walsnd->mutex);
3939
3940 if (walsnd->pid == 0)
3941 {
3942 SpinLockRelease(&walsnd->mutex);
3943 continue;
3944 }
3945
3946 if (walsnd->state != WALSNDSTATE_STOPPING)
3947 {
3948 all_stopped = false;
3949 SpinLockRelease(&walsnd->mutex);
3950 break;
3951 }
3952 SpinLockRelease(&walsnd->mutex);
3953 }
3954
3955 /* safe to leave if confirmation is done for all WAL senders */
3956 if (all_stopped)
3957 return;
3958
3959 pg_usleep(10000L); /* wait for 10 msec */
3960 }
3961}
void pg_usleep(long microsec)
Definition signal.c:53

References fb(), i, max_wal_senders, pg_usleep(), SpinLockAcquire(), SpinLockRelease(), WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3822 of file walsender.c.

3823{
3824 /*
3825 * Wake up all the walsenders waiting on WAL being flushed or replayed
3826 * respectively. Note that waiting walsender would have prepared to sleep
3827 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3828 * before actually waiting.
3829 */
3830 if (physical)
3832
3833 if (logical)
3835}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1583 of file walsender.c.

1585{
1587
1588 /*
1589 * Fill the send timestamp last, so that it is taken as late as possible.
1590 * This is somewhat ugly, but the protocol is set as it's already used for
1591 * several releases by streaming physical replication.
1592 */
1596 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1597 tmpbuf.data, sizeof(int64));
1598
1599 /* output previously gathered data in a CopyData packet */
1601
1603
1604 /* Try to flush pending output to the client */
1605 if (pq_flush_if_writable() != 0)
1607
1608 /* Try taking fast path unless we get too close to walsender timeout. */
1610 wal_sender_timeout / 2) &&
1612 {
1613 return;
1614 }
1615
1616 /* If we have pending write here, go to slow path */
1618}

References CHECK_FOR_INTERRUPTS, StringInfoData::data, fb(), GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), PqMsg_CopyData, ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3524 of file walsender.c.

3525{
3526 XLogRecord *record;
3527 char *errm;
3528
3529 /*
3530 * We'll use the current flush point to determine whether we've caught up.
3531 * This variable is static in order to cache it across calls. Caching is
3532 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3533 * spinlock.
3534 */
3536
3537 /*
3538 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3539 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3540 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3541 * didn't wait - i.e. when we're shutting down.
3542 */
3543 WalSndCaughtUp = false;
3544
3546
3547 /* xlog record was invalid */
3548 if (errm != NULL)
3549 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3550 errm);
3551
3552 if (record != NULL)
3553 {
3554 /*
3555 * Note the lack of any call to LagTrackerWrite() which is handled by
3556 * WalSndUpdateProgress which is called by output plugin through
3557 * logical decoding write api.
3558 */
3560
3562 }
3563
3564 /*
3565 * If first time through in this session, initialize flushPtr. Otherwise,
3566 * we only need to update flushPtr if EndRecPtr is past it.
3567 */
3570 {
3571 /*
3572 * For cascading logical WAL senders, we use the replay LSN instead of
3573 * the flush LSN, since logical decoding on a standby only processes
3574 * WAL that has been replayed. This distinction becomes particularly
3575 * important during shutdown, as new WAL is no longer replayed and the
3576 * last replayed LSN marks the furthest point up to which decoding can
3577 * proceed.
3578 */
3581 else
3583 }
3584
3585 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3587 WalSndCaughtUp = true;
3588
3589 /*
3590 * If we're caught up and have been requested to stop, have WalSndLoop()
3591 * terminate the connection in an orderly manner, after writing out all
3592 * the pending data.
3593 */
3595 got_SIGUSR2 = true;
3596
3597 /* Update shared memory status */
3598 {
3600
3601 SpinLockAcquire(&walsnd->mutex);
3602 walsnd->sentPtr = sentPtr;
3603 SpinLockRelease(&walsnd->mutex);
3604 }
3605}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition decode.c:88
XLogRecPtr EndRecPtr
Definition xlogreader.h:206
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition xlogreader.c:391

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, fb(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), MyWalSnd, LogicalDecodingContext::reader, sentPtr, SpinLockAcquire(), SpinLockRelease(), WalSndCaughtUp, XLogReadRecord(), and XLogRecPtrIsValid.

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3214 of file walsender.c.

3215{
3217 XLogRecPtr startptr;
3218 XLogRecPtr endptr;
3219 Size nbytes;
3220 XLogSegNo segno;
3222 Size rbytes;
3223
3224 /* If requested switch the WAL sender to the stopping state. */
3225 if (got_STOPPING)
3227
3229 {
3230 WalSndCaughtUp = true;
3231 return;
3232 }
3233
3234 /* Figure out how far we can safely send the WAL. */
3236 {
3237 /*
3238 * Streaming an old timeline that's in this server's history, but is
3239 * not the one we're currently inserting or replaying. It can be
3240 * streamed up to the point where we switched off that timeline.
3241 */
3243 }
3244 else if (am_cascading_walsender)
3245 {
3247
3248 /*
3249 * Streaming the latest timeline on a standby.
3250 *
3251 * Attempt to send all WAL that has already been replayed, so that we
3252 * know it's valid. If we're receiving WAL through streaming
3253 * replication, it's also OK to send any WAL that has been received
3254 * but not replayed.
3255 *
3256 * The timeline we're recovering from can change, or we can be
3257 * promoted. In either case, the current timeline becomes historic. We
3258 * need to detect that so that we don't try to stream past the point
3259 * where we switched to another timeline. We check for promotion or
3260 * timeline switch after calculating FlushPtr, to avoid a race
3261 * condition: if the timeline becomes historic just after we checked
3262 * that it was still current, it's still be OK to stream it up to the
3263 * FlushPtr that was calculated before it became historic.
3264 */
3265 bool becameHistoric = false;
3266
3268
3269 if (!RecoveryInProgress())
3270 {
3271 /* We have been promoted. */
3273 am_cascading_walsender = false;
3274 becameHistoric = true;
3275 }
3276 else
3277 {
3278 /*
3279 * Still a cascading standby. But is the timeline we're sending
3280 * still the one recovery is recovering from?
3281 */
3283 becameHistoric = true;
3284 }
3285
3286 if (becameHistoric)
3287 {
3288 /*
3289 * The timeline we were sending has become historic. Read the
3290 * timeline history file of the new timeline to see where exactly
3291 * we forked off from the timeline we were sending.
3292 */
3293 List *history;
3294
3297
3300
3302
3304 }
3305 }
3306 else
3307 {
3308 /*
3309 * Streaming the current timeline on a primary.
3310 *
3311 * Attempt to send all data that's already been written out and
3312 * fsync'd to disk. We cannot go further than what's been written out
3313 * given the current implementation of WALRead(). And in any case
3314 * it's unsafe to send WAL that is not securely down to disk on the
3315 * primary: if the primary subsequently crashes and restarts, standbys
3316 * must not have applied any WAL that got lost on the primary.
3317 */
3319 }
3320
3321 /*
3322 * Record the current system time as an approximation of the time at which
3323 * this WAL location was written for the purposes of lag tracking.
3324 *
3325 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3326 * is flushed and we could get that time as well as the LSN when we call
3327 * GetFlushRecPtr() above (and likewise for the cascading standby
3328 * equivalent), but rather than putting any new code into the hot WAL path
3329 * it seems good enough to capture the time here. We should reach this
3330 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3331 * may take some time, we read the WAL flush pointer and take the time
3332 * very close to together here so that we'll get a later position if it is
3333 * still moving.
3334 *
3335 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3336 * this gives us a cheap approximation for the WAL flush time for this
3337 * LSN.
3338 *
3339 * Note that the LSN is not necessarily the LSN for the data contained in
3340 * the present message; it's the end of the WAL, which might be further
3341 * ahead. All the lag tracking machinery cares about is finding out when
3342 * that arbitrary LSN is eventually reported as written, flushed and
3343 * applied, so that it can measure the elapsed time.
3344 */
3346
3347 /*
3348 * If this is a historic timeline and we've reached the point where we
3349 * forked to the next timeline, stop streaming.
3350 *
3351 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3352 * startup process will normally replay all WAL that has been received
3353 * from the primary, before promoting, but if the WAL streaming is
3354 * terminated at a WAL page boundary, the valid portion of the timeline
3355 * might end in the middle of a WAL record. We might've already sent the
3356 * first half of that partial WAL record to the cascading standby, so that
3357 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3358 * replay the partial WAL record either, so it can still follow our
3359 * timeline switch.
3360 */
3362 {
3363 /* close the current file. */
3364 if (xlogreader->seg.ws_file >= 0)
3366
3367 /* Send CopyDone */
3369 streamingDoneSending = true;
3370
3371 WalSndCaughtUp = true;
3372
3373 elog(DEBUG1, "walsender reached end of timeline at %X/%08X (sent up to %X/%08X)",
3376 return;
3377 }
3378
3379 /* Do we have any work to do? */
3381 if (SendRqstPtr <= sentPtr)
3382 {
3383 WalSndCaughtUp = true;
3384 return;
3385 }
3386
3387 /*
3388 * Figure out how much to send in one message. If there's no more than
3389 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3390 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3391 *
3392 * The rounding is not only for performance reasons. Walreceiver relies on
3393 * the fact that we never split a WAL record across two messages. Since a
3394 * long WAL record is split at page boundary into continuation records,
3395 * page boundary is always a safe cut-off point. We also assume that
3396 * SendRqstPtr never points to the middle of a WAL record.
3397 */
3398 startptr = sentPtr;
3399 endptr = startptr;
3400 endptr += MAX_SEND_SIZE;
3401
3402 /* if we went beyond SendRqstPtr, back off */
3403 if (SendRqstPtr <= endptr)
3404 {
3405 endptr = SendRqstPtr;
3407 WalSndCaughtUp = false;
3408 else
3409 WalSndCaughtUp = true;
3410 }
3411 else
3412 {
3413 /* round down to page boundary. */
3414 endptr -= (endptr % XLOG_BLCKSZ);
3415 WalSndCaughtUp = false;
3416 }
3417
3418 nbytes = endptr - startptr;
3419 Assert(nbytes <= MAX_SEND_SIZE);
3420
3421 /*
3422 * OK to read and send the slice.
3423 */
3426
3427 pq_sendint64(&output_message, startptr); /* dataStart */
3428 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3429 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3430
3431 /*
3432 * Read the log directly into the output buffer to avoid extra memcpy
3433 * calls.
3434 */
3436
3437retry:
3438 /* attempt to read WAL from WAL buffers first */
3440 startptr, nbytes, xlogreader->seg.ws_tli);
3442 startptr += rbytes;
3443 nbytes -= rbytes;
3444
3445 /* now read the remaining WAL from WAL file */
3446 if (nbytes > 0 &&
3449 startptr,
3450 nbytes,
3451 xlogreader->seg.ws_tli, /* Pass the current TLI because
3452 * only WalSndSegmentOpen controls
3453 * whether new TLI is needed. */
3454 &errinfo))
3456
3457 /* See logical_read_xlog_page(). */
3458 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3460
3461 /*
3462 * During recovery, the currently-open WAL file might be replaced with the
3463 * file of the same name retrieved from archive. So we always need to
3464 * check what we read was valid after reading into the buffer. If it's
3465 * invalid, we try to open and read the file again.
3466 */
3468 {
3470 bool reload;
3471
3472 SpinLockAcquire(&walsnd->mutex);
3473 reload = walsnd->needreload;
3474 walsnd->needreload = false;
3475 SpinLockRelease(&walsnd->mutex);
3476
3477 if (reload && xlogreader->seg.ws_file >= 0)
3478 {
3480
3481 goto retry;
3482 }
3483 }
3484
3485 output_message.len += nbytes;
3487
3488 /*
3489 * Fill the send timestamp last, so that it is taken as late as possible.
3490 */
3493 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3494 tmpbuf.data, sizeof(int64));
3495
3497
3498 sentPtr = endptr;
3499
3500 /* Update shared memory status */
3501 {
3503
3504 SpinLockAcquire(&walsnd->mutex);
3505 walsnd->sentPtr = sentPtr;
3506 SpinLockRelease(&walsnd->mutex);
3507 }
3508
3509 /* Report progress of XLOG streaming in PS display */
3511 {
3512 char activitymsg[50];
3513
3514 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
3517 }
3518}
bool update_process_title
Definition ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition stringinfo.c:337
TimeLineID ws_tli
Definition xlogreader.h:49
WALSegmentContext segcxt
Definition xlogreader.h:270
#define MAX_SEND_SIZE
Definition walsender.c:115
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition xlog.c:1755

References am_cascading_walsender, Assert, CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), fb(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, MyWalSnd, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), PqMsg_CopyData, PqMsg_CopyDone, PqReplMsg_WALData, readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, set_ps_display(), snprintf, SpinLockAcquire(), SpinLockRelease(), streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 127 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 253 of file walsender.c.

Referenced by InitWalSender(), LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 217 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

◆ replication_active

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 200 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 156 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 157 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 139 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader