228 #define TargetTagIsCoveredBy(covered_target, covering_target) \
229 ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == \
230 GET_PREDICATELOCKTARGETTAG_RELATION(covering_target)) \
231 && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) == \
232 InvalidOffsetNumber) \
233 && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) != \
234 InvalidOffsetNumber) \
235 && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
236 GET_PREDICATELOCKTARGETTAG_PAGE(covered_target))) \
237 || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) == \
238 InvalidBlockNumber) \
239 && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target) \
240 != InvalidBlockNumber))) \
241 && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) == \
242 GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
251 #define PredicateLockHashPartition(hashcode) \
252 ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
253 #define PredicateLockHashPartitionLock(hashcode) \
254 (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + \
255 PredicateLockHashPartition(hashcode)].lock)
256 #define PredicateLockHashPartitionLockByIndex(i) \
257 (&MainLWLockArray[PREDICATELOCK_MANAGER_LWLOCK_OFFSET + (i)].lock)
259 #define NPREDICATELOCKTARGETENTS() \
260 mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
262 #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
272 #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
273 #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
274 #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
275 #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
276 #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
277 #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
278 #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
284 #define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
285 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
286 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
287 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
288 #define SxactIsPartiallyReleased(sxact) (((sxact)->flags & SXACT_FLAG_PARTIALLY_RELEASED) != 0)
298 #define PredicateLockTargetTagHashCode(predicatelocktargettag) \
299 get_hash_value(PredicateLockTargetHash, predicatelocktargettag)
311 #define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
312 ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
313 << LOG2_NUM_PREDICATELOCK_PARTITIONS)
321 #define SerialSlruCtl (&SerialSlruCtlData)
323 #define SERIAL_PAGESIZE BLCKSZ
324 #define SERIAL_ENTRYSIZE sizeof(SerCommitSeqNo)
325 #define SERIAL_ENTRIESPERPAGE (SERIAL_PAGESIZE / SERIAL_ENTRYSIZE)
330 #define SERIAL_MAX_PAGE (MaxTransactionId / SERIAL_ENTRIESPERPAGE)
332 #define SerialNextPage(page) (((page) >= SERIAL_MAX_PAGE) ? 0 : (page) + 1)
334 #define SerialValue(slotno, xid) (*((SerCommitSeqNo *) \
335 (SerialSlruCtl->shared->page_buffer[slotno] + \
336 ((((uint32) (xid)) % SERIAL_ENTRIESPERPAGE) * SERIAL_ENTRYSIZE))))
338 #define SerialPage(xid) (((uint32) (xid)) / SERIAL_ENTRIESPERPAGE)
499 relation->
rd_rel->relkind == RELKIND_MATVIEW);
673 if (conflict->
sxactIn == writer)
699 (
errcode(ERRCODE_OUT_OF_MEMORY),
700 errmsg(
"not enough elements in RWConflictPool to record a read/write conflict"),
701 errhint(
"You might need to run fewer transactions at a time or increase max_connections.")));
717 Assert(roXact != activeXact);
727 (
errcode(ERRCODE_OUT_OF_MEMORY),
728 errmsg(
"not enough elements in RWConflictPool to record a potential read/write conflict"),
729 errhint(
"You might need to run fewer transactions at a time or increase max_connections.")));
780 conflict = nextConflict;
805 #ifdef USE_ASSERT_CHECKING
807 SerialPagePrecedesLogicallyUnitTests(
void)
810 offset = per_page / 2;
820 newestXact = newestPage * per_page + offset;
821 Assert(newestXact / per_page == newestPage);
822 oldestXact = newestXact + 1;
823 oldestXact -= 1U << 31;
824 oldestPage = oldestXact / per_page;
836 targetPage = oldestPage;
854 targetPage = newestPage;
877 #ifdef USE_ASSERT_CHECKING
878 SerialPagePrecedesLogicallyUnitTests();
955 while (firstZeroPage != targetPage)
1156 long max_table_size;
1160 #ifndef EXEC_BACKEND
1212 max_table_size *= 2;
1235 max_table_size *= 10;
1258 for (
i = 0;
i < max_table_size;
i++)
1310 max_table_size *= 5;
1326 for (
i = 0;
i < max_table_size;
i++)
1359 long max_table_size;
1367 max_table_size *= 2;
1379 max_table_size *= 10;
1389 max_table_size *= 5;
1465 data->nelements = els;
1606 errmsg_internal(
"deferrable snapshot was unsafe; trying a new one")));
1631 int num_written = 0;
1639 if (sxact->
pid == blocked_pid)
1654 while (possibleUnsafeConflict != NULL && num_written < output_size)
1659 &possibleUnsafeConflict->
inLink,
1693 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1694 errmsg(
"cannot use serializable mode in a hot standby"),
1695 errdetail(
"\"default_transaction_isolation\" is set to \"serializable\"."),
1696 errhint(
"You can use \"SET default_transaction_isolation = 'repeatable read'\" to change the default.")));
1746 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1747 errmsg(
"a snapshot-importing transaction must not be READ ONLY DEFERRABLE")));
1783 elog(
ERROR,
"cannot establish serializable snapshot during a parallel operation");
1801 #ifdef TEST_SUMMARIZE_SERIAL
1825 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1826 errmsg(
"could not import the requested snapshot"),
1827 errdetail(
"The source process with PID %d is not running anymore.",
2009 &targettag, targettaghash,
2013 return (target != NULL);
2100 targettag = *newtargettag;
2105 targettag = parenttag;
2182 Assert(rmtarget == target);
2219 predlocksxactlink = &(predlock->
xactLink);
2225 oldlocktag = predlock->
tag;
2228 oldtargettag = oldtarget->
tag;
2249 Assert(rmpredlock == predlock);
2258 predlock = nextpredlock;
2331 targettag = *reqtag;
2336 targettag = nexttag;
2343 parentlock->
held =
false;
2358 promotiontag = targettag;
2390 parenttag = *targettag;
2398 parenttag = nexttag;
2402 &parenttag, targettaghash,
2410 if (parentlock == NULL)
2430 &parenttag, targettaghash,
2432 Assert(rmlock == parentlock);
2467 targettag, targettaghash,
2471 (
errcode(ERRCODE_OUT_OF_MEMORY),
2472 errmsg(
"out of shared memory"),
2473 errhint(
"You might need to increase max_pred_locks_per_transaction.")));
2486 (
errcode(ERRCODE_OUT_OF_MEMORY),
2487 errmsg(
"out of shared memory"),
2488 errhint(
"You might need to increase max_pred_locks_per_transaction.")));
2531 targettag, targettaghash,
2533 locallock->
held =
true;
2682 predlocktargetlink = &(predlock->
targetLink);
2699 predlock = nextpredlock;
2740 LWLock *oldpartitionLock;
2743 LWLock *newpartitionLock;
2745 bool outOfShmem =
false;
2769 if (oldpartitionLock < newpartitionLock)
2775 else if (oldpartitionLock > newpartitionLock)
2817 newpredlocktag.
myTarget = newtarget;
2835 predlocktargetlink = &(oldpredlock->
targetLink);
2889 oldpredlock = nextpredlock;
2903 if (oldpartitionLock < newpartitionLock)
2908 else if (oldpartitionLock > newpartitionLock)
2963 uint32 heaptargettaghash;
2978 relId = relation->
rd_id;
2987 heapId = relation->
rd_index->indrelid;
2990 Assert(transfer || !isIndex);
2994 heaptargettaghash = 0;
3024 if (transfer && !isIndex
3039 if (transfer && heaptarget == NULL)
3095 newpredlocktag.
myTarget = heaptarget;
3096 newpredlocktag.
myXact = oldXact;
3123 oldpredlock = nextpredlock;
3192 Assert(oldblkno != newblkno);
3339 possibleUnsafeConflict;
3351 bool topLevelIsDeclaredReadOnly;
3354 Assert(!(isCommit && isReadOnlySafe));
3357 if (!isReadOnlySafe)
3504 if (!topLevelIsDeclaredReadOnly)
3532 while (possibleUnsafeConflict)
3536 &possibleUnsafeConflict->
inLink,
3544 possibleUnsafeConflict = nextConflict;
3593 conflict = nextConflict;
3616 conflict = nextConflict;
3619 if (!topLevelIsDeclaredReadOnly)
3631 while (possibleUnsafeConflict)
3635 &possibleUnsafeConflict->
outLink,
3638 roXact = possibleUnsafeConflict->
sxactIn;
3676 possibleUnsafeConflict = nextConflict;
3687 needToClear =
false;
3760 while (finishedSxact)
3815 finishedSxact = nextSxact;
3830 bool canDoPartialCleanup;
3847 if (canDoPartialCleanup)
3855 tag = predlock->
tag;
3857 targettag = target->
tag;
3875 predlock = nextpredlock;
3941 tag = predlock->
tag;
3944 targettag = target->
tag;
3968 (
errcode(ERRCODE_OUT_OF_MEMORY),
3969 errmsg(
"out of shared memory"),
3970 errhint(
"You might need to increase max_pred_locks_per_transaction.")));
3992 predlock = nextpredlock;
4024 conflict = nextConflict;
4042 conflict = nextConflict;
4081 for (
i = 0;
i < snap->
xcnt;
i++)
4083 if (xid == snap->
xip[
i])
4101 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4102 errdetail_internal(
"Reason code: Canceled on identification as a pivot, during conflict out checking."),
4103 errhint(
"The transaction might succeed if retried.")));
4137 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4138 errdetail_internal(
"Reason code: Canceled on identification as a pivot, during conflict out checking."),
4139 errhint(
"The transaction might succeed if retried.")));
4162 if (conflictCommitSeqNo != 0)
4166 || conflictCommitSeqNo
4170 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4172 errhint(
"The transaction might succeed if retried.")));
4178 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4179 errdetail_internal(
"Reason code: Canceled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
4180 errhint(
"The transaction might succeed if retried.")));
4217 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4219 errhint(
"The transaction might succeed if retried.")));
4285 targettag, targettaghash,
4309 predlocktargetlink = &(predlock->
targetLink);
4331 mypredlock = predlock;
4332 mypredlocktag = predlock->
tag;
4361 predlock = nextpredlock;
4374 if (mypredlock != NULL)
4391 (&mypredlocktag, targettaghash);
4397 if (rmpredlock != NULL)
4399 Assert(rmpredlock == mypredlock);
4409 Assert(rmpredlock == mypredlock);
4420 if (rmpredlock != NULL)
4428 targettag, targettaghash,
4459 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4460 errdetail_internal(
"Reason code: Canceled on identification as a pivot, during conflict in checking."),
4461 errhint(
"The transaction might succeed if retried.")));
4560 heapId = relation->
rd_id;
4604 predlock = nextpredlock;
4625 Assert(reader != writer);
4796 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4797 errdetail_internal(
"Reason code: Canceled on identification as a pivot, during write."),
4798 errhint(
"The transaction might succeed if retried.")));
4808 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4810 errhint(
"The transaction might succeed if retried.")));
4851 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4852 errdetail_internal(
"Reason code: Canceled on identification as a pivot, during commit attempt."),
4853 errhint(
"The transaction might succeed if retried.")));
4860 while (nearConflict)
4889 errmsg(
"could not serialize access due to read/write dependencies among transactions"),
4890 errdetail_internal(
"Reason code: Canceled on commit attempt with conflict in from prepared pivot."),
4891 errhint(
"The transaction might succeed if retried.")));
4955 &record,
sizeof(record));
4978 while (predlock != NULL)
4984 &record,
sizeof(record));
5080 (
errcode(ERRCODE_OUT_OF_MEMORY),
5081 errmsg(
"out of shared memory")));
bool ParallelContextActive(void)
#define InvalidBlockNumber
#define BlockNumberIsValid(blockNumber)
#define offsetof(type, field)
uint32 LocalTransactionId
#define PG_USED_FOR_ASSERTS_ONLY
void hash_destroy(HTAB *hashp)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, long nelem, const HASHCTL *info, int flags)
long hash_get_num_entries(HTAB *hashp)
Size hash_estimate_size(long num_entries, Size entrysize)
void * hash_search_with_hash_value(HTAB *hashp, const void *keyPtr, uint32 hashvalue, HASHACTION action, bool *foundPtr)
void * hash_seq_search(HASH_SEQ_STATUS *status)
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
int errmsg_internal(const char *fmt,...)
int errdetail_internal(const char *fmt,...)
int errdetail(const char *fmt,...)
int errhint(const char *fmt,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
#define IsParallelWorker()
#define ItemPointerGetBlockNumber(pointer)
#define ItemPointerGetOffsetNumber(pointer)
Assert(fmt[strlen(fmt) - 1] !='\n')
#define SetInvalidVirtualTransactionId(vxid)
#define GET_VXID_FROM_PGPROC(vxid, proc)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
bool LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
void LWLockInitialize(LWLock *lock, int tranche_id)
bool LWLockHeldByMe(LWLock *l)
@ LWTRANCHE_SERIAL_BUFFER
@ LWTRANCHE_PER_XACT_PREDICATE_LIST
#define NUM_PREDICATELOCK_PARTITIONS
static void output(uint64 loop_count)
#define ERRCODE_T_R_SERIALIZATION_FAILURE
void CheckPointPredicate(void)
void PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, BlockNumber newblkno)
static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
static HTAB * PredicateLockHash
static SHM_QUEUE * FinishedSerializableTransactions
static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact)
#define PredicateLockTargetTagHashCode(predicatelocktargettag)
static void SetNewSxactGlobalXmin(void)
static SERIALIZABLEXACT * FirstPredXact(void)
void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
PredicateLockData * GetPredicateLockStatusData(void)
void InitPredicateLocks(void)
static void ReleasePredXact(SERIALIZABLEXACT *sxact)
void SetSerializableTransactionSnapshot(Snapshot snapshot, VirtualTransactionId *sourcevxid, int sourcepid)
static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
static bool PredicateLockingNeededForRelation(Relation relation)
static bool SerializationNeededForRead(Relation relation, Snapshot snapshot)
#define SxactIsCommitted(sxact)
static SerialControl serialControl
void PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot)
#define SxactIsROUnsafe(sxact)
static Snapshot GetSerializableTransactionSnapshotInt(Snapshot snapshot, VirtualTransactionId *sourcevxid, int sourcepid)
static LWLock * ScratchPartitionLock
static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
#define SxactIsDeferrableWaiting(sxact)
static void ReleasePredicateLocksLocal(void)
static HTAB * LocalPredicateLockHash
int max_predicate_locks_per_page
struct SerialControlData * SerialControl
static PredXactList PredXact
static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
int GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size)
static uint32 ScratchTargetTagHash
static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
static uint32 predicatelock_hash(const void *key, Size keysize)
void CheckForSerializableConflictOut(Relation relation, TransactionId xid, Snapshot snapshot)
#define SxactIsReadOnly(sxact)
#define SerialNextPage(page)
static void DropAllPredicateLocksFromTable(Relation relation, bool transfer)
bool PageIsPredicateLocked(Relation relation, BlockNumber blkno)
static SlruCtlData SerialSlruCtlData
static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, uint32 targettaghash, SERIALIZABLEXACT *sxact)
static SERIALIZABLEXACT * NextPredXact(SERIALIZABLEXACT *sxact)
static void SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
static void ClearOldPredicateLocks(void)
#define SxactHasSummaryConflictIn(sxact)
static SERIALIZABLEXACT * CreatePredXact(void)
static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag, PREDICATELOCKTARGETTAG *parent)
#define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash)
static void RestoreScratchTarget(bool lockheld)
#define SerialValue(slotno, xid)
static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
static Snapshot GetSafeSnapshot(Snapshot snapshot)
static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
static SERIALIZABLEXACT * OldCommittedSxact
#define SxactHasConflictOut(sxact)
void CheckForSerializableConflictIn(Relation relation, ItemPointer tid, BlockNumber blkno)
static bool MyXactDidWrite
static int MaxPredicateChildLocks(const PREDICATELOCKTARGETTAG *tag)
static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
static void SerialInit(void)
void CheckTableForSerializableConflictIn(Relation relation)
#define SxactIsPrepared(sxact)
void PredicateLockTID(Relation relation, ItemPointer tid, Snapshot snapshot, TransactionId tuple_xid)
void AttachSerializableXact(SerializableXactHandle handle)
struct SerialControlData SerialControlData
SerializableXactHandle ShareSerializableXact(void)
static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
static void RemoveScratchTarget(bool lockheld)
#define SxactIsOnFinishedList(sxact)
#define SxactIsPartiallyReleased(sxact)
static void SerialSetActiveSerXmin(TransactionId xid)
static bool SerializationNeededForWrite(Relation relation)
static HTAB * SerializableXidHash
static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
void PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, BlockNumber newblkno)
static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
int max_predicate_locks_per_relation
#define SxactIsROSafe(sxact)
void PreCommit_CheckForSerializationFailure(void)
void ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
static const PREDICATELOCKTARGETTAG ScratchTargetTag
#define PredicateLockHashPartitionLockByIndex(i)
static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
static bool SerialPagePrecedesLogically(int page1, int page2)
static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
void PredicateLockRelation(Relation relation, Snapshot snapshot)
static SERIALIZABLEXACT * MySerializableXact
void predicatelock_twophase_recover(TransactionId xid, uint16 info, void *recdata, uint32 len)
Size PredicateLockShmemSize(void)
#define SxactIsDoomed(sxact)
#define NPREDICATELOCKTARGETENTS()
static SerCommitSeqNo SerialGetMinConflictCommitSeqNo(TransactionId xid)
static void SummarizeOldestCommittedSxact(void)
#define TargetTagIsCoveredBy(covered_target, covering_target)
static RWConflictPoolHeader RWConflictPool
static void ReleaseRWConflict(RWConflict conflict)
static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, PREDICATELOCKTARGETTAG newtargettag, bool removeOld)
void AtPrepare_PredicateLocks(void)
void RegisterPredicateLockingXid(TransactionId xid)
#define PredicateLockHashPartitionLock(hashcode)
#define SERIAL_ENTRIESPERPAGE
static bool XidIsConcurrent(TransactionId xid)
static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial, bool summarize)
static HTAB * PredicateLockTargetHash
bool CheckForSerializableConflictOutNeeded(Relation relation, Snapshot snapshot)
#define SxactIsRolledBack(sxact)
static SERIALIZABLEXACT * SavedSerializableXact
#define SxactHasSummaryConflictOut(sxact)
void TransferPredicateLocksToHeapRelation(Relation relation)
void PostPrepare_PredicateLocks(TransactionId xid)
static void CreateLocalPredicateLockHash(void)
int max_predicate_locks_per_xact
Snapshot GetSerializableTransactionSnapshot(Snapshot snapshot)
void * SerializableXactHandle