PostgreSQL Source Code git master
Loading...
Searching...
No Matches
slotfuncs.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
#include "funcapi.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "storage/proc.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/pg_lsn.h"
Include dependency graph for slotfuncs.c:

Go to the source code of this file.

Macros

#define PG_GET_REPLICATION_SLOTS_COLS   21
 

Functions

static void create_physical_replication_slot (char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
 
Datum pg_create_physical_replication_slot (PG_FUNCTION_ARGS)
 
static void create_logical_replication_slot (char *name, char *plugin, bool temporary, bool two_phase, bool failover, XLogRecPtr restart_lsn, bool find_startpoint)
 
Datum pg_create_logical_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_drop_replication_slot (PG_FUNCTION_ARGS)
 
Datum pg_get_replication_slots (PG_FUNCTION_ARGS)
 
static XLogRecPtr pg_physical_replication_slot_advance (XLogRecPtr moveto)
 
static XLogRecPtr pg_logical_replication_slot_advance (XLogRecPtr moveto)
 
Datum pg_replication_slot_advance (PG_FUNCTION_ARGS)
 
static Datum copy_replication_slot (FunctionCallInfo fcinfo, bool logical_slot)
 
Datum pg_copy_logical_replication_slot_a (PG_FUNCTION_ARGS)
 
Datum pg_copy_logical_replication_slot_b (PG_FUNCTION_ARGS)
 
Datum pg_copy_logical_replication_slot_c (PG_FUNCTION_ARGS)
 
Datum pg_copy_physical_replication_slot_a (PG_FUNCTION_ARGS)
 
Datum pg_copy_physical_replication_slot_b (PG_FUNCTION_ARGS)
 
Datum pg_sync_replication_slots (PG_FUNCTION_ARGS)
 

Variables

static const charSlotSyncSkipReasonNames []
 

Macro Definition Documentation

◆ PG_GET_REPLICATION_SLOTS_COLS

#define PG_GET_REPLICATION_SLOTS_COLS   21

Function Documentation

◆ copy_replication_slot()

static Datum copy_replication_slot ( FunctionCallInfo  fcinfo,
bool  logical_slot 
)
static

Definition at line 629 of file slotfuncs.c.

630{
633 ReplicationSlot *src = NULL;
637 bool src_islogical;
638 bool temporary;
639 char *plugin;
640 Datum values[2];
641 bool nulls[2];
643 TupleDesc tupdesc;
644 HeapTuple tuple;
645
646 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
647 elog(ERROR, "return type must be a row type");
648
650
651 if (logical_slot)
653 else
655
657
658 /*
659 * We need to prevent the source slot's reserved WAL from being removed,
660 * but we don't want to lock that slot for very long, and it can advance
661 * in the meantime. So obtain the source slot's data, and create a new
662 * slot using its restart_lsn. Afterwards we lock the source slot again
663 * and verify that the data we copied (name, type) has not changed
664 * incompatibly. No inconvenient WAL removal can occur once the new slot
665 * is created -- but since WAL removal could have occurred before we
666 * managed to create the new slot, we advance the new slot's restart_lsn
667 * to the source slot's updated restart_lsn the second time we lock it.
668 */
670 {
672
673 if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
674 {
675 /* Copy the slot contents while holding spinlock */
679 src = s;
680 break;
681 }
682 }
683
685
686 if (src == NULL)
689 errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
690
692 src_restart_lsn = first_slot_contents.data.restart_lsn;
693 temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
695
696 /* Check type of replication slot */
701 errmsg("cannot copy physical replication slot \"%s\" as a logical replication slot",
702 NameStr(*src_name)) :
703 errmsg("cannot copy logical replication slot \"%s\" as a physical replication slot",
704 NameStr(*src_name))));
705
706 /* Copying non-reserved slot doesn't make sense */
710 errmsg("cannot copy a replication slot that doesn't reserve WAL")));
711
712 /* Cannot copy an invalidated replication slot */
713 if (first_slot_contents.data.invalidated != RS_INVAL_NONE)
716 errmsg("cannot copy invalidated replication slot \"%s\"",
717 NameStr(*src_name)));
718
719 /* Overwrite params from optional arguments */
720 if (PG_NARGS() >= 3)
721 temporary = PG_GETARG_BOOL(2);
722 if (PG_NARGS() >= 4)
723 {
726 }
727
728 /* Create new slot and acquire it */
729 if (logical_slot)
730 {
731 /*
732 * We must not try to read WAL, since we haven't reserved it yet --
733 * hence pass find_startpoint false. confirmed_flush will be set
734 * below, by copying from the source slot.
735 *
736 * We don't copy the failover option to prevent potential issues with
737 * slot synchronization. For instance, if a slot was synchronized to
738 * the standby, then dropped on the primary, and immediately recreated
739 * by copying from another existing slot with much earlier restart_lsn
740 * and confirmed_flush_lsn, the slot synchronization would only
741 * observe the LSN of the same slot moving backward. As slot
742 * synchronization does not copy the restart_lsn and
743 * confirmed_flush_lsn backward (see update_local_synced_slot() for
744 * details), if a failover happens before the primary's slot catches
745 * up, logical replication cannot continue using the synchronized slot
746 * on the promoted standby because the slot retains the restart_lsn
747 * and confirmed_flush_lsn that are much later than expected.
748 */
750 plugin,
751 temporary,
752 false,
753 false,
755 false);
756 }
757 else
759 true,
760 temporary,
762
763 /*
764 * Update the destination slot to current values of the source slot;
765 * recheck that the source slot is still the one we saw previously.
766 */
767 {
774 bool copy_islogical;
775 char *copy_name;
776
777 /* Copy data of source slot again */
778 SpinLockAcquire(&src->mutex);
780 SpinLockRelease(&src->mutex);
781
783 copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
784
786 copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
787 copy_restart_lsn = second_slot_contents.data.restart_lsn;
788 copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
789
790 /* for existence check */
793
794 /*
795 * Check if the source slot still exists and is valid. We regard it as
796 * invalid if the type of replication slot or name has been changed,
797 * or the restart_lsn either is invalid or has gone backward. (The
798 * restart_lsn could go backwards if the source slot is dropped and
799 * copied from an older slot during installation.)
800 *
801 * Since erroring out will release and drop the destination slot we
802 * don't need to release it here.
803 */
808 (errmsg("could not copy replication slot \"%s\"",
809 NameStr(*src_name)),
810 errdetail("The source replication slot was modified incompatibly during the copy operation.")));
811
812 /* The source slot must have a consistent snapshot */
816 errmsg("cannot copy unfinished logical replication slot \"%s\"",
817 NameStr(*src_name)),
818 errhint("Retry when the source replication slot's confirmed_flush_lsn is valid.")));
819
820 /*
821 * Copying an invalid slot doesn't make sense. Note that the source
822 * slot can become invalid after we create the new slot and copy the
823 * data of source slot. This is possible because the operations in
824 * InvalidateObsoleteReplicationSlots() are not serialized with this
825 * function. Even though we can't detect such a case here, the copied
826 * slot will become invalid in the next checkpoint cycle.
827 */
828 if (second_slot_contents.data.invalidated != RS_INVAL_NONE)
830 errmsg("cannot copy replication slot \"%s\"",
831 NameStr(*src_name)),
832 errdetail("The source replication slot was invalidated during the copy operation."));
833
834 /* Install copied values again */
838
844
849
850#ifdef USE_ASSERT_CHECKING
851 /* Check that the restart_lsn is available */
852 {
853 XLogSegNo segno;
854
857 }
858#endif
859 }
860
861 /* target slot fully created, mark as persistent if needed */
862 if (logical_slot && !temporary)
864
865 /* All done. Set up the return values */
867 nulls[0] = false;
869 {
871 nulls[1] = false;
872 }
873 else
874 nulls[1] = true;
875
876 tuple = heap_form_tuple(tupdesc, values, nulls);
877 result = HeapTupleGetDatum(tuple);
878
880
882}
static Datum values[MAXATTR]
Definition bootstrap.c:190
#define NameStr(name)
Definition c.h:835
#define Assert(condition)
Definition c.h:943
uint32 TransactionId
Definition c.h:736
uint32 result
int errcode(int sqlerrcode)
Definition elog.c:875
int errhint(const char *fmt,...) pg_attribute_printf(1
int errdetail(const char *fmt,...) pg_attribute_printf(1
#define ERROR
Definition elog.h:40
#define elog(elevel,...)
Definition elog.h:228
#define ereport(elevel,...)
Definition elog.h:152
#define PG_NARGS()
Definition fmgr.h:203
#define PG_GETARG_NAME(n)
Definition fmgr.h:279
#define PG_GETARG_BOOL(n)
Definition fmgr.h:274
#define PG_RETURN_DATUM(x)
Definition fmgr.h:354
TypeFuncClass get_call_result_type(FunctionCallInfo fcinfo, Oid *resultTypeId, TupleDesc *resultTupleDesc)
Definition funcapi.c:276
@ TYPEFUNC_COMPOSITE
Definition funcapi.h:149
static Datum HeapTupleGetDatum(const HeapTupleData *tuple)
Definition funcapi.h:230
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition heaptuple.c:1025
int i
Definition isn.c:77
void CheckLogicalDecodingRequirements(bool repack)
Definition logical.c:111
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition lwlock.c:1150
void LWLockRelease(LWLock *lock)
Definition lwlock.c:1767
@ LW_SHARED
Definition lwlock.h:105
static char * errmsg
static Datum LSNGetDatum(XLogRecPtr X)
Definition pg_lsn.h:31
static const char * plugin
static Datum NameGetDatum(const NameData *X)
Definition postgres.h:393
uint64_t Datum
Definition postgres.h:70
static int fb(int x)
void ReplicationSlotMarkDirty(void)
Definition slot.c:1184
void ReplicationSlotsComputeRequiredXmin(bool already_locked)
Definition slot.c:1226
void ReplicationSlotPersist(void)
Definition slot.c:1201
ReplicationSlot * MyReplicationSlot
Definition slot.c:158
void ReplicationSlotSave(void)
Definition slot.c:1166
void CheckSlotPermissions(void)
Definition slot.c:1694
void ReplicationSlotRelease(void)
Definition slot.c:769
int max_replication_slots
Definition slot.c:161
ReplicationSlotCtlData * ReplicationSlotCtl
Definition slot.c:147
void ReplicationSlotsComputeRequiredLSN(void)
Definition slot.c:1308
int max_repack_replication_slots
Definition slot.c:163
void CheckSlotRequirements(bool repack)
Definition slot.c:1665
@ RS_TEMPORARY
Definition slot.h:47
@ RS_INVAL_NONE
Definition slot.h:60
#define SlotIsLogical(slot)
Definition slot.h:288
static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, bool failover, XLogRecPtr restart_lsn, bool find_startpoint)
Definition slotfuncs.c:129
static void create_physical_replication_slot(char *name, bool immediately_reserve, bool temporary, XLogRecPtr restart_lsn)
Definition slotfuncs.c:48
static void SpinLockRelease(volatile slock_t *lock)
Definition spin.h:62
static void SpinLockAcquire(volatile slock_t *lock)
Definition spin.h:56
ReplicationSlot replication_slots[1]
Definition slot.h:299
TransactionId catalog_xmin
Definition slot.h:122
TransactionId effective_catalog_xmin
Definition slot.h:210
slock_t mutex
Definition slot.h:183
bool in_use
Definition slot.h:186
TransactionId effective_xmin
Definition slot.h:209
ReplicationSlotPersistentData data
Definition slot.h:213
Definition c.h:830
XLogSegNo XLogGetLastRemovedSegno(void)
Definition xlog.c:3813
int wal_segment_size
Definition xlog.c:150
#define XLByteToSeg(xlrp, logSegNo, wal_segsz_bytes)
#define XLogRecPtrIsValid(r)
Definition xlogdefs.h:29
uint64 XLogRecPtr
Definition xlogdefs.h:21
uint64 XLogSegNo
Definition xlogdefs.h:52

References Assert, ReplicationSlotPersistentData::catalog_xmin, CheckLogicalDecodingRequirements(), CheckSlotPermissions(), CheckSlotRequirements(), ReplicationSlotPersistentData::confirmed_flush, create_logical_replication_slot(), create_physical_replication_slot(), ReplicationSlot::data, ReplicationSlot::effective_catalog_xmin, ReplicationSlot::effective_xmin, elog, ereport, errcode(), errdetail(), errhint(), errmsg, ERROR, fb(), get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum(), i, ReplicationSlot::in_use, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), max_repack_replication_slots, max_replication_slots, ReplicationSlot::mutex, MyReplicationSlot, ReplicationSlotPersistentData::name, NameGetDatum(), NameStr, PG_GETARG_BOOL, PG_GETARG_NAME, PG_NARGS, PG_RETURN_DATUM, plugin, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotMarkDirty(), ReplicationSlotPersist(), ReplicationSlotRelease(), ReplicationSlotSave(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotPersistentData::restart_lsn, result, RS_INVAL_NONE, RS_TEMPORARY, SlotIsLogical, SpinLockAcquire(), SpinLockRelease(), TYPEFUNC_COMPOSITE, values, wal_segment_size, XLByteToSeg, XLogGetLastRemovedSegno(), XLogRecPtrIsValid, and ReplicationSlotPersistentData::xmin.

Referenced by pg_copy_logical_replication_slot_a(), pg_copy_logical_replication_slot_b(), pg_copy_logical_replication_slot_c(), pg_copy_physical_replication_slot_a(), and pg_copy_physical_replication_slot_b().

◆ create_logical_replication_slot()

static void create_logical_replication_slot ( char name,
char plugin,
bool  temporary,
bool  two_phase,
bool  failover,
XLogRecPtr  restart_lsn,
bool  find_startpoint 
)
static

Definition at line 129 of file slotfuncs.c.

134{
136
138
139 /*
140 * Acquire a logical decoding slot, this will check for conflicting names.
141 * Initially create persistent slot as ephemeral - that allows us to
142 * nicely handle errors during initialization because it'll get dropped if
143 * this transaction fails. We'll make it persistent at the end. Temporary
144 * slots can be created as temporary from beginning as they get dropped on
145 * error as well.
146 */
148 temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
150
151 /*
152 * Ensure the logical decoding is enabled before initializing the logical
153 * decoding context.
154 */
157
158 /*
159 * Create logical decoding context to find start point or, if we don't
160 * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
161 *
162 * Note: when !find_startpoint this is still important, because it's at
163 * this point that the output plugin is validated.
164 */
166 false, /* just catalogs is OK */
167 false, /* not repack */
168 restart_lsn,
169 XL_ROUTINE(.page_read = read_local_xlog_page,
170 .segment_open = wal_segment_open,
171 .segment_close = wal_segment_close),
172 NULL, NULL, NULL);
173
174 /*
175 * If caller needs us to determine the decoding start point, do so now.
176 * This might take a while.
177 */
178 if (find_startpoint)
180
181 /* don't need the decoding context anymore */
183}
#define false
void FreeDecodingContext(LogicalDecodingContext *ctx)
Definition logical.c:673
void DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
Definition logical.c:629
LogicalDecodingContext * CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, bool for_repack, XLogRecPtr restart_lsn, XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress)
Definition logical.c:325
bool IsLogicalDecodingEnabled(void)
Definition logicalctl.c:202
void EnsureLogicalDecodingEnabled(void)
Definition logicalctl.c:303
#define NIL
Definition pg_list.h:68
static bool two_phase
static bool failover
void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool repack, bool failover, bool synced)
Definition slot.c:378
@ RS_EPHEMERAL
Definition slot.h:46
const char * name
#define XL_ROUTINE(...)
Definition xlogreader.h:117
void wal_segment_close(XLogReaderState *state)
Definition xlogutils.c:831
void wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p)
Definition xlogutils.c:806
int read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page)
Definition xlogutils.c:845

References Assert, CreateInitDecodingContext(), DecodingContextFindStartpoint(), EnsureLogicalDecodingEnabled(), failover, fb(), FreeDecodingContext(), IsLogicalDecodingEnabled(), MyReplicationSlot, name, NIL, plugin, read_local_xlog_page(), ReplicationSlotCreate(), RS_EPHEMERAL, RS_TEMPORARY, two_phase, wal_segment_close(), wal_segment_open(), and XL_ROUTINE.

Referenced by copy_replication_slot(), and pg_create_logical_replication_slot().

◆ create_physical_replication_slot()

static void create_physical_replication_slot ( char name,
bool  immediately_reserve,
bool  temporary,
XLogRecPtr  restart_lsn 
)
static

Definition at line 48 of file slotfuncs.c.

50{
52
53 /* acquire replication slot, this will check for conflicting names */
55 temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
56 false, false, false);
57
59 {
60 /* Reserve WAL as the user asked for it */
61 if (!XLogRecPtrIsValid(restart_lsn))
63 else
64 MyReplicationSlot->data.restart_lsn = restart_lsn;
65
66 /* Write this slot to disk */
69 }
70}
void ReplicationSlotReserveWal(void)
Definition slot.c:1711
@ RS_PERSISTENT
Definition slot.h:45

References Assert, ReplicationSlot::data, fb(), MyReplicationSlot, name, ReplicationSlotCreate(), ReplicationSlotMarkDirty(), ReplicationSlotReserveWal(), ReplicationSlotSave(), ReplicationSlotPersistentData::restart_lsn, RS_PERSISTENT, RS_TEMPORARY, and XLogRecPtrIsValid.

Referenced by copy_replication_slot(), and pg_create_physical_replication_slot().

◆ pg_copy_logical_replication_slot_a()

Datum pg_copy_logical_replication_slot_a ( PG_FUNCTION_ARGS  )

Definition at line 886 of file slotfuncs.c.

887{
888 return copy_replication_slot(fcinfo, true);
889}
static Datum copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Definition slotfuncs.c:629

References copy_replication_slot().

◆ pg_copy_logical_replication_slot_b()

Datum pg_copy_logical_replication_slot_b ( PG_FUNCTION_ARGS  )

Definition at line 892 of file slotfuncs.c.

893{
894 return copy_replication_slot(fcinfo, true);
895}

References copy_replication_slot().

◆ pg_copy_logical_replication_slot_c()

Datum pg_copy_logical_replication_slot_c ( PG_FUNCTION_ARGS  )

Definition at line 898 of file slotfuncs.c.

899{
900 return copy_replication_slot(fcinfo, true);
901}

References copy_replication_slot().

◆ pg_copy_physical_replication_slot_a()

Datum pg_copy_physical_replication_slot_a ( PG_FUNCTION_ARGS  )

Definition at line 904 of file slotfuncs.c.

905{
906 return copy_replication_slot(fcinfo, false);
907}

References copy_replication_slot().

◆ pg_copy_physical_replication_slot_b()

Datum pg_copy_physical_replication_slot_b ( PG_FUNCTION_ARGS  )

Definition at line 910 of file slotfuncs.c.

911{
912 return copy_replication_slot(fcinfo, false);
913}

References copy_replication_slot().

◆ pg_create_logical_replication_slot()

Datum pg_create_logical_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 189 of file slotfuncs.c.

190{
193 bool temporary = PG_GETARG_BOOL(2);
194 bool two_phase = PG_GETARG_BOOL(3);
195 bool failover = PG_GETARG_BOOL(4);
197 TupleDesc tupdesc;
198 HeapTuple tuple;
199 Datum values[2];
200 bool nulls[2];
201
202 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
203 elog(ERROR, "return type must be a row type");
204
206
208
210 NameStr(*plugin),
211 temporary,
212 two_phase,
213 failover,
215 true);
216
219
220 memset(nulls, 0, sizeof(nulls));
221
222 tuple = heap_form_tuple(tupdesc, values, nulls);
223 result = HeapTupleGetDatum(tuple);
224
225 /* ok, slot is now fully created, mark it as persistent if needed */
226 if (!temporary)
229
231}
#define InvalidXLogRecPtr
Definition xlogdefs.h:28

References CheckLogicalDecodingRequirements(), CheckSlotPermissions(), ReplicationSlotPersistentData::confirmed_flush, create_logical_replication_slot(), ReplicationSlot::data, elog, ERROR, failover, fb(), get_call_result_type(), heap_form_tuple(), HeapTupleGetDatum(), InvalidXLogRecPtr, LSNGetDatum(), MyReplicationSlot, name, ReplicationSlotPersistentData::name, NameGetDatum(), NameStr, PG_GETARG_BOOL, PG_GETARG_NAME, PG_RETURN_DATUM, plugin, ReplicationSlotPersist(), ReplicationSlotRelease(), result, two_phase, TYPEFUNC_COMPOSITE, and values.

◆ pg_create_physical_replication_slot()

Datum pg_create_physical_replication_slot ( PG_FUNCTION_ARGS  )

◆ pg_drop_replication_slot()

Datum pg_drop_replication_slot ( PG_FUNCTION_ARGS  )

Definition at line 238 of file slotfuncs.c.

239{
241
243
245
247
249}
#define PG_RETURN_VOID()
Definition fmgr.h:350
void ReplicationSlotDrop(const char *name, bool nowait)
Definition slot.c:920

References CheckSlotPermissions(), CheckSlotRequirements(), name, NameStr, PG_GETARG_NAME, PG_RETURN_VOID, and ReplicationSlotDrop().

◆ pg_get_replication_slots()

Datum pg_get_replication_slots ( PG_FUNCTION_ARGS  )

Definition at line 256 of file slotfuncs.c.

257{
258#define PG_GET_REPLICATION_SLOTS_COLS 21
259 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
261 int slotno;
262
263 /*
264 * We don't require any special permission to see this function's data
265 * because nothing should be sensitive. The most critical being the slot
266 * name, which shouldn't contain anything particularly sensitive.
267 */
268
269 InitMaterializedSRF(fcinfo, 0);
270
272
275 {
281 int i;
283
284 if (!slot->in_use)
285 continue;
286
287 /* Copy slot contents while holding spinlock, then examine at leisure */
288 SpinLockAcquire(&slot->mutex);
289 slot_contents = *slot;
290 SpinLockRelease(&slot->mutex);
291
292 memset(values, 0, sizeof(values));
293 memset(nulls, 0, sizeof(nulls));
294
295 i = 0;
296 values[i++] = NameGetDatum(&slot_contents.data.name);
297
298 if (slot_contents.data.database == InvalidOid)
299 nulls[i++] = true;
300 else
301 values[i++] = NameGetDatum(&slot_contents.data.plugin);
302
303 if (slot_contents.data.database == InvalidOid)
304 values[i++] = CStringGetTextDatum("physical");
305 else
306 values[i++] = CStringGetTextDatum("logical");
307
308 if (slot_contents.data.database == InvalidOid)
309 nulls[i++] = true;
310 else
311 values[i++] = ObjectIdGetDatum(slot_contents.data.database);
312
313 values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
315
316 if (slot_contents.active_proc != INVALID_PROC_NUMBER)
317 values[i++] = Int32GetDatum(GetPGProcByNumber(slot_contents.active_proc)->pid);
318 else
319 nulls[i++] = true;
320
321 if (slot_contents.data.xmin != InvalidTransactionId)
323 else
324 nulls[i++] = true;
325
326 if (slot_contents.data.catalog_xmin != InvalidTransactionId)
327 values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
328 else
329 nulls[i++] = true;
330
331 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
332 values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
333 else
334 nulls[i++] = true;
335
336 if (XLogRecPtrIsValid(slot_contents.data.confirmed_flush))
337 values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
338 else
339 nulls[i++] = true;
340
341 /*
342 * If the slot has not been invalidated, test availability from
343 * restart_lsn.
344 */
345 if (slot_contents.data.invalidated != RS_INVAL_NONE)
347 else
348 walstate = GetWALAvailability(slot_contents.data.restart_lsn);
349
350 switch (walstate)
351 {
353 nulls[i++] = true;
354 break;
355
357 values[i++] = CStringGetTextDatum("reserved");
358 break;
359
361 values[i++] = CStringGetTextDatum("extended");
362 break;
363
365 values[i++] = CStringGetTextDatum("unreserved");
366 break;
367
368 case WALAVAIL_REMOVED:
369
370 /*
371 * If we read the restart_lsn long enough ago, maybe that file
372 * has been removed by now. However, the walsender could have
373 * moved forward enough that it jumped to another file after
374 * we looked. If checkpointer signalled the process to
375 * termination, then it's definitely lost; but if a process is
376 * still alive, then "unreserved" seems more appropriate.
377 *
378 * If we do change it, save the state for safe_wal_size below.
379 */
380 if (XLogRecPtrIsValid(slot_contents.data.restart_lsn))
381 {
382 ProcNumber procno;
383
384 SpinLockAcquire(&slot->mutex);
385 procno = slot->active_proc;
386 slot_contents.data.restart_lsn = slot->data.restart_lsn;
387 SpinLockRelease(&slot->mutex);
388 if (procno != INVALID_PROC_NUMBER)
389 {
390 values[i++] = CStringGetTextDatum("unreserved");
392 break;
393 }
394 }
395 values[i++] = CStringGetTextDatum("lost");
396 break;
397 }
398
399 /*
400 * safe_wal_size is only computed for slots that have not been lost,
401 * and only if there's a configured maximum size.
402 */
404 nulls[i++] = true;
405 else
406 {
412
414
415 /* determine how many segments can be kept by slots */
417 /* ditto for wal_keep_size */
419
420 /* if currpos reaches failLSN, we lose our segment */
423
425 }
426
427 values[i++] = BoolGetDatum(slot_contents.data.two_phase);
428
429 if (slot_contents.data.two_phase &&
430 XLogRecPtrIsValid(slot_contents.data.two_phase_at))
431 values[i++] = LSNGetDatum(slot_contents.data.two_phase_at);
432 else
433 nulls[i++] = true;
434
435 if (slot_contents.inactive_since > 0)
436 values[i++] = TimestampTzGetDatum(slot_contents.inactive_since);
437 else
438 nulls[i++] = true;
439
440 cause = slot_contents.data.invalidated;
441
443 nulls[i++] = true;
444 else
445 {
446 /*
447 * rows_removed and wal_level_insufficient are the only two
448 * reasons for the logical slot's conflict with recovery.
449 */
450 if (cause == RS_INVAL_HORIZON ||
451 cause == RS_INVAL_WAL_LEVEL)
452 values[i++] = BoolGetDatum(true);
453 else
454 values[i++] = BoolGetDatum(false);
455 }
456
457 if (cause == RS_INVAL_NONE)
458 nulls[i++] = true;
459 else
461
462 values[i++] = BoolGetDatum(slot_contents.data.failover);
463
464 values[i++] = BoolGetDatum(slot_contents.data.synced);
465
466 if (slot_contents.slotsync_skip_reason == SS_SKIP_NONE)
467 nulls[i++] = true;
468 else
470
472
473 tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
474 values, nulls);
475 }
476
478
479 return (Datum) 0;
480}
#define CStringGetTextDatum(s)
Definition builtins.h:98
#define Max(x, y)
Definition c.h:1085
uint64_t uint64
Definition c.h:625
void InitMaterializedSRF(FunctionCallInfo fcinfo, uint32 flags)
Definition funcapi.c:76
static Datum Int64GetDatum(int64 X)
Definition postgres.h:413
static Datum TransactionIdGetDatum(TransactionId X)
Definition postgres.h:292
static Datum BoolGetDatum(bool X)
Definition postgres.h:112
static Datum ObjectIdGetDatum(Oid X)
Definition postgres.h:252
static Datum Int32GetDatum(int32 X)
Definition postgres.h:212
#define InvalidOid
#define GetPGProcByNumber(n)
Definition proc.h:504
#define INVALID_PROC_NUMBER
Definition procnumber.h:26
int ProcNumber
Definition procnumber.h:24
const char * GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
Definition slot.c:2956
#define SlotIsPhysical(slot)
Definition slot.h:287
ReplicationSlotInvalidationCause
Definition slot.h:59
@ RS_INVAL_HORIZON
Definition slot.h:64
@ RS_INVAL_WAL_LEVEL
Definition slot.h:66
@ SS_SKIP_NONE
Definition slot.h:82
#define PG_GET_REPLICATION_SLOTS_COLS
static const char * SlotSyncSkipReasonNames[]
Definition slotfuncs.c:31
ProcNumber active_proc
Definition slot.h:192
#define InvalidTransactionId
Definition transam.h:31
void tuplestore_putvalues(Tuplestorestate *state, TupleDesc tdesc, const Datum *values, const bool *isnull)
Definition tuplestore.c:785
static Datum TimestampTzGetDatum(TimestampTz X)
Definition timestamp.h:52
int wal_keep_size_mb
Definition xlog.c:123
WALAvailability GetWALAvailability(XLogRecPtr targetLSN)
Definition xlog.c:8414
int max_slot_wal_keep_size_mb
Definition xlog.c:142
XLogRecPtr GetXLogWriteRecPtr(void)
Definition xlog.c:10140
WALAvailability
Definition xlog.h:200
@ WALAVAIL_REMOVED
Definition xlog.h:206
@ WALAVAIL_RESERVED
Definition xlog.h:202
@ WALAVAIL_UNRESERVED
Definition xlog.h:205
@ WALAVAIL_EXTENDED
Definition xlog.h:203
@ WALAVAIL_INVALID_LSN
Definition xlog.h:201
#define XLogSegNoOffsetToRecPtr(segno, offset, wal_segsz_bytes, dest)
#define XLogMBVarToSegs(mbvar, wal_segsz_bytes)

References ReplicationSlot::active_proc, Assert, BoolGetDatum(), CStringGetTextDatum, ReplicationSlot::data, fb(), GetPGProcByNumber, GetSlotInvalidationCauseName(), GetWALAvailability(), GetXLogWriteRecPtr(), i, ReplicationSlot::in_use, InitMaterializedSRF(), Int32GetDatum(), Int64GetDatum(), INVALID_PROC_NUMBER, InvalidOid, InvalidTransactionId, LSNGetDatum(), LW_SHARED, LWLockAcquire(), LWLockRelease(), Max, max_repack_replication_slots, max_replication_slots, max_slot_wal_keep_size_mb, ReplicationSlot::mutex, NameGetDatum(), ObjectIdGetDatum(), PG_GET_REPLICATION_SLOTS_COLS, ReplicationSlotCtlData::replication_slots, ReplicationSlotCtl, ReplicationSlotPersistentData::restart_lsn, RS_INVAL_HORIZON, RS_INVAL_NONE, RS_INVAL_WAL_LEVEL, RS_TEMPORARY, SlotIsPhysical, SlotSyncSkipReasonNames, SpinLockAcquire(), SpinLockRelease(), SS_SKIP_NONE, TimestampTzGetDatum(), TransactionIdGetDatum(), tuplestore_putvalues(), values, wal_keep_size_mb, wal_segment_size, WALAVAIL_EXTENDED, WALAVAIL_INVALID_LSN, WALAVAIL_REMOVED, WALAVAIL_RESERVED, WALAVAIL_UNRESERVED, XLByteToSeg, XLogMBVarToSegs, XLogRecPtrIsValid, and XLogSegNoOffsetToRecPtr.

◆ pg_logical_replication_slot_advance()

static XLogRecPtr pg_logical_replication_slot_advance ( XLogRecPtr  moveto)
static

Definition at line 526 of file slotfuncs.c.

527{
529}
XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot)
Definition logical.c:2099

References fb(), and LogicalSlotAdvanceAndCheckSnapState().

Referenced by pg_replication_slot_advance().

◆ pg_physical_replication_slot_advance()

static XLogRecPtr pg_physical_replication_slot_advance ( XLogRecPtr  moveto)
static

Definition at line 490 of file slotfuncs.c.

491{
494
496
497 if (startlsn < moveto)
498 {
502 retlsn = moveto;
503
504 /*
505 * Dirty the slot so as it is written out at the next checkpoint. Note
506 * that the LSN position advanced may still be lost in the event of a
507 * crash, but this makes the data consistent after a clean shutdown.
508 */
510
511 /*
512 * Wake up logical walsenders holding logical failover slots after
513 * updating the restart_lsn of the physical slot.
514 */
516 }
517
518 return retlsn;
519}
void PhysicalWakeupLogicalWalSnd(void)
Definition walsender.c:1794

References Assert, ReplicationSlot::data, fb(), ReplicationSlot::mutex, MyReplicationSlot, PhysicalWakeupLogicalWalSnd(), ReplicationSlotMarkDirty(), ReplicationSlotPersistentData::restart_lsn, SpinLockAcquire(), SpinLockRelease(), and XLogRecPtrIsValid.

Referenced by pg_replication_slot_advance().

◆ pg_replication_slot_advance()

Datum pg_replication_slot_advance ( PG_FUNCTION_ARGS  )

Definition at line 535 of file slotfuncs.c.

536{
537 Name slotname = PG_GETARG_NAME(0);
541 TupleDesc tupdesc;
542 Datum values[2];
543 bool nulls[2];
544 HeapTuple tuple;
546
548
550
554 errmsg("invalid target WAL LSN")));
555
556 /* Build a tuple descriptor for our result type */
557 if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
558 elog(ERROR, "return type must be a row type");
559
560 /*
561 * We can't move slot past what's been flushed/replayed so clamp the
562 * target position accordingly.
563 */
564 if (!RecoveryInProgress())
566 else
568
569 /* Acquire the slot so we "own" it */
570 ReplicationSlotAcquire(NameStr(*slotname), true, true);
571
572 /* A slot whose restart_lsn has never been reserved cannot be advanced */
576 errmsg("replication slot \"%s\" cannot be advanced",
577 NameStr(*slotname)),
578 errdetail("This slot has never previously reserved WAL, or it has been invalidated.")));
579
580 /*
581 * Check if the slot is not moving backwards. Physical slots rely simply
582 * on restart_lsn as a minimum point, while logical slots have confirmed
583 * consumption up to confirmed_flush, meaning that in both cases data
584 * older than that is not available anymore.
585 */
588 else
590
591 if (moveto < minlsn)
594 errmsg("cannot advance replication slot to %X/%08X, minimum is %X/%08X",
596
597 /* Do the actual slot update, depending on the slot type */
600 else
602
604 nulls[0] = false;
605
606 /*
607 * Recompute the minimum LSN and xmin across all slots to adjust with the
608 * advancing potentially done.
609 */
612
614
615 /* Return the reached position. */
617 nulls[1] = false;
618
619 tuple = heap_form_tuple(tupdesc, values, nulls);
620 result = HeapTupleGetDatum(tuple);
621
623}
#define Min(x, y)
Definition c.h:1091
#define OidIsValid(objectId)
Definition c.h:858
#define PG_GETARG_LSN(n)
Definition pg_lsn.h:36
void ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
Definition slot.c:629
static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto)
Definition slotfuncs.c:526
static XLogRecPtr pg_physical_replication_slot_advance(XLogRecPtr moveto)
Definition slotfuncs.c:490
bool RecoveryInProgress(void)
Definition xlog.c:6830
XLogRecPtr GetFlushRecPtr(TimeLineID *insertTLI)
Definition xlog.c:6995
#define LSN_FORMAT_ARGS(lsn)
Definition xlogdefs.h:47
XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI)

References Assert, CheckSlotPermissions(), ReplicationSlotPersistentData::confirmed_flush, ReplicationSlot::data, ReplicationSlotPersistentData::database, elog, ereport, errcode(), errdetail(), errmsg, ERROR, fb(), get_call_result_type(), GetFlushRecPtr(), GetXLogReplayRecPtr(), heap_form_tuple(), HeapTupleGetDatum(), LSN_FORMAT_ARGS, LSNGetDatum(), Min, MyReplicationSlot, ReplicationSlotPersistentData::name, NameGetDatum(), NameStr, OidIsValid, PG_GETARG_LSN, PG_GETARG_NAME, pg_logical_replication_slot_advance(), pg_physical_replication_slot_advance(), PG_RETURN_DATUM, RecoveryInProgress(), ReplicationSlotAcquire(), ReplicationSlotRelease(), ReplicationSlotsComputeRequiredLSN(), ReplicationSlotsComputeRequiredXmin(), ReplicationSlotPersistentData::restart_lsn, result, TYPEFUNC_COMPOSITE, values, and XLogRecPtrIsValid.

◆ pg_sync_replication_slots()

Datum pg_sync_replication_slots ( PG_FUNCTION_ARGS  )

Definition at line 920 of file slotfuncs.c.

921{
923 char *err;
925
927
928 if (!RecoveryInProgress())
931 errmsg("replication slots can only be synchronized to a standby server"));
932
934
935 /* Load the libpq-specific functions */
936 load_file("libpqwalreceiver", false);
937
939
941 if (cluster_name[0])
942 appendStringInfo(&app_name, "%s_slotsync", cluster_name);
943 else
944 appendStringInfoString(&app_name, "slotsync");
945
946 /* Connect to the primary server. */
947 wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
948 app_name.data, &err);
949
950 if (!wrconn)
953 errmsg("synchronization worker \"%s\" could not connect to the primary server: %s",
954 app_name.data, err));
955
956 pfree(app_name.data);
957
959
961
963}
void load_file(const char *filename, bool restricted)
Definition dfmgr.c:149
void err(int eval, const char *fmt,...)
Definition err.c:43
char * cluster_name
Definition guc_tables.c:582
void pfree(void *pointer)
Definition mcxt.c:1616
void SyncReplicationSlots(WalReceiverConn *wrconn)
Definition slotsync.c:2003
char * CheckAndGetDbnameFromConninfo(void)
Definition slotsync.c:1151
bool ValidateSlotSyncParams(int elevel)
Definition slotsync.c:1178
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition stringinfo.c:230
void initStringInfo(StringInfo str)
Definition stringinfo.c:97
static WalReceiverConn * wrconn
Definition walreceiver.c:95
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_disconnect(conn)
char * PrimaryConnInfo

References appendStringInfo(), appendStringInfoString(), CheckAndGetDbnameFromConninfo(), CheckSlotPermissions(), cluster_name, ereport, err(), errcode(), errmsg, ERROR, fb(), initStringInfo(), load_file(), pfree(), PG_RETURN_VOID, PrimaryConnInfo, RecoveryInProgress(), SyncReplicationSlots(), ValidateSlotSyncParams(), walrcv_connect, walrcv_disconnect, and wrconn.

Variable Documentation

◆ SlotSyncSkipReasonNames

const char* SlotSyncSkipReasonNames[]
static
Initial value:
= {
[SS_SKIP_NONE] = "none",
[SS_SKIP_WAL_NOT_FLUSHED] = "wal_not_flushed",
[SS_SKIP_WAL_OR_ROWS_REMOVED] = "wal_or_rows_removed",
[SS_SKIP_NO_CONSISTENT_SNAPSHOT] = "no_consistent_snapshot",
[SS_SKIP_INVALID] = "slot_invalidated"
}
@ SS_SKIP_WAL_NOT_FLUSHED
Definition slot.h:83
@ SS_SKIP_NO_CONSISTENT_SNAPSHOT
Definition slot.h:87
@ SS_SKIP_INVALID
Definition slot.h:89
@ SS_SKIP_WAL_OR_ROWS_REMOVED
Definition slot.h:85

Definition at line 31 of file slotfuncs.c.

31 {
32 [SS_SKIP_NONE] = "none",
33 [SS_SKIP_WAL_NOT_FLUSHED] = "wal_not_flushed",
34 [SS_SKIP_WAL_OR_ROWS_REMOVED] = "wal_or_rows_removed",
35 [SS_SKIP_NO_CONSISTENT_SNAPSHOT] = "no_consistent_snapshot",
36 [SS_SKIP_INVALID] = "slot_invalidated"
37};

Referenced by pg_get_replication_slots().