PostgreSQL Source Code  git master
lwlock.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  * Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures. Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object). There are few other frammishes. User-level
10  * locking should be done with the full lock manager --- which depends on
11  * LWLocks to protect its shared state.
12  *
13  * In addition to exclusive and shared modes, lightweight locks can be used to
14  * wait until a variable changes value. The variable is initially not set
15  * when the lock is acquired with LWLockAcquire, i.e. it remains set to the
16  * value it was set to when the lock was released last, and can be updated
17  * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
18  * waits for the variable to be updated, or until the lock is free. When
19  * releasing the lock with LWLockReleaseClearVar() the value can be set to an
20  * appropriate value for a free lock. The meaning of the variable is up to
21  * the caller, the lightweight lock code just assigns and compares it.
22  *
23  * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
24  * Portions Copyright (c) 1994, Regents of the University of California
25  *
26  * IDENTIFICATION
27  * src/backend/storage/lmgr/lwlock.c
28  *
29  * NOTES:
30  *
31  * This used to be a pretty straight forward reader-writer lock
32  * implementation, in which the internal state was protected by a
33  * spinlock. Unfortunately the overhead of taking the spinlock proved to be
34  * too high for workloads/locks that were taken in shared mode very
35  * frequently. Often we were spinning in the (obviously exclusive) spinlock,
36  * while trying to acquire a shared lock that was actually free.
37  *
38  * Thus a new implementation was devised that provides wait-free shared lock
39  * acquisition for locks that aren't exclusively locked.
40  *
41  * The basic idea is to have a single atomic variable 'lockcount' instead of
42  * the formerly separate shared and exclusive counters and to use atomic
43  * operations to acquire the lock. That's fairly easy to do for plain
44  * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45  * in the OS.
46  *
47  * For lock acquisition we use an atomic compare-and-exchange on the lockcount
48  * variable. For exclusive lock we swap in a sentinel value
49  * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
50  *
51  * To release the lock we use an atomic decrement to release the lock. If the
52  * new value is zero (we get that atomically), we know we can/have to release
53  * waiters.
54  *
55  * Obviously it is important that the sentinel value for exclusive locks
56  * doesn't conflict with the maximum number of possible share lockers -
57  * luckily MAX_BACKENDS makes that easily possible.
58  *
59  *
60  * The attentive reader might have noticed that naively doing the above has a
61  * glaring race condition: We try to lock using the atomic operations and
62  * notice that we have to wait. Unfortunately by the time we have finished
63  * queuing, the former locker very well might have already finished it's
64  * work. That's problematic because we're now stuck waiting inside the OS.
65 
66  * To mitigate those races we use a two phased attempt at locking:
67  * Phase 1: Try to do it atomically, if we succeed, nice
68  * Phase 2: Add ourselves to the waitqueue of the lock
69  * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
70  * the queue
71  * Phase 4: Sleep till wake-up, goto Phase 1
72  *
73  * This protects us against the problem from above as nobody can release too
74  * quick, before we're queued, since after Phase 2 we're already queued.
75  * -------------------------------------------------------------------------
76  */
77 #include "postgres.h"
78 
79 #include "miscadmin.h"
80 #include "pgstat.h"
81 #include "pg_trace.h"
82 #include "postmaster/postmaster.h"
83 #include "replication/slot.h"
84 #include "storage/ipc.h"
85 #include "storage/predicate.h"
86 #include "storage/proc.h"
87 #include "storage/proclist.h"
88 #include "storage/spin.h"
89 #include "utils/memutils.h"
90 
91 #ifdef LWLOCK_STATS
92 #include "utils/hsearch.h"
93 #endif
94 
95 
96 /* We use the ShmemLock spinlock to protect LWLockCounter */
97 extern slock_t *ShmemLock;
98 
99 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
100 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
101 #define LW_FLAG_LOCKED ((uint32) 1 << 28)
102 
103 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
104 #define LW_VAL_SHARED 1
105 
106 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108 #define LW_SHARED_MASK ((uint32) ((1 << 24)-1))
109 
110 /*
111  * This is indexed by tranche ID and stores the names of all tranches known
112  * to the current backend.
113  */
114 static const char **LWLockTrancheArray = NULL;
115 static int LWLockTranchesAllocated = 0;
116 
117 #define T_NAME(lock) \
118  (LWLockTrancheArray[(lock)->tranche])
119 
120 /*
121  * This points to the main array of LWLocks in shared memory. Backends inherit
122  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
123  * where we have special measures to pass it down).
124  */
126 
127 /*
128  * We use this structure to keep track of locked LWLocks for release
129  * during error recovery. Normally, only a few will be held at once, but
130  * occasionally the number can be much higher; for example, the pg_buffercache
131  * extension locks all buffer partitions simultaneously.
132  */
133 #define MAX_SIMUL_LWLOCKS 200
134 
135 /* struct representing the LWLocks we're holding */
136 typedef struct LWLockHandle
137 {
140 } LWLockHandle;
141 
142 static int num_held_lwlocks = 0;
144 
145 /* struct representing the LWLock tranche request for named tranche */
147 {
148  char tranche_name[NAMEDATALEN];
151 
155 
157 
158 static bool lock_named_request_allowed = true;
159 
160 static void InitializeLWLocks(void);
161 static void RegisterLWLockTranches(void);
162 
163 static inline void LWLockReportWaitStart(LWLock *lock);
164 static inline void LWLockReportWaitEnd(void);
165 
166 #ifdef LWLOCK_STATS
167 typedef struct lwlock_stats_key
168 {
169  int tranche;
170  void *instance;
171 } lwlock_stats_key;
172 
173 typedef struct lwlock_stats
174 {
175  lwlock_stats_key key;
176  int sh_acquire_count;
177  int ex_acquire_count;
178  int block_count;
179  int dequeue_self_count;
180  int spin_delay_count;
181 } lwlock_stats;
182 
183 static HTAB *lwlock_stats_htab;
184 static lwlock_stats lwlock_stats_dummy;
185 #endif
186 
187 #ifdef LOCK_DEBUG
188 bool Trace_lwlocks = false;
189 
190 inline static void
191 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
192 {
193  /* hide statement & context here, otherwise the log is just too verbose */
194  if (Trace_lwlocks)
195  {
197 
198  ereport(LOG,
199  (errhidestmt(true),
200  errhidecontext(true),
201  errmsg_internal("%d: %s(%s %p): excl %u shared %u haswaiters %u waiters %u rOK %d",
202  MyProcPid,
203  where, T_NAME(lock), lock,
204  (state & LW_VAL_EXCLUSIVE) != 0,
205  state & LW_SHARED_MASK,
206  (state & LW_FLAG_HAS_WAITERS) != 0,
207  pg_atomic_read_u32(&lock->nwaiters),
208  (state & LW_FLAG_RELEASE_OK) != 0)));
209  }
210 }
211 
212 inline static void
213 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
214 {
215  /* hide statement & context here, otherwise the log is just too verbose */
216  if (Trace_lwlocks)
217  {
218  ereport(LOG,
219  (errhidestmt(true),
220  errhidecontext(true),
221  errmsg_internal("%s(%s %p): %s", where,
222  T_NAME(lock), lock, msg)));
223  }
224 }
225 
226 #else /* not LOCK_DEBUG */
227 #define PRINT_LWDEBUG(a,b,c) ((void)0)
228 #define LOG_LWDEBUG(a,b,c) ((void)0)
229 #endif /* LOCK_DEBUG */
230 
231 #ifdef LWLOCK_STATS
232 
233 static void init_lwlock_stats(void);
234 static void print_lwlock_stats(int code, Datum arg);
235 static lwlock_stats * get_lwlock_stats_entry(LWLock *lockid);
236 
237 static void
238 init_lwlock_stats(void)
239 {
240  HASHCTL ctl;
241  static MemoryContext lwlock_stats_cxt = NULL;
242  static bool exit_registered = false;
243 
244  if (lwlock_stats_cxt != NULL)
245  MemoryContextDelete(lwlock_stats_cxt);
246 
247  /*
248  * The LWLock stats will be updated within a critical section, which
249  * requires allocating new hash entries. Allocations within a critical
250  * section are normally not allowed because running out of memory would
251  * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
252  * turned on in production, so that's an acceptable risk. The hash entries
253  * are small, so the risk of running out of memory is minimal in practice.
254  */
255  lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
256  "LWLock stats",
258  MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
259 
260  MemSet(&ctl, 0, sizeof(ctl));
261  ctl.keysize = sizeof(lwlock_stats_key);
262  ctl.entrysize = sizeof(lwlock_stats);
263  ctl.hcxt = lwlock_stats_cxt;
264  lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
266  if (!exit_registered)
267  {
268  on_shmem_exit(print_lwlock_stats, 0);
269  exit_registered = true;
270  }
271 }
272 
273 static void
274 print_lwlock_stats(int code, Datum arg)
275 {
276  HASH_SEQ_STATUS scan;
277  lwlock_stats *lwstats;
278 
279  hash_seq_init(&scan, lwlock_stats_htab);
280 
281  /* Grab an LWLock to keep different backends from mixing reports */
282  LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
283 
284  while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
285  {
286  fprintf(stderr,
287  "PID %d lwlock %s %p: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
288  MyProcPid, LWLockTrancheArray[lwstats->key.tranche],
289  lwstats->key.instance, lwstats->sh_acquire_count,
290  lwstats->ex_acquire_count, lwstats->block_count,
291  lwstats->spin_delay_count, lwstats->dequeue_self_count);
292  }
293 
294  LWLockRelease(&MainLWLockArray[0].lock);
295 }
296 
297 static lwlock_stats *
298 get_lwlock_stats_entry(LWLock *lock)
299 {
300  lwlock_stats_key key;
301  lwlock_stats *lwstats;
302  bool found;
303 
304  /*
305  * During shared memory initialization, the hash table doesn't exist yet.
306  * Stats of that phase aren't very interesting, so just collect operations
307  * on all locks in a single dummy entry.
308  */
309  if (lwlock_stats_htab == NULL)
310  return &lwlock_stats_dummy;
311 
312  /* Fetch or create the entry. */
313  key.tranche = lock->tranche;
314  key.instance = lock;
315  lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
316  if (!found)
317  {
318  lwstats->sh_acquire_count = 0;
319  lwstats->ex_acquire_count = 0;
320  lwstats->block_count = 0;
321  lwstats->dequeue_self_count = 0;
322  lwstats->spin_delay_count = 0;
323  }
324  return lwstats;
325 }
326 #endif /* LWLOCK_STATS */
327 
328 
329 /*
330  * Compute number of LWLocks required by named tranches. These will be
331  * allocated in the main array.
332  */
333 static int
335 {
336  int numLocks = 0;
337  int i;
338 
339  for (i = 0; i < NamedLWLockTrancheRequests; i++)
340  numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
341 
342  return numLocks;
343 }
344 
345 /*
346  * Compute shmem space needed for LWLocks and named tranches.
347  */
348 Size
350 {
351  Size size;
352  int i;
353  int numLocks = NUM_FIXED_LWLOCKS;
354 
355  numLocks += NumLWLocksByNamedTranches();
356 
357  /* Space for the LWLock array. */
358  size = mul_size(numLocks, sizeof(LWLockPadded));
359 
360  /* Space for dynamic allocation counter, plus room for alignment. */
361  size = add_size(size, sizeof(int) + LWLOCK_PADDED_SIZE);
362 
363  /* space for named tranches. */
365 
366  /* space for name of each tranche. */
367  for (i = 0; i < NamedLWLockTrancheRequests; i++)
368  size = add_size(size, strlen(NamedLWLockTrancheRequestArray[i].tranche_name) + 1);
369 
370  /* Disallow named LWLocks' requests after startup */
372 
373  return size;
374 }
375 
376 /*
377  * Allocate shmem space for the main LWLock array and all tranches and
378  * initialize it. We also register all the LWLock tranches here.
379  */
380 void
382 {
384  "MAX_BACKENDS too big for lwlock.c");
385 
387  sizeof(LWLock) <= LWLOCK_PADDED_SIZE,
388  "Miscalculated LWLock padding");
389 
390  if (!IsUnderPostmaster)
391  {
392  Size spaceLocks = LWLockShmemSize();
393  int *LWLockCounter;
394  char *ptr;
395 
396  /* Allocate space */
397  ptr = (char *) ShmemAlloc(spaceLocks);
398 
399  /* Leave room for dynamic allocation of tranches */
400  ptr += sizeof(int);
401 
402  /* Ensure desired alignment of LWLock array */
403  ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
404 
405  MainLWLockArray = (LWLockPadded *) ptr;
406 
407  /*
408  * Initialize the dynamic-allocation counter for tranches, which is
409  * stored just before the first LWLock.
410  */
411  LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
412  *LWLockCounter = LWTRANCHE_FIRST_USER_DEFINED;
413 
414  /* Initialize all LWLocks */
416  }
417 
418  /* Register all LWLock tranches */
420 }
421 
422 /*
423  * Initialize LWLocks that are fixed and those belonging to named tranches.
424  */
425 static void
427 {
428  int numNamedLocks = NumLWLocksByNamedTranches();
429  int id;
430  int i;
431  int j;
433 
434  /* Initialize all individual LWLocks in main array */
435  for (id = 0, lock = MainLWLockArray; id < NUM_INDIVIDUAL_LWLOCKS; id++, lock++)
436  LWLockInitialize(&lock->lock, id);
437 
438  /* Initialize buffer mapping LWLocks in main array */
439  lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS;
440  for (id = 0; id < NUM_BUFFER_PARTITIONS; id++, lock++)
442 
443  /* Initialize lmgrs' LWLocks in main array */
444  lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS + NUM_BUFFER_PARTITIONS;
445  for (id = 0; id < NUM_LOCK_PARTITIONS; id++, lock++)
447 
448  /* Initialize predicate lmgrs' LWLocks in main array */
449  lock = MainLWLockArray + NUM_INDIVIDUAL_LWLOCKS +
450  NUM_BUFFER_PARTITIONS + NUM_LOCK_PARTITIONS;
451  for (id = 0; id < NUM_PREDICATELOCK_PARTITIONS; id++, lock++)
453 
454  /* Initialize named tranches. */
456  {
457  char *trancheNames;
458 
459  NamedLWLockTrancheArray = (NamedLWLockTranche *)
460  &MainLWLockArray[NUM_FIXED_LWLOCKS + numNamedLocks];
461 
462  trancheNames = (char *) NamedLWLockTrancheArray +
464  lock = &MainLWLockArray[NUM_FIXED_LWLOCKS];
465 
466  for (i = 0; i < NamedLWLockTrancheRequests; i++)
467  {
468  NamedLWLockTrancheRequest *request;
469  NamedLWLockTranche *tranche;
470  char *name;
471 
472  request = &NamedLWLockTrancheRequestArray[i];
473  tranche = &NamedLWLockTrancheArray[i];
474 
475  name = trancheNames;
476  trancheNames += strlen(request->tranche_name) + 1;
477  strcpy(name, request->tranche_name);
478  tranche->trancheId = LWLockNewTrancheId();
479  tranche->trancheName = name;
480 
481  for (j = 0; j < request->num_lwlocks; j++, lock++)
482  LWLockInitialize(&lock->lock, tranche->trancheId);
483  }
484  }
485 }
486 
487 /*
488  * Register named tranches and tranches for fixed LWLocks.
489  */
490 static void
492 {
493  int i;
494 
495  if (LWLockTrancheArray == NULL)
496  {
498  LWLockTrancheArray = (const char **)
500  LWLockTranchesAllocated * sizeof(char *));
502  }
503 
504  for (i = 0; i < NUM_INDIVIDUAL_LWLOCKS; ++i)
506 
510  "predicate_lock_manager");
512  "parallel_query_dsa");
514  "session_dsa");
516  "session_record_table");
518  "session_typmod_table");
521 
522  /* Register named tranches. */
523  for (i = 0; i < NamedLWLockTrancheRequests; i++)
524  LWLockRegisterTranche(NamedLWLockTrancheArray[i].trancheId,
525  NamedLWLockTrancheArray[i].trancheName);
526 }
527 
528 /*
529  * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
530  */
531 void
533 {
534 #ifdef LWLOCK_STATS
535  init_lwlock_stats();
536 #endif
537 }
538 
539 /*
540  * GetNamedLWLockTranche - returns the base address of LWLock from the
541  * specified tranche.
542  *
543  * Caller needs to retrieve the requested number of LWLocks starting from
544  * the base lock address returned by this API. This can be used for
545  * tranches that are requested by using RequestNamedLWLockTranche() API.
546  */
547 LWLockPadded *
548 GetNamedLWLockTranche(const char *tranche_name)
549 {
550  int lock_pos;
551  int i;
552 
553  /*
554  * Obtain the position of base address of LWLock belonging to requested
555  * tranche_name in MainLWLockArray. LWLocks for named tranches are placed
556  * in MainLWLockArray after fixed locks.
557  */
558  lock_pos = NUM_FIXED_LWLOCKS;
559  for (i = 0; i < NamedLWLockTrancheRequests; i++)
560  {
561  if (strcmp(NamedLWLockTrancheRequestArray[i].tranche_name,
562  tranche_name) == 0)
563  return &MainLWLockArray[lock_pos];
564 
565  lock_pos += NamedLWLockTrancheRequestArray[i].num_lwlocks;
566  }
567 
568  if (i >= NamedLWLockTrancheRequests)
569  elog(ERROR, "requested tranche is not registered");
570 
571  /* just to keep compiler quiet */
572  return NULL;
573 }
574 
575 /*
576  * Allocate a new tranche ID.
577  */
578 int
580 {
581  int result;
582  int *LWLockCounter;
583 
584  LWLockCounter = (int *) ((char *) MainLWLockArray - sizeof(int));
586  result = (*LWLockCounter)++;
588 
589  return result;
590 }
591 
592 /*
593  * Register a tranche ID in the lookup table for the current process. This
594  * routine will save a pointer to the tranche name passed as an argument,
595  * so the name should be allocated in a backend-lifetime context
596  * (TopMemoryContext, static variable, or similar).
597  */
598 void
599 LWLockRegisterTranche(int tranche_id, const char *tranche_name)
600 {
601  Assert(LWLockTrancheArray != NULL);
602 
603  if (tranche_id >= LWLockTranchesAllocated)
604  {
606  int j = LWLockTranchesAllocated;
607 
608  while (i <= tranche_id)
609  i *= 2;
610 
611  LWLockTrancheArray = (const char **)
612  repalloc(LWLockTrancheArray, i * sizeof(char *));
614  while (j < LWLockTranchesAllocated)
615  LWLockTrancheArray[j++] = NULL;
616  }
617 
618  LWLockTrancheArray[tranche_id] = tranche_name;
619 }
620 
621 /*
622  * RequestNamedLWLockTranche
623  * Request that extra LWLocks be allocated during postmaster
624  * startup.
625  *
626  * This is only useful for extensions if called from the _PG_init hook
627  * of a library that is loaded into the postmaster via
628  * shared_preload_libraries. Once shared memory has been allocated, calls
629  * will be ignored. (We could raise an error, but it seems better to make
630  * it a no-op, so that libraries containing such calls can be reloaded if
631  * needed.)
632  */
633 void
634 RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
635 {
636  NamedLWLockTrancheRequest *request;
637 
639  return; /* too late */
640 
641  if (NamedLWLockTrancheRequestArray == NULL)
642  {
644  NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
647  * sizeof(NamedLWLockTrancheRequest));
648  }
649 
651  {
653 
654  while (i <= NamedLWLockTrancheRequests)
655  i *= 2;
656 
657  NamedLWLockTrancheRequestArray = (NamedLWLockTrancheRequest *)
658  repalloc(NamedLWLockTrancheRequestArray,
659  i * sizeof(NamedLWLockTrancheRequest));
661  }
662 
663  request = &NamedLWLockTrancheRequestArray[NamedLWLockTrancheRequests];
664  Assert(strlen(tranche_name) + 1 < NAMEDATALEN);
665  StrNCpy(request->tranche_name, tranche_name, NAMEDATALEN);
666  request->num_lwlocks = num_lwlocks;
668 }
669 
670 /*
671  * LWLockInitialize - initialize a new lwlock; it's initially unlocked
672  */
673 void
674 LWLockInitialize(LWLock *lock, int tranche_id)
675 {
677 #ifdef LOCK_DEBUG
678  pg_atomic_init_u32(&lock->nwaiters, 0);
679 #endif
680  lock->tranche = tranche_id;
681  proclist_init(&lock->waiters);
682 }
683 
684 /*
685  * Report start of wait event for light-weight locks.
686  *
687  * This function will be used by all the light-weight lock calls which
688  * needs to wait to acquire the lock. This function distinguishes wait
689  * event based on tranche and lock id.
690  */
691 static inline void
693 {
695 }
696 
697 /*
698  * Report end of wait event for light-weight locks.
699  */
700 static inline void
702 {
704 }
705 
706 /*
707  * Return an identifier for an LWLock based on the wait class and event.
708  */
709 const char *
711 {
712  Assert(classId == PG_WAIT_LWLOCK);
713 
714  /*
715  * It is quite possible that user has registered tranche in one of the
716  * backends (e.g. by allocating lwlocks in dynamic shared memory) but not
717  * all of them, so we can't assume the tranche is registered here.
718  */
719  if (eventId >= LWLockTranchesAllocated ||
720  LWLockTrancheArray[eventId] == NULL)
721  return "extension";
722 
723  return LWLockTrancheArray[eventId];
724 }
725 
726 /*
727  * Internal function that tries to atomically acquire the lwlock in the passed
728  * in mode.
729  *
730  * This function will not block waiting for a lock to become free - that's the
731  * callers job.
732  *
733  * Returns true if the lock isn't free and we need to wait.
734  */
735 static bool
737 {
738  uint32 old_state;
739 
740  AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
741 
742  /*
743  * Read once outside the loop, later iterations will get the newer value
744  * via compare & exchange.
745  */
746  old_state = pg_atomic_read_u32(&lock->state);
747 
748  /* loop until we've determined whether we could acquire the lock or not */
749  while (true)
750  {
751  uint32 desired_state;
752  bool lock_free;
753 
754  desired_state = old_state;
755 
756  if (mode == LW_EXCLUSIVE)
757  {
758  lock_free = (old_state & LW_LOCK_MASK) == 0;
759  if (lock_free)
760  desired_state += LW_VAL_EXCLUSIVE;
761  }
762  else
763  {
764  lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
765  if (lock_free)
766  desired_state += LW_VAL_SHARED;
767  }
768 
769  /*
770  * Attempt to swap in the state we are expecting. If we didn't see
771  * lock to be free, that's just the old value. If we saw it as free,
772  * we'll attempt to mark it acquired. The reason that we always swap
773  * in the value is that this doubles as a memory barrier. We could try
774  * to be smarter and only swap in values if we saw the lock as free,
775  * but benchmark haven't shown it as beneficial so far.
776  *
777  * Retry if the value changed since we last looked at it.
778  */
780  &old_state, desired_state))
781  {
782  if (lock_free)
783  {
784  /* Great! Got the lock. */
785 #ifdef LOCK_DEBUG
786  if (mode == LW_EXCLUSIVE)
787  lock->owner = MyProc;
788 #endif
789  return false;
790  }
791  else
792  return true; /* somebody else has the lock */
793  }
794  }
795  pg_unreachable();
796 }
797 
798 /*
799  * Lock the LWLock's wait list against concurrent activity.
800  *
801  * NB: even though the wait list is locked, non-conflicting lock operations
802  * may still happen concurrently.
803  *
804  * Time spent holding mutex should be short!
805  */
806 static void
808 {
809  uint32 old_state;
810 #ifdef LWLOCK_STATS
811  lwlock_stats *lwstats;
812  uint32 delays = 0;
813 
814  lwstats = get_lwlock_stats_entry(lock);
815 #endif
816 
817  while (true)
818  {
819  /* always try once to acquire lock directly */
820  old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
821  if (!(old_state & LW_FLAG_LOCKED))
822  break; /* got lock */
823 
824  /* and then spin without atomic operations until lock is released */
825  {
826  SpinDelayStatus delayStatus;
827 
828  init_local_spin_delay(&delayStatus);
829 
830  while (old_state & LW_FLAG_LOCKED)
831  {
832  perform_spin_delay(&delayStatus);
833  old_state = pg_atomic_read_u32(&lock->state);
834  }
835 #ifdef LWLOCK_STATS
836  delays += delayStatus.delays;
837 #endif
838  finish_spin_delay(&delayStatus);
839  }
840 
841  /*
842  * Retry. The lock might obviously already be re-acquired by the time
843  * we're attempting to get it again.
844  */
845  }
846 
847 #ifdef LWLOCK_STATS
848  lwstats->spin_delay_count += delays;
849 #endif
850 }
851 
852 /*
853  * Unlock the LWLock's wait list.
854  *
855  * Note that it can be more efficient to manipulate flags and release the
856  * locks in a single atomic operation.
857  */
858 static void
860 {
862 
863  old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
864 
865  Assert(old_state & LW_FLAG_LOCKED);
866 }
867 
868 /*
869  * Wakeup all the lockers that currently have a chance to acquire the lock.
870  */
871 static void
873 {
874  bool new_release_ok;
875  bool wokeup_somebody = false;
876  proclist_head wakeup;
878 
879  proclist_init(&wakeup);
880 
881  new_release_ok = true;
882 
883  /* lock wait list while collecting backends to wake up */
884  LWLockWaitListLock(lock);
885 
886  proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
887  {
888  PGPROC *waiter = GetPGProcByNumber(iter.cur);
889 
890  if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
891  continue;
892 
893  proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
894  proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
895 
896  if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
897  {
898  /*
899  * Prevent additional wakeups until retryer gets to run. Backends
900  * that are just waiting for the lock to become free don't retry
901  * automatically.
902  */
903  new_release_ok = false;
904 
905  /*
906  * Don't wakeup (further) exclusive locks.
907  */
908  wokeup_somebody = true;
909  }
910 
911  /*
912  * Once we've woken up an exclusive lock, there's no point in waking
913  * up anybody else.
914  */
915  if (waiter->lwWaitMode == LW_EXCLUSIVE)
916  break;
917  }
918 
920 
921  /* unset required flags, and release lock, in one fell swoop */
922  {
923  uint32 old_state;
924  uint32 desired_state;
925 
926  old_state = pg_atomic_read_u32(&lock->state);
927  while (true)
928  {
929  desired_state = old_state;
930 
931  /* compute desired flags */
932 
933  if (new_release_ok)
934  desired_state |= LW_FLAG_RELEASE_OK;
935  else
936  desired_state &= ~LW_FLAG_RELEASE_OK;
937 
938  if (proclist_is_empty(&wakeup))
939  desired_state &= ~LW_FLAG_HAS_WAITERS;
940 
941  desired_state &= ~LW_FLAG_LOCKED; /* release lock */
942 
943  if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
944  desired_state))
945  break;
946  }
947  }
948 
949  /* Awaken any waiters I removed from the queue. */
950  proclist_foreach_modify(iter, &wakeup, lwWaitLink)
951  {
952  PGPROC *waiter = GetPGProcByNumber(iter.cur);
953 
954  LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
955  proclist_delete(&wakeup, iter.cur, lwWaitLink);
956 
957  /*
958  * Guarantee that lwWaiting being unset only becomes visible once the
959  * unlink from the link has completed. Otherwise the target backend
960  * could be woken up for other reason and enqueue for a new lock - if
961  * that happens before the list unlink happens, the list would end up
962  * being corrupted.
963  *
964  * The barrier pairs with the LWLockWaitListLock() when enqueuing for
965  * another lock.
966  */
968  waiter->lwWaiting = false;
969  PGSemaphoreUnlock(waiter->sem);
970  }
971 }
972 
973 /*
974  * Add ourselves to the end of the queue.
975  *
976  * NB: Mode can be LW_WAIT_UNTIL_FREE here!
977  */
978 static void
980 {
981  /*
982  * If we don't have a PGPROC structure, there's no way to wait. This
983  * should never occur, since MyProc should only be null during shared
984  * memory initialization.
985  */
986  if (MyProc == NULL)
987  elog(PANIC, "cannot wait without a PGPROC structure");
988 
989  if (MyProc->lwWaiting)
990  elog(PANIC, "queueing for lock while waiting on another one");
991 
992  LWLockWaitListLock(lock);
993 
994  /* setting the flag is protected by the spinlock */
996 
997  MyProc->lwWaiting = true;
999 
1000  /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
1001  if (mode == LW_WAIT_UNTIL_FREE)
1002  proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1003  else
1004  proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
1005 
1006  /* Can release the mutex now */
1007  LWLockWaitListUnlock(lock);
1008 
1009 #ifdef LOCK_DEBUG
1010  pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
1011 #endif
1012 
1013 }
1014 
1015 /*
1016  * Remove ourselves from the waitlist.
1017  *
1018  * This is used if we queued ourselves because we thought we needed to sleep
1019  * but, after further checking, we discovered that we don't actually need to
1020  * do so.
1021  */
1022 static void
1024 {
1025  bool found = false;
1026  proclist_mutable_iter iter;
1027 
1028 #ifdef LWLOCK_STATS
1029  lwlock_stats *lwstats;
1030 
1031  lwstats = get_lwlock_stats_entry(lock);
1032 
1033  lwstats->dequeue_self_count++;
1034 #endif
1035 
1036  LWLockWaitListLock(lock);
1037 
1038  /*
1039  * Can't just remove ourselves from the list, but we need to iterate over
1040  * all entries as somebody else could have dequeued us.
1041  */
1042  proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1043  {
1044  if (iter.cur == MyProc->pgprocno)
1045  {
1046  found = true;
1047  proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1048  break;
1049  }
1050  }
1051 
1052  if (proclist_is_empty(&lock->waiters) &&
1053  (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
1054  {
1056  }
1057 
1058  /* XXX: combine with fetch_and above? */
1059  LWLockWaitListUnlock(lock);
1060 
1061  /* clear waiting state again, nice for debugging */
1062  if (found)
1063  MyProc->lwWaiting = false;
1064  else
1065  {
1066  int extraWaits = 0;
1067 
1068  /*
1069  * Somebody else dequeued us and has or will wake us up. Deal with the
1070  * superfluous absorption of a wakeup.
1071  */
1072 
1073  /*
1074  * Reset releaseOk if somebody woke us before we removed ourselves -
1075  * they'll have set it to false.
1076  */
1078 
1079  /*
1080  * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
1081  * get reset at some inconvenient point later. Most of the time this
1082  * will immediately return.
1083  */
1084  for (;;)
1085  {
1087  if (!MyProc->lwWaiting)
1088  break;
1089  extraWaits++;
1090  }
1091 
1092  /*
1093  * Fix the process wait semaphore's count for any absorbed wakeups.
1094  */
1095  while (extraWaits-- > 0)
1097  }
1098 
1099 #ifdef LOCK_DEBUG
1100  {
1101  /* not waiting anymore */
1102  uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1103 
1104  Assert(nwaiters < MAX_BACKENDS);
1105  }
1106 #endif
1107 }
1108 
1109 /*
1110  * LWLockAcquire - acquire a lightweight lock in the specified mode
1111  *
1112  * If the lock is not available, sleep until it is. Returns true if the lock
1113  * was available immediately, false if we had to sleep.
1114  *
1115  * Side effect: cancel/die interrupts are held off until lock release.
1116  */
1117 bool
1119 {
1120  PGPROC *proc = MyProc;
1121  bool result = true;
1122  int extraWaits = 0;
1123 #ifdef LWLOCK_STATS
1124  lwlock_stats *lwstats;
1125 
1126  lwstats = get_lwlock_stats_entry(lock);
1127 #endif
1128 
1129  AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1130 
1131  PRINT_LWDEBUG("LWLockAcquire", lock, mode);
1132 
1133 #ifdef LWLOCK_STATS
1134  /* Count lock acquisition attempts */
1135  if (mode == LW_EXCLUSIVE)
1136  lwstats->ex_acquire_count++;
1137  else
1138  lwstats->sh_acquire_count++;
1139 #endif /* LWLOCK_STATS */
1140 
1141  /*
1142  * We can't wait if we haven't got a PGPROC. This should only occur
1143  * during bootstrap or shared memory initialization. Put an Assert here
1144  * to catch unsafe coding practices.
1145  */
1146  Assert(!(proc == NULL && IsUnderPostmaster));
1147 
1148  /* Ensure we will have room to remember the lock */
1150  elog(ERROR, "too many LWLocks taken");
1151 
1152  /*
1153  * Lock out cancel/die interrupts until we exit the code section protected
1154  * by the LWLock. This ensures that interrupts will not interfere with
1155  * manipulations of data structures in shared memory.
1156  */
1157  HOLD_INTERRUPTS();
1158 
1159  /*
1160  * Loop here to try to acquire lock after each time we are signaled by
1161  * LWLockRelease.
1162  *
1163  * NOTE: it might seem better to have LWLockRelease actually grant us the
1164  * lock, rather than retrying and possibly having to go back to sleep. But
1165  * in practice that is no good because it means a process swap for every
1166  * lock acquisition when two or more processes are contending for the same
1167  * lock. Since LWLocks are normally used to protect not-very-long
1168  * sections of computation, a process needs to be able to acquire and
1169  * release the same lock many times during a single CPU time slice, even
1170  * in the presence of contention. The efficiency of being able to do that
1171  * outweighs the inefficiency of sometimes wasting a process dispatch
1172  * cycle because the lock is not free when a released waiter finally gets
1173  * to run. See pgsql-hackers archives for 29-Dec-01.
1174  */
1175  for (;;)
1176  {
1177  bool mustwait;
1178 
1179  /*
1180  * Try to grab the lock the first time, we're not in the waitqueue
1181  * yet/anymore.
1182  */
1183  mustwait = LWLockAttemptLock(lock, mode);
1184 
1185  if (!mustwait)
1186  {
1187  LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
1188  break; /* got the lock */
1189  }
1190 
1191  /*
1192  * Ok, at this point we couldn't grab the lock on the first try. We
1193  * cannot simply queue ourselves to the end of the list and wait to be
1194  * woken up because by now the lock could long have been released.
1195  * Instead add us to the queue and try to grab the lock again. If we
1196  * succeed we need to revert the queuing and be happy, otherwise we
1197  * recheck the lock. If we still couldn't grab it, we know that the
1198  * other locker will see our queue entries when releasing since they
1199  * existed before we checked for the lock.
1200  */
1201 
1202  /* add to the queue */
1203  LWLockQueueSelf(lock, mode);
1204 
1205  /* we're now guaranteed to be woken up if necessary */
1206  mustwait = LWLockAttemptLock(lock, mode);
1207 
1208  /* ok, grabbed the lock the second time round, need to undo queueing */
1209  if (!mustwait)
1210  {
1211  LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1212 
1213  LWLockDequeueSelf(lock);
1214  break;
1215  }
1216 
1217  /*
1218  * Wait until awakened.
1219  *
1220  * Since we share the process wait semaphore with the regular lock
1221  * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1222  * while one of those is pending, it is possible that we get awakened
1223  * for a reason other than being signaled by LWLockRelease. If so,
1224  * loop back and wait again. Once we've gotten the LWLock,
1225  * re-increment the sema by the number of additional signals received,
1226  * so that the lock manager or signal manager will see the received
1227  * signal when it next waits.
1228  */
1229  LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1230 
1231 #ifdef LWLOCK_STATS
1232  lwstats->block_count++;
1233 #endif
1234 
1235  LWLockReportWaitStart(lock);
1236  TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1237 
1238  for (;;)
1239  {
1240  PGSemaphoreLock(proc->sem);
1241  if (!proc->lwWaiting)
1242  break;
1243  extraWaits++;
1244  }
1245 
1246  /* Retrying, allow LWLockRelease to release waiters again. */
1248 
1249 #ifdef LOCK_DEBUG
1250  {
1251  /* not waiting anymore */
1252  uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1253 
1254  Assert(nwaiters < MAX_BACKENDS);
1255  }
1256 #endif
1257 
1258  TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1260 
1261  LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1262 
1263  /* Now loop back and try to acquire lock again. */
1264  result = false;
1265  }
1266 
1267  TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), mode);
1268 
1269  /* Add lock to list of locks held by this backend */
1270  held_lwlocks[num_held_lwlocks].lock = lock;
1271  held_lwlocks[num_held_lwlocks++].mode = mode;
1272 
1273  /*
1274  * Fix the process wait semaphore's count for any absorbed wakeups.
1275  */
1276  while (extraWaits-- > 0)
1277  PGSemaphoreUnlock(proc->sem);
1278 
1279  return result;
1280 }
1281 
1282 /*
1283  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1284  *
1285  * If the lock is not available, return false with no side-effects.
1286  *
1287  * If successful, cancel/die interrupts are held off until lock release.
1288  */
1289 bool
1291 {
1292  bool mustwait;
1293 
1294  AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1295 
1296  PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1297 
1298  /* Ensure we will have room to remember the lock */
1300  elog(ERROR, "too many LWLocks taken");
1301 
1302  /*
1303  * Lock out cancel/die interrupts until we exit the code section protected
1304  * by the LWLock. This ensures that interrupts will not interfere with
1305  * manipulations of data structures in shared memory.
1306  */
1307  HOLD_INTERRUPTS();
1308 
1309  /* Check for the lock */
1310  mustwait = LWLockAttemptLock(lock, mode);
1311 
1312  if (mustwait)
1313  {
1314  /* Failed to get lock, so release interrupt holdoff */
1316 
1317  LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1318  TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), mode);
1319  }
1320  else
1321  {
1322  /* Add lock to list of locks held by this backend */
1323  held_lwlocks[num_held_lwlocks].lock = lock;
1324  held_lwlocks[num_held_lwlocks++].mode = mode;
1325  TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), mode);
1326  }
1327  return !mustwait;
1328 }
1329 
1330 /*
1331  * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1332  *
1333  * The semantics of this function are a bit funky. If the lock is currently
1334  * free, it is acquired in the given mode, and the function returns true. If
1335  * the lock isn't immediately free, the function waits until it is released
1336  * and returns false, but does not acquire the lock.
1337  *
1338  * This is currently used for WALWriteLock: when a backend flushes the WAL,
1339  * holding WALWriteLock, it can flush the commit records of many other
1340  * backends as a side-effect. Those other backends need to wait until the
1341  * flush finishes, but don't need to acquire the lock anymore. They can just
1342  * wake up, observe that their records have already been flushed, and return.
1343  */
1344 bool
1346 {
1347  PGPROC *proc = MyProc;
1348  bool mustwait;
1349  int extraWaits = 0;
1350 #ifdef LWLOCK_STATS
1351  lwlock_stats *lwstats;
1352 
1353  lwstats = get_lwlock_stats_entry(lock);
1354 #endif
1355 
1356  Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1357 
1358  PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1359 
1360  /* Ensure we will have room to remember the lock */
1362  elog(ERROR, "too many LWLocks taken");
1363 
1364  /*
1365  * Lock out cancel/die interrupts until we exit the code section protected
1366  * by the LWLock. This ensures that interrupts will not interfere with
1367  * manipulations of data structures in shared memory.
1368  */
1369  HOLD_INTERRUPTS();
1370 
1371  /*
1372  * NB: We're using nearly the same twice-in-a-row lock acquisition
1373  * protocol as LWLockAcquire(). Check its comments for details.
1374  */
1375  mustwait = LWLockAttemptLock(lock, mode);
1376 
1377  if (mustwait)
1378  {
1380 
1381  mustwait = LWLockAttemptLock(lock, mode);
1382 
1383  if (mustwait)
1384  {
1385  /*
1386  * Wait until awakened. Like in LWLockAcquire, be prepared for
1387  * bogus wakeups, because we share the semaphore with
1388  * ProcWaitForSignal.
1389  */
1390  LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1391 
1392 #ifdef LWLOCK_STATS
1393  lwstats->block_count++;
1394 #endif
1395 
1396  LWLockReportWaitStart(lock);
1397  TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
1398 
1399  for (;;)
1400  {
1401  PGSemaphoreLock(proc->sem);
1402  if (!proc->lwWaiting)
1403  break;
1404  extraWaits++;
1405  }
1406 
1407 #ifdef LOCK_DEBUG
1408  {
1409  /* not waiting anymore */
1410  uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1411 
1412  Assert(nwaiters < MAX_BACKENDS);
1413  }
1414 #endif
1415  TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
1417 
1418  LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1419  }
1420  else
1421  {
1422  LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1423 
1424  /*
1425  * Got lock in the second attempt, undo queueing. We need to treat
1426  * this as having successfully acquired the lock, otherwise we'd
1427  * not necessarily wake up people we've prevented from acquiring
1428  * the lock.
1429  */
1430  LWLockDequeueSelf(lock);
1431  }
1432  }
1433 
1434  /*
1435  * Fix the process wait semaphore's count for any absorbed wakeups.
1436  */
1437  while (extraWaits-- > 0)
1438  PGSemaphoreUnlock(proc->sem);
1439 
1440  if (mustwait)
1441  {
1442  /* Failed to get lock, so release interrupt holdoff */
1444  LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1445  TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), mode);
1446  }
1447  else
1448  {
1449  LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1450  /* Add lock to list of locks held by this backend */
1451  held_lwlocks[num_held_lwlocks].lock = lock;
1452  held_lwlocks[num_held_lwlocks++].mode = mode;
1453  TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), mode);
1454  }
1455 
1456  return !mustwait;
1457 }
1458 
1459 /*
1460  * Does the lwlock in its current state need to wait for the variable value to
1461  * change?
1462  *
1463  * If we don't need to wait, and it's because the value of the variable has
1464  * changed, store the current value in newval.
1465  *
1466  * *result is set to true if the lock was free, and false otherwise.
1467  */
1468 static bool
1470  uint64 *valptr, uint64 oldval, uint64 *newval,
1471  bool *result)
1472 {
1473  bool mustwait;
1474  uint64 value;
1475 
1476  /*
1477  * Test first to see if it the slot is free right now.
1478  *
1479  * XXX: the caller uses a spinlock before this, so we don't need a memory
1480  * barrier here as far as the current usage is concerned. But that might
1481  * not be safe in general.
1482  */
1483  mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1484 
1485  if (!mustwait)
1486  {
1487  *result = true;
1488  return false;
1489  }
1490 
1491  *result = false;
1492 
1493  /*
1494  * Read value using the lwlock's wait list lock, as we can't generally
1495  * rely on atomic 64 bit reads/stores. TODO: On platforms with a way to
1496  * do atomic 64 bit reads/writes the spinlock should be optimized away.
1497  */
1498  LWLockWaitListLock(lock);
1499  value = *valptr;
1500  LWLockWaitListUnlock(lock);
1501 
1502  if (value != oldval)
1503  {
1504  mustwait = false;
1505  *newval = value;
1506  }
1507  else
1508  {
1509  mustwait = true;
1510  }
1511 
1512  return mustwait;
1513 }
1514 
1515 /*
1516  * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1517  *
1518  * If the lock is held and *valptr equals oldval, waits until the lock is
1519  * either freed, or the lock holder updates *valptr by calling
1520  * LWLockUpdateVar. If the lock is free on exit (immediately or after
1521  * waiting), returns true. If the lock is still held, but *valptr no longer
1522  * matches oldval, returns false and sets *newval to the current value in
1523  * *valptr.
1524  *
1525  * Note: this function ignores shared lock holders; if the lock is held
1526  * in shared mode, returns 'true'.
1527  */
1528 bool
1529 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1530 {
1531  PGPROC *proc = MyProc;
1532  int extraWaits = 0;
1533  bool result = false;
1534 #ifdef LWLOCK_STATS
1535  lwlock_stats *lwstats;
1536 
1537  lwstats = get_lwlock_stats_entry(lock);
1538 #endif
1539 
1540  PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1541 
1542  /*
1543  * Lock out cancel/die interrupts while we sleep on the lock. There is no
1544  * cleanup mechanism to remove us from the wait queue if we got
1545  * interrupted.
1546  */
1547  HOLD_INTERRUPTS();
1548 
1549  /*
1550  * Loop here to check the lock's status after each time we are signaled.
1551  */
1552  for (;;)
1553  {
1554  bool mustwait;
1555 
1556  mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1557  &result);
1558 
1559  if (!mustwait)
1560  break; /* the lock was free or value didn't match */
1561 
1562  /*
1563  * Add myself to wait queue. Note that this is racy, somebody else
1564  * could wakeup before we're finished queuing. NB: We're using nearly
1565  * the same twice-in-a-row lock acquisition protocol as
1566  * LWLockAcquire(). Check its comments for details. The only
1567  * difference is that we also have to check the variable's values when
1568  * checking the state of the lock.
1569  */
1571 
1572  /*
1573  * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1574  * lock is released.
1575  */
1577 
1578  /*
1579  * We're now guaranteed to be woken up if necessary. Recheck the lock
1580  * and variables state.
1581  */
1582  mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
1583  &result);
1584 
1585  /* Ok, no conflict after we queued ourselves. Undo queueing. */
1586  if (!mustwait)
1587  {
1588  LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1589 
1590  LWLockDequeueSelf(lock);
1591  break;
1592  }
1593 
1594  /*
1595  * Wait until awakened.
1596  *
1597  * Since we share the process wait semaphore with the regular lock
1598  * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1599  * while one of those is pending, it is possible that we get awakened
1600  * for a reason other than being signaled by LWLockRelease. If so,
1601  * loop back and wait again. Once we've gotten the LWLock,
1602  * re-increment the sema by the number of additional signals received,
1603  * so that the lock manager or signal manager will see the received
1604  * signal when it next waits.
1605  */
1606  LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1607 
1608 #ifdef LWLOCK_STATS
1609  lwstats->block_count++;
1610 #endif
1611 
1612  LWLockReportWaitStart(lock);
1613  TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), LW_EXCLUSIVE);
1614 
1615  for (;;)
1616  {
1617  PGSemaphoreLock(proc->sem);
1618  if (!proc->lwWaiting)
1619  break;
1620  extraWaits++;
1621  }
1622 
1623 #ifdef LOCK_DEBUG
1624  {
1625  /* not waiting anymore */
1626  uint32 nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1627 
1628  Assert(nwaiters < MAX_BACKENDS);
1629  }
1630 #endif
1631 
1632  TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), LW_EXCLUSIVE);
1634 
1635  LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1636 
1637  /* Now loop back and check the status of the lock again. */
1638  }
1639 
1640  TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), LW_EXCLUSIVE);
1641 
1642  /*
1643  * Fix the process wait semaphore's count for any absorbed wakeups.
1644  */
1645  while (extraWaits-- > 0)
1646  PGSemaphoreUnlock(proc->sem);
1647 
1648  /*
1649  * Now okay to allow cancel/die interrupts.
1650  */
1652 
1653  return result;
1654 }
1655 
1656 
1657 /*
1658  * LWLockUpdateVar - Update a variable and wake up waiters atomically
1659  *
1660  * Sets *valptr to 'val', and wakes up all processes waiting for us with
1661  * LWLockWaitForVar(). Setting the value and waking up the processes happen
1662  * atomically so that any process calling LWLockWaitForVar() on the same lock
1663  * is guaranteed to see the new value, and act accordingly.
1664  *
1665  * The caller must be holding the lock in exclusive mode.
1666  */
1667 void
1668 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1669 {
1670  proclist_head wakeup;
1671  proclist_mutable_iter iter;
1672 
1673  PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1674 
1675  proclist_init(&wakeup);
1676 
1677  LWLockWaitListLock(lock);
1678 
1680 
1681  /* Update the lock's value */
1682  *valptr = val;
1683 
1684  /*
1685  * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1686  * up. They are always in the front of the queue.
1687  */
1688  proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
1689  {
1690  PGPROC *waiter = GetPGProcByNumber(iter.cur);
1691 
1692  if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1693  break;
1694 
1695  proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
1696  proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
1697  }
1698 
1699  /* We are done updating shared state of the lock itself. */
1700  LWLockWaitListUnlock(lock);
1701 
1702  /*
1703  * Awaken any waiters I removed from the queue.
1704  */
1705  proclist_foreach_modify(iter, &wakeup, lwWaitLink)
1706  {
1707  PGPROC *waiter = GetPGProcByNumber(iter.cur);
1708 
1709  proclist_delete(&wakeup, iter.cur, lwWaitLink);
1710  /* check comment in LWLockWakeup() about this barrier */
1711  pg_write_barrier();
1712  waiter->lwWaiting = false;
1713  PGSemaphoreUnlock(waiter->sem);
1714  }
1715 }
1716 
1717 
1718 /*
1719  * LWLockRelease - release a previously acquired lock
1720  */
1721 void
1723 {
1724  LWLockMode mode;
1725  uint32 oldstate;
1726  bool check_waiters;
1727  int i;
1728 
1729  /*
1730  * Remove lock from list of locks held. Usually, but not always, it will
1731  * be the latest-acquired lock; so search array backwards.
1732  */
1733  for (i = num_held_lwlocks; --i >= 0;)
1734  if (lock == held_lwlocks[i].lock)
1735  break;
1736 
1737  if (i < 0)
1738  elog(ERROR, "lock %s is not held", T_NAME(lock));
1739 
1740  mode = held_lwlocks[i].mode;
1741 
1742  num_held_lwlocks--;
1743  for (; i < num_held_lwlocks; i++)
1744  held_lwlocks[i] = held_lwlocks[i + 1];
1745 
1746  PRINT_LWDEBUG("LWLockRelease", lock, mode);
1747 
1748  /*
1749  * Release my hold on lock, after that it can immediately be acquired by
1750  * others, even if we still have to wakeup other waiters.
1751  */
1752  if (mode == LW_EXCLUSIVE)
1753  oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1754  else
1755  oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1756 
1757  /* nobody else can have that kind of lock */
1758  Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1759 
1760 
1761  /*
1762  * We're still waiting for backends to get scheduled, don't wake them up
1763  * again.
1764  */
1765  if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1767  (oldstate & LW_LOCK_MASK) == 0)
1768  check_waiters = true;
1769  else
1770  check_waiters = false;
1771 
1772  /*
1773  * As waking up waiters requires the spinlock to be acquired, only do so
1774  * if necessary.
1775  */
1776  if (check_waiters)
1777  {
1778  /* XXX: remove before commit? */
1779  LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1780  LWLockWakeup(lock);
1781  }
1782 
1783  TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
1784 
1785  /*
1786  * Now okay to allow cancel/die interrupts.
1787  */
1789 }
1790 
1791 /*
1792  * LWLockReleaseClearVar - release a previously acquired lock, reset variable
1793  */
1794 void
1795 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
1796 {
1797  LWLockWaitListLock(lock);
1798 
1799  /*
1800  * Set the variable's value before releasing the lock, that prevents race
1801  * a race condition wherein a new locker acquires the lock, but hasn't yet
1802  * set the variables value.
1803  */
1804  *valptr = val;
1805  LWLockWaitListUnlock(lock);
1806 
1807  LWLockRelease(lock);
1808 }
1809 
1810 
1811 /*
1812  * LWLockReleaseAll - release all currently-held locks
1813  *
1814  * Used to clean up after ereport(ERROR). An important difference between this
1815  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1816  * unchanged by this operation. This is necessary since InterruptHoldoffCount
1817  * has been set to an appropriate level earlier in error recovery. We could
1818  * decrement it below zero if we allow it to drop for each released lock!
1819  */
1820 void
1822 {
1823  while (num_held_lwlocks > 0)
1824  {
1825  HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1826 
1827  LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1828  }
1829 }
1830 
1831 
1832 /*
1833  * LWLockHeldByMe - test whether my process holds a lock in any mode
1834  *
1835  * This is meant as debug support only.
1836  */
1837 bool
1839 {
1840  int i;
1841 
1842  for (i = 0; i < num_held_lwlocks; i++)
1843  {
1844  if (held_lwlocks[i].lock == l)
1845  return true;
1846  }
1847  return false;
1848 }
1849 
1850 /*
1851  * LWLockHeldByMeInMode - test whether my process holds a lock in given mode
1852  *
1853  * This is meant as debug support only.
1854  */
1855 bool
1857 {
1858  int i;
1859 
1860  for (i = 0; i < num_held_lwlocks; i++)
1861  {
1862  if (held_lwlocks[i].lock == l && held_lwlocks[i].mode == mode)
1863  return true;
1864  }
1865  return false;
1866 }
#define T_NAME(lock)
Definition: lwlock.c:117
int slock_t
Definition: s_lock.h:912
#define init_local_spin_delay(status)
Definition: s_lock.h:1021
static uint32 pg_atomic_fetch_sub_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:354
Definition: lwlock.h:32
#define pg_unreachable()
Definition: c.h:185
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:198
bool LWLockHeldByMeInMode(LWLock *l, LWLockMode mode)
Definition: lwlock.c:1856
#define GetPGProcByNumber(n)
Definition: proc.h:277
static bool LWLockAttemptLock(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:736
int MyProcPid
Definition: globals.c:39
#define LW_FLAG_LOCKED
Definition: lwlock.c:101
int LWLockNewTrancheId(void)
Definition: lwlock.c:579
LWLockMode
Definition: lwlock.h:132
#define HASH_CONTEXT
Definition: hsearch.h:93
void MemoryContextAllowInCriticalSection(MemoryContext context, bool allow)
Definition: mcxt.c:374
#define HASH_ELEM
Definition: hsearch.h:87
void PGSemaphoreUnlock(PGSemaphore sema)
Definition: posix_sema.c:323
MemoryContext hcxt
Definition: hsearch.h:78
void RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
Definition: lwlock.c:634
static bool lock_named_request_allowed
Definition: lwlock.c:158
bool LWLockHeldByMe(LWLock *l)
Definition: lwlock.c:1838
#define LW_FLAG_HAS_WAITERS
Definition: lwlock.c:99
static bool pg_atomic_compare_exchange_u32(volatile pg_atomic_uint32 *ptr, uint32 *expected, uint32 newval)
Definition: atomics.h:322
PGPROC * MyProc
Definition: proc.c:67
proclist_head waiters
Definition: lwlock.h:36
static uint32 pg_atomic_sub_fetch_u32(volatile pg_atomic_uint32 *ptr, int32 sub_)
Definition: atomics.h:412
void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
Definition: lwlock.c:1668
char tranche_name[NAMEDATALEN]
Definition: lwlock.c:148
struct LWLockHandle LWLockHandle
bool lwWaiting
Definition: proc.h:127
#define MAX_SIMUL_LWLOCKS
Definition: lwlock.c:133
Size entrysize
Definition: hsearch.h:73
int errhidestmt(bool hide_stmt)
Definition: elog.c:1068
#define MemSet(start, val, len)
Definition: c.h:863
static void LWLockWaitListUnlock(LWLock *lock)
Definition: lwlock.c:859
#define proclist_foreach_modify(iter, lhead, link_member)
Definition: proclist.h:195
#define LW_SHARED_MASK
Definition: lwlock.c:108
uint8 lwWaitMode
Definition: proc.h:128
void * ShmemAlloc(Size size)
Definition: shmem.c:157
pg_atomic_uint32 state
Definition: lwlock.h:35
#define LW_LOCK_MASK
Definition: lwlock.c:106
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:904
#define LOG
Definition: elog.h:26
NamedLWLockTranche * NamedLWLockTrancheArray
Definition: lwlock.c:156
#define PANIC
Definition: elog.h:53
#define proclist_delete(list, procno, link_member)
Definition: proclist.h:176
Size LWLockShmemSize(void)
Definition: lwlock.c:349
#define LW_VAL_SHARED
Definition: lwlock.c:104
void LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
Definition: lwlock.c:1795
void LWLockRegisterTranche(int tranche_id, const char *tranche_name)
Definition: lwlock.c:599
static void RegisterLWLockTranches(void)
Definition: lwlock.c:491
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1722
#define RESUME_INTERRUPTS()
Definition: miscadmin.h:117
static int LWLockTranchesAllocated
Definition: lwlock.c:115
#define NAMEDATALEN
#define SpinLockAcquire(lock)
Definition: spin.h:62
Definition: dynahash.c:208
#define NUM_FIXED_LWLOCKS
Definition: lwlock.h:129
unsigned short uint16
Definition: c.h:305
static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS]
Definition: lwlock.c:143
#define LWLOCK_MINIMAL_SIZE
Definition: lwlock.h:74
#define ERROR
Definition: elog.h:43
#define MAX_BACKENDS
Definition: postmaster.h:75
LWLockMode mode
Definition: lwlock.c:139
#define proclist_push_head(list, procno, link_member)
Definition: proclist.h:178
int NamedLWLockTrancheRequests
Definition: lwlock.c:154
static struct @121 value
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:197
void finish_spin_delay(SpinDelayStatus *status)
Definition: s_lock.c:175
void on_shmem_exit(pg_on_exit_callback function, Datum arg)
Definition: ipc.c:348
bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1290
bool IsUnderPostmaster
Definition: globals.c:101
bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
Definition: lwlock.c:1529
static const char ** LWLockTrancheArray
Definition: lwlock.c:114
static void LWLockReportWaitEnd(void)
Definition: lwlock.c:701
static void LWLockWakeup(LWLock *lock)
Definition: lwlock.c:872
static uint32 pg_atomic_fetch_and_u32(volatile pg_atomic_uint32 *ptr, uint32 and_)
Definition: atomics.h:369
#define NUM_BUFFER_PARTITIONS
Definition: lwlock.h:113
unsigned int uint32
Definition: c.h:306
static void InitializeLWLocks(void)
Definition: lwlock.c:426
static void pgstat_report_wait_end(void)
Definition: pgstat.h:1244
uint16 tranche
Definition: lwlock.h:34
static void LWLockDequeueSelf(LWLock *lock)
Definition: lwlock.c:1023
#define ereport(elevel, rest)
Definition: elog.h:122
#define AssertArg(condition)
Definition: c.h:682
char * trancheName
Definition: lwlock.h:97
MemoryContext TopMemoryContext
Definition: mcxt.c:43
void LWLockInitialize(LWLock *lock, int tranche_id)
Definition: lwlock.c:674
#define AllocSetContextCreate(parent, name, allocparams)
Definition: memutils.h:165
static void LWLockReportWaitStart(LWLock *lock)
Definition: lwlock.c:692
#define SpinLockRelease(lock)
Definition: spin.h:64
#define HASH_BLOBS
Definition: hsearch.h:88
slock_t * ShmemLock
Definition: shmem.c:84
Size mul_size(Size s1, Size s2)
Definition: shmem.c:492
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:316
uintptr_t Datum
Definition: postgres.h:372
Size add_size(Size s1, Size s2)
Definition: shmem.c:475
static int NamedLWLockTrancheRequestsAllocated
Definition: lwlock.c:153
Size keysize
Definition: hsearch.h:72
bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1345
#define StaticAssertExpr(condition, errmessage)
Definition: c.h:765
void * MemoryContextAllocZero(MemoryContext context, Size size)
Definition: mcxt.c:728
static void LWLockQueueSelf(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:979
static int num_held_lwlocks
Definition: lwlock.c:142
LWLock lock
Definition: lwlock.h:79
static uint32 pg_atomic_fetch_add_u32(volatile pg_atomic_uint32 *ptr, int32 add_)
Definition: atomics.h:339
int errmsg_internal(const char *fmt,...)
Definition: elog.c:827
void CreateLWLocks(void)
Definition: lwlock.c:381
#define Assert(condition)
Definition: c.h:680
#define StrNCpy(dst, src, len)
Definition: c.h:836
LWLockPadded * GetNamedLWLockTranche(const char *tranche_name)
Definition: lwlock.c:548
Definition: regguts.h:298
size_t Size
Definition: c.h:414
static void pgstat_report_wait_start(uint32 wait_event_info)
Definition: pgstat.h:1220
LWLock * lock
Definition: lwlock.c:138
#define newval
#define PRINT_LWDEBUG(a, b, c)
Definition: lwlock.c:227
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1118
static bool proclist_is_empty(proclist_head *list)
Definition: proclist.h:38
#define LW_FLAG_RELEASE_OK
Definition: lwlock.c:100
char * MainLWLockNames[]
void * hash_seq_search(HASH_SEQ_STATUS *status)
Definition: dynahash.c:1387
static int NumLWLocksByNamedTranches(void)
Definition: lwlock.c:334
void * repalloc(void *pointer, Size size)
Definition: mcxt.c:949
void hash_seq_init(HASH_SEQ_STATUS *status, HTAB *hashp)
Definition: dynahash.c:1377
#define proclist_push_tail(list, procno, link_member)
Definition: proclist.h:180
void PGSemaphoreLock(PGSemaphore sema)
Definition: posix_sema.c:303
const char * name
Definition: encode.c:521
NamedLWLockTrancheRequest * NamedLWLockTrancheRequestArray
Definition: lwlock.c:152
static bool LWLockConflictsWithVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval, bool *result)
Definition: lwlock.c:1469
static void LWLockWaitListLock(LWLock *lock)
Definition: lwlock.c:807
struct NamedLWLockTrancheRequest NamedLWLockTrancheRequest
int pgprocno
Definition: proc.h:110
#define LW_VAL_EXCLUSIVE
Definition: lwlock.c:103
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:693
const char * GetLWLockIdentifier(uint32 classId, uint16 eventId)
Definition: lwlock.c:710
#define HOLD_INTERRUPTS()
Definition: miscadmin.h:115
int i
static void proclist_init(proclist_head *list)
Definition: proclist.h:29
#define pg_write_barrier()
Definition: atomics.h:162
void * arg
static uint32 pg_atomic_fetch_or_u32(volatile pg_atomic_uint32 *ptr, uint32 or_)
Definition: atomics.h:383
static void pg_atomic_init_u32(volatile pg_atomic_uint32 *ptr, uint32 val)
Definition: atomics.h:234
#define elog
Definition: elog.h:219
void LWLockReleaseAll(void)
Definition: lwlock.c:1821
#define LWLOCK_PADDED_SIZE
Definition: lwlock.h:73
PGSemaphore sem
Definition: proc.h:101
#define PG_USED_FOR_ASSERTS_ONLY
Definition: c.h:122
LWLockPadded * MainLWLockArray
Definition: lwlock.c:125
#define PG_WAIT_LWLOCK
Definition: pgstat.h:737
Definition: proc.h:95
long val
Definition: informix.c:689
int errhidecontext(bool hide_ctx)
Definition: elog.c:1087
void InitLWLockAccess(void)
Definition: lwlock.c:532
#define NUM_LOCK_PARTITIONS
Definition: lwlock.h:117
void perform_spin_delay(SpinDelayStatus *status)
Definition: s_lock.c:125
static uint32 pg_atomic_read_u32(volatile pg_atomic_uint32 *ptr)
Definition: atomics.h:252
#define NUM_PREDICATELOCK_PARTITIONS
Definition: lwlock.h:121
#define LOG_LWDEBUG(a, b, c)
Definition: lwlock.c:228