PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
walsender.c File Reference
#include "postgres.h"
#include <signal.h>
#include <unistd.h>
#include "access/timeline.h"
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "backup/basebackup.h"
#include "backup/basebackup_incremental.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/defrem.h"
#include "funcapi.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
#include "nodes/replnodes.h"
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
Include dependency graph for walsender.c:

Go to the source code of this file.

Data Structures

struct  WalTimeSample
 
struct  LagTracker
 

Macros

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)
 
#define LAG_TRACKER_BUFFER_SIZE   8192
 
#define READ_REPLICATION_SLOT_COLS   3
 
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
 
#define PG_STAT_GET_WAL_SENDERS_COLS   12
 

Typedefs

typedef void(* WalSndSendDataCallback) (void)
 

Functions

static void WalSndLastCycleHandler (SIGNAL_ARGS)
 
static void WalSndLoop (WalSndSendDataCallback send_data)
 
static void InitWalSenderSlot (void)
 
static void WalSndKill (int code, Datum arg)
 
static void WalSndShutdown (void)
 
void WalSndErrorCleanup (void)
 
static void IdentifySystem (void)
 
static void ReadReplicationSlot (ReadReplicationSlotCmd *cmd)
 
static void SendTimeLineHistory (TimeLineHistoryCmd *cmd)
 
static void UploadManifest (void)
 
static bool HandleUploadManifestPacket (StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
 
static void StartReplication (StartReplicationCmd *cmd)
 
static int logical_read_xlog_page (XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
 
static void parseCreateReplSlotOptions (CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
 
static void CreateReplicationSlot (CreateReplicationSlotCmd *cmd)
 
static void DropReplicationSlot (DropReplicationSlotCmd *cmd)
 
static void AlterReplicationSlot (AlterReplicationSlotCmd *cmd)
 
static void StartLogicalReplication (StartReplicationCmd *cmd)
 
static void WalSndPrepareWrite (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void WalSndWriteData (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
 
static void ProcessPendingWrites (void)
 
static void WalSndUpdateProgress (LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
 
void PhysicalWakeupLogicalWalSnd (void)
 
static bool NeedToWaitForStandbys (XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static bool NeedToWaitForWal (XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
 
static XLogRecPtr WalSndWaitForWal (XLogRecPtr loc)
 
bool exec_replication_command (const char *cmd_string)
 
static void ProcessRepliesIfAny (void)
 
static void ProcessStandbyMessage (void)
 
static void PhysicalConfirmReceivedLocation (XLogRecPtr lsn)
 
static void ProcessStandbyReplyMessage (void)
 
static void PhysicalReplicationSlotNewXmin (TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
 
static bool TransactionIdInRecentPast (TransactionId xid, uint32 epoch)
 
static void ProcessStandbyHSFeedbackMessage (void)
 
static long WalSndComputeSleeptime (TimestampTz now)
 
static void WalSndCheckTimeOut (void)
 
static void WalSndSegmentOpen (XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
 
static void XLogSendPhysical (void)
 
static void XLogSendLogical (void)
 
static void WalSndDone (WalSndSendDataCallback send_data)
 
XLogRecPtr GetStandbyFlushRecPtr (TimeLineID *tli)
 
void WalSndRqstFileReload (void)
 
void HandleWalSndInitStopping (void)
 
void WalSndSignals (void)
 
Size WalSndShmemSize (void)
 
void WalSndShmemInit (void)
 
void WalSndWakeup (bool physical, bool logical)
 
static void WalSndWait (uint32 socket_events, long timeout, uint32 wait_event)
 
void WalSndInitStopping (void)
 
void WalSndWaitStopping (void)
 
void WalSndSetState (WalSndState state)
 
static const char * WalSndGetStateString (WalSndState state)
 
static Intervaloffset_to_interval (TimeOffset offset)
 
Datum pg_stat_get_wal_senders (PG_FUNCTION_ARGS)
 
static void WalSndKeepalive (bool requestReply, XLogRecPtr writePtr)
 
static void WalSndKeepaliveIfNecessary (void)
 
static void LagTrackerWrite (XLogRecPtr lsn, TimestampTz local_flush_time)
 
static TimeOffset LagTrackerRead (int head, XLogRecPtr lsn, TimestampTz now)
 

Variables

WalSndCtlDataWalSndCtl = NULL
 
WalSndMyWalSnd = NULL
 
bool am_walsender = false
 
bool am_cascading_walsender = false
 
bool am_db_walsender = false
 
int max_wal_senders = 10
 
int wal_sender_timeout = 60 * 1000
 
bool log_replication_commands = false
 
bool wake_wal_senders = false
 
static XLogReaderStatexlogreader = NULL
 
static IncrementalBackupInfouploaded_manifest = NULL
 
static MemoryContext uploaded_manifest_mcxt = NULL
 
static TimeLineID sendTimeLine = 0
 
static TimeLineID sendTimeLineNextTLI = 0
 
static bool sendTimeLineIsHistoric = false
 
static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
 
static XLogRecPtr sentPtr = InvalidXLogRecPtr
 
static StringInfoData output_message
 
static StringInfoData reply_message
 
static StringInfoData tmpbuf
 
static TimestampTz last_processing = 0
 
static TimestampTz last_reply_timestamp = 0
 
static bool waiting_for_ping_response = false
 
static bool streamingDoneSending
 
static bool streamingDoneReceiving
 
static bool WalSndCaughtUp = false
 
static volatile sig_atomic_t got_SIGUSR2 = false
 
static volatile sig_atomic_t got_STOPPING = false
 
static volatile sig_atomic_t replication_active = false
 
static LogicalDecodingContextlogical_decoding_ctx = NULL
 
static LagTrackerlag_tracker
 

Macro Definition Documentation

◆ LAG_TRACKER_BUFFER_SIZE

#define LAG_TRACKER_BUFFER_SIZE   8192

Definition at line 218 of file walsender.c.

◆ MAX_SEND_SIZE

#define MAX_SEND_SIZE   (XLOG_BLCKSZ * 16)

Definition at line 106 of file walsender.c.

◆ PG_STAT_GET_WAL_SENDERS_COLS

#define PG_STAT_GET_WAL_SENDERS_COLS   12

◆ READ_REPLICATION_SLOT_COLS

#define READ_REPLICATION_SLOT_COLS   3

◆ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS

#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000

Typedef Documentation

◆ WalSndSendDataCallback

typedef void(* WalSndSendDataCallback) (void)

Definition at line 236 of file walsender.c.

Function Documentation

◆ AlterReplicationSlot()

static void AlterReplicationSlot ( AlterReplicationSlotCmd cmd)
static

Definition at line 1385 of file walsender.c.

1386{
1387 bool failover_given = false;
1388 bool two_phase_given = false;
1389 bool failover;
1390 bool two_phase;
1391
1392 /* Parse options */
1393 foreach_ptr(DefElem, defel, cmd->options)
1394 {
1395 if (strcmp(defel->defname, "failover") == 0)
1396 {
1397 if (failover_given)
1398 ereport(ERROR,
1399 (errcode(ERRCODE_SYNTAX_ERROR),
1400 errmsg("conflicting or redundant options")));
1401 failover_given = true;
1402 failover = defGetBoolean(defel);
1403 }
1404 else if (strcmp(defel->defname, "two_phase") == 0)
1405 {
1406 if (two_phase_given)
1407 ereport(ERROR,
1408 (errcode(ERRCODE_SYNTAX_ERROR),
1409 errmsg("conflicting or redundant options")));
1410 two_phase_given = true;
1411 two_phase = defGetBoolean(defel);
1412 }
1413 else
1414 elog(ERROR, "unrecognized option: %s", defel->defname);
1415 }
1416
1418 failover_given ? &failover : NULL,
1419 two_phase_given ? &two_phase : NULL);
1420}
bool defGetBoolean(DefElem *def)
Definition: define.c:94
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
#define foreach_ptr(type, var, lst)
Definition: pg_list.h:469
static bool two_phase
void ReplicationSlotAlter(const char *name, const bool *failover, const bool *two_phase)
Definition: slot.c:807

References defGetBoolean(), elog, ereport, errcode(), errmsg(), ERROR, foreach_ptr, AlterReplicationSlotCmd::options, ReplicationSlotAlter(), AlterReplicationSlotCmd::slotname, and two_phase.

Referenced by exec_replication_command().

◆ CreateReplicationSlot()

static void CreateReplicationSlot ( CreateReplicationSlotCmd cmd)
static

Definition at line 1171 of file walsender.c.

1172{
1173 const char *snapshot_name = NULL;
1174 char xloc[MAXFNAMELEN];
1175 char *slot_name;
1176 bool reserve_wal = false;
1177 bool two_phase = false;
1178 bool failover = false;
1179 CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1181 TupOutputState *tstate;
1182 TupleDesc tupdesc;
1183 Datum values[4];
1184 bool nulls[4] = {0};
1185
1187
1188 parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase,
1189 &failover);
1190
1191 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1192 {
1193 ReplicationSlotCreate(cmd->slotname, false,
1195 false, false, false);
1196
1197 if (reserve_wal)
1198 {
1200
1202
1203 /* Write this slot to disk if it's a permanent one. */
1204 if (!cmd->temporary)
1206 }
1207 }
1208 else
1209 {
1211 bool need_full_snapshot = false;
1212
1214
1216
1217 /*
1218 * Initially create persistent slot as ephemeral - that allows us to
1219 * nicely handle errors during initialization because it'll get
1220 * dropped if this transaction fails. We'll make it persistent at the
1221 * end. Temporary slots can be created as temporary from beginning as
1222 * they get dropped on error as well.
1223 */
1226 two_phase, failover, false);
1227
1228 /*
1229 * Do options check early so that we can bail before calling the
1230 * DecodingContextFindStartpoint which can take long time.
1231 */
1232 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1233 {
1234 if (IsTransactionBlock())
1235 ereport(ERROR,
1236 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1237 (errmsg("%s must not be called inside a transaction",
1238 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1239
1240 need_full_snapshot = true;
1241 }
1242 else if (snapshot_action == CRS_USE_SNAPSHOT)
1243 {
1244 if (!IsTransactionBlock())
1245 ereport(ERROR,
1246 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1247 (errmsg("%s must be called inside a transaction",
1248 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1249
1251 ereport(ERROR,
1252 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1253 (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1254 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1255 if (!XactReadOnly)
1256 ereport(ERROR,
1257 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1258 (errmsg("%s must be called in a read-only transaction",
1259 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1260
1261 if (FirstSnapshotSet)
1262 ereport(ERROR,
1263 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1264 (errmsg("%s must be called before any query",
1265 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1266
1267 if (IsSubTransaction())
1268 ereport(ERROR,
1269 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1270 (errmsg("%s must not be called in a subtransaction",
1271 "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1272
1273 need_full_snapshot = true;
1274 }
1275
1276 ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1279 .segment_open = WalSndSegmentOpen,
1280 .segment_close = wal_segment_close),
1283
1284 /*
1285 * Signal that we don't need the timeout mechanism. We're just
1286 * creating the replication slot and don't yet accept feedback
1287 * messages or send keepalives. As we possibly need to wait for
1288 * further WAL the walsender would otherwise possibly be killed too
1289 * soon.
1290 */
1292
1293 /* build initial snapshot, might take a while */
1295
1296 /*
1297 * Export or use the snapshot if we've been asked to do so.
1298 *
1299 * NB. We will convert the snapbuild.c kind of snapshot to normal
1300 * snapshot when doing this.
1301 */
1302 if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1303 {
1304 snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1305 }
1306 else if (snapshot_action == CRS_USE_SNAPSHOT)
1307 {
1308 Snapshot snap;
1309
1312 }
1313
1314 /* don't need the decoding context anymore */
1316
1317 if (!cmd->temporary)
1319 }
1320
1321 snprintf(xloc, sizeof(xloc), "%X/%X",
1323
1325
1326 /*----------
1327 * Need a tuple descriptor representing four columns:
1328 * - first field: the slot name
1329 * - second field: LSN at which we became consistent
1330 * - third field: exported snapshot's name
1331 * - fourth field: output plugin
1332 */
1333 tupdesc = CreateTemplateTupleDesc(4);
1334 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1335 TEXTOID, -1, 0);
1336 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1337 TEXTOID, -1, 0);
1338 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1339 TEXTOID, -1, 0);
1340 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1341 TEXTOID, -1, 0);
1342
1343 /* prepare for projection of tuples */
1344 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1345
1346 /* slot_name */
1347 slot_name = NameStr(MyReplicationSlot->data.name);
1348 values[0] = CStringGetTextDatum(slot_name);
1349
1350 /* consistent wal location */
1351 values[1] = CStringGetTextDatum(xloc);
1352
1353 /* snapshot name, or NULL if none */
1354 if (snapshot_name != NULL)
1355 values[2] = CStringGetTextDatum(snapshot_name);
1356 else
1357 nulls[2] = true;
1358
1359 /* plugin, or NULL if none */
1360 if (cmd->plugin != NULL)
1362 else
1363 nulls[3] = true;
1364
1365 /* send it to dest */
1366 do_tup_output(tstate, values, nulls);
1367 end_tup_output(tstate);
1368
1370}
int16 AttrNumber
Definition: attnum.h:21
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define CStringGetTextDatum(s)
Definition: builtins.h:97
#define NameStr(name)
Definition: c.h:700
#define Assert(condition)
Definition: c.h:812
DestReceiver * CreateDestReceiver(CommandDest dest)
Definition: dest.c:113
@ DestRemoteSimple
Definition: dest.h:91
void do_tup_output(TupOutputState *tstate, const Datum *values, const bool *isnull)
Definition: execTuples.c:2462
const TupleTableSlotOps TTSOpsVirtual
Definition: execTuples.c:84
void end_tup_output(TupOutputState *tstate)
Definition: execTuples.c:2520
TupOutputState * begin_tup_output_tupdesc(DestReceiver *dest, TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:2442
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition: logical.c:694
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition: logical.c:650
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:330
void CheckLogicalDecodingRequirements(void)
Definition: logical.c:109
#define NIL
Definition: pg_list.h:68
#define snprintf
Definition: port.h:238
uintptr_t Datum
Definition: postgres.h:64
@ REPLICATION_KIND_PHYSICAL
Definition: replnodes.h:22
@ REPLICATION_KIND_LOGICAL
Definition: replnodes.h:23
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, bool synced)
Definition: slot.c:309
void ReplicationSlotMarkDirty(void)
Definition: slot.c:1038
void ReplicationSlotReserveWal(void)
Definition: slot.c:1429
void ReplicationSlotPersist(void)
Definition: slot.c:1055
ReplicationSlot * MyReplicationSlot
Definition: slot.c:138
void ReplicationSlotSave(void)
Definition: slot.c:1020
void ReplicationSlotRelease(void)
Definition: slot.c:652
@ RS_PERSISTENT
Definition: slot.h:38
@ RS_EPHEMERAL
Definition: slot.h:39
@ RS_TEMPORARY
Definition: slot.h:40
Snapshot SnapBuildInitialSnapshot(SnapBuild *builder)
Definition: snapbuild.c:440
const char * SnapBuildExportSnapshot(SnapBuild *builder)
Definition: snapbuild.c:539
bool FirstSnapshotSet
Definition: snapmgr.c:133
void RestoreTransactionSnapshot(Snapshot snapshot, void *source_pgproc)
Definition: snapmgr.c:1784
PGPROC * MyProc
Definition: proc.c:66
ReplicationKind kind
Definition: replnodes.h:56
struct SnapBuild * snapshot_builder
Definition: logical.h:44
XLogRecPtr confirmed_flush
Definition: slot.h:107
ReplicationSlotPersistentData data
Definition: slot.h:181
TupleDesc CreateTemplateTupleDesc(int natts)
Definition: tupdesc.c:165
void TupleDescInitBuiltinEntry(TupleDesc desc, AttrNumber attributeNumber, const char *attributeName, Oid oidtypeid, int32 typmod, int attdim)
Definition: tupdesc.c:873
static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, bool *two_phase, bool *failover)
Definition: walsender.c:1094
static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition: walsender.c:2961
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1547
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool skipped_xact)
Definition: walsender.c:1643
static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition: walsender.c:1021
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
Definition: walsender.c:1520
static TimestampTz last_reply_timestamp
Definition: walsender.c:179
CRSSnapshotAction
Definition: walsender.h:21
@ CRS_USE_SNAPSHOT
Definition: walsender.h:24
@ CRS_EXPORT_SNAPSHOT
Definition: walsender.h:22
bool XactReadOnly
Definition: xact.c:81
int XactIsoLevel
Definition: xact.c:78
bool IsSubTransaction(void)
Definition: xact.c:5036
bool IsTransactionBlock(void)
Definition: xact.c:4963
#define XACT_REPEATABLE_READ
Definition: xact.h:38
#define MAXFNAMELEN
#define LSN_FORMAT_ARGS(lsn)
Definition: xlogdefs.h:43
#define InvalidXLogRecPtr
Definition: xlogdefs.h:28
#define XL_ROUTINE(...)
Definition: xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition: xlogutils.c:842

References Assert, begin_tup_output_tupdesc(), CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDestReceiver(), CreateInitDecodingContext(), CreateTemplateTupleDesc(), CRS_EXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, CStringGetTextDatum, ReplicationSlot::data, DecodingContextFindStartpoint(), generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errmsg(), ERROR, FirstSnapshotSet, FreeDecodingContext(), InvalidXLogRecPtr, IsSubTransaction(), IsTransactionBlock(), CreateReplicationSlotCmd::kind, last_reply_timestamp, logical_read_xlog_page(), LSN_FORMAT_ARGS, MAXFNAMELEN, MyProc, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, NIL, parseCreateReplSlotOptions(), CreateReplicationSlotCmd::plugin, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotReserveWal(), ReplicationSlotSave(), RestoreTransactionSnapshot(), RS_EPHEMERAL, RS_PERSISTENT, RS_TEMPORARY, CreateReplicationSlotCmd::slotname, SnapBuildExportSnapshot(), SnapBuildInitialSnapshot(), LogicalDecodingContext::snapshot_builder, snprintf, CreateReplicationSlotCmd::temporary, TTSOpsVirtual, TupleDescInitBuiltinEntry(), two_phase, values, wal_segment_close(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndUpdateProgress(), WalSndWriteData(), XACT_REPEATABLE_READ, XactIsoLevel, XactReadOnly, and XL_ROUTINE.

Referenced by exec_replication_command(), main(), and StartLogStreamer().

◆ DropReplicationSlot()

static void DropReplicationSlot ( DropReplicationSlotCmd cmd)
static

Definition at line 1376 of file walsender.c.

1377{
1378 ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1379}
void ReplicationSlotDrop(const char *name, bool nowait)
Definition: slot.c:784

References ReplicationSlotDrop(), DropReplicationSlotCmd::slotname, and DropReplicationSlotCmd::wait.

Referenced by exec_replication_command(), and main().

◆ exec_replication_command()

bool exec_replication_command ( const char *  cmd_string)

Definition at line 1952 of file walsender.c.

1953{
1954 yyscan_t scanner;
1955 int parse_rc;
1956 Node *cmd_node;
1957 const char *cmdtag;
1958 MemoryContext cmd_context;
1959 MemoryContext old_context;
1960
1961 /*
1962 * If WAL sender has been told that shutdown is getting close, switch its
1963 * status accordingly to handle the next replication commands correctly.
1964 */
1965 if (got_STOPPING)
1967
1968 /*
1969 * Throw error if in stopping mode. We need prevent commands that could
1970 * generate WAL while the shutdown checkpoint is being written. To be
1971 * safe, we just prohibit all new commands.
1972 */
1974 ereport(ERROR,
1975 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1976 errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1977
1978 /*
1979 * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1980 * command arrives. Clean up the old stuff if there's anything.
1981 */
1983
1985
1986 /*
1987 * Prepare to parse and execute the command.
1988 */
1990 "Replication command context",
1992 old_context = MemoryContextSwitchTo(cmd_context);
1993
1994 replication_scanner_init(cmd_string, &scanner);
1995
1996 /*
1997 * Is it a WalSender command?
1998 */
2000 {
2001 /* Nope; clean up and get out. */
2003
2004 MemoryContextSwitchTo(old_context);
2005 MemoryContextDelete(cmd_context);
2006
2007 /* XXX this is a pretty random place to make this check */
2008 if (MyDatabaseId == InvalidOid)
2009 ereport(ERROR,
2010 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2011 errmsg("cannot execute SQL commands in WAL sender for physical replication")));
2012
2013 /* Tell the caller that this wasn't a WalSender command. */
2014 return false;
2015 }
2016
2017 /*
2018 * Looks like a WalSender command, so parse it.
2019 */
2020 parse_rc = replication_yyparse(scanner);
2021 if (parse_rc != 0)
2022 ereport(ERROR,
2023 (errcode(ERRCODE_SYNTAX_ERROR),
2024 errmsg_internal("replication command parser returned %d",
2025 parse_rc)));
2027
2028 cmd_node = replication_parse_result;
2029
2030 /*
2031 * Report query to various monitoring facilities. For this purpose, we
2032 * report replication commands just like SQL commands.
2033 */
2034 debug_query_string = cmd_string;
2035
2037
2038 /*
2039 * Log replication command if log_replication_commands is enabled. Even
2040 * when it's disabled, log the command with DEBUG1 level for backward
2041 * compatibility.
2042 */
2044 (errmsg("received replication command: %s", cmd_string)));
2045
2046 /*
2047 * Disallow replication commands in aborted transaction blocks.
2048 */
2050 ereport(ERROR,
2051 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
2052 errmsg("current transaction is aborted, "
2053 "commands ignored until end of transaction block")));
2054
2056
2057 /*
2058 * Allocate buffers that will be used for each outgoing and incoming
2059 * message. We do this just once per command to reduce palloc overhead.
2060 */
2064
2065 switch (cmd_node->type)
2066 {
2067 case T_IdentifySystemCmd:
2068 cmdtag = "IDENTIFY_SYSTEM";
2069 set_ps_display(cmdtag);
2071 EndReplicationCommand(cmdtag);
2072 break;
2073
2074 case T_ReadReplicationSlotCmd:
2075 cmdtag = "READ_REPLICATION_SLOT";
2076 set_ps_display(cmdtag);
2078 EndReplicationCommand(cmdtag);
2079 break;
2080
2081 case T_BaseBackupCmd:
2082 cmdtag = "BASE_BACKUP";
2083 set_ps_display(cmdtag);
2084 PreventInTransactionBlock(true, cmdtag);
2086 EndReplicationCommand(cmdtag);
2087 break;
2088
2089 case T_CreateReplicationSlotCmd:
2090 cmdtag = "CREATE_REPLICATION_SLOT";
2091 set_ps_display(cmdtag);
2093 EndReplicationCommand(cmdtag);
2094 break;
2095
2096 case T_DropReplicationSlotCmd:
2097 cmdtag = "DROP_REPLICATION_SLOT";
2098 set_ps_display(cmdtag);
2100 EndReplicationCommand(cmdtag);
2101 break;
2102
2103 case T_AlterReplicationSlotCmd:
2104 cmdtag = "ALTER_REPLICATION_SLOT";
2105 set_ps_display(cmdtag);
2107 EndReplicationCommand(cmdtag);
2108 break;
2109
2110 case T_StartReplicationCmd:
2111 {
2112 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
2113
2114 cmdtag = "START_REPLICATION";
2115 set_ps_display(cmdtag);
2116 PreventInTransactionBlock(true, cmdtag);
2117
2118 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
2119 StartReplication(cmd);
2120 else
2122
2123 /* dupe, but necessary per libpqrcv_endstreaming */
2124 EndReplicationCommand(cmdtag);
2125
2126 Assert(xlogreader != NULL);
2127 break;
2128 }
2129
2130 case T_TimeLineHistoryCmd:
2131 cmdtag = "TIMELINE_HISTORY";
2132 set_ps_display(cmdtag);
2133 PreventInTransactionBlock(true, cmdtag);
2135 EndReplicationCommand(cmdtag);
2136 break;
2137
2138 case T_VariableShowStmt:
2139 {
2141 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2142
2143 cmdtag = "SHOW";
2144 set_ps_display(cmdtag);
2145
2146 /* syscache access needs a transaction environment */
2148 GetPGVariable(n->name, dest);
2150 EndReplicationCommand(cmdtag);
2151 }
2152 break;
2153
2154 case T_UploadManifestCmd:
2155 cmdtag = "UPLOAD_MANIFEST";
2156 set_ps_display(cmdtag);
2157 PreventInTransactionBlock(true, cmdtag);
2159 EndReplicationCommand(cmdtag);
2160 break;
2161
2162 default:
2163 elog(ERROR, "unrecognized replication command node tag: %u",
2164 cmd_node->type);
2165 }
2166
2167 /* done */
2168 MemoryContextSwitchTo(old_context);
2169 MemoryContextDelete(cmd_context);
2170
2171 /*
2172 * We need not update ps display or pg_stat_activity, because PostgresMain
2173 * will reset those to "idle". But we must reset debug_query_string to
2174 * ensure it doesn't become a dangling pointer.
2175 */
2176 debug_query_string = NULL;
2177
2178 return true;
2179}
void pgstat_report_activity(BackendState state, const char *cmd_str)
@ STATE_RUNNING
void SendBaseBackup(BaseBackupCmd *cmd, IncrementalBackupInfo *ib)
Definition: basebackup.c:990
void * yyscan_t
Definition: cubedata.h:67
void EndReplicationCommand(const char *commandTag)
Definition: dest.c:205
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
#define LOG
Definition: elog.h:31
#define DEBUG1
Definition: elog.h:30
Oid MyDatabaseId
Definition: globals.c:93
void GetPGVariable(const char *name, DestReceiver *dest)
Definition: guc_funcs.c:382
MemoryContext CurrentMemoryContext
Definition: mcxt.c:143
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:454
#define AllocSetContextCreate
Definition: memutils.h:129
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:160
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:122
const char * debug_query_string
Definition: postgres.c:87
#define InvalidOid
Definition: postgres_ext.h:36
static void set_ps_display(const char *activity)
Definition: ps_status.h:40
MemoryContextSwitchTo(old_ctx)
bool replication_scanner_is_replication_command(yyscan_t yyscanner)
Definition: repl_scanner.l:295
void replication_scanner_finish(yyscan_t yyscanner)
Definition: repl_scanner.l:280
void replication_scanner_init(const char *str, yyscan_t *yyscannerp)
Definition: repl_scanner.l:264
void SnapBuildClearExportedSnapshot(void)
Definition: snapbuild.c:600
void initStringInfo(StringInfo str)
Definition: stringinfo.c:56
Definition: nodes.h:129
NodeTag type
Definition: nodes.h:130
ReplicationKind kind
Definition: replnodes.h:94
WalSndState state
static void AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
Definition: walsender.c:1385
static void SendTimeLineHistory(TimeLineHistoryCmd *cmd)
Definition: walsender.c:557
WalSnd * MyWalSnd
Definition: walsender.c:112
static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
Definition: walsender.c:458
static StringInfoData tmpbuf
Definition: walsender.c:170
static void IdentifySystem(void)
Definition: walsender.c:377
static StringInfoData reply_message
Definition: walsender.c:169
void WalSndSetState(WalSndState state)
Definition: walsender.c:3782
static StringInfoData output_message
Definition: walsender.c:168
static void UploadManifest(void)
Definition: walsender.c:647
static volatile sig_atomic_t got_STOPPING
Definition: walsender.c:198
bool log_replication_commands
Definition: walsender.c:125
static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
Definition: walsender.c:1171
static void StartLogicalReplication(StartReplicationCmd *cmd)
Definition: walsender.c:1427
static IncrementalBackupInfo * uploaded_manifest
Definition: walsender.c:147
static void DropReplicationSlot(DropReplicationSlotCmd *cmd)
Definition: walsender.c:1376
static void StartReplication(StartReplicationCmd *cmd)
Definition: walsender.c:789
static XLogReaderState * xlogreader
Definition: walsender.c:137
PGDLLIMPORT Node * replication_parse_result
@ WALSNDSTATE_STOPPING
int replication_yyparse(yyscan_t yyscanner)
void PreventInTransactionBlock(bool isTopLevel, const char *stmtType)
Definition: xact.c:3640
void StartTransactionCommand(void)
Definition: xact.c:3051
bool IsAbortedTransactionBlockState(void)
Definition: xact.c:406
void CommitTransactionCommand(void)
Definition: xact.c:3149

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, AlterReplicationSlot(), Assert, CHECK_FOR_INTERRUPTS, CommitTransactionCommand(), CreateDestReceiver(), CreateReplicationSlot(), CurrentMemoryContext, DEBUG1, debug_query_string, generate_unaccent_rules::dest, DestRemoteSimple, DropReplicationSlot(), elog, EndReplicationCommand(), ereport, errcode(), errmsg(), errmsg_internal(), ERROR, GetPGVariable(), got_STOPPING, IdentifySystem(), initStringInfo(), InvalidOid, IsAbortedTransactionBlockState(), StartReplicationCmd::kind, LOG, log_replication_commands, MemoryContextDelete(), MemoryContextSwitchTo(), MyDatabaseId, MyWalSnd, VariableShowStmt::name, output_message, pgstat_report_activity(), PreventInTransactionBlock(), ReadReplicationSlot(), REPLICATION_KIND_PHYSICAL, replication_parse_result, replication_scanner_finish(), replication_scanner_init(), replication_scanner_is_replication_command(), replication_yyparse(), reply_message, SendBaseBackup(), SendTimeLineHistory(), set_ps_display(), SnapBuildClearExportedSnapshot(), StartLogicalReplication(), StartReplication(), StartTransactionCommand(), WalSnd::state, STATE_RUNNING, tmpbuf, Node::type, ReadReplicationSlotCmd::type, uploaded_manifest, UploadManifest(), WalSndSetState(), WALSNDSTATE_STOPPING, and xlogreader.

Referenced by PostgresMain().

◆ GetStandbyFlushRecPtr()

XLogRecPtr GetStandbyFlushRecPtr ( TimeLineID tli)

Definition at line 3477 of file walsender.c.

3478{
3479 XLogRecPtr replayPtr;
3480 TimeLineID replayTLI;
3481 XLogRecPtr receivePtr;
3483 XLogRecPtr result;
3484
3486
3487 /*
3488 * We can safely send what's already been replayed. Also, if walreceiver
3489 * is streaming WAL from the same timeline, we can send anything that it
3490 * has streamed, but hasn't been replayed yet.
3491 */
3492
3493 receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3494 replayPtr = GetXLogReplayRecPtr(&replayTLI);
3495
3496 if (tli)
3497 *tli = replayTLI;
3498
3499 result = replayPtr;
3500 if (receiveTLI == replayTLI && receivePtr > replayPtr)
3501 result = receivePtr;
3502
3503 return result;
3504}
bool IsSyncingReplicationSlots(void)
Definition: slotsync.c:1649
XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
bool am_cascading_walsender
Definition: walsender.c:116
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59
static TimeLineID receiveTLI
Definition: xlogrecovery.c:263
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References am_cascading_walsender, Assert, GetWalRcvFlushRecPtr(), GetXLogReplayRecPtr(), IsSyncingReplicationSlots(), and receiveTLI.

Referenced by IdentifySystem(), StartReplication(), synchronize_one_slot(), XLogSendLogical(), and XLogSendPhysical().

◆ HandleUploadManifestPacket()

static bool HandleUploadManifestPacket ( StringInfo  buf,
off_t *  offset,
IncrementalBackupInfo ib 
)
static

Definition at line 713 of file walsender.c.

715{
716 int mtype;
717 int maxmsglen;
718
720
722 mtype = pq_getbyte();
723 if (mtype == EOF)
725 (errcode(ERRCODE_CONNECTION_FAILURE),
726 errmsg("unexpected EOF on client connection with an open transaction")));
727
728 switch (mtype)
729 {
730 case 'd': /* CopyData */
731 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
732 break;
733 case 'c': /* CopyDone */
734 case 'f': /* CopyFail */
735 case 'H': /* Flush */
736 case 'S': /* Sync */
737 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
738 break;
739 default:
741 (errcode(ERRCODE_PROTOCOL_VIOLATION),
742 errmsg("unexpected message type 0x%02X during COPY from stdin",
743 mtype)));
744 maxmsglen = 0; /* keep compiler quiet */
745 break;
746 }
747
748 /* Now collect the message body */
749 if (pq_getmessage(buf, maxmsglen))
751 (errcode(ERRCODE_CONNECTION_FAILURE),
752 errmsg("unexpected EOF on client connection with an open transaction")));
754
755 /* Process the message */
756 switch (mtype)
757 {
758 case 'd': /* CopyData */
759 AppendIncrementalManifestData(ib, buf->data, buf->len);
760 return true;
761
762 case 'c': /* CopyDone */
763 return false;
764
765 case 'H': /* Sync */
766 case 'S': /* Flush */
767 /* Ignore these while in CopyOut mode as we do elsewhere. */
768 return true;
769
770 case 'f':
772 (errcode(ERRCODE_QUERY_CANCELED),
773 errmsg("COPY from stdin failed: %s",
775 }
776
777 /* Not reached. */
778 Assert(false);
779 return false;
780}
void AppendIncrementalManifestData(IncrementalBackupInfo *ib, const char *data, int len)
#define PQ_SMALL_MESSAGE_LIMIT
Definition: libpq.h:30
#define PQ_LARGE_MESSAGE_LIMIT
Definition: libpq.h:31
#define HOLD_CANCEL_INTERRUPTS()
Definition: miscadmin.h:141
#define RESUME_CANCEL_INTERRUPTS()
Definition: miscadmin.h:143
static char * buf
Definition: pg_test_fsync.c:72
int pq_getmessage(StringInfo s, int maxlen)
Definition: pqcomm.c:1203
int pq_getbyte(void)
Definition: pqcomm.c:964
void pq_startmsgread(void)
Definition: pqcomm.c:1141
const char * pq_getmsgstring(StringInfo msg)
Definition: pqformat.c:579

References AppendIncrementalManifestData(), Assert, buf, ereport, errcode(), errmsg(), ERROR, HOLD_CANCEL_INTERRUPTS, pq_getbyte(), pq_getmessage(), pq_getmsgstring(), PQ_LARGE_MESSAGE_LIMIT, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), and RESUME_CANCEL_INTERRUPTS.

Referenced by UploadManifest().

◆ HandleWalSndInitStopping()

void HandleWalSndInitStopping ( void  )

Definition at line 3533 of file walsender.c.

3534{
3536
3537 /*
3538 * If replication has not yet started, die like with SIGTERM. If
3539 * replication is active, only set a flag and wake up the main loop. It
3540 * will send any outstanding WAL, wait for it to be replicated to the
3541 * standby, and then exit gracefully.
3542 */
3543 if (!replication_active)
3544 kill(MyProcPid, SIGTERM);
3545 else
3546 got_STOPPING = true;
3547}
int MyProcPid
Definition: globals.c:46
bool am_walsender
Definition: walsender.c:115
static volatile sig_atomic_t replication_active
Definition: walsender.c:206
#define kill(pid, sig)
Definition: win32_port.h:503

References am_walsender, Assert, got_STOPPING, kill, MyProcPid, and replication_active.

Referenced by procsignal_sigusr1_handler().

◆ IdentifySystem()

static void IdentifySystem ( void  )
static

Definition at line 377 of file walsender.c.

378{
379 char sysid[32];
380 char xloc[MAXFNAMELEN];
381 XLogRecPtr logptr;
382 char *dbname = NULL;
384 TupOutputState *tstate;
385 TupleDesc tupdesc;
386 Datum values[4];
387 bool nulls[4] = {0};
388 TimeLineID currTLI;
389
390 /*
391 * Reply with a result set with one row, four columns. First col is system
392 * ID, second is timeline ID, third is current xlog location and the
393 * fourth contains the database name if we are connected to one.
394 */
395
396 snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
398
401 logptr = GetStandbyFlushRecPtr(&currTLI);
402 else
403 logptr = GetFlushRecPtr(&currTLI);
404
405 snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
406
408 {
410
411 /* syscache access needs a transaction env. */
414 /* copy dbname out of TX context */
417 }
418
420
421 /* need a tuple descriptor representing four columns */
422 tupdesc = CreateTemplateTupleDesc(4);
423 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
424 TEXTOID, -1, 0);
425 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
426 INT8OID, -1, 0);
427 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
428 TEXTOID, -1, 0);
429 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
430 TEXTOID, -1, 0);
431
432 /* prepare for projection of tuples */
433 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
434
435 /* column 1: system identifier */
436 values[0] = CStringGetTextDatum(sysid);
437
438 /* column 2: timeline */
439 values[1] = Int64GetDatum(currTLI);
440
441 /* column 3: wal location */
442 values[2] = CStringGetTextDatum(xloc);
443
444 /* column 4: database name, or NULL if none */
445 if (dbname)
447 else
448 nulls[3] = true;
449
450 /* send it to dest */
451 do_tup_output(tstate, values, nulls);
452
453 end_tup_output(tstate);
454}
#define UINT64_FORMAT
Definition: c.h:504
char * get_database_name(Oid dbid)
Definition: dbcommands.c:3187
struct cursor * cur
Definition: ecpg.c:29
Datum Int64GetDatum(int64 X)
Definition: fmgr.c:1807
char * MemoryContextStrdup(MemoryContext context, const char *string)
Definition: mcxt.c:1683
char * dbname
Definition: streamutil.c:50
XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli)
Definition: walsender.c:3477
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4568
bool RecoveryInProgress(void)
Definition: xlog.c:6334
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition: xlog.c:6499

References am_cascading_walsender, begin_tup_output_tupdesc(), CommitTransactionCommand(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, cur, CurrentMemoryContext, dbname, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), get_database_name(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetSystemIdentifier(), Int64GetDatum(), InvalidOid, LSN_FORMAT_ARGS, MAXFNAMELEN, MemoryContextStrdup(), MyDatabaseId, RecoveryInProgress(), snprintf, StartTransactionCommand(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), UINT64_FORMAT, and values.

Referenced by exec_replication_command().

◆ InitWalSenderSlot()

static void InitWalSenderSlot ( void  )
static

Definition at line 2869 of file walsender.c.

2870{
2871 int i;
2872
2873 /*
2874 * WalSndCtl should be set up already (we inherit this by fork() or
2875 * EXEC_BACKEND mechanism from the postmaster).
2876 */
2877 Assert(WalSndCtl != NULL);
2878 Assert(MyWalSnd == NULL);
2879
2880 /*
2881 * Find a free walsender slot and reserve it. This must not fail due to
2882 * the prior check for free WAL senders in InitProcess().
2883 */
2884 for (i = 0; i < max_wal_senders; i++)
2885 {
2886 WalSnd *walsnd = &WalSndCtl->walsnds[i];
2887
2888 SpinLockAcquire(&walsnd->mutex);
2889
2890 if (walsnd->pid != 0)
2891 {
2892 SpinLockRelease(&walsnd->mutex);
2893 continue;
2894 }
2895 else
2896 {
2897 /*
2898 * Found a free slot. Reserve it for us.
2899 */
2900 walsnd->pid = MyProcPid;
2901 walsnd->state = WALSNDSTATE_STARTUP;
2902 walsnd->sentPtr = InvalidXLogRecPtr;
2903 walsnd->needreload = false;
2904 walsnd->write = InvalidXLogRecPtr;
2905 walsnd->flush = InvalidXLogRecPtr;
2906 walsnd->apply = InvalidXLogRecPtr;
2907 walsnd->writeLag = -1;
2908 walsnd->flushLag = -1;
2909 walsnd->applyLag = -1;
2910 walsnd->sync_standby_priority = 0;
2911 walsnd->replyTime = 0;
2912
2913 /*
2914 * The kind assignment is done here and not in StartReplication()
2915 * and StartLogicalReplication(). Indeed, the logical walsender
2916 * needs to read WAL records (like snapshot of running
2917 * transactions) during the slot creation. So it needs to be woken
2918 * up based on its kind.
2919 *
2920 * The kind assignment could also be done in StartReplication(),
2921 * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2922 * seems better to set it on one place.
2923 */
2924 if (MyDatabaseId == InvalidOid)
2926 else
2928
2929 SpinLockRelease(&walsnd->mutex);
2930 /* don't need the lock anymore */
2931 MyWalSnd = (WalSnd *) walsnd;
2932
2933 break;
2934 }
2935 }
2936
2937 Assert(MyWalSnd != NULL);
2938
2939 /* Arrange to clean up at walsender exit */
2941}
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:365
int i
Definition: isn.c:72
#define SpinLockRelease(lock)
Definition: spin.h:61
#define SpinLockAcquire(lock)
Definition: spin.h:59
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]
TimeOffset writeLag
slock_t mutex
XLogRecPtr flush
XLogRecPtr sentPtr
TimeOffset flushLag
ReplicationKind kind
XLogRecPtr write
TimeOffset applyLag
int sync_standby_priority
bool needreload
TimestampTz replyTime
XLogRecPtr apply
int max_wal_senders
Definition: walsender.c:121
static void WalSndKill(int code, Datum arg)
Definition: walsender.c:2945
WalSndCtlData * WalSndCtl
Definition: walsender.c:109
@ WALSNDSTATE_STARTUP

References WalSnd::apply, WalSnd::applyLag, Assert, WalSnd::flush, WalSnd::flushLag, i, InvalidOid, InvalidXLogRecPtr, WalSnd::kind, max_wal_senders, WalSnd::mutex, MyDatabaseId, MyProcPid, MyWalSnd, WalSnd::needreload, on_shmem_exit(), WalSnd::pid, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WalSnd::replyTime, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSnd::sync_standby_priority, WalSndCtl, WalSndKill(), WalSndCtlData::walsnds, WALSNDSTATE_STARTUP, WalSnd::write, and WalSnd::writeLag.

◆ LagTrackerRead()

static TimeOffset LagTrackerRead ( int  head,
XLogRecPtr  lsn,
TimestampTz  now 
)
static

Definition at line 4133 of file walsender.c.

4134{
4135 TimestampTz time = 0;
4136
4137 /* Read all unread samples up to this LSN or end of buffer. */
4138 while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
4140 {
4142 lag_tracker->last_read[head] =
4144 lag_tracker->read_heads[head] =
4146 }
4147
4148 /*
4149 * If the lag tracker is empty, that means the standby has processed
4150 * everything we've ever sent so we should now clear 'last_read'. If we
4151 * didn't do that, we'd risk using a stale and irrelevant sample for
4152 * interpolation at the beginning of the next burst of WAL after a period
4153 * of idleness.
4154 */
4156 lag_tracker->last_read[head].time = 0;
4157
4158 if (time > now)
4159 {
4160 /* If the clock somehow went backwards, treat as not found. */
4161 return -1;
4162 }
4163 else if (time == 0)
4164 {
4165 /*
4166 * We didn't cross a time. If there is a future sample that we
4167 * haven't reached yet, and we've already reached at least one sample,
4168 * let's interpolate the local flushed time. This is mainly useful
4169 * for reporting a completely stuck apply position as having
4170 * increasing lag, since otherwise we'd have to wait for it to
4171 * eventually start moving again and cross one of our samples before
4172 * we can show the lag increasing.
4173 */
4175 {
4176 /* There are no future samples, so we can't interpolate. */
4177 return -1;
4178 }
4179 else if (lag_tracker->last_read[head].time != 0)
4180 {
4181 /* We can interpolate between last_read and the next sample. */
4182 double fraction;
4183 WalTimeSample prev = lag_tracker->last_read[head];
4185
4186 if (lsn < prev.lsn)
4187 {
4188 /*
4189 * Reported LSNs shouldn't normally go backwards, but it's
4190 * possible when there is a timeline change. Treat as not
4191 * found.
4192 */
4193 return -1;
4194 }
4195
4196 Assert(prev.lsn < next.lsn);
4197
4198 if (prev.time > next.time)
4199 {
4200 /* If the clock somehow went backwards, treat as not found. */
4201 return -1;
4202 }
4203
4204 /* See how far we are between the previous and next samples. */
4205 fraction =
4206 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
4207
4208 /* Scale the local flush time proportionally. */
4209 time = (TimestampTz)
4210 ((double) prev.time + (next.time - prev.time) * fraction);
4211 }
4212 else
4213 {
4214 /*
4215 * We have only a future sample, implying that we were entirely
4216 * caught up but and now there is a new burst of WAL and the
4217 * standby hasn't processed the first sample yet. Until the
4218 * standby reaches the future sample the best we can do is report
4219 * the hypothetical lag if that sample were to be replayed now.
4220 */
4222 }
4223 }
4224
4225 /* Return the elapsed time since local flush time in microseconds. */
4226 Assert(time != 0);
4227 return now - time;
4228}
Datum now(PG_FUNCTION_ARGS)
Definition: timestamp.c:1608
static int32 next
Definition: blutils.c:219
int64 TimestampTz
Definition: timestamp.h:39
WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]
Definition: walsender.c:224
int read_heads[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:226
WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]
Definition: walsender.c:227
int write_head
Definition: walsender.c:225
TimestampTz time
Definition: walsender.c:214
XLogRecPtr lsn
Definition: walsender.c:213
static LagTracker * lag_tracker
Definition: walsender.c:230
#define LAG_TRACKER_BUFFER_SIZE
Definition: walsender.c:218

References Assert, LagTracker::buffer, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_read, WalTimeSample::lsn, next, now(), LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by ProcessStandbyReplyMessage().

◆ LagTrackerWrite()

static void LagTrackerWrite ( XLogRecPtr  lsn,
TimestampTz  local_flush_time 
)
static

Definition at line 4068 of file walsender.c.

4069{
4070 bool buffer_full;
4071 int new_write_head;
4072 int i;
4073
4074 if (!am_walsender)
4075 return;
4076
4077 /*
4078 * If the lsn hasn't advanced since last time, then do nothing. This way
4079 * we only record a new sample when new WAL has been written.
4080 */
4081 if (lag_tracker->last_lsn == lsn)
4082 return;
4083 lag_tracker->last_lsn = lsn;
4084
4085 /*
4086 * If advancing the write head of the circular buffer would crash into any
4087 * of the read heads, then the buffer is full. In other words, the
4088 * slowest reader (presumably apply) is the one that controls the release
4089 * of space.
4090 */
4091 new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
4092 buffer_full = false;
4093 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
4094 {
4095 if (new_write_head == lag_tracker->read_heads[i])
4096 buffer_full = true;
4097 }
4098
4099 /*
4100 * If the buffer is full, for now we just rewind by one slot and overwrite
4101 * the last sample, as a simple (if somewhat uneven) way to lower the
4102 * sampling rate. There may be better adaptive compaction algorithms.
4103 */
4104 if (buffer_full)
4105 {
4106 new_write_head = lag_tracker->write_head;
4107 if (lag_tracker->write_head > 0)
4109 else
4111 }
4112
4113 /* Store a sample at the current write head position. */
4115 lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
4116 lag_tracker->write_head = new_write_head;
4117}
XLogRecPtr last_lsn
Definition: walsender.c:223
#define NUM_SYNC_REP_WAIT_MODE
Definition: syncrep.h:27

References am_walsender, LagTracker::buffer, i, lag_tracker, LAG_TRACKER_BUFFER_SIZE, LagTracker::last_lsn, WalTimeSample::lsn, NUM_SYNC_REP_WAIT_MODE, LagTracker::read_heads, WalTimeSample::time, and LagTracker::write_head.

Referenced by WalSndUpdateProgress(), and XLogSendPhysical().

◆ logical_read_xlog_page()

static int logical_read_xlog_page ( XLogReaderState state,
XLogRecPtr  targetPagePtr,
int  reqLen,
XLogRecPtr  targetRecPtr,
char *  cur_page 
)
static

Definition at line 1021 of file walsender.c.

1023{
1024 XLogRecPtr flushptr;
1025 int count;
1026 WALReadError errinfo;
1027 XLogSegNo segno;
1028 TimeLineID currTLI;
1029
1030 /*
1031 * Make sure we have enough WAL available before retrieving the current
1032 * timeline.
1033 */
1034 flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
1035
1036 /* Fail if not enough (implies we are going to shut down) */
1037 if (flushptr < targetPagePtr + reqLen)
1038 return -1;
1039
1040 /*
1041 * Since logical decoding is also permitted on a standby server, we need
1042 * to check if the server is in recovery to decide how to get the current
1043 * timeline ID (so that it also covers the promotion or timeline change
1044 * cases). We must determine am_cascading_walsender after waiting for the
1045 * required WAL so that it is correct when the walsender wakes up after a
1046 * promotion.
1047 */
1049
1051 GetXLogReplayRecPtr(&currTLI);
1052 else
1053 currTLI = GetWALInsertionTimeLine();
1054
1055 XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
1056 sendTimeLineIsHistoric = (state->currTLI != currTLI);
1057 sendTimeLine = state->currTLI;
1058 sendTimeLineValidUpto = state->currTLIValidUntil;
1059 sendTimeLineNextTLI = state->nextTLI;
1060
1061 if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
1062 count = XLOG_BLCKSZ; /* more than one block available */
1063 else
1064 count = flushptr - targetPagePtr; /* part of the page available */
1065
1066 /* now actually read the data, we know it's there */
1067 if (!WALRead(state,
1068 cur_page,
1069 targetPagePtr,
1070 count,
1071 currTLI, /* Pass the current TLI because only
1072 * WalSndSegmentOpen controls whether new TLI
1073 * is needed. */
1074 &errinfo))
1075 WALReadRaiseError(&errinfo);
1076
1077 /*
1078 * After reading into the buffer, check that what we read was valid. We do
1079 * this after reading, because even though the segment was present when we
1080 * opened it, it might get recycled or removed while we read it. The
1081 * read() succeeds in that case, but the data we tried to read might
1082 * already have been overwritten with new WAL records.
1083 */
1084 XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
1085 CheckXLogRemoved(segno, state->seg.ws_tli);
1086
1087 return count;
1088}
Definition: regguts.h:323
static TimeLineID sendTimeLine
Definition: walsender.c:156
static bool sendTimeLineIsHistoric
Definition: walsender.c:158
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc)
Definition: walsender.c:1793
static TimeLineID sendTimeLineNextTLI
Definition: walsender.c:157
static XLogRecPtr sendTimeLineValidUpto
Definition: walsender.c:159
TimeLineID GetWALInsertionTimeLine(void)
Definition: xlog.c:6520
void CheckXLogRemoved(XLogSegNo segno, TimeLineID tli)
Definition: xlog.c:3727
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
uint64 XLogSegNo
Definition: xlogdefs.h:48
bool WALRead(XLogReaderState *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, WALReadError *errinfo)
Definition: xlogreader.c:1503
void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength, TimeLineID currTLI)
Definition: xlogutils.c:718
void WALReadRaiseError(WALReadError *errinfo)
Definition: xlogutils.c:1020

References am_cascading_walsender, CheckXLogRemoved(), GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), RecoveryInProgress(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, WALRead(), WALReadRaiseError(), WalSndWaitForWal(), XLByteToSeg, and XLogReadDetermineTimeline().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ NeedToWaitForStandbys()

static bool NeedToWaitForStandbys ( XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1733 of file walsender.c.

1734{
1735 int elevel = got_STOPPING ? ERROR : WARNING;
1736 bool failover_slot;
1737
1738 failover_slot = (replication_active && MyReplicationSlot->data.failover);
1739
1740 /*
1741 * Note that after receiving the shutdown signal, an ERROR is reported if
1742 * any slots are dropped, invalidated, or inactive. This measure is taken
1743 * to prevent the walsender from waiting indefinitely.
1744 */
1745 if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
1746 {
1747 *wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
1748 return true;
1749 }
1750
1751 *wait_event = 0;
1752 return false;
1753}
#define WARNING
Definition: elog.h:36
bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
Definition: slot.c:2605

References ReplicationSlot::data, ERROR, ReplicationSlotPersistentData::failover, got_STOPPING, MyReplicationSlot, replication_active, StandbySlotsHaveCaughtup(), and WARNING.

Referenced by NeedToWaitForWal(), and WalSndWaitForWal().

◆ NeedToWaitForWal()

static bool NeedToWaitForWal ( XLogRecPtr  target_lsn,
XLogRecPtr  flushed_lsn,
uint32 wait_event 
)
static

Definition at line 1765 of file walsender.c.

1767{
1768 /* Check if we need to wait for WALs to be flushed to disk */
1769 if (target_lsn > flushed_lsn)
1770 {
1771 *wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
1772 return true;
1773 }
1774
1775 /* Check if the standby slots have caught up to the flushed position */
1776 return NeedToWaitForStandbys(flushed_lsn, wait_event);
1777}
static bool NeedToWaitForStandbys(XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1733

References NeedToWaitForStandbys().

Referenced by WalSndWaitForWal().

◆ offset_to_interval()

static Interval * offset_to_interval ( TimeOffset  offset)
static

Definition at line 3820 of file walsender.c.

3821{
3822 Interval *result = palloc(sizeof(Interval));
3823
3824 result->month = 0;
3825 result->day = 0;
3826 result->time = offset;
3827
3828 return result;
3829}
void * palloc(Size size)
Definition: mcxt.c:1317
int32 day
Definition: timestamp.h:51
int32 month
Definition: timestamp.h:52
TimeOffset time
Definition: timestamp.h:49

References Interval::day, Interval::month, palloc(), and Interval::time.

Referenced by pg_stat_get_wal_senders().

◆ parseCreateReplSlotOptions()

static void parseCreateReplSlotOptions ( CreateReplicationSlotCmd cmd,
bool *  reserve_wal,
CRSSnapshotAction snapshot_action,
bool *  two_phase,
bool *  failover 
)
static

Definition at line 1094 of file walsender.c.

1098{
1099 ListCell *lc;
1100 bool snapshot_action_given = false;
1101 bool reserve_wal_given = false;
1102 bool two_phase_given = false;
1103 bool failover_given = false;
1104
1105 /* Parse options */
1106 foreach(lc, cmd->options)
1107 {
1108 DefElem *defel = (DefElem *) lfirst(lc);
1109
1110 if (strcmp(defel->defname, "snapshot") == 0)
1111 {
1112 char *action;
1113
1114 if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1115 ereport(ERROR,
1116 (errcode(ERRCODE_SYNTAX_ERROR),
1117 errmsg("conflicting or redundant options")));
1118
1119 action = defGetString(defel);
1120 snapshot_action_given = true;
1121
1122 if (strcmp(action, "export") == 0)
1123 *snapshot_action = CRS_EXPORT_SNAPSHOT;
1124 else if (strcmp(action, "nothing") == 0)
1125 *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1126 else if (strcmp(action, "use") == 0)
1127 *snapshot_action = CRS_USE_SNAPSHOT;
1128 else
1129 ereport(ERROR,
1130 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1131 errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1132 defel->defname, action)));
1133 }
1134 else if (strcmp(defel->defname, "reserve_wal") == 0)
1135 {
1136 if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1137 ereport(ERROR,
1138 (errcode(ERRCODE_SYNTAX_ERROR),
1139 errmsg("conflicting or redundant options")));
1140
1141 reserve_wal_given = true;
1142 *reserve_wal = defGetBoolean(defel);
1143 }
1144 else if (strcmp(defel->defname, "two_phase") == 0)
1145 {
1146 if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1147 ereport(ERROR,
1148 (errcode(ERRCODE_SYNTAX_ERROR),
1149 errmsg("conflicting or redundant options")));
1150 two_phase_given = true;
1151 *two_phase = defGetBoolean(defel);
1152 }
1153 else if (strcmp(defel->defname, "failover") == 0)
1154 {
1155 if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1156 ereport(ERROR,
1157 (errcode(ERRCODE_SYNTAX_ERROR),
1158 errmsg("conflicting or redundant options")));
1159 failover_given = true;
1160 *failover = defGetBoolean(defel);
1161 }
1162 else
1163 elog(ERROR, "unrecognized option: %s", defel->defname);
1164 }
1165}
char * defGetString(DefElem *def)
Definition: define.c:35
#define lfirst(lc)
Definition: pg_list.h:172
char * defname
Definition: parsenodes.h:817
@ CRS_NOEXPORT_SNAPSHOT
Definition: walsender.h:23

References generate_unaccent_rules::action, CRS_EXPORT_SNAPSHOT, CRS_NOEXPORT_SNAPSHOT, CRS_USE_SNAPSHOT, defGetBoolean(), defGetString(), DefElem::defname, elog, ereport, errcode(), errmsg(), ERROR, CreateReplicationSlotCmd::kind, lfirst, CreateReplicationSlotCmd::options, REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, and two_phase.

Referenced by CreateReplicationSlot().

◆ pg_stat_get_wal_senders()

Datum pg_stat_get_wal_senders ( PG_FUNCTION_ARGS  )

Definition at line 3836 of file walsender.c.

3837{
3838#define PG_STAT_GET_WAL_SENDERS_COLS 12
3839 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3840 SyncRepStandbyData *sync_standbys;
3841 int num_standbys;
3842 int i;
3843
3844 InitMaterializedSRF(fcinfo, 0);
3845
3846 /*
3847 * Get the currently active synchronous standbys. This could be out of
3848 * date before we're done, but we'll use the data anyway.
3849 */
3850 num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3851
3852 for (i = 0; i < max_wal_senders; i++)
3853 {
3854 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3855 XLogRecPtr sent_ptr;
3857 XLogRecPtr flush;
3858 XLogRecPtr apply;
3859 TimeOffset writeLag;
3860 TimeOffset flushLag;
3861 TimeOffset applyLag;
3862 int priority;
3863 int pid;
3865 TimestampTz replyTime;
3866 bool is_sync_standby;
3868 bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3869 int j;
3870
3871 /* Collect data from shared memory */
3872 SpinLockAcquire(&walsnd->mutex);
3873 if (walsnd->pid == 0)
3874 {
3875 SpinLockRelease(&walsnd->mutex);
3876 continue;
3877 }
3878 pid = walsnd->pid;
3879 sent_ptr = walsnd->sentPtr;
3880 state = walsnd->state;
3881 write = walsnd->write;
3882 flush = walsnd->flush;
3883 apply = walsnd->apply;
3884 writeLag = walsnd->writeLag;
3885 flushLag = walsnd->flushLag;
3886 applyLag = walsnd->applyLag;
3887 priority = walsnd->sync_standby_priority;
3888 replyTime = walsnd->replyTime;
3889 SpinLockRelease(&walsnd->mutex);
3890
3891 /*
3892 * Detect whether walsender is/was considered synchronous. We can
3893 * provide some protection against stale data by checking the PID
3894 * along with walsnd_index.
3895 */
3896 is_sync_standby = false;
3897 for (j = 0; j < num_standbys; j++)
3898 {
3899 if (sync_standbys[j].walsnd_index == i &&
3900 sync_standbys[j].pid == pid)
3901 {
3902 is_sync_standby = true;
3903 break;
3904 }
3905 }
3906
3907 values[0] = Int32GetDatum(pid);
3908
3909 if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3910 {
3911 /*
3912 * Only superusers and roles with privileges of pg_read_all_stats
3913 * can see details. Other users only get the pid value to know
3914 * it's a walsender, but no details.
3915 */
3916 MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3917 }
3918 else
3919 {
3921
3922 if (XLogRecPtrIsInvalid(sent_ptr))
3923 nulls[2] = true;
3924 values[2] = LSNGetDatum(sent_ptr);
3925
3927 nulls[3] = true;
3928 values[3] = LSNGetDatum(write);
3929
3930 if (XLogRecPtrIsInvalid(flush))
3931 nulls[4] = true;
3932 values[4] = LSNGetDatum(flush);
3933
3934 if (XLogRecPtrIsInvalid(apply))
3935 nulls[5] = true;
3936 values[5] = LSNGetDatum(apply);
3937
3938 /*
3939 * Treat a standby such as a pg_basebackup background process
3940 * which always returns an invalid flush location, as an
3941 * asynchronous standby.
3942 */
3943 priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3944
3945 if (writeLag < 0)
3946 nulls[6] = true;
3947 else
3949
3950 if (flushLag < 0)
3951 nulls[7] = true;
3952 else
3954
3955 if (applyLag < 0)
3956 nulls[8] = true;
3957 else
3959
3960 values[9] = Int32GetDatum(priority);
3961
3962 /*
3963 * More easily understood version of standby state. This is purely
3964 * informational.
3965 *
3966 * In quorum-based sync replication, the role of each standby
3967 * listed in synchronous_standby_names can be changing very
3968 * frequently. Any standbys considered as "sync" at one moment can
3969 * be switched to "potential" ones at the next moment. So, it's
3970 * basically useless to report "sync" or "potential" as their sync
3971 * states. We report just "quorum" for them.
3972 */
3973 if (priority == 0)
3974 values[10] = CStringGetTextDatum("async");
3975 else if (is_sync_standby)
3977 CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3978 else
3979 values[10] = CStringGetTextDatum("potential");
3980
3981 if (replyTime == 0)
3982 nulls[11] = true;
3983 else
3984 values[11] = TimestampTzGetDatum(replyTime);
3985 }
3986
3987 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3988 values, nulls);
3989 }
3990
3991 return (Datum) 0;
3992}
bool has_privs_of_role(Oid member, Oid role)
Definition: acl.c:5268
#define MemSet(start, val, len)
Definition: c.h:974
int64 TimeOffset
Definition: timestamp.h:40
void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
Definition: funcapi.c:76
#define write(a, b, c)
Definition: win32.h:14
int j
Definition: isn.c:73
Oid GetUserId(void)
Definition: miscinit.c:517
static Datum LSNGetDatum(XLogRecPtr X)
Definition: pg_lsn.h:28
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
TupleDesc setDesc
Definition: execnodes.h:343
Tuplestorestate * setResult
Definition: execnodes.h:342
uint8 syncrep_method
Definition: syncrep.h:68
SyncRepConfigData * SyncRepConfig
Definition: syncrep.c:97
int SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
Definition: syncrep.c:711
#define SYNC_REP_PRIORITY
Definition: syncrep.h:35
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition: tuplestore.c:784
static Datum TimestampTzGetDatum(TimestampTz X)
Definition: timestamp.h:52
static Datum IntervalPGetDatum(const Interval *X)
Definition: timestamp.h:58
#define PG_STAT_GET_WAL_SENDERS_COLS
static Interval * offset_to_interval(TimeOffset offset)
Definition: walsender.c:3820
static const char * WalSndGetStateString(WalSndState state)
Definition: walsender.c:3801
WalSndState
#define XLogRecPtrIsInvalid(r)
Definition: xlogdefs.h:29

References WalSnd::apply, WalSnd::applyLag, CStringGetTextDatum, WalSnd::flush, WalSnd::flushLag, GetUserId(), has_privs_of_role(), i, InitMaterializedSRF(), Int32GetDatum(), IntervalPGetDatum(), j, LSNGetDatum(), max_wal_senders, MemSet, WalSnd::mutex, offset_to_interval(), PG_STAT_GET_WAL_SENDERS_COLS, WalSnd::pid, WalSnd::replyTime, WalSnd::sentPtr, ReturnSetInfo::setDesc, ReturnSetInfo::setResult, SpinLockAcquire, SpinLockRelease, WalSnd::state, SYNC_REP_PRIORITY, WalSnd::sync_standby_priority, SyncRepConfigData::syncrep_method, SyncRepConfig, SyncRepGetCandidateStandbys(), TimestampTzGetDatum(), tuplestore_putvalues(), values, WalSndCtl, WalSndGetStateString(), WalSndCtlData::walsnds, WalSnd::write, write, WalSnd::writeLag, and XLogRecPtrIsInvalid.

◆ PhysicalConfirmReceivedLocation()

static void PhysicalConfirmReceivedLocation ( XLogRecPtr  lsn)
static

Definition at line 2330 of file walsender.c.

2331{
2332 bool changed = false;
2334
2335 Assert(lsn != InvalidXLogRecPtr);
2336 SpinLockAcquire(&slot->mutex);
2337 if (slot->data.restart_lsn != lsn)
2338 {
2339 changed = true;
2340 slot->data.restart_lsn = lsn;
2341 }
2342 SpinLockRelease(&slot->mutex);
2343
2344 if (changed)
2345 {
2349 }
2350
2351 /*
2352 * One could argue that the slot should be saved to disk now, but that'd
2353 * be energy wasted - the worst thing lost information could cause here is
2354 * to give wrong information in a statistics view - we'll just potentially
2355 * be more conservative in removing files.
2356 */
2357}
void ReplicationSlotsComputeRequiredLSN(void)
Definition: slot.c:1133
XLogRecPtr restart_lsn
Definition: slot.h:96
slock_t mutex
Definition: slot.h:154
void PhysicalWakeupLogicalWalSnd(void)
Definition: walsender.c:1708

References Assert, ReplicationSlot::data, InvalidXLogRecPtr, ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire, and SpinLockRelease.

Referenced by ProcessStandbyReplyMessage().

◆ PhysicalReplicationSlotNewXmin()

static void PhysicalReplicationSlotNewXmin ( TransactionId  feedbackXmin,
TransactionId  feedbackCatalogXmin 
)
static

Definition at line 2468 of file walsender.c.

2469{
2470 bool changed = false;
2472
2473 SpinLockAcquire(&slot->mutex);
2475
2476 /*
2477 * For physical replication we don't need the interlock provided by xmin
2478 * and effective_xmin since the consequences of a missed increase are
2479 * limited to query cancellations, so set both at once.
2480 */
2481 if (!TransactionIdIsNormal(slot->data.xmin) ||
2482 !TransactionIdIsNormal(feedbackXmin) ||
2483 TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2484 {
2485 changed = true;
2486 slot->data.xmin = feedbackXmin;
2487 slot->effective_xmin = feedbackXmin;
2488 }
2490 !TransactionIdIsNormal(feedbackCatalogXmin) ||
2491 TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2492 {
2493 changed = true;
2494 slot->data.catalog_xmin = feedbackCatalogXmin;
2495 slot->effective_catalog_xmin = feedbackCatalogXmin;
2496 }
2497 SpinLockRelease(&slot->mutex);
2498
2499 if (changed)
2500 {
2503 }
2504}
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition: slot.c:1077
TransactionId xmin
Definition: proc.h:177
TransactionId xmin
Definition: slot.h:85
TransactionId catalog_xmin
Definition: slot.h:93
TransactionId effective_catalog_xmin
Definition: slot.h:178
TransactionId effective_xmin
Definition: slot.h:177
bool TransactionIdPrecedes(TransactionId id1, TransactionId id2)
Definition: transam.c:280
#define InvalidTransactionId
Definition: transam.h:31
#define TransactionIdIsNormal(xid)
Definition: transam.h:42

References ReplicationSlotPersistentData::catalog_xmin, ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, InvalidTransactionId, ReplicationSlot::mutex, MyProc, MyReplicationSlot, ReplicationSlotMarkDirty(), ReplicationSlotsComputeRequiredXmin(), SpinLockAcquire, SpinLockRelease, TransactionIdIsNormal, TransactionIdPrecedes(), ReplicationSlotPersistentData::xmin, and PGPROC::xmin.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ PhysicalWakeupLogicalWalSnd()

void PhysicalWakeupLogicalWalSnd ( void  )

Definition at line 1708 of file walsender.c.

1709{
1711
1712 /*
1713 * If we are running in a standby, there is no need to wake up walsenders.
1714 * This is because we do not support syncing slots to cascading standbys,
1715 * so, there are no walsenders waiting for standbys to catch up.
1716 */
1717 if (RecoveryInProgress())
1718 return;
1719
1722}
void ConditionVariableBroadcast(ConditionVariable *cv)
bool SlotExistsInSyncStandbySlots(const char *slot_name)
Definition: slot.c:2572
#define SlotIsPhysical(slot)
Definition: slot.h:216
ConditionVariable wal_confirm_rcv_cv

References Assert, ConditionVariableBroadcast(), ReplicationSlot::data, MyReplicationSlot, ReplicationSlotPersistentData::name, NameStr, RecoveryInProgress(), SlotExistsInSyncStandbySlots(), SlotIsPhysical, WalSndCtlData::wal_confirm_rcv_cv, and WalSndCtl.

Referenced by pg_physical_replication_slot_advance(), and PhysicalConfirmReceivedLocation().

◆ ProcessPendingWrites()

static void ProcessPendingWrites ( void  )
static

Definition at line 1589 of file walsender.c.

1590{
1591 for (;;)
1592 {
1593 long sleeptime;
1594
1595 /* Check for input from the client */
1597
1598 /* die if timeout was reached */
1600
1601 /* Send keepalive if the time has come */
1603
1604 if (!pq_is_send_pending())
1605 break;
1606
1608
1609 /* Sleep until something happens or we time out */
1611 WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1612
1613 /* Clear any already-pending wakeups */
1615
1617
1618 /* Process any requests or signals received recently */
1620 {
1621 ConfigReloadPending = false;
1624 }
1625
1626 /* Try to flush pending output to the client */
1627 if (pq_flush_if_writable() != 0)
1629 }
1630
1631 /* reactivate latch so WalSndLoop knows to continue */
1633}
TimestampTz GetCurrentTimestamp(void)
Definition: timestamp.c:1644
struct Latch * MyLatch
Definition: globals.c:62
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:71
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
void SetLatch(Latch *latch)
Definition: latch.c:632
void ResetLatch(Latch *latch)
Definition: latch.c:724
#define WL_SOCKET_READABLE
Definition: latch.h:128
#define WL_SOCKET_WRITEABLE
Definition: latch.h:129
#define pq_flush_if_writable()
Definition: libpq.h:47
#define pq_is_send_pending()
Definition: libpq.h:48
void SyncRepInitConfig(void)
Definition: syncrep.c:402
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
Definition: walsender.c:3660
static void WalSndCheckTimeOut(void)
Definition: walsender.c:2716
static void ProcessRepliesIfAny(void)
Definition: walsender.c:2186
static void WalSndKeepaliveIfNecessary(void)
Definition: walsender.c:4030
static void WalSndShutdown(void)
Definition: walsender.c:240
static long WalSndComputeSleeptime(TimestampTz now)
Definition: walsender.c:2672

References CHECK_FOR_INTERRUPTS, ConfigReloadPending, GetCurrentTimestamp(), MyLatch, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), SetLatch(), SyncRepInitConfig(), WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, and WL_SOCKET_WRITEABLE.

Referenced by WalSndUpdateProgress(), and WalSndWriteData().

◆ ProcessRepliesIfAny()

static void ProcessRepliesIfAny ( void  )
static

Definition at line 2186 of file walsender.c.

2187{
2188 unsigned char firstchar;
2189 int maxmsglen;
2190 int r;
2191 bool received = false;
2192
2194
2195 /*
2196 * If we already received a CopyDone from the frontend, any subsequent
2197 * message is the beginning of a new command, and should be processed in
2198 * the main processing loop.
2199 */
2200 while (!streamingDoneReceiving)
2201 {
2203 r = pq_getbyte_if_available(&firstchar);
2204 if (r < 0)
2205 {
2206 /* unexpected error or EOF */
2208 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2209 errmsg("unexpected EOF on standby connection")));
2210 proc_exit(0);
2211 }
2212 if (r == 0)
2213 {
2214 /* no data available without blocking */
2215 pq_endmsgread();
2216 break;
2217 }
2218
2219 /* Validate message type and set packet size limit */
2220 switch (firstchar)
2221 {
2222 case PqMsg_CopyData:
2223 maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
2224 break;
2225 case PqMsg_CopyDone:
2226 case PqMsg_Terminate:
2227 maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
2228 break;
2229 default:
2230 ereport(FATAL,
2231 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2232 errmsg("invalid standby message type \"%c\"",
2233 firstchar)));
2234 maxmsglen = 0; /* keep compiler quiet */
2235 break;
2236 }
2237
2238 /* Read the message contents */
2240 if (pq_getmessage(&reply_message, maxmsglen))
2241 {
2243 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2244 errmsg("unexpected EOF on standby connection")));
2245 proc_exit(0);
2246 }
2247
2248 /* ... and process it */
2249 switch (firstchar)
2250 {
2251 /*
2252 * 'd' means a standby reply wrapped in a CopyData packet.
2253 */
2254 case PqMsg_CopyData:
2256 received = true;
2257 break;
2258
2259 /*
2260 * CopyDone means the standby requested to finish streaming.
2261 * Reply with CopyDone, if we had not sent that already.
2262 */
2263 case PqMsg_CopyDone:
2265 {
2266 pq_putmessage_noblock('c', NULL, 0);
2267 streamingDoneSending = true;
2268 }
2269
2271 received = true;
2272 break;
2273
2274 /*
2275 * 'X' means that the standby is closing down the socket.
2276 */
2277 case PqMsg_Terminate:
2278 proc_exit(0);
2279
2280 default:
2281 Assert(false); /* NOT REACHED */
2282 }
2283 }
2284
2285 /*
2286 * Save the last reply timestamp if we've received at least one reply.
2287 */
2288 if (received)
2289 {
2292 }
2293}
#define COMMERROR
Definition: elog.h:33
#define FATAL
Definition: elog.h:41
void proc_exit(int code)
Definition: ipc.c:104
#define pq_putmessage_noblock(msgtype, s, len)
Definition: libpq.h:51
int pq_getbyte_if_available(unsigned char *c)
Definition: pqcomm.c:1004
void pq_endmsgread(void)
Definition: pqcomm.c:1165
#define PqMsg_CopyDone
Definition: protocol.h:64
#define PqMsg_CopyData
Definition: protocol.h:65
#define PqMsg_Terminate
Definition: protocol.h:28
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:75
static bool waiting_for_ping_response
Definition: walsender.c:182
static TimestampTz last_processing
Definition: walsender.c:173
static bool streamingDoneSending
Definition: walsender.c:190
static void ProcessStandbyMessage(void)
Definition: walsender.c:2299
static bool streamingDoneReceiving
Definition: walsender.c:191

References Assert, COMMERROR, ereport, errcode(), errmsg(), FATAL, GetCurrentTimestamp(), last_processing, last_reply_timestamp, pq_endmsgread(), pq_getbyte_if_available(), pq_getmessage(), PQ_LARGE_MESSAGE_LIMIT, pq_putmessage_noblock, PQ_SMALL_MESSAGE_LIMIT, pq_startmsgread(), PqMsg_CopyData, PqMsg_CopyDone, PqMsg_Terminate, proc_exit(), ProcessStandbyMessage(), reply_message, resetStringInfo(), streamingDoneReceiving, streamingDoneSending, and waiting_for_ping_response.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ ProcessStandbyHSFeedbackMessage()

static void ProcessStandbyHSFeedbackMessage ( void  )
static

Definition at line 2548 of file walsender.c.

2549{
2550 TransactionId feedbackXmin;
2551 uint32 feedbackEpoch;
2552 TransactionId feedbackCatalogXmin;
2553 uint32 feedbackCatalogEpoch;
2554 TimestampTz replyTime;
2555
2556 /*
2557 * Decipher the reply message. The caller already consumed the msgtype
2558 * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2559 * of this message.
2560 */
2561 replyTime = pq_getmsgint64(&reply_message);
2562 feedbackXmin = pq_getmsgint(&reply_message, 4);
2563 feedbackEpoch = pq_getmsgint(&reply_message, 4);
2564 feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2565 feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2566
2568 {
2569 char *replyTimeStr;
2570
2571 /* Copy because timestamptz_to_str returns a static buffer */
2572 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2573
2574 elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2575 feedbackXmin,
2576 feedbackEpoch,
2577 feedbackCatalogXmin,
2578 feedbackCatalogEpoch,
2579 replyTimeStr);
2580
2581 pfree(replyTimeStr);
2582 }
2583
2584 /*
2585 * Update shared state for this WalSender process based on reply data from
2586 * standby.
2587 */
2588 {
2589 WalSnd *walsnd = MyWalSnd;
2590
2591 SpinLockAcquire(&walsnd->mutex);
2592 walsnd->replyTime = replyTime;
2593 SpinLockRelease(&walsnd->mutex);
2594 }
2595
2596 /*
2597 * Unset WalSender's xmins if the feedback message values are invalid.
2598 * This happens when the downstream turned hot_standby_feedback off.
2599 */
2600 if (!TransactionIdIsNormal(feedbackXmin)
2601 && !TransactionIdIsNormal(feedbackCatalogXmin))
2602 {
2604 if (MyReplicationSlot != NULL)
2605 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2606 return;
2607 }
2608
2609 /*
2610 * Check that the provided xmin/epoch are sane, that is, not in the future
2611 * and not so far back as to be already wrapped around. Ignore if not.
2612 */
2613 if (TransactionIdIsNormal(feedbackXmin) &&
2614 !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2615 return;
2616
2617 if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2618 !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2619 return;
2620
2621 /*
2622 * Set the WalSender's xmin equal to the standby's requested xmin, so that
2623 * the xmin will be taken into account by GetSnapshotData() /
2624 * ComputeXidHorizons(). This will hold back the removal of dead rows and
2625 * thereby prevent the generation of cleanup conflicts on the standby
2626 * server.
2627 *
2628 * There is a small window for a race condition here: although we just
2629 * checked that feedbackXmin precedes nextXid, the nextXid could have
2630 * gotten advanced between our fetching it and applying the xmin below,
2631 * perhaps far enough to make feedbackXmin wrap around. In that case the
2632 * xmin we set here would be "in the future" and have no effect. No point
2633 * in worrying about this since it's too late to save the desired data
2634 * anyway. Assuming that the standby sends us an increasing sequence of
2635 * xmins, this could only happen during the first reply cycle, else our
2636 * own xmin would prevent nextXid from advancing so far.
2637 *
2638 * We don't bother taking the ProcArrayLock here. Setting the xmin field
2639 * is assumed atomic, and there's no real need to prevent concurrent
2640 * horizon determinations. (If we're moving our xmin forward, this is
2641 * obviously safe, and if we're moving it backwards, well, the data is at
2642 * risk already since a VACUUM could already have determined the horizon.)
2643 *
2644 * If we're using a replication slot we reserve the xmin via that,
2645 * otherwise via the walsender's PGPROC entry. We can only track the
2646 * catalog xmin separately when using a slot, so we store the least of the
2647 * two provided when not using a slot.
2648 *
2649 * XXX: It might make sense to generalize the ephemeral slot concept and
2650 * always use the slot mechanism to handle the feedback xmin.
2651 */
2652 if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2653 PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2654 else
2655 {
2656 if (TransactionIdIsNormal(feedbackCatalogXmin)
2657 && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2658 MyProc->xmin = feedbackCatalogXmin;
2659 else
2660 MyProc->xmin = feedbackXmin;
2661 }
2662}
const char * timestamptz_to_str(TimestampTz t)
Definition: timestamp.c:1843
uint32_t uint32
Definition: c.h:485
uint32 TransactionId
Definition: c.h:606
bool message_level_is_interesting(int elevel)
Definition: elog.c:272
#define DEBUG2
Definition: elog.h:29
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void pfree(void *pointer)
Definition: mcxt.c:1521
unsigned int pq_getmsgint(StringInfo msg, int b)
Definition: pqformat.c:415
int64 pq_getmsgint64(StringInfo msg)
Definition: pqformat.c:453
static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
Definition: walsender.c:2468
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
Definition: walsender.c:2517

References DEBUG2, elog, InvalidTransactionId, message_level_is_interesting(), WalSnd::mutex, MyProc, MyReplicationSlot, MyWalSnd, pfree(), PhysicalReplicationSlotNewXmin(), pq_getmsgint(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, SpinLockAcquire, SpinLockRelease, timestamptz_to_str(), TransactionIdInRecentPast(), TransactionIdIsNormal, TransactionIdPrecedes(), and PGPROC::xmin.

Referenced by ProcessStandbyMessage().

◆ ProcessStandbyMessage()

static void ProcessStandbyMessage ( void  )
static

Definition at line 2299 of file walsender.c.

2300{
2301 char msgtype;
2302
2303 /*
2304 * Check message type from the first byte.
2305 */
2306 msgtype = pq_getmsgbyte(&reply_message);
2307
2308 switch (msgtype)
2309 {
2310 case 'r':
2312 break;
2313
2314 case 'h':
2316 break;
2317
2318 default:
2320 (errcode(ERRCODE_PROTOCOL_VIOLATION),
2321 errmsg("unexpected message type \"%c\"", msgtype)));
2322 proc_exit(0);
2323 }
2324}
int pq_getmsgbyte(StringInfo msg)
Definition: pqformat.c:399
static void ProcessStandbyHSFeedbackMessage(void)
Definition: walsender.c:2548
static void ProcessStandbyReplyMessage(void)
Definition: walsender.c:2363

References COMMERROR, ereport, errcode(), errmsg(), pq_getmsgbyte(), proc_exit(), ProcessStandbyHSFeedbackMessage(), ProcessStandbyReplyMessage(), and reply_message.

Referenced by ProcessRepliesIfAny().

◆ ProcessStandbyReplyMessage()

static void ProcessStandbyReplyMessage ( void  )
static

Definition at line 2363 of file walsender.c.

2364{
2365 XLogRecPtr writePtr,
2366 flushPtr,
2367 applyPtr;
2368 bool replyRequested;
2369 TimeOffset writeLag,
2370 flushLag,
2371 applyLag;
2372 bool clearLagTimes;
2374 TimestampTz replyTime;
2375
2376 static bool fullyAppliedLastTime = false;
2377
2378 /* the caller already consumed the msgtype byte */
2379 writePtr = pq_getmsgint64(&reply_message);
2380 flushPtr = pq_getmsgint64(&reply_message);
2381 applyPtr = pq_getmsgint64(&reply_message);
2382 replyTime = pq_getmsgint64(&reply_message);
2383 replyRequested = pq_getmsgbyte(&reply_message);
2384
2386 {
2387 char *replyTimeStr;
2388
2389 /* Copy because timestamptz_to_str returns a static buffer */
2390 replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2391
2392 elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2393 LSN_FORMAT_ARGS(writePtr),
2394 LSN_FORMAT_ARGS(flushPtr),
2395 LSN_FORMAT_ARGS(applyPtr),
2396 replyRequested ? " (reply requested)" : "",
2397 replyTimeStr);
2398
2399 pfree(replyTimeStr);
2400 }
2401
2402 /* See if we can compute the round-trip lag for these positions. */
2404 writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2405 flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2406 applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2407
2408 /*
2409 * If the standby reports that it has fully replayed the WAL in two
2410 * consecutive reply messages, then the second such message must result
2411 * from wal_receiver_status_interval expiring on the standby. This is a
2412 * convenient time to forget the lag times measured when it last
2413 * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2414 * until more WAL traffic arrives.
2415 */
2416 clearLagTimes = false;
2417 if (applyPtr == sentPtr)
2418 {
2419 if (fullyAppliedLastTime)
2420 clearLagTimes = true;
2421 fullyAppliedLastTime = true;
2422 }
2423 else
2424 fullyAppliedLastTime = false;
2425
2426 /* Send a reply if the standby requested one. */
2427 if (replyRequested)
2429
2430 /*
2431 * Update shared state for this WalSender process based on reply data from
2432 * standby.
2433 */
2434 {
2435 WalSnd *walsnd = MyWalSnd;
2436
2437 SpinLockAcquire(&walsnd->mutex);
2438 walsnd->write = writePtr;
2439 walsnd->flush = flushPtr;
2440 walsnd->apply = applyPtr;
2441 if (writeLag != -1 || clearLagTimes)
2442 walsnd->writeLag = writeLag;
2443 if (flushLag != -1 || clearLagTimes)
2444 walsnd->flushLag = flushLag;
2445 if (applyLag != -1 || clearLagTimes)
2446 walsnd->applyLag = applyLag;
2447 walsnd->replyTime = replyTime;
2448 SpinLockRelease(&walsnd->mutex);
2449 }
2450
2453
2454 /*
2455 * Advance our local xmin horizon when the client confirmed a flush.
2456 */
2457 if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2458 {
2461 else
2463 }
2464}
void LogicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: logical.c:1837
#define SlotIsLogical(slot)
Definition: slot.h:217
void SyncRepReleaseWaiters(void)
Definition: syncrep.c:431
#define SYNC_REP_WAIT_WRITE
Definition: syncrep.h:23
#define SYNC_REP_WAIT_FLUSH
Definition: syncrep.h:24
#define SYNC_REP_WAIT_APPLY
Definition: syncrep.h:25
static XLogRecPtr sentPtr
Definition: walsender.c:165
static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
Definition: walsender.c:2330
static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
Definition: walsender.c:4007
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
Definition: walsender.c:4133

References am_cascading_walsender, WalSnd::apply, WalSnd::applyLag, DEBUG2, elog, WalSnd::flush, WalSnd::flushLag, GetCurrentTimestamp(), InvalidXLogRecPtr, LagTrackerRead(), LogicalConfirmReceivedLocation(), LSN_FORMAT_ARGS, message_level_is_interesting(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, now(), pfree(), PhysicalConfirmReceivedLocation(), pq_getmsgbyte(), pq_getmsgint64(), pstrdup(), reply_message, WalSnd::replyTime, sentPtr, SlotIsLogical, SpinLockAcquire, SpinLockRelease, SYNC_REP_WAIT_APPLY, SYNC_REP_WAIT_FLUSH, SYNC_REP_WAIT_WRITE, SyncRepReleaseWaiters(), timestamptz_to_str(), WalSndKeepalive(), WalSnd::write, and WalSnd::writeLag.

Referenced by ProcessStandbyMessage().

◆ ReadReplicationSlot()

static void ReadReplicationSlot ( ReadReplicationSlotCmd cmd)
static

Definition at line 458 of file walsender.c.

459{
460#define READ_REPLICATION_SLOT_COLS 3
461 ReplicationSlot *slot;
463 TupOutputState *tstate;
464 TupleDesc tupdesc;
466 bool nulls[READ_REPLICATION_SLOT_COLS];
467
469 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
470 TEXTOID, -1, 0);
471 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
472 TEXTOID, -1, 0);
473 /* TimeLineID is unsigned, so int4 is not wide enough. */
474 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
475 INT8OID, -1, 0);
476
477 memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
478
479 LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
480 slot = SearchNamedReplicationSlot(cmd->slotname, false);
481 if (slot == NULL || !slot->in_use)
482 {
483 LWLockRelease(ReplicationSlotControlLock);
484 }
485 else
486 {
487 ReplicationSlot slot_contents;
488 int i = 0;
489
490 /* Copy slot contents while holding spinlock */
491 SpinLockAcquire(&slot->mutex);
492 slot_contents = *slot;
493 SpinLockRelease(&slot->mutex);
494 LWLockRelease(ReplicationSlotControlLock);
495
496 if (OidIsValid(slot_contents.data.database))
498 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
499 errmsg("cannot use %s with a logical replication slot",
500 "READ_REPLICATION_SLOT"));
501
502 /* slot type */
503 values[i] = CStringGetTextDatum("physical");
504 nulls[i] = false;
505 i++;
506
507 /* start LSN */
508 if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
509 {
510 char xloc[64];
511
512 snprintf(xloc, sizeof(xloc), "%X/%X",
513 LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
515 nulls[i] = false;
516 }
517 i++;
518
519 /* timeline this WAL was produced on */
520 if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
521 {
522 TimeLineID slots_position_timeline;
523 TimeLineID current_timeline;
524 List *timeline_history = NIL;
525
526 /*
527 * While in recovery, use as timeline the currently-replaying one
528 * to get the LSN position's history.
529 */
530 if (RecoveryInProgress())
531 (void) GetXLogReplayRecPtr(&current_timeline);
532 else
533 current_timeline = GetWALInsertionTimeLine();
534
535 timeline_history = readTimeLineHistory(current_timeline);
536 slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
537 timeline_history);
538 values[i] = Int64GetDatum((int64) slots_position_timeline);
539 nulls[i] = false;
540 }
541 i++;
542
544 }
545
547 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
548 do_tup_output(tstate, values, nulls);
549 end_tup_output(tstate);
550}
List * readTimeLineHistory(TimeLineID targetTLI)
Definition: timeline.c:76
TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history)
Definition: timeline.c:544
int64_t int64
Definition: c.h:482
#define OidIsValid(objectId)
Definition: c.h:729
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1168
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1781
@ LW_SHARED
Definition: lwlock.h:115
ReplicationSlot * SearchNamedReplicationSlot(const char *name, bool need_lock)
Definition: slot.c:464
Definition: pg_list.h:54
bool in_use
Definition: slot.h:157
#define READ_REPLICATION_SLOT_COLS

References Assert, begin_tup_output_tupdesc(), CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, ReplicationSlot::data, ReplicationSlotPersistentData::database, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), ereport, errcode(), errmsg(), ERROR, GetWALInsertionTimeLine(), GetXLogReplayRecPtr(), i, ReplicationSlot::in_use, Int64GetDatum(), LSN_FORMAT_ARGS, LW_SHARED, LWLockAcquire(), LWLockRelease(), ReplicationSlot::mutex, NIL, OidIsValid, READ_REPLICATION_SLOT_COLS, readTimeLineHistory(), RecoveryInProgress(), ReplicationSlotPersistentData::restart_lsn, SearchNamedReplicationSlot(), ReadReplicationSlotCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, tliOfPointInHistory(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, and XLogRecPtrIsInvalid.

Referenced by exec_replication_command().

◆ SendTimeLineHistory()

static void SendTimeLineHistory ( TimeLineHistoryCmd cmd)
static

Definition at line 557 of file walsender.c.

558{
560 TupleDesc tupdesc;
562 char histfname[MAXFNAMELEN];
563 char path[MAXPGPATH];
564 int fd;
565 off_t histfilelen;
566 off_t bytesleft;
567 Size len;
568
570
571 /*
572 * Reply with a result set with one row, and two columns. The first col is
573 * the name of the history file, 2nd is the contents.
574 */
575 tupdesc = CreateTemplateTupleDesc(2);
576 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
577 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
578
579 TLHistoryFileName(histfname, cmd->timeline);
580 TLHistoryFilePath(path, cmd->timeline);
581
582 /* Send a RowDescription message */
583 dest->rStartup(dest, CMD_SELECT, tupdesc);
584
585 /* Send a DataRow message */
587 pq_sendint16(&buf, 2); /* # of columns */
588 len = strlen(histfname);
589 pq_sendint32(&buf, len); /* col1 len */
590 pq_sendbytes(&buf, histfname, len);
591
592 fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
593 if (fd < 0)
596 errmsg("could not open file \"%s\": %m", path)));
597
598 /* Determine file length and send it to client */
599 histfilelen = lseek(fd, 0, SEEK_END);
600 if (histfilelen < 0)
603 errmsg("could not seek to end of file \"%s\": %m", path)));
604 if (lseek(fd, 0, SEEK_SET) != 0)
607 errmsg("could not seek to beginning of file \"%s\": %m", path)));
608
609 pq_sendint32(&buf, histfilelen); /* col2 len */
610
611 bytesleft = histfilelen;
612 while (bytesleft > 0)
613 {
614 PGAlignedBlock rbuf;
615 int nread;
616
617 pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
618 nread = read(fd, rbuf.data, sizeof(rbuf));
620 if (nread < 0)
623 errmsg("could not read file \"%s\": %m",
624 path)));
625 else if (nread == 0)
628 errmsg("could not read file \"%s\": read %d of %zu",
629 path, nread, (Size) bytesleft)));
630
631 pq_sendbytes(&buf, rbuf.data, nread);
632 bytesleft -= nread;
633 }
634
635 if (CloseTransientFile(fd) != 0)
638 errmsg("could not close file \"%s\": %m", path)));
639
641}
#define PG_BINARY
Definition: c.h:1227
size_t Size
Definition: c.h:559
int errcode_for_file_access(void)
Definition: elog.c:876
int CloseTransientFile(int fd)
Definition: fd.c:2831
int OpenTransientFile(const char *fileName, int fileFlags)
Definition: fd.c:2655
#define read(a, b, c)
Definition: win32.h:13
@ CMD_SELECT
Definition: nodes.h:265
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
#define MAXPGPATH
const void size_t len
void pq_sendbytes(StringInfo buf, const void *data, int datalen)
Definition: pqformat.c:126
void pq_endmessage(StringInfo buf)
Definition: pqformat.c:296
void pq_beginmessage(StringInfo buf, char msgtype)
Definition: pqformat.c:88
static void pq_sendint32(StringInfo buf, uint32 i)
Definition: pqformat.h:144
static void pq_sendint16(StringInfo buf, uint16 i)
Definition: pqformat.h:136
static int fd(const char *x, int i)
Definition: preproc-init.c:105
#define PqMsg_DataRow
Definition: protocol.h:43
TimeLineID timeline
Definition: replnodes.h:120
char data[BLCKSZ]
Definition: c.h:1073
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: wait_event.h:85
static void pgstat_report_wait_end(void)
Definition: wait_event.h:101
static void TLHistoryFilePath(char *path, TimeLineID tli)
static void TLHistoryFileName(char *fname, TimeLineID tli)

References buf, CloseTransientFile(), CMD_SELECT, CreateDestReceiver(), CreateTemplateTupleDesc(), PGAlignedBlock::data, generate_unaccent_rules::dest, DestRemoteSimple, ereport, errcode(), ERRCODE_DATA_CORRUPTED, errcode_for_file_access(), errmsg(), ERROR, fd(), len, MAXFNAMELEN, MAXPGPATH, OpenTransientFile(), PG_BINARY, pgstat_report_wait_end(), pgstat_report_wait_start(), pq_beginmessage(), pq_endmessage(), pq_sendbytes(), pq_sendint16(), pq_sendint32(), PqMsg_DataRow, read, TimeLineHistoryCmd::timeline, TLHistoryFileName(), TLHistoryFilePath(), and TupleDescInitBuiltinEntry().

Referenced by exec_replication_command().

◆ StartLogicalReplication()

static void StartLogicalReplication ( StartReplicationCmd cmd)
static

Definition at line 1427 of file walsender.c.

1428{
1430 QueryCompletion qc;
1431
1432 /* make sure that our requirements are still fulfilled */
1434
1436
1437 ReplicationSlotAcquire(cmd->slotname, true);
1438
1439 /*
1440 * Force a disconnect, so that the decoding code doesn't need to care
1441 * about an eventual switch from running in recovery, to running in a
1442 * normal environment. Client code is expected to handle reconnects.
1443 */
1445 {
1446 ereport(LOG,
1447 (errmsg("terminating walsender process after promotion")));
1448 got_STOPPING = true;
1449 }
1450
1451 /*
1452 * Create our decoding context, making it start at the previously ack'ed
1453 * position.
1454 *
1455 * Do this before sending a CopyBothResponse message, so that any errors
1456 * are reported early.
1457 */
1459 CreateDecodingContext(cmd->startpoint, cmd->options, false,
1461 .segment_open = WalSndSegmentOpen,
1462 .segment_close = wal_segment_close),
1466
1468
1469 /* Send a CopyBothResponse message, and start streaming */
1471 pq_sendbyte(&buf, 0);
1472 pq_sendint16(&buf, 0);
1474 pq_flush();
1475
1476 /* Start reading WAL from the oldest required WAL. */
1479
1480 /*
1481 * Report the location after which we'll send out further commits as the
1482 * current sentPtr.
1483 */
1485
1486 /* Also update the sent position status in shared memory */
1490
1491 replication_active = true;
1492
1494
1495 /* Main loop of walsender */
1497
1500
1501 replication_active = false;
1502 if (got_STOPPING)
1503 proc_exit(0);
1505
1506 /* Get out of COPY mode (CommandComplete). */
1507 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1508 EndCommand(&qc, DestRemote, false);
1509}
static void SetQueryCompletion(QueryCompletion *qc, CommandTag commandTag, uint64 nprocessed)
Definition: cmdtag.h:37
void EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_output)
Definition: dest.c:169
@ DestRemote
Definition: dest.h:89
#define pq_flush()
Definition: libpq.h:46
LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition: logical.c:496
static void pq_sendbyte(StringInfo buf, uint8 byt)
Definition: pqformat.h:160
#define PqMsg_CopyBothResponse
Definition: protocol.h:54
void ReplicationSlotAcquire(const char *name, bool nowait)
Definition: slot.c:540
XLogReaderState * reader
Definition: logical.h:42
XLogRecPtr startpoint
Definition: replnodes.h:97
static void WalSndLoop(WalSndSendDataCallback send_data)
Definition: walsender.c:2743
static LogicalDecodingContext * logical_decoding_ctx
Definition: walsender.c:208
static void XLogSendLogical(void)
Definition: walsender.c:3349
@ WALSNDSTATE_CATCHUP
void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
Definition: xlogreader.c:231

References am_cascading_walsender, Assert, buf, CheckLogicalDecodingRequirements(), ReplicationSlotPersistentData::confirmed_flush, CreateDecodingContext(), ReplicationSlot::data, DestRemote, EndCommand(), ereport, errmsg(), FreeDecodingContext(), got_STOPPING, LOG, logical_decoding_ctx, logical_read_xlog_page(), WalSnd::mutex, MyReplicationSlot, MyWalSnd, StartReplicationCmd::options, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), LogicalDecodingContext::reader, RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotPersistentData::restart_lsn, sentPtr, WalSnd::sentPtr, SetQueryCompletion(), StartReplicationCmd::slotname, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, SyncRepInitConfig(), wal_segment_close(), WalSndLoop(), WalSndPrepareWrite(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WalSndUpdateProgress(), WalSndWriteData(), XL_ROUTINE, XLogBeginRead(), xlogreader, and XLogSendLogical().

Referenced by exec_replication_command().

◆ StartReplication()

static void StartReplication ( StartReplicationCmd cmd)
static

Definition at line 789 of file walsender.c.

790{
792 XLogRecPtr FlushPtr;
793 TimeLineID FlushTLI;
794
795 /* create xlogreader for physical replication */
796 xlogreader =
798 XL_ROUTINE(.segment_open = WalSndSegmentOpen,
799 .segment_close = wal_segment_close),
800 NULL);
801
802 if (!xlogreader)
804 (errcode(ERRCODE_OUT_OF_MEMORY),
805 errmsg("out of memory"),
806 errdetail("Failed while allocating a WAL reading processor.")));
807
808 /*
809 * We assume here that we're logging enough information in the WAL for
810 * log-shipping, since this is checked in PostmasterMain().
811 *
812 * NOTE: wal_level can only change at shutdown, so in most cases it is
813 * difficult for there to be WAL data that we can still see that was
814 * written at wal_level='minimal'.
815 */
816
817 if (cmd->slotname)
818 {
822 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
823 errmsg("cannot use a logical replication slot for physical replication")));
824
825 /*
826 * We don't need to verify the slot's restart_lsn here; instead we
827 * rely on the caller requesting the starting point to use. If the
828 * WAL segment doesn't exist, we'll fail later.
829 */
830 }
831
832 /*
833 * Select the timeline. If it was given explicitly by the client, use
834 * that. Otherwise use the timeline of the last replayed record.
835 */
838 FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
839 else
840 FlushPtr = GetFlushRecPtr(&FlushTLI);
841
842 if (cmd->timeline != 0)
843 {
844 XLogRecPtr switchpoint;
845
846 sendTimeLine = cmd->timeline;
847 if (sendTimeLine == FlushTLI)
848 {
851 }
852 else
853 {
854 List *timeLineHistory;
855
857
858 /*
859 * Check that the timeline the client requested exists, and the
860 * requested start location is on that timeline.
861 */
862 timeLineHistory = readTimeLineHistory(FlushTLI);
863 switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
865 list_free_deep(timeLineHistory);
866
867 /*
868 * Found the requested timeline in the history. Check that
869 * requested startpoint is on that timeline in our history.
870 *
871 * This is quite loose on purpose. We only check that we didn't
872 * fork off the requested timeline before the switchpoint. We
873 * don't check that we switched *to* it before the requested
874 * starting point. This is because the client can legitimately
875 * request to start replication from the beginning of the WAL
876 * segment that contains switchpoint, but on the new timeline, so
877 * that it doesn't end up with a partial segment. If you ask for
878 * too old a starting point, you'll get an error later when we
879 * fail to find the requested WAL segment in pg_wal.
880 *
881 * XXX: we could be more strict here and only allow a startpoint
882 * that's older than the switchpoint, if it's still in the same
883 * WAL segment.
884 */
885 if (!XLogRecPtrIsInvalid(switchpoint) &&
886 switchpoint < cmd->startpoint)
887 {
889 (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
891 cmd->timeline),
892 errdetail("This server's history forked from timeline %u at %X/%X.",
893 cmd->timeline,
894 LSN_FORMAT_ARGS(switchpoint))));
895 }
896 sendTimeLineValidUpto = switchpoint;
897 }
898 }
899 else
900 {
901 sendTimeLine = FlushTLI;
904 }
905
907
908 /* If there is nothing to stream, don't even enter COPY mode */
910 {
911 /*
912 * When we first start replication the standby will be behind the
913 * primary. For some applications, for example synchronous
914 * replication, it is important to have a clear state for this initial
915 * catchup mode, so we can trigger actions when we change streaming
916 * state later. We may stay in this state for a long time, which is
917 * exactly why we want to be able to monitor whether or not we are
918 * still here.
919 */
921
922 /* Send a CopyBothResponse message, and start streaming */
924 pq_sendbyte(&buf, 0);
925 pq_sendint16(&buf, 0);
927 pq_flush();
928
929 /*
930 * Don't allow a request to stream from a future point in WAL that
931 * hasn't been flushed to disk in this server yet.
932 */
933 if (FlushPtr < cmd->startpoint)
934 {
936 (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
938 LSN_FORMAT_ARGS(FlushPtr))));
939 }
940
941 /* Start streaming from the requested point */
942 sentPtr = cmd->startpoint;
943
944 /* Initialize shared memory status, too */
948
950
951 /* Main loop of walsender */
952 replication_active = true;
953
955
956 replication_active = false;
957 if (got_STOPPING)
958 proc_exit(0);
960
962 }
963
964 if (cmd->slotname)
966
967 /*
968 * Copy is finished now. Send a single-row result set indicating the next
969 * timeline.
970 */
972 {
973 char startpos_str[8 + 1 + 8 + 1];
975 TupOutputState *tstate;
976 TupleDesc tupdesc;
977 Datum values[2];
978 bool nulls[2] = {0};
979
980 snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
982
984
985 /*
986 * Need a tuple descriptor representing two columns. int8 may seem
987 * like a surprising data type for this, but in theory int4 would not
988 * be wide enough for this, as TimeLineID is unsigned.
989 */
990 tupdesc = CreateTemplateTupleDesc(2);
991 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
992 INT8OID, -1, 0);
993 TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
994 TEXTOID, -1, 0);
995
996 /* prepare for projection of tuple */
997 tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
998
1000 values[1] = CStringGetTextDatum(startpos_str);
1001
1002 /* send it to dest */
1003 do_tup_output(tstate, values, nulls);
1004
1005 end_tup_output(tstate);
1006 }
1007
1008 /* Send CommandComplete message */
1009 EndReplicationCommand("START_STREAMING");
1010}
XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history, TimeLineID *nextTLI)
Definition: timeline.c:572
int errdetail(const char *fmt,...)
Definition: elog.c:1203
void list_free_deep(List *list)
Definition: list.c:1560
TimeLineID timeline
Definition: replnodes.h:96
static void XLogSendPhysical(void)
Definition: walsender.c:3039
int wal_segment_size
Definition: xlog.c:143
XLogReaderState * XLogReaderAllocate(int wal_segment_size, const char *waldir, XLogReaderRoutine *routine, void *private_data)
Definition: xlogreader.c:106

References am_cascading_walsender, Assert, begin_tup_output_tupdesc(), buf, CreateDestReceiver(), CreateTemplateTupleDesc(), CStringGetTextDatum, generate_unaccent_rules::dest, DestRemoteSimple, do_tup_output(), end_tup_output(), EndReplicationCommand(), ereport, errcode(), errdetail(), errmsg(), ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_STOPPING, Int64GetDatum(), InvalidXLogRecPtr, list_free_deep(), LSN_FORMAT_ARGS, WalSnd::mutex, MyReplicationSlot, MyWalSnd, pq_beginmessage(), pq_endmessage(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyBothResponse, proc_exit(), readTimeLineHistory(), RecoveryInProgress(), replication_active, ReplicationSlotAcquire(), ReplicationSlotRelease(), sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, SlotIsLogical, StartReplicationCmd::slotname, snprintf, SpinLockAcquire, SpinLockRelease, StartReplicationCmd::startpoint, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), StartReplicationCmd::timeline, tliSwitchPoint(), TTSOpsVirtual, TupleDescInitBuiltinEntry(), values, wal_segment_close(), wal_segment_size, WalSndLoop(), WalSndSegmentOpen(), WalSndSetState(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, XL_ROUTINE, xlogreader, XLogReaderAllocate(), XLogRecPtrIsInvalid, and XLogSendPhysical().

Referenced by exec_replication_command().

◆ TransactionIdInRecentPast()

static bool TransactionIdInRecentPast ( TransactionId  xid,
uint32  epoch 
)
static

Definition at line 2517 of file walsender.c.

2518{
2519 FullTransactionId nextFullXid;
2520 TransactionId nextXid;
2521 uint32 nextEpoch;
2522
2523 nextFullXid = ReadNextFullTransactionId();
2524 nextXid = XidFromFullTransactionId(nextFullXid);
2525 nextEpoch = EpochFromFullTransactionId(nextFullXid);
2526
2527 if (xid <= nextXid)
2528 {
2529 if (epoch != nextEpoch)
2530 return false;
2531 }
2532 else
2533 {
2534 if (epoch + 1 != nextEpoch)
2535 return false;
2536 }
2537
2538 if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2539 return false; /* epoch OK, but it's wrapped around */
2540
2541 return true;
2542}
bool TransactionIdPrecedesOrEquals(TransactionId id1, TransactionId id2)
Definition: transam.c:299
#define EpochFromFullTransactionId(x)
Definition: transam.h:47
#define XidFromFullTransactionId(x)
Definition: transam.h:48
FullTransactionId ReadNextFullTransactionId(void)
Definition: varsup.c:288
static const unsigned __int64 epoch

References epoch, EpochFromFullTransactionId, ReadNextFullTransactionId(), TransactionIdPrecedesOrEquals(), and XidFromFullTransactionId.

Referenced by ProcessStandbyHSFeedbackMessage().

◆ UploadManifest()

static void UploadManifest ( void  )
static

Definition at line 647 of file walsender.c.

648{
649 MemoryContext mcxt;
651 off_t offset = 0;
653
654 /*
655 * parsing the manifest will use the cryptohash stuff, which requires a
656 * resource owner
657 */
660 CurrentResourceOwner == NULL);
662
663 /* Prepare to read manifest data into a temporary context. */
665 "incremental backup information",
668
669 /* Send a CopyInResponse message */
671 pq_sendbyte(&buf, 0);
672 pq_sendint16(&buf, 0);
674 pq_flush();
675
676 /* Receive packets from client until done. */
677 while (HandleUploadManifestPacket(&buf, &offset, ib))
678 ;
679
680 /* Finish up manifest processing. */
682
683 /*
684 * Discard any old manifest information and arrange to preserve the new
685 * information we just got.
686 *
687 * We assume that MemoryContextDelete and MemoryContextSetParent won't
688 * fail, and thus we shouldn't end up bailing out of here in such a way as
689 * to leave dangling pointers.
690 */
691 if (uploaded_manifest_mcxt != NULL)
696
697 /* clean up the resource owner we created */
699}
IncrementalBackupInfo * CreateIncrementalBackupInfo(MemoryContext mcxt)
void FinalizeIncrementalManifest(IncrementalBackupInfo *ib)
void MemoryContextSetParent(MemoryContext context, MemoryContext new_parent)
Definition: mcxt.c:637
MemoryContext CacheMemoryContext
Definition: mcxt.c:152
void pq_endmessage_reuse(StringInfo buf)
Definition: pqformat.c:314
#define PqMsg_CopyInResponse
Definition: protocol.h:45
void ReleaseAuxProcessResources(bool isCommit)
Definition: resowner.c:1002
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
ResourceOwner AuxProcessResourceOwner
Definition: resowner.c:168
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset, IncrementalBackupInfo *ib)
Definition: walsender.c:713
static MemoryContext uploaded_manifest_mcxt
Definition: walsender.c:148

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate, Assert, AuxProcessResourceOwner, buf, CacheMemoryContext, CreateIncrementalBackupInfo(), CurrentMemoryContext, CurrentResourceOwner, FinalizeIncrementalManifest(), HandleUploadManifestPacket(), MemoryContextDelete(), MemoryContextSetParent(), pq_beginmessage(), pq_endmessage_reuse(), pq_flush, pq_sendbyte(), pq_sendint16(), PqMsg_CopyInResponse, ReleaseAuxProcessResources(), uploaded_manifest, and uploaded_manifest_mcxt.

Referenced by exec_replication_command().

◆ WalSndCheckTimeOut()

static void WalSndCheckTimeOut ( void  )
static

Definition at line 2716 of file walsender.c.

2717{
2718 TimestampTz timeout;
2719
2720 /* don't bail out if we're doing something that doesn't require timeouts */
2721 if (last_reply_timestamp <= 0)
2722 return;
2723
2726
2727 if (wal_sender_timeout > 0 && last_processing >= timeout)
2728 {
2729 /*
2730 * Since typically expiration of replication timeout means
2731 * communication problem, we don't send the error message to the
2732 * standby.
2733 */
2735 (errmsg("terminating walsender process due to replication timeout")));
2736
2738 }
2739}
#define TimestampTzPlusMilliseconds(tz, ms)
Definition: timestamp.h:85
int wal_sender_timeout
Definition: walsender.c:123

References COMMERROR, ereport, errmsg(), last_processing, last_reply_timestamp, TimestampTzPlusMilliseconds, wal_sender_timeout, and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndComputeSleeptime()

static long WalSndComputeSleeptime ( TimestampTz  now)
static

Definition at line 2672 of file walsender.c.

2673{
2674 long sleeptime = 10000; /* 10 s */
2675
2677 {
2678 TimestampTz wakeup_time;
2679
2680 /*
2681 * At the latest stop sleeping once wal_sender_timeout has been
2682 * reached.
2683 */
2686
2687 /*
2688 * If no ping has been sent yet, wakeup when it's time to do so.
2689 * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2690 * the timeout passed without a response.
2691 */
2694 wal_sender_timeout / 2);
2695
2696 /* Compute relative time until wakeup. */
2697 sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2698 }
2699
2700 return sleeptime;
2701}
long TimestampDifferenceMilliseconds(TimestampTz start_time, TimestampTz stop_time)
Definition: timestamp.c:1756

References last_reply_timestamp, now(), TimestampDifferenceMilliseconds(), TimestampTzPlusMilliseconds, waiting_for_ping_response, and wal_sender_timeout.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndDone()

static void WalSndDone ( WalSndSendDataCallback  send_data)
static

Definition at line 3434 of file walsender.c.

3435{
3436 XLogRecPtr replicatedPtr;
3437
3438 /* ... let's just be real sure we're caught up ... */
3439 send_data();
3440
3441 /*
3442 * To figure out whether all WAL has successfully been replicated, check
3443 * flush location if valid, write otherwise. Tools like pg_receivewal will
3444 * usually (unless in synchronous mode) return an invalid flush location.
3445 */
3446 replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3448
3449 if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3451 {
3452 QueryCompletion qc;
3453
3454 /* Inform the standby that XLOG streaming is done */
3455 SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3456 EndCommand(&qc, DestRemote, false);
3457 pq_flush();
3458
3459 proc_exit(0);
3460 }
3463}
static bool WalSndCaughtUp
Definition: walsender.c:194

References DestRemote, EndCommand(), WalSnd::flush, InvalidXLogRecPtr, MyWalSnd, pq_flush, pq_is_send_pending, proc_exit(), sentPtr, SetQueryCompletion(), waiting_for_ping_response, WalSndCaughtUp, WalSndKeepalive(), WalSnd::write, and XLogRecPtrIsInvalid.

Referenced by WalSndLoop().

◆ WalSndErrorCleanup()

void WalSndErrorCleanup ( void  )

Definition at line 325 of file walsender.c.

326{
330
331 if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
333
334 if (MyReplicationSlot != NULL)
336
338
339 replication_active = false;
340
341 /*
342 * If there is a transaction in progress, it will clean up our
343 * ResourceOwner, but if a replication command set up a resource owner
344 * without a transaction, we've got to clean that up now.
345 */
348
350 proc_exit(0);
351
352 /* Revert back to startup state */
354}
bool ConditionVariableCancelSleep(void)
void LWLockReleaseAll(void)
Definition: lwlock.c:1876
void ReplicationSlotCleanup(bool synced_only)
Definition: slot.c:745
WALOpenSegment seg
Definition: xlogreader.h:272
static volatile sig_atomic_t got_SIGUSR2
Definition: walsender.c:197
bool IsTransactionOrTransactionBlock(void)
Definition: xact.c:4981

References ConditionVariableCancelSleep(), got_SIGUSR2, got_STOPPING, IsTransactionOrTransactionBlock(), LWLockReleaseAll(), MyReplicationSlot, pgstat_report_wait_end(), proc_exit(), ReleaseAuxProcessResources(), replication_active, ReplicationSlotCleanup(), ReplicationSlotRelease(), XLogReaderState::seg, wal_segment_close(), WalSndSetState(), WALSNDSTATE_STARTUP, WALOpenSegment::ws_file, and xlogreader.

Referenced by PostgresMain().

◆ WalSndGetStateString()

static const char * WalSndGetStateString ( WalSndState  state)
static

Definition at line 3801 of file walsender.c.

3802{
3803 switch (state)
3804 {
3806 return "startup";
3807 case WALSNDSTATE_BACKUP:
3808 return "backup";
3810 return "catchup";
3812 return "streaming";
3814 return "stopping";
3815 }
3816 return "UNKNOWN";
3817}
@ WALSNDSTATE_STREAMING
@ WALSNDSTATE_BACKUP

References WALSNDSTATE_BACKUP, WALSNDSTATE_CATCHUP, WALSNDSTATE_STARTUP, WALSNDSTATE_STOPPING, and WALSNDSTATE_STREAMING.

Referenced by pg_stat_get_wal_senders().

◆ WalSndInitStopping()

void WalSndInitStopping ( void  )

Definition at line 3718 of file walsender.c.

3719{
3720 int i;
3721
3722 for (i = 0; i < max_wal_senders; i++)
3723 {
3724 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3725 pid_t pid;
3726
3727 SpinLockAcquire(&walsnd->mutex);
3728 pid = walsnd->pid;
3729 SpinLockRelease(&walsnd->mutex);
3730
3731 if (pid == 0)
3732 continue;
3733
3735 }
3736}
#define INVALID_PROC_NUMBER
Definition: procnumber.h:26
int SendProcSignal(pid_t pid, ProcSignalReason reason, ProcNumber procNumber)
Definition: procsignal.c:281
@ PROCSIG_WALSND_INIT_STOPPING
Definition: procsignal.h:35

References i, INVALID_PROC_NUMBER, max_wal_senders, WalSnd::mutex, WalSnd::pid, PROCSIG_WALSND_INIT_STOPPING, SendProcSignal(), SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by ShutdownXLOG().

◆ WalSndKeepalive()

static void WalSndKeepalive ( bool  requestReply,
XLogRecPtr  writePtr 
)
static

Definition at line 4007 of file walsender.c.

4008{
4009 elog(DEBUG2, "sending replication keepalive");
4010
4011 /* construct the message... */
4014 pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
4016 pq_sendbyte(&output_message, requestReply ? 1 : 0);
4017
4018 /* ... and send it wrapped in CopyData */
4020
4021 /* Set local flag */
4022 if (requestReply)
4024}
static void pq_sendint64(StringInfo buf, uint64 i)
Definition: pqformat.h:152

References StringInfoData::data, DEBUG2, elog, GetCurrentTimestamp(), StringInfoData::len, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), resetStringInfo(), sentPtr, waiting_for_ping_response, and XLogRecPtrIsInvalid.

Referenced by ProcessStandbyReplyMessage(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndUpdateProgress(), and WalSndWaitForWal().

◆ WalSndKeepaliveIfNecessary()

static void WalSndKeepaliveIfNecessary ( void  )
static

Definition at line 4030 of file walsender.c.

4031{
4032 TimestampTz ping_time;
4033
4034 /*
4035 * Don't send keepalive messages if timeouts are globally disabled or
4036 * we're doing something not partaking in timeouts.
4037 */
4039 return;
4040
4042 return;
4043
4044 /*
4045 * If half of wal_sender_timeout has lapsed without receiving any reply
4046 * from the standby, send a keep-alive message to the standby requesting
4047 * an immediate reply.
4048 */
4050 wal_sender_timeout / 2);
4051 if (last_processing >= ping_time)
4052 {
4054
4055 /* Try to flush pending output to the client */
4056 if (pq_flush_if_writable() != 0)
4058 }
4059}

References InvalidXLogRecPtr, last_processing, last_reply_timestamp, pq_flush_if_writable, TimestampTzPlusMilliseconds, waiting_for_ping_response, wal_sender_timeout, WalSndKeepalive(), and WalSndShutdown().

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndKill()

static void WalSndKill ( int  code,
Datum  arg 
)
static

Definition at line 2945 of file walsender.c.

2946{
2947 WalSnd *walsnd = MyWalSnd;
2948
2949 Assert(walsnd != NULL);
2950
2951 MyWalSnd = NULL;
2952
2953 SpinLockAcquire(&walsnd->mutex);
2954 /* Mark WalSnd struct as no longer being in use. */
2955 walsnd->pid = 0;
2956 SpinLockRelease(&walsnd->mutex);
2957}

References Assert, WalSnd::mutex, MyWalSnd, WalSnd::pid, SpinLockAcquire, and SpinLockRelease.

Referenced by InitWalSenderSlot().

◆ WalSndLastCycleHandler()

static void WalSndLastCycleHandler ( SIGNAL_ARGS  )
static

Definition at line 3555 of file walsender.c.

3556{
3557 got_SIGUSR2 = true;
3559}

References got_SIGUSR2, MyLatch, and SetLatch().

Referenced by WalSndSignals().

◆ WalSndLoop()

static void WalSndLoop ( WalSndSendDataCallback  send_data)
static

Definition at line 2743 of file walsender.c.

2744{
2745 /*
2746 * Initialize the last reply timestamp. That enables timeout processing
2747 * from hereon.
2748 */
2751
2752 /*
2753 * Loop until we reach the end of this timeline or the client requests to
2754 * stop streaming.
2755 */
2756 for (;;)
2757 {
2758 /* Clear any already-pending wakeups */
2760
2762
2763 /* Process any requests or signals received recently */
2765 {
2766 ConfigReloadPending = false;
2769 }
2770
2771 /* Check for input from the client */
2773
2774 /*
2775 * If we have received CopyDone from the client, sent CopyDone
2776 * ourselves, and the output buffer is empty, it's time to exit
2777 * streaming.
2778 */
2781 break;
2782
2783 /*
2784 * If we don't have any pending data in the output buffer, try to send
2785 * some more. If there is some, we don't bother to call send_data
2786 * again until we've flushed it ... but we'd better assume we are not
2787 * caught up.
2788 */
2789 if (!pq_is_send_pending())
2790 send_data();
2791 else
2792 WalSndCaughtUp = false;
2793
2794 /* Try to flush pending output to the client */
2795 if (pq_flush_if_writable() != 0)
2797
2798 /* If nothing remains to be sent right now ... */
2800 {
2801 /*
2802 * If we're in catchup state, move to streaming. This is an
2803 * important state change for users to know about, since before
2804 * this point data loss might occur if the primary dies and we
2805 * need to failover to the standby. The state change is also
2806 * important for synchronous replication, since commits that
2807 * started to wait at that point might wait for some time.
2808 */
2810 {
2812 (errmsg_internal("\"%s\" has now caught up with upstream server",
2815 }
2816
2817 /*
2818 * When SIGUSR2 arrives, we send any outstanding logs up to the
2819 * shutdown checkpoint record (i.e., the latest record), wait for
2820 * them to be replicated to the standby, and exit. This may be a
2821 * normal termination at shutdown, or a promotion, the walsender
2822 * is not sure which.
2823 */
2824 if (got_SIGUSR2)
2825 WalSndDone(send_data);
2826 }
2827
2828 /* Check for replication timeout. */
2830
2831 /* Send keepalive if the time has come */
2833
2834 /*
2835 * Block if we have unsent data. XXX For logical replication, let
2836 * WalSndWaitForWal() handle any other blocking; idle receivers need
2837 * its additional actions. For physical replication, also block if
2838 * caught up; its send_data does not block.
2839 */
2840 if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2843 {
2844 long sleeptime;
2845 int wakeEvents;
2846
2848 wakeEvents = WL_SOCKET_READABLE;
2849 else
2850 wakeEvents = 0;
2851
2852 /*
2853 * Use fresh timestamp, not last_processing, to reduce the chance
2854 * of reaching wal_sender_timeout before sending a keepalive.
2855 */
2857
2858 if (pq_is_send_pending())
2859 wakeEvents |= WL_SOCKET_WRITEABLE;
2860
2861 /* Sleep until something happens or we time out */
2862 WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2863 }
2864 }
2865}
char * application_name
Definition: guc_tables.c:543
static void WalSndDone(WalSndSendDataCallback send_data)
Definition: walsender.c:3434

References application_name, CHECK_FOR_INTERRUPTS, ConfigReloadPending, DEBUG1, ereport, errmsg_internal(), GetCurrentTimestamp(), got_SIGUSR2, last_reply_timestamp, MyLatch, MyWalSnd, PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), ResetLatch(), WalSnd::state, streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndDone(), WalSndKeepaliveIfNecessary(), WalSndSetState(), WalSndShutdown(), WALSNDSTATE_CATCHUP, WALSNDSTATE_STREAMING, WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, and XLogSendLogical().

Referenced by StartLogicalReplication(), and StartReplication().

◆ WalSndPrepareWrite()

static void WalSndPrepareWrite ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1520 of file walsender.c.

1521{
1522 /* can't have sync rep confused by sending the same LSN several times */
1523 if (!last_write)
1524 lsn = InvalidXLogRecPtr;
1525
1526 resetStringInfo(ctx->out);
1527
1528 pq_sendbyte(ctx->out, 'w');
1529 pq_sendint64(ctx->out, lsn); /* dataStart */
1530 pq_sendint64(ctx->out, lsn); /* walEnd */
1531
1532 /*
1533 * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1534 * reserve space here.
1535 */
1536 pq_sendint64(ctx->out, 0); /* sendtime */
1537}
StringInfo out
Definition: logical.h:71

References InvalidXLogRecPtr, LogicalDecodingContext::out, pq_sendbyte(), pq_sendint64(), and resetStringInfo().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndRqstFileReload()

void WalSndRqstFileReload ( void  )

Definition at line 3510 of file walsender.c.

3511{
3512 int i;
3513
3514 for (i = 0; i < max_wal_senders; i++)
3515 {
3516 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3517
3518 SpinLockAcquire(&walsnd->mutex);
3519 if (walsnd->pid == 0)
3520 {
3521 SpinLockRelease(&walsnd->mutex);
3522 continue;
3523 }
3524 walsnd->needreload = true;
3525 SpinLockRelease(&walsnd->mutex);
3526 }
3527}

References i, max_wal_senders, WalSnd::mutex, WalSnd::needreload, WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSndCtl, and WalSndCtlData::walsnds.

Referenced by KeepFileRestoredFromArchive().

◆ WalSndSegmentOpen()

static void WalSndSegmentOpen ( XLogReaderState state,
XLogSegNo  nextSegNo,
TimeLineID tli_p 
)
static

Definition at line 2961 of file walsender.c.

2963{
2964 char path[MAXPGPATH];
2965
2966 /*-------
2967 * When reading from a historic timeline, and there is a timeline switch
2968 * within this segment, read from the WAL segment belonging to the new
2969 * timeline.
2970 *
2971 * For example, imagine that this server is currently on timeline 5, and
2972 * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2973 * 0/13002088. In pg_wal, we have these files:
2974 *
2975 * ...
2976 * 000000040000000000000012
2977 * 000000040000000000000013
2978 * 000000050000000000000013
2979 * 000000050000000000000014
2980 * ...
2981 *
2982 * In this situation, when requested to send the WAL from segment 0x13, on
2983 * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2984 * recovery prefers files from newer timelines, so if the segment was
2985 * restored from the archive on this server, the file belonging to the old
2986 * timeline, 000000040000000000000013, might not exist. Their contents are
2987 * equal up to the switchpoint, because at a timeline switch, the used
2988 * portion of the old segment is copied to the new file.
2989 */
2990 *tli_p = sendTimeLine;
2992 {
2993 XLogSegNo endSegNo;
2994
2995 XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2996 if (nextSegNo == endSegNo)
2997 *tli_p = sendTimeLineNextTLI;
2998 }
2999
3000 XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
3001 state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
3002 if (state->seg.ws_file >= 0)
3003 return;
3004
3005 /*
3006 * If the file is not found, assume it's because the standby asked for a
3007 * too old WAL segment that has already been removed or recycled.
3008 */
3009 if (errno == ENOENT)
3010 {
3011 char xlogfname[MAXFNAMELEN];
3012 int save_errno = errno;
3013
3014 XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
3015 errno = save_errno;
3016 ereport(ERROR,
3018 errmsg("requested WAL segment %s has already been removed",
3019 xlogfname)));
3020 }
3021 else
3022 ereport(ERROR,
3024 errmsg("could not open file \"%s\": %m",
3025 path)));
3026}
int BasicOpenFile(const char *fileName, int fileFlags)
Definition: fd.c:1086
static void XLogFilePath(char *path, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)
static void XLogFileName(char *fname, TimeLineID tli, XLogSegNo logSegNo, int wal_segsz_bytes)

References BasicOpenFile(), ereport, errcode_for_file_access(), errmsg(), ERROR, MAXFNAMELEN, MAXPGPATH, PG_BINARY, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, wal_segment_size, XLByteToSeg, XLogFileName(), and XLogFilePath().

Referenced by CreateReplicationSlot(), StartLogicalReplication(), and StartReplication().

◆ WalSndSetState()

void WalSndSetState ( WalSndState  state)

Definition at line 3782 of file walsender.c.

3783{
3784 WalSnd *walsnd = MyWalSnd;
3785
3787
3788 if (walsnd->state == state)
3789 return;
3790
3791 SpinLockAcquire(&walsnd->mutex);
3792 walsnd->state = state;
3793 SpinLockRelease(&walsnd->mutex);
3794}

References am_walsender, Assert, WalSnd::mutex, MyWalSnd, SpinLockAcquire, SpinLockRelease, and WalSnd::state.

Referenced by exec_replication_command(), SendBaseBackup(), StartLogicalReplication(), StartReplication(), WalSndErrorCleanup(), WalSndLoop(), and XLogSendPhysical().

◆ WalSndShmemInit()

void WalSndShmemInit ( void  )

Definition at line 3594 of file walsender.c.

3595{
3596 bool found;
3597 int i;
3598
3600 ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3601
3602 if (!found)
3603 {
3604 /* First time through, so initialize */
3606
3607 for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3609
3610 for (i = 0; i < max_wal_senders; i++)
3611 {
3612 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3613
3614 SpinLockInit(&walsnd->mutex);
3615 }
3616
3620 }
3621}
void ConditionVariableInit(ConditionVariable *cv)
static void dlist_init(dlist_head *head)
Definition: ilist.h:314
void * ShmemInitStruct(const char *name, Size size, bool *foundPtr)
Definition: shmem.c:382
#define SpinLockInit(lock)
Definition: spin.h:57
ConditionVariable wal_replay_cv
dlist_head SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]
ConditionVariable wal_flush_cv
Size WalSndShmemSize(void)
Definition: walsender.c:3582

References ConditionVariableInit(), dlist_init(), i, max_wal_senders, MemSet, WalSnd::mutex, NUM_SYNC_REP_WAIT_MODE, ShmemInitStruct(), SpinLockInit, WalSndCtlData::SyncRepQueue, WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, WalSndCtlData::walsnds, and WalSndShmemSize().

Referenced by CreateOrAttachShmemStructs().

◆ WalSndShmemSize()

Size WalSndShmemSize ( void  )

Definition at line 3582 of file walsender.c.

3583{
3584 Size size = 0;
3585
3586 size = offsetof(WalSndCtlData, walsnds);
3588
3589 return size;
3590}
Size add_size(Size s1, Size s2)
Definition: shmem.c:488
Size mul_size(Size s1, Size s2)
Definition: shmem.c:505
static pg_noinline void Size size
Definition: slab.c:607

References add_size(), max_wal_senders, mul_size(), and size.

Referenced by CalculateShmemSize(), and WalSndShmemInit().

◆ WalSndShutdown()

static void WalSndShutdown ( void  )
static

Definition at line 240 of file walsender.c.

279{
281
282 /* Create a per-walsender data structure in shared memory */
284
285 /* need resource owner for e.g. basebackups */
287
288 /*
289 * Let postmaster know that we're a WAL sender. Once we've declared us as
290 * a WAL sender process, postmaster will let us outlive the bgwriter and
291 * kill us last in the shutdown sequence, so we get a chance to stream all
292 * remaining WAL at shutdown, including the shutdown checkpoint. Note that
293 * there's no going back, and we mustn't write any WAL records after this.
294 */
297
298 /*
299 * If the client didn't specify a database to connect to, show in PGPROC
300 * that our advertised xmin should affect vacuum horizons in all
301 * databases. This allows physical replication clients to send hot
302 * standby feedback that will delay vacuum cleanup in all databases.
303 */
305 {
307 LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
310 LWLockRelease(ProcArrayLock);
311 }
312
313 /* Initialize empty timestamp buffer for lag tracking. */
315}
@ LW_EXCLUSIVE
Definition: lwlock.h:114
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:1215
MemoryContext TopMemoryContext
Definition: mcxt.c:149
void SendPostmasterSignal(PMSignalReason reason)
Definition: pmsignal.c:165
void MarkPostmasterChildWalSender(void)
Definition: pmsignal.c:309
@ PMSIGNAL_ADVANCE_STATE_MACHINE
Definition: pmsignal.h:42
#define PROC_AFFECTS_ALL_HORIZONS
Definition: proc.h:62
void CreateAuxProcessResourceOwner(void)
Definition: resowner.c:982
PROC_HDR * ProcGlobal
Definition: proc.c:78
uint8 statusFlags
Definition: proc.h:242
int pgxactoff
Definition: proc.h:184
uint8 * statusFlags
Definition: proc.h:399
static void InitWalSenderSlot(void)
Definition: walsender.c:2869

Referenced by ProcessPendingWrites(), WalSndCheckTimeOut(), WalSndKeepaliveIfNecessary(), WalSndLoop(), WalSndUpdateProgress(), WalSndWaitForWal(), and WalSndWriteData().

◆ WalSndSignals()

void WalSndSignals ( void  )

Definition at line 3563 of file walsender.c.

3564{
3565 /* Set up signal handlers */
3567 pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3568 pqsignal(SIGTERM, die); /* request shutdown */
3569 /* SIGQUIT handler was already set up by InitPostmasterChild */
3570 InitializeTimeouts(); /* establishes SIGALRM handler */
3573 pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3574 * shutdown */
3575
3576 /* Reset some signals that are accepted by postmaster but not here */
3578}
void SignalHandlerForConfigReload(SIGNAL_ARGS)
Definition: interrupt.c:61
#define die(msg)
pqsigfunc pqsignal(int signo, pqsigfunc func)
void StatementCancelHandler(SIGNAL_ARGS)
Definition: postgres.c:3031
void procsignal_sigusr1_handler(SIGNAL_ARGS)
Definition: procsignal.c:671
void InitializeTimeouts(void)
Definition: timeout.c:470
static void WalSndLastCycleHandler(SIGNAL_ARGS)
Definition: walsender.c:3555
#define SIGCHLD
Definition: win32_port.h:178
#define SIGHUP
Definition: win32_port.h:168
#define SIG_DFL
Definition: win32_port.h:163
#define SIGPIPE
Definition: win32_port.h:173
#define SIGUSR1
Definition: win32_port.h:180
#define SIGUSR2
Definition: win32_port.h:181
#define SIG_IGN
Definition: win32_port.h:165

References die, InitializeTimeouts(), pqsignal(), procsignal_sigusr1_handler(), SIG_DFL, SIG_IGN, SIGCHLD, SIGHUP, SignalHandlerForConfigReload(), SIGPIPE, SIGUSR1, SIGUSR2, StatementCancelHandler(), and WalSndLastCycleHandler().

Referenced by PostgresMain().

◆ WalSndUpdateProgress()

static void WalSndUpdateProgress ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  skipped_xact 
)
static

Definition at line 1643 of file walsender.c.

1645{
1646 static TimestampTz sendTime = 0;
1648 bool pending_writes = false;
1649 bool end_xact = ctx->end_xact;
1650
1651 /*
1652 * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1653 * avoid flooding the lag tracker when we commit frequently.
1654 *
1655 * We don't have a mechanism to get the ack for any LSN other than end
1656 * xact LSN from the downstream. So, we track lag only for end of
1657 * transaction LSN.
1658 */
1659#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1660 if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1662 {
1663 LagTrackerWrite(lsn, now);
1664 sendTime = now;
1665 }
1666
1667 /*
1668 * When skipping empty transactions in synchronous replication, we send a
1669 * keepalive message to avoid delaying such transactions.
1670 *
1671 * It is okay to check sync_standbys_defined flag without lock here as in
1672 * the worst case we will just send an extra keepalive message when it is
1673 * really not required.
1674 */
1675 if (skipped_xact &&
1676 SyncRepRequested() &&
1677 ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1678 {
1679 WalSndKeepalive(false, lsn);
1680
1681 /* Try to flush pending output to the client */
1682 if (pq_flush_if_writable() != 0)
1684
1685 /* If we have pending write here, make sure it's actually flushed */
1686 if (pq_is_send_pending())
1687 pending_writes = true;
1688 }
1689
1690 /*
1691 * Process pending writes if any or try to send a keepalive if required.
1692 * We don't need to try sending keep alive messages at the transaction end
1693 * as that will be done at a later point in time. This is required only
1694 * for large transactions where we don't send any changes to the
1695 * downstream and the receiver can timeout due to that.
1696 */
1697 if (pending_writes || (!end_xact &&
1699 wal_sender_timeout / 2)))
1701}
bool TimestampDifferenceExceeds(TimestampTz start_time, TimestampTz stop_time, int msec)
Definition: timestamp.c:1780
#define SyncRepRequested()
Definition: syncrep.h:18
static void ProcessPendingWrites(void)
Definition: walsender.c:1589
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
Definition: walsender.c:4068

References LogicalDecodingContext::end_xact, GetCurrentTimestamp(), LagTrackerWrite(), last_reply_timestamp, now(), pq_flush_if_writable, pq_is_send_pending, ProcessPendingWrites(), SyncRepRequested, TimestampDifferenceExceeds(), TimestampTzPlusMilliseconds, wal_sender_timeout, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS, WalSndCtl, WalSndKeepalive(), and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ WalSndWait()

static void WalSndWait ( uint32  socket_events,
long  timeout,
uint32  wait_event 
)
static

Definition at line 3660 of file walsender.c.

3661{
3662 WaitEvent event;
3663
3664 ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3665
3666 /*
3667 * We use a condition variable to efficiently wake up walsenders in
3668 * WalSndWakeup().
3669 *
3670 * Every walsender prepares to sleep on a shared memory CV. Note that it
3671 * just prepares to sleep on the CV (i.e., adds itself to the CV's
3672 * waitlist), but does not actually wait on the CV (IOW, it never calls
3673 * ConditionVariableSleep()). It still uses WaitEventSetWait() for
3674 * waiting, because we also need to wait for socket events. The processes
3675 * (startup process, walreceiver etc.) wanting to wake up walsenders use
3676 * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
3677 * walsenders come out of WaitEventSetWait().
3678 *
3679 * This approach is simple and efficient because, one doesn't have to loop
3680 * through all the walsenders slots, with a spinlock acquisition and
3681 * release for every iteration, just to wake up only the waiting
3682 * walsenders. It makes WalSndWakeup() callers' life easy.
3683 *
3684 * XXX: A desirable future improvement would be to add support for CVs
3685 * into WaitEventSetWait().
3686 *
3687 * And, we use separate shared memory CVs for physical and logical
3688 * walsenders for selective wake ups, see WalSndWakeup() for more details.
3689 *
3690 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
3691 * until awakened by physical walsenders after the walreceiver confirms
3692 * the receipt of the LSN.
3693 */
3694 if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
3700
3701 if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3702 (event.events & WL_POSTMASTER_DEATH))
3703 {
3705 proc_exit(1);
3706 }
3707
3709}
void ConditionVariablePrepareToSleep(ConditionVariable *cv)
void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
Definition: latch.c:1043
int WaitEventSetWait(WaitEventSet *set, long timeout, WaitEvent *occurred_events, int nevents, uint32 wait_event_info)
Definition: latch.c:1418
#define WL_POSTMASTER_DEATH
Definition: latch.h:131
#define FeBeWaitSetSocketPos
Definition: libpq.h:63
WaitEventSet * FeBeWaitSet
Definition: pqcomm.c:166
uint32 events
Definition: latch.h:155

References ConditionVariableCancelSleep(), ConditionVariablePrepareToSleep(), WaitEvent::events, FeBeWaitSet, FeBeWaitSetSocketPos, WalSnd::kind, ModifyWaitEvent(), MyWalSnd, proc_exit(), REPLICATION_KIND_LOGICAL, REPLICATION_KIND_PHYSICAL, WaitEventSetWait(), WalSndCtlData::wal_confirm_rcv_cv, WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, WalSndCtl, and WL_POSTMASTER_DEATH.

Referenced by ProcessPendingWrites(), WalSndLoop(), and WalSndWaitForWal().

◆ WalSndWaitForWal()

static XLogRecPtr WalSndWaitForWal ( XLogRecPtr  loc)
static

Definition at line 1793 of file walsender.c.

1794{
1795 int wakeEvents;
1796 uint32 wait_event = 0;
1797 static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1798
1799 /*
1800 * Fast path to avoid acquiring the spinlock in case we already know we
1801 * have enough WAL available and all the standby servers have confirmed
1802 * receipt of WAL up to RecentFlushPtr. This is particularly interesting
1803 * if we're far behind.
1804 */
1805 if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
1806 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1807 return RecentFlushPtr;
1808
1809 /*
1810 * Within the loop, we wait for the necessary WALs to be flushed to disk
1811 * first, followed by waiting for standbys to catch up if there are enough
1812 * WALs (see NeedToWaitForWal()) or upon receiving the shutdown signal.
1813 */
1814 for (;;)
1815 {
1816 bool wait_for_standby_at_stop = false;
1817 long sleeptime;
1818
1819 /* Clear any already-pending wakeups */
1821
1823
1824 /* Process any requests or signals received recently */
1826 {
1827 ConfigReloadPending = false;
1830 }
1831
1832 /* Check for input from the client */
1834
1835 /*
1836 * If we're shutting down, trigger pending WAL to be written out,
1837 * otherwise we'd possibly end up waiting for WAL that never gets
1838 * written, because walwriter has shut down already.
1839 */
1840 if (got_STOPPING)
1842
1843 /*
1844 * To avoid the scenario where standbys need to catch up to a newer
1845 * WAL location in each iteration, we update our idea of the currently
1846 * flushed position only if we are not waiting for standbys to catch
1847 * up.
1848 */
1849 if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
1850 {
1851 if (!RecoveryInProgress())
1852 RecentFlushPtr = GetFlushRecPtr(NULL);
1853 else
1854 RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1855 }
1856
1857 /*
1858 * If postmaster asked us to stop and the standby slots have caught up
1859 * to the flushed position, don't wait anymore.
1860 *
1861 * It's important to do this check after the recomputation of
1862 * RecentFlushPtr, so we can send all remaining data before shutting
1863 * down.
1864 */
1865 if (got_STOPPING)
1866 {
1867 if (NeedToWaitForStandbys(RecentFlushPtr, &wait_event))
1868 wait_for_standby_at_stop = true;
1869 else
1870 break;
1871 }
1872
1873 /*
1874 * We only send regular messages to the client for full decoded
1875 * transactions, but a synchronous replication and walsender shutdown
1876 * possibly are waiting for a later location. So, before sleeping, we
1877 * send a ping containing the flush location. If the receiver is
1878 * otherwise idle, this keepalive will trigger a reply. Processing the
1879 * reply will update these MyWalSnd locations.
1880 */
1881 if (MyWalSnd->flush < sentPtr &&
1882 MyWalSnd->write < sentPtr &&
1885
1886 /*
1887 * Exit the loop if already caught up and doesn't need to wait for
1888 * standby slots.
1889 */
1890 if (!wait_for_standby_at_stop &&
1891 !NeedToWaitForWal(loc, RecentFlushPtr, &wait_event))
1892 break;
1893
1894 /*
1895 * Waiting for new WAL or waiting for standbys to catch up. Since we
1896 * need to wait, we're now caught up.
1897 */
1898 WalSndCaughtUp = true;
1899
1900 /*
1901 * Try to flush any pending output to the client.
1902 */
1903 if (pq_flush_if_writable() != 0)
1905
1906 /*
1907 * If we have received CopyDone from the client, sent CopyDone
1908 * ourselves, and the output buffer is empty, it's time to exit
1909 * streaming, so fail the current WAL fetch request.
1910 */
1913 break;
1914
1915 /* die if timeout was reached */
1917
1918 /* Send keepalive if the time has come */
1920
1921 /*
1922 * Sleep until something happens or we time out. Also wait for the
1923 * socket becoming writable, if there's still pending output.
1924 * Otherwise we might sit on sendable output data while waiting for
1925 * new WAL to be generated. (But if we have nothing to send, we don't
1926 * want to wake on socket-writable.)
1927 */
1929
1930 wakeEvents = WL_SOCKET_READABLE;
1931
1932 if (pq_is_send_pending())
1933 wakeEvents |= WL_SOCKET_WRITEABLE;
1934
1935 Assert(wait_event != 0);
1936
1937 WalSndWait(wakeEvents, sleeptime, wait_event);
1938 }
1939
1940 /* reactivate latch so WalSndLoop knows to continue */
1942 return RecentFlushPtr;
1943}
static bool NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn, uint32 *wait_event)
Definition: walsender.c:1765
bool XLogBackgroundFlush(void)
Definition: xlog.c:2990

References Assert, CHECK_FOR_INTERRUPTS, ConfigReloadPending, WalSnd::flush, GetCurrentTimestamp(), GetFlushRecPtr(), GetXLogReplayRecPtr(), got_STOPPING, InvalidXLogRecPtr, MyLatch, MyWalSnd, NeedToWaitForStandbys(), NeedToWaitForWal(), PGC_SIGHUP, pq_flush_if_writable, pq_is_send_pending, ProcessConfigFile(), ProcessRepliesIfAny(), RecoveryInProgress(), ResetLatch(), sentPtr, SetLatch(), streamingDoneReceiving, streamingDoneSending, SyncRepInitConfig(), waiting_for_ping_response, WalSndCaughtUp, WalSndCheckTimeOut(), WalSndComputeSleeptime(), WalSndKeepalive(), WalSndKeepaliveIfNecessary(), WalSndShutdown(), WalSndWait(), WL_SOCKET_READABLE, WL_SOCKET_WRITEABLE, WalSnd::write, XLogBackgroundFlush(), and XLogRecPtrIsInvalid.

Referenced by logical_read_xlog_page().

◆ WalSndWaitStopping()

void WalSndWaitStopping ( void  )

Definition at line 3744 of file walsender.c.

3745{
3746 for (;;)
3747 {
3748 int i;
3749 bool all_stopped = true;
3750
3751 for (i = 0; i < max_wal_senders; i++)
3752 {
3753 WalSnd *walsnd = &WalSndCtl->walsnds[i];
3754
3755 SpinLockAcquire(&walsnd->mutex);
3756
3757 if (walsnd->pid == 0)
3758 {
3759 SpinLockRelease(&walsnd->mutex);
3760 continue;
3761 }
3762
3763 if (walsnd->state != WALSNDSTATE_STOPPING)
3764 {
3765 all_stopped = false;
3766 SpinLockRelease(&walsnd->mutex);
3767 break;
3768 }
3769 SpinLockRelease(&walsnd->mutex);
3770 }
3771
3772 /* safe to leave if confirmation is done for all WAL senders */
3773 if (all_stopped)
3774 return;
3775
3776 pg_usleep(10000L); /* wait for 10 msec */
3777 }
3778}
void pg_usleep(long microsec)
Definition: signal.c:53

References i, max_wal_senders, WalSnd::mutex, pg_usleep(), WalSnd::pid, SpinLockAcquire, SpinLockRelease, WalSnd::state, WalSndCtl, WalSndCtlData::walsnds, and WALSNDSTATE_STOPPING.

Referenced by ShutdownXLOG().

◆ WalSndWakeup()

void WalSndWakeup ( bool  physical,
bool  logical 
)

Definition at line 3639 of file walsender.c.

3640{
3641 /*
3642 * Wake up all the walsenders waiting on WAL being flushed or replayed
3643 * respectively. Note that waiting walsender would have prepared to sleep
3644 * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
3645 * before actually waiting.
3646 */
3647 if (physical)
3649
3650 if (logical)
3652}

References ConditionVariableBroadcast(), WalSndCtlData::wal_flush_cv, WalSndCtlData::wal_replay_cv, and WalSndCtl.

Referenced by ApplyWalRecord(), KeepFileRestoredFromArchive(), StartupXLOG(), WalSndWakeupProcessRequests(), and XLogWalRcvFlush().

◆ WalSndWriteData()

static void WalSndWriteData ( LogicalDecodingContext ctx,
XLogRecPtr  lsn,
TransactionId  xid,
bool  last_write 
)
static

Definition at line 1547 of file walsender.c.

1549{
1551
1552 /*
1553 * Fill the send timestamp last, so that it is taken as late as possible.
1554 * This is somewhat ugly, but the protocol is set as it's already used for
1555 * several releases by streaming physical replication.
1556 */
1560 memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1561 tmpbuf.data, sizeof(int64));
1562
1563 /* output previously gathered data in a CopyData packet */
1564 pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1565
1567
1568 /* Try to flush pending output to the client */
1569 if (pq_flush_if_writable() != 0)
1571
1572 /* Try taking fast path unless we get too close to walsender timeout. */
1574 wal_sender_timeout / 2) &&
1576 {
1577 return;
1578 }
1579
1580 /* If we have pending write here, go to slow path */
1582}

References CHECK_FOR_INTERRUPTS, StringInfoData::data, GetCurrentTimestamp(), last_reply_timestamp, StringInfoData::len, now(), LogicalDecodingContext::out, pq_flush_if_writable, pq_is_send_pending, pq_putmessage_noblock, pq_sendint64(), ProcessPendingWrites(), resetStringInfo(), TimestampTzPlusMilliseconds, tmpbuf, wal_sender_timeout, and WalSndShutdown().

Referenced by CreateReplicationSlot(), and StartLogicalReplication().

◆ XLogSendLogical()

static void XLogSendLogical ( void  )
static

Definition at line 3349 of file walsender.c.

3350{
3351 XLogRecord *record;
3352 char *errm;
3353
3354 /*
3355 * We'll use the current flush point to determine whether we've caught up.
3356 * This variable is static in order to cache it across calls. Caching is
3357 * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3358 * spinlock.
3359 */
3360 static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3361
3362 /*
3363 * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3364 * true in WalSndWaitForWal, if we're actually waiting. We also set to
3365 * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3366 * didn't wait - i.e. when we're shutting down.
3367 */
3368 WalSndCaughtUp = false;
3369
3370 record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3371
3372 /* xlog record was invalid */
3373 if (errm != NULL)
3374 elog(ERROR, "could not find record while sending logically-decoded data: %s",
3375 errm);
3376
3377 if (record != NULL)
3378 {
3379 /*
3380 * Note the lack of any call to LagTrackerWrite() which is handled by
3381 * WalSndUpdateProgress which is called by output plugin through
3382 * logical decoding write api.
3383 */
3385
3387 }
3388
3389 /*
3390 * If first time through in this session, initialize flushPtr. Otherwise,
3391 * we only need to update flushPtr if EndRecPtr is past it.
3392 */
3393 if (flushPtr == InvalidXLogRecPtr ||
3395 {
3397 flushPtr = GetStandbyFlushRecPtr(NULL);
3398 else
3399 flushPtr = GetFlushRecPtr(NULL);
3400 }
3401
3402 /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3403 if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3404 WalSndCaughtUp = true;
3405
3406 /*
3407 * If we're caught up and have been requested to stop, have WalSndLoop()
3408 * terminate the connection in an orderly manner, after writing out all
3409 * the pending data.
3410 */
3412 got_SIGUSR2 = true;
3413
3414 /* Update shared memory status */
3415 {
3416 WalSnd *walsnd = MyWalSnd;
3417
3418 SpinLockAcquire(&walsnd->mutex);
3419 walsnd->sentPtr = sentPtr;
3420 SpinLockRelease(&walsnd->mutex);
3421 }
3422}
void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
Definition: decode.c:88
XLogRecPtr EndRecPtr
Definition: xlogreader.h:207
XLogRecord * XLogReadRecord(XLogReaderState *state, char **errormsg)
Definition: xlogreader.c:389

References am_cascading_walsender, elog, XLogReaderState::EndRecPtr, ERROR, GetFlushRecPtr(), GetStandbyFlushRecPtr(), got_SIGUSR2, got_STOPPING, InvalidXLogRecPtr, logical_decoding_ctx, LogicalDecodingProcessRecord(), WalSnd::mutex, MyWalSnd, LogicalDecodingContext::reader, sentPtr, WalSnd::sentPtr, SpinLockAcquire, SpinLockRelease, WalSndCaughtUp, and XLogReadRecord().

Referenced by StartLogicalReplication(), and WalSndLoop().

◆ XLogSendPhysical()

static void XLogSendPhysical ( void  )
static

Definition at line 3039 of file walsender.c.

3040{
3041 XLogRecPtr SendRqstPtr;
3042 XLogRecPtr startptr;
3043 XLogRecPtr endptr;
3044 Size nbytes;
3045 XLogSegNo segno;
3046 WALReadError errinfo;
3047 Size rbytes;
3048
3049 /* If requested switch the WAL sender to the stopping state. */
3050 if (got_STOPPING)
3052
3054 {
3055 WalSndCaughtUp = true;
3056 return;
3057 }
3058
3059 /* Figure out how far we can safely send the WAL. */
3061 {
3062 /*
3063 * Streaming an old timeline that's in this server's history, but is
3064 * not the one we're currently inserting or replaying. It can be
3065 * streamed up to the point where we switched off that timeline.
3066 */
3067 SendRqstPtr = sendTimeLineValidUpto;
3068 }
3069 else if (am_cascading_walsender)
3070 {
3071 TimeLineID SendRqstTLI;
3072
3073 /*
3074 * Streaming the latest timeline on a standby.
3075 *
3076 * Attempt to send all WAL that has already been replayed, so that we
3077 * know it's valid. If we're receiving WAL through streaming
3078 * replication, it's also OK to send any WAL that has been received
3079 * but not replayed.
3080 *
3081 * The timeline we're recovering from can change, or we can be
3082 * promoted. In either case, the current timeline becomes historic. We
3083 * need to detect that so that we don't try to stream past the point
3084 * where we switched to another timeline. We check for promotion or
3085 * timeline switch after calculating FlushPtr, to avoid a race
3086 * condition: if the timeline becomes historic just after we checked
3087 * that it was still current, it's still be OK to stream it up to the
3088 * FlushPtr that was calculated before it became historic.
3089 */
3090 bool becameHistoric = false;
3091
3092 SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
3093
3094 if (!RecoveryInProgress())
3095 {
3096 /* We have been promoted. */
3097 SendRqstTLI = GetWALInsertionTimeLine();
3098 am_cascading_walsender = false;
3099 becameHistoric = true;
3100 }
3101 else
3102 {
3103 /*
3104 * Still a cascading standby. But is the timeline we're sending
3105 * still the one recovery is recovering from?
3106 */
3107 if (sendTimeLine != SendRqstTLI)
3108 becameHistoric = true;
3109 }
3110
3111 if (becameHistoric)
3112 {
3113 /*
3114 * The timeline we were sending has become historic. Read the
3115 * timeline history file of the new timeline to see where exactly
3116 * we forked off from the timeline we were sending.
3117 */
3118 List *history;
3119
3120 history = readTimeLineHistory(SendRqstTLI);
3122
3124 list_free_deep(history);
3125
3127
3128 SendRqstPtr = sendTimeLineValidUpto;
3129 }
3130 }
3131 else
3132 {
3133 /*
3134 * Streaming the current timeline on a primary.
3135 *
3136 * Attempt to send all data that's already been written out and
3137 * fsync'd to disk. We cannot go further than what's been written out
3138 * given the current implementation of WALRead(). And in any case
3139 * it's unsafe to send WAL that is not securely down to disk on the
3140 * primary: if the primary subsequently crashes and restarts, standbys
3141 * must not have applied any WAL that got lost on the primary.
3142 */
3143 SendRqstPtr = GetFlushRecPtr(NULL);
3144 }
3145
3146 /*
3147 * Record the current system time as an approximation of the time at which
3148 * this WAL location was written for the purposes of lag tracking.
3149 *
3150 * In theory we could make XLogFlush() record a time in shmem whenever WAL
3151 * is flushed and we could get that time as well as the LSN when we call
3152 * GetFlushRecPtr() above (and likewise for the cascading standby
3153 * equivalent), but rather than putting any new code into the hot WAL path
3154 * it seems good enough to capture the time here. We should reach this
3155 * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
3156 * may take some time, we read the WAL flush pointer and take the time
3157 * very close to together here so that we'll get a later position if it is
3158 * still moving.
3159 *
3160 * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
3161 * this gives us a cheap approximation for the WAL flush time for this
3162 * LSN.
3163 *
3164 * Note that the LSN is not necessarily the LSN for the data contained in
3165 * the present message; it's the end of the WAL, which might be further
3166 * ahead. All the lag tracking machinery cares about is finding out when
3167 * that arbitrary LSN is eventually reported as written, flushed and
3168 * applied, so that it can measure the elapsed time.
3169 */
3170 LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
3171
3172 /*
3173 * If this is a historic timeline and we've reached the point where we
3174 * forked to the next timeline, stop streaming.
3175 *
3176 * Note: We might already have sent WAL > sendTimeLineValidUpto. The
3177 * startup process will normally replay all WAL that has been received
3178 * from the primary, before promoting, but if the WAL streaming is
3179 * terminated at a WAL page boundary, the valid portion of the timeline
3180 * might end in the middle of a WAL record. We might've already sent the
3181 * first half of that partial WAL record to the cascading standby, so that
3182 * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
3183 * replay the partial WAL record either, so it can still follow our
3184 * timeline switch.
3185 */
3187 {
3188 /* close the current file. */
3189 if (xlogreader->seg.ws_file >= 0)
3191
3192 /* Send CopyDone */
3193 pq_putmessage_noblock('c', NULL, 0);
3194 streamingDoneSending = true;
3195
3196 WalSndCaughtUp = true;
3197
3198 elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
3201 return;
3202 }
3203
3204 /* Do we have any work to do? */
3205 Assert(sentPtr <= SendRqstPtr);
3206 if (SendRqstPtr <= sentPtr)
3207 {
3208 WalSndCaughtUp = true;
3209 return;
3210 }
3211
3212 /*
3213 * Figure out how much to send in one message. If there's no more than
3214 * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
3215 * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
3216 *
3217 * The rounding is not only for performance reasons. Walreceiver relies on
3218 * the fact that we never split a WAL record across two messages. Since a
3219 * long WAL record is split at page boundary into continuation records,
3220 * page boundary is always a safe cut-off point. We also assume that
3221 * SendRqstPtr never points to the middle of a WAL record.
3222 */
3223 startptr = sentPtr;
3224 endptr = startptr;
3225 endptr += MAX_SEND_SIZE;
3226
3227 /* if we went beyond SendRqstPtr, back off */
3228 if (SendRqstPtr <= endptr)
3229 {
3230 endptr = SendRqstPtr;
3232 WalSndCaughtUp = false;
3233 else
3234 WalSndCaughtUp = true;
3235 }
3236 else
3237 {
3238 /* round down to page boundary. */
3239 endptr -= (endptr % XLOG_BLCKSZ);
3240 WalSndCaughtUp = false;
3241 }
3242
3243 nbytes = endptr - startptr;
3244 Assert(nbytes <= MAX_SEND_SIZE);
3245
3246 /*
3247 * OK to read and send the slice.
3248 */
3251
3252 pq_sendint64(&output_message, startptr); /* dataStart */
3253 pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3254 pq_sendint64(&output_message, 0); /* sendtime, filled in last */
3255
3256 /*
3257 * Read the log directly into the output buffer to avoid extra memcpy
3258 * calls.
3259 */
3261
3262retry:
3263 /* attempt to read WAL from WAL buffers first */
3265 startptr, nbytes, xlogreader->seg.ws_tli);
3266 output_message.len += rbytes;
3267 startptr += rbytes;
3268 nbytes -= rbytes;
3269
3270 /* now read the remaining WAL from WAL file */
3271 if (nbytes > 0 &&
3274 startptr,
3275 nbytes,
3276 xlogreader->seg.ws_tli, /* Pass the current TLI because
3277 * only WalSndSegmentOpen controls
3278 * whether new TLI is needed. */
3279 &errinfo))
3280 WALReadRaiseError(&errinfo);
3281
3282 /* See logical_read_xlog_page(). */
3283 XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
3285
3286 /*
3287 * During recovery, the currently-open WAL file might be replaced with the
3288 * file of the same name retrieved from archive. So we always need to
3289 * check what we read was valid after reading into the buffer. If it's
3290 * invalid, we try to open and read the file again.
3291 */
3293 {
3294 WalSnd *walsnd = MyWalSnd;
3295 bool reload;
3296
3297 SpinLockAcquire(&walsnd->mutex);
3298 reload = walsnd->needreload;
3299 walsnd->needreload = false;
3300 SpinLockRelease(&walsnd->mutex);
3301
3302 if (reload && xlogreader->seg.ws_file >= 0)
3303 {
3305
3306 goto retry;
3307 }
3308 }
3309
3310 output_message.len += nbytes;
3312
3313 /*
3314 * Fill the send timestamp last, so that it is taken as late as possible.
3315 */
3318 memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3319 tmpbuf.data, sizeof(int64));
3320
3322
3323 sentPtr = endptr;
3324
3325 /* Update shared memory status */
3326 {
3327 WalSnd *walsnd = MyWalSnd;
3328
3329 SpinLockAcquire(&walsnd->mutex);
3330 walsnd->sentPtr = sentPtr;
3331 SpinLockRelease(&walsnd->mutex);
3332 }
3333
3334 /* Report progress of XLOG streaming in PS display */
3336 {
3337 char activitymsg[50];
3338
3339 snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3341 set_ps_display(activitymsg);
3342 }
3343}
bool update_process_title
Definition: ps_status.c:31
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:286
TimeLineID ws_tli
Definition: xlogreader.h:49
WALSegmentContext segcxt
Definition: xlogreader.h:271
#define MAX_SEND_SIZE
Definition: walsender.c:106
Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count, TimeLineID tli)
Definition: xlog.c:1748

References am_cascading_walsender, Assert, CheckXLogRemoved(), StringInfoData::data, DEBUG1, elog, enlargeStringInfo(), GetCurrentTimestamp(), GetFlushRecPtr(), GetStandbyFlushRecPtr(), GetWALInsertionTimeLine(), got_STOPPING, LagTrackerWrite(), StringInfoData::len, list_free_deep(), LSN_FORMAT_ARGS, MAX_SEND_SIZE, WalSnd::mutex, MyWalSnd, WalSnd::needreload, output_message, pq_putmessage_noblock, pq_sendbyte(), pq_sendint64(), readTimeLineHistory(), RecoveryInProgress(), resetStringInfo(), XLogReaderState::seg, XLogReaderState::segcxt, sendTimeLine, sendTimeLineIsHistoric, sendTimeLineNextTLI, sendTimeLineValidUpto, sentPtr, WalSnd::sentPtr, set_ps_display(), snprintf, SpinLockAcquire, SpinLockRelease, streamingDoneSending, tliSwitchPoint(), tmpbuf, update_process_title, wal_segment_close(), WALRead(), WALReadFromBuffers(), WALReadRaiseError(), WalSndCaughtUp, WalSndSetState(), WALSNDSTATE_STOPPING, WALOpenSegment::ws_file, WALSegmentContext::ws_segsize, WALOpenSegment::ws_tli, XLByteToSeg, and xlogreader.

Referenced by StartReplication().

Variable Documentation

◆ am_cascading_walsender

◆ am_db_walsender

bool am_db_walsender = false

Definition at line 118 of file walsender.c.

Referenced by check_db(), ClientAuthentication(), InitPostgres(), and ProcessStartupPacket().

◆ am_walsender

◆ got_SIGUSR2

volatile sig_atomic_t got_SIGUSR2 = false
static

◆ got_STOPPING

◆ lag_tracker

LagTracker* lag_tracker
static

Definition at line 230 of file walsender.c.

Referenced by LagTrackerRead(), and LagTrackerWrite().

◆ last_processing

TimestampTz last_processing = 0
static

◆ last_reply_timestamp

◆ log_replication_commands

bool log_replication_commands = false

◆ logical_decoding_ctx

LogicalDecodingContext* logical_decoding_ctx = NULL
static

Definition at line 208 of file walsender.c.

Referenced by StartLogicalReplication(), and XLogSendLogical().

◆ max_wal_senders

◆ MyWalSnd

◆ output_message

StringInfoData output_message
static

Definition at line 168 of file walsender.c.

Referenced by exec_replication_command(), WalSndKeepalive(), and XLogSendPhysical().

◆ replication_active

volatile sig_atomic_t replication_active = false
static

◆ reply_message

◆ sendTimeLine

TimeLineID sendTimeLine = 0
static

◆ sendTimeLineIsHistoric

bool sendTimeLineIsHistoric = false
static

◆ sendTimeLineNextTLI

TimeLineID sendTimeLineNextTLI = 0
static

◆ sendTimeLineValidUpto

XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr
static

◆ sentPtr

◆ streamingDoneReceiving

bool streamingDoneReceiving
static

Definition at line 191 of file walsender.c.

Referenced by ProcessRepliesIfAny(), StartReplication(), WalSndLoop(), and WalSndWaitForWal().

◆ streamingDoneSending

bool streamingDoneSending
static

◆ tmpbuf

◆ uploaded_manifest

IncrementalBackupInfo* uploaded_manifest = NULL
static

Definition at line 147 of file walsender.c.

Referenced by exec_replication_command(), and UploadManifest().

◆ uploaded_manifest_mcxt

MemoryContext uploaded_manifest_mcxt = NULL
static

Definition at line 148 of file walsender.c.

Referenced by UploadManifest().

◆ waiting_for_ping_response

bool waiting_for_ping_response = false
static

◆ wake_wal_senders

bool wake_wal_senders = false

Definition at line 130 of file walsender.c.

Referenced by WalSndWakeupProcessRequests().

◆ wal_sender_timeout

int wal_sender_timeout = 60 * 1000

◆ WalSndCaughtUp

bool WalSndCaughtUp = false
static

◆ WalSndCtl

◆ xlogreader