PostgreSQL Source Code  git master
inv_api.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * inv_api.c
4  * routines for manipulating inversion fs large objects. This file
5  * contains the user-level large object application interface routines.
6  *
7  *
8  * Note: we access pg_largeobject.data using its C struct declaration.
9  * This is safe because it immediately follows pageno which is an int4 field,
10  * and therefore the data field will always be 4-byte aligned, even if it
11  * is in the short 1-byte-header format. We have to detoast it since it's
12  * quite likely to be in compressed or short format. We also need to check
13  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14  *
15  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16  * does most of the backend code. We expect that CurrentMemoryContext will
17  * be a short-lived context. Data that must persist across function calls
18  * is kept either in CacheMemoryContext (the Relation structs) or in the
19  * memory context given to inv_open (for LargeObjectDesc structs).
20  *
21  *
22  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
23  * Portions Copyright (c) 1994, Regents of the University of California
24  *
25  *
26  * IDENTIFICATION
27  * src/backend/storage/large_object/inv_api.c
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32 
33 #include <limits.h>
34 
35 #include "access/detoast.h"
36 #include "access/genam.h"
37 #include "access/htup_details.h"
38 #include "access/sysattr.h"
39 #include "access/table.h"
40 #include "access/xact.h"
41 #include "catalog/dependency.h"
42 #include "catalog/indexing.h"
43 #include "catalog/objectaccess.h"
44 #include "catalog/pg_largeobject.h"
46 #include "libpq/libpq-fs.h"
47 #include "miscadmin.h"
48 #include "storage/large_object.h"
49 #include "utils/acl.h"
50 #include "utils/fmgroids.h"
51 #include "utils/rel.h"
52 #include "utils/snapmgr.h"
53 
54 
55 /*
56  * GUC: backwards-compatibility flag to suppress LO permission checks
57  */
59 
60 /*
61  * All accesses to pg_largeobject and its index make use of a single
62  * Relation reference. To guarantee that the relcache entry remains
63  * in the cache, on the first reference inside a subtransaction, we
64  * execute a slightly klugy maneuver to assign ownership of the
65  * Relation reference to TopTransactionResourceOwner.
66  */
67 static Relation lo_heap_r = NULL;
68 static Relation lo_index_r = NULL;
69 
70 
71 /*
72  * Open pg_largeobject and its index, if not already done in current xact
73  */
74 static void
76 {
77  ResourceOwner currentOwner;
78 
79  if (lo_heap_r && lo_index_r)
80  return; /* already open in current xact */
81 
82  /* Arrange for the top xact to own these relation references */
83  currentOwner = CurrentResourceOwner;
85 
86  /* Use RowExclusiveLock since we might either read or write */
87  if (lo_heap_r == NULL)
88  lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
89  if (lo_index_r == NULL)
90  lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
91 
92  CurrentResourceOwner = currentOwner;
93 }
94 
95 /*
96  * Clean up at main transaction end
97  */
98 void
99 close_lo_relation(bool isCommit)
100 {
101  if (lo_heap_r || lo_index_r)
102  {
103  /*
104  * Only bother to close if committing; else abort cleanup will handle
105  * it
106  */
107  if (isCommit)
108  {
109  ResourceOwner currentOwner;
110 
111  currentOwner = CurrentResourceOwner;
113 
114  if (lo_index_r)
116  if (lo_heap_r)
118 
119  CurrentResourceOwner = currentOwner;
120  }
121  lo_heap_r = NULL;
122  lo_index_r = NULL;
123  }
124 }
125 
126 
127 /*
128  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
129  * read with can be specified.
130  */
131 static bool
133 {
134  Relation pg_lo_meta;
135  ScanKeyData skey[1];
136  SysScanDesc sd;
137  HeapTuple tuple;
138  bool retval = false;
139 
140  ScanKeyInit(&skey[0],
141  Anum_pg_largeobject_metadata_oid,
142  BTEqualStrategyNumber, F_OIDEQ,
143  ObjectIdGetDatum(loid));
144 
145  pg_lo_meta = table_open(LargeObjectMetadataRelationId,
147 
148  sd = systable_beginscan(pg_lo_meta,
149  LargeObjectMetadataOidIndexId, true,
150  snapshot, 1, skey);
151 
152  tuple = systable_getnext(sd);
153  if (HeapTupleIsValid(tuple))
154  retval = true;
155 
156  systable_endscan(sd);
157 
158  table_close(pg_lo_meta, AccessShareLock);
159 
160  return retval;
161 }
162 
163 
164 /*
165  * Extract data field from a pg_largeobject tuple, detoasting if needed
166  * and verifying that the length is sane. Returns data pointer (a bytea *),
167  * data length, and an indication of whether to pfree the data pointer.
168  */
169 static void
171  bytea **pdatafield,
172  int *plen,
173  bool *pfreeit)
174 {
175  bytea *datafield;
176  int len;
177  bool freeit;
178 
179  datafield = &(tuple->data); /* see note at top of file */
180  freeit = false;
181  if (VARATT_IS_EXTENDED(datafield))
182  {
183  datafield = (bytea *)
184  detoast_attr((struct varlena *) datafield);
185  freeit = true;
186  }
187  len = VARSIZE(datafield) - VARHDRSZ;
188  if (len < 0 || len > LOBLKSIZE)
189  ereport(ERROR,
191  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
192  tuple->loid, tuple->pageno, len)));
193  *pdatafield = datafield;
194  *plen = len;
195  *pfreeit = freeit;
196 }
197 
198 
199 /*
200  * inv_create -- create a new large object
201  *
202  * Arguments:
203  * lobjId - OID to use for new large object, or InvalidOid to pick one
204  *
205  * Returns:
206  * OID of new object
207  *
208  * If lobjId is not InvalidOid, then an error occurs if the OID is already
209  * in use.
210  */
211 Oid
213 {
214  Oid lobjId_new;
215 
216  /*
217  * Create a new largeobject with empty data pages
218  */
219  lobjId_new = LargeObjectCreate(lobjId);
220 
221  /*
222  * dependency on the owner of largeobject
223  *
224  * Note that LO dependencies are recorded using classId
225  * LargeObjectRelationId for backwards-compatibility reasons. Using
226  * LargeObjectMetadataRelationId instead would simplify matters for the
227  * backend, but it'd complicate pg_dump and possibly break other clients.
228  */
229  recordDependencyOnOwner(LargeObjectRelationId,
230  lobjId_new, GetUserId());
231 
232  /* Post creation hook for new large object */
233  InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
234 
235  /*
236  * Advance command counter to make new tuple visible to later operations.
237  */
239 
240  return lobjId_new;
241 }
242 
243 /*
244  * inv_open -- access an existing large object.
245  *
246  * Returns a large object descriptor, appropriately filled in.
247  * The descriptor and subsidiary data are allocated in the specified
248  * memory context, which must be suitably long-lived for the caller's
249  * purposes. If the returned descriptor has a snapshot associated
250  * with it, the caller must ensure that it also lives long enough,
251  * e.g. by calling RegisterSnapshotOnOwner
252  */
254 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
255 {
256  LargeObjectDesc *retval;
257  Snapshot snapshot = NULL;
258  int descflags = 0;
259 
260  /*
261  * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
262  * | INV_READ), the caller being allowed to read the large object
263  * descriptor in either case.
264  */
265  if (flags & INV_WRITE)
266  descflags |= IFS_WRLOCK | IFS_RDLOCK;
267  if (flags & INV_READ)
268  descflags |= IFS_RDLOCK;
269 
270  if (descflags == 0)
271  ereport(ERROR,
272  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
273  errmsg("invalid flags for opening a large object: %d",
274  flags)));
275 
276  /* Get snapshot. If write is requested, use an instantaneous snapshot. */
277  if (descflags & IFS_WRLOCK)
278  snapshot = NULL;
279  else
280  snapshot = GetActiveSnapshot();
281 
282  /* Can't use LargeObjectExists here because we need to specify snapshot */
283  if (!myLargeObjectExists(lobjId, snapshot))
284  ereport(ERROR,
285  (errcode(ERRCODE_UNDEFINED_OBJECT),
286  errmsg("large object %u does not exist", lobjId)));
287 
288  /* Apply permission checks, again specifying snapshot */
289  if ((descflags & IFS_RDLOCK) != 0)
290  {
291  if (!lo_compat_privileges &&
293  GetUserId(),
294  ACL_SELECT,
295  snapshot) != ACLCHECK_OK)
296  ereport(ERROR,
297  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
298  errmsg("permission denied for large object %u",
299  lobjId)));
300  }
301  if ((descflags & IFS_WRLOCK) != 0)
302  {
303  if (!lo_compat_privileges &&
305  GetUserId(),
306  ACL_UPDATE,
307  snapshot) != ACLCHECK_OK)
308  ereport(ERROR,
309  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
310  errmsg("permission denied for large object %u",
311  lobjId)));
312  }
313 
314  /* OK to create a descriptor */
315  retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
316  sizeof(LargeObjectDesc));
317  retval->id = lobjId;
318  retval->offset = 0;
319  retval->flags = descflags;
320 
321  /* caller sets if needed, not used by the functions in this file */
322  retval->subid = InvalidSubTransactionId;
323 
324  /*
325  * The snapshot (if any) is just the currently active snapshot. The
326  * caller will replace it with a longer-lived copy if needed.
327  */
328  retval->snapshot = snapshot;
329 
330  return retval;
331 }
332 
333 /*
334  * Closes a large object descriptor previously made by inv_open(), and
335  * releases the long-term memory used by it.
336  */
337 void
339 {
340  Assert(PointerIsValid(obj_desc));
341  pfree(obj_desc);
342 }
343 
344 /*
345  * Destroys an existing large object (not to be confused with a descriptor!)
346  *
347  * Note we expect caller to have done any required permissions check.
348  */
349 int
350 inv_drop(Oid lobjId)
351 {
352  ObjectAddress object;
353 
354  /*
355  * Delete any comments and dependencies on the large object
356  */
357  object.classId = LargeObjectRelationId;
358  object.objectId = lobjId;
359  object.objectSubId = 0;
360  performDeletion(&object, DROP_CASCADE, 0);
361 
362  /*
363  * Advance command counter so that tuple removal will be seen by later
364  * large-object operations in this transaction.
365  */
367 
368  /* For historical reasons, we always return 1 on success. */
369  return 1;
370 }
371 
372 /*
373  * Determine size of a large object
374  *
375  * NOTE: LOs can contain gaps, just like Unix files. We actually return
376  * the offset of the last byte + 1.
377  */
378 static uint64
380 {
381  uint64 lastbyte = 0;
382  ScanKeyData skey[1];
383  SysScanDesc sd;
384  HeapTuple tuple;
385 
386  Assert(PointerIsValid(obj_desc));
387 
389 
390  ScanKeyInit(&skey[0],
391  Anum_pg_largeobject_loid,
392  BTEqualStrategyNumber, F_OIDEQ,
393  ObjectIdGetDatum(obj_desc->id));
394 
396  obj_desc->snapshot, 1, skey);
397 
398  /*
399  * Because the pg_largeobject index is on both loid and pageno, but we
400  * constrain only loid, a backwards scan should visit all pages of the
401  * large object in reverse pageno order. So, it's sufficient to examine
402  * the first valid tuple (== last valid page).
403  */
405  if (HeapTupleIsValid(tuple))
406  {
408  bytea *datafield;
409  int len;
410  bool pfreeit;
411 
412  if (HeapTupleHasNulls(tuple)) /* paranoia */
413  elog(ERROR, "null field found in pg_largeobject");
415  getdatafield(data, &datafield, &len, &pfreeit);
416  lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
417  if (pfreeit)
418  pfree(datafield);
419  }
420 
422 
423  return lastbyte;
424 }
425 
426 int64
427 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
428 {
429  int64 newoffset;
430 
431  Assert(PointerIsValid(obj_desc));
432 
433  /*
434  * We allow seek/tell if you have either read or write permission, so no
435  * need for a permission check here.
436  */
437 
438  /*
439  * Note: overflow in the additions is possible, but since we will reject
440  * negative results, we don't need any extra test for that.
441  */
442  switch (whence)
443  {
444  case SEEK_SET:
445  newoffset = offset;
446  break;
447  case SEEK_CUR:
448  newoffset = obj_desc->offset + offset;
449  break;
450  case SEEK_END:
451  newoffset = inv_getsize(obj_desc) + offset;
452  break;
453  default:
454  ereport(ERROR,
455  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
456  errmsg("invalid whence setting: %d", whence)));
457  newoffset = 0; /* keep compiler quiet */
458  break;
459  }
460 
461  /*
462  * use errmsg_internal here because we don't want to expose INT64_FORMAT
463  * in translatable strings; doing better is not worth the trouble
464  */
465  if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
466  ereport(ERROR,
467  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
468  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
469  newoffset)));
470 
471  obj_desc->offset = newoffset;
472  return newoffset;
473 }
474 
475 int64
477 {
478  Assert(PointerIsValid(obj_desc));
479 
480  /*
481  * We allow seek/tell if you have either read or write permission, so no
482  * need for a permission check here.
483  */
484 
485  return obj_desc->offset;
486 }
487 
488 int
489 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
490 {
491  int nread = 0;
492  int64 n;
493  int64 off;
494  int len;
495  int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
496  uint64 pageoff;
497  ScanKeyData skey[2];
498  SysScanDesc sd;
499  HeapTuple tuple;
500 
501  Assert(PointerIsValid(obj_desc));
502  Assert(buf != NULL);
503 
504  if ((obj_desc->flags & IFS_RDLOCK) == 0)
505  ereport(ERROR,
506  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
507  errmsg("permission denied for large object %u",
508  obj_desc->id)));
509 
510  if (nbytes <= 0)
511  return 0;
512 
514 
515  ScanKeyInit(&skey[0],
516  Anum_pg_largeobject_loid,
517  BTEqualStrategyNumber, F_OIDEQ,
518  ObjectIdGetDatum(obj_desc->id));
519 
520  ScanKeyInit(&skey[1],
521  Anum_pg_largeobject_pageno,
523  Int32GetDatum(pageno));
524 
526  obj_desc->snapshot, 2, skey);
527 
528  while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
529  {
531  bytea *datafield;
532  bool pfreeit;
533 
534  if (HeapTupleHasNulls(tuple)) /* paranoia */
535  elog(ERROR, "null field found in pg_largeobject");
537 
538  /*
539  * We expect the indexscan will deliver pages in order. However,
540  * there may be missing pages if the LO contains unwritten "holes". We
541  * want missing sections to read out as zeroes.
542  */
543  pageoff = ((uint64) data->pageno) * LOBLKSIZE;
544  if (pageoff > obj_desc->offset)
545  {
546  n = pageoff - obj_desc->offset;
547  n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
548  MemSet(buf + nread, 0, n);
549  nread += n;
550  obj_desc->offset += n;
551  }
552 
553  if (nread < nbytes)
554  {
555  Assert(obj_desc->offset >= pageoff);
556  off = (int) (obj_desc->offset - pageoff);
557  Assert(off >= 0 && off < LOBLKSIZE);
558 
559  getdatafield(data, &datafield, &len, &pfreeit);
560  if (len > off)
561  {
562  n = len - off;
563  n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
564  memcpy(buf + nread, VARDATA(datafield) + off, n);
565  nread += n;
566  obj_desc->offset += n;
567  }
568  if (pfreeit)
569  pfree(datafield);
570  }
571 
572  if (nread >= nbytes)
573  break;
574  }
575 
577 
578  return nread;
579 }
580 
581 int
582 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
583 {
584  int nwritten = 0;
585  int n;
586  int off;
587  int len;
588  int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
589  ScanKeyData skey[2];
590  SysScanDesc sd;
591  HeapTuple oldtuple;
592  Form_pg_largeobject olddata;
593  bool neednextpage;
594  bytea *datafield;
595  bool pfreeit;
596  union
597  {
598  bytea hdr;
599  /* this is to make the union big enough for a LO data chunk: */
600  char data[LOBLKSIZE + VARHDRSZ];
601  /* ensure union is aligned well enough: */
602  int32 align_it;
603  } workbuf;
604  char *workb = VARDATA(&workbuf.hdr);
605  HeapTuple newtup;
606  Datum values[Natts_pg_largeobject];
607  bool nulls[Natts_pg_largeobject];
608  bool replace[Natts_pg_largeobject];
609  CatalogIndexState indstate;
610 
611  Assert(PointerIsValid(obj_desc));
612  Assert(buf != NULL);
613 
614  /* enforce writability because snapshot is probably wrong otherwise */
615  if ((obj_desc->flags & IFS_WRLOCK) == 0)
616  ereport(ERROR,
617  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
618  errmsg("permission denied for large object %u",
619  obj_desc->id)));
620 
621  if (nbytes <= 0)
622  return 0;
623 
624  /* this addition can't overflow because nbytes is only int32 */
625  if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
626  ereport(ERROR,
627  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
628  errmsg("invalid large object write request size: %d",
629  nbytes)));
630 
632 
633  indstate = CatalogOpenIndexes(lo_heap_r);
634 
635  ScanKeyInit(&skey[0],
636  Anum_pg_largeobject_loid,
637  BTEqualStrategyNumber, F_OIDEQ,
638  ObjectIdGetDatum(obj_desc->id));
639 
640  ScanKeyInit(&skey[1],
641  Anum_pg_largeobject_pageno,
643  Int32GetDatum(pageno));
644 
646  obj_desc->snapshot, 2, skey);
647 
648  oldtuple = NULL;
649  olddata = NULL;
650  neednextpage = true;
651 
652  while (nwritten < nbytes)
653  {
654  /*
655  * If possible, get next pre-existing page of the LO. We expect the
656  * indexscan will deliver these in order --- but there may be holes.
657  */
658  if (neednextpage)
659  {
660  if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
661  {
662  if (HeapTupleHasNulls(oldtuple)) /* paranoia */
663  elog(ERROR, "null field found in pg_largeobject");
664  olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
665  Assert(olddata->pageno >= pageno);
666  }
667  neednextpage = false;
668  }
669 
670  /*
671  * If we have a pre-existing page, see if it is the page we want to
672  * write, or a later one.
673  */
674  if (olddata != NULL && olddata->pageno == pageno)
675  {
676  /*
677  * Update an existing page with fresh data.
678  *
679  * First, load old data into workbuf
680  */
681  getdatafield(olddata, &datafield, &len, &pfreeit);
682  memcpy(workb, VARDATA(datafield), len);
683  if (pfreeit)
684  pfree(datafield);
685 
686  /*
687  * Fill any hole
688  */
689  off = (int) (obj_desc->offset % LOBLKSIZE);
690  if (off > len)
691  MemSet(workb + len, 0, off - len);
692 
693  /*
694  * Insert appropriate portion of new data
695  */
696  n = LOBLKSIZE - off;
697  n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
698  memcpy(workb + off, buf + nwritten, n);
699  nwritten += n;
700  obj_desc->offset += n;
701  off += n;
702  /* compute valid length of new page */
703  len = (len >= off) ? len : off;
704  SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
705 
706  /*
707  * Form and insert updated tuple
708  */
709  memset(values, 0, sizeof(values));
710  memset(nulls, false, sizeof(nulls));
711  memset(replace, false, sizeof(replace));
712  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
713  replace[Anum_pg_largeobject_data - 1] = true;
714  newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
715  values, nulls, replace);
716  CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
717  indstate);
718  heap_freetuple(newtup);
719 
720  /*
721  * We're done with this old page.
722  */
723  oldtuple = NULL;
724  olddata = NULL;
725  neednextpage = true;
726  }
727  else
728  {
729  /*
730  * Write a brand new page.
731  *
732  * First, fill any hole
733  */
734  off = (int) (obj_desc->offset % LOBLKSIZE);
735  if (off > 0)
736  MemSet(workb, 0, off);
737 
738  /*
739  * Insert appropriate portion of new data
740  */
741  n = LOBLKSIZE - off;
742  n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
743  memcpy(workb + off, buf + nwritten, n);
744  nwritten += n;
745  obj_desc->offset += n;
746  /* compute valid length of new page */
747  len = off + n;
748  SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
749 
750  /*
751  * Form and insert updated tuple
752  */
753  memset(values, 0, sizeof(values));
754  memset(nulls, false, sizeof(nulls));
755  values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
756  values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
757  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
758  newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
759  CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
760  heap_freetuple(newtup);
761  }
762  pageno++;
763  }
764 
766 
767  CatalogCloseIndexes(indstate);
768 
769  /*
770  * Advance command counter so that my tuple updates will be seen by later
771  * large-object operations in this transaction.
772  */
774 
775  return nwritten;
776 }
777 
778 void
780 {
781  int32 pageno = (int32) (len / LOBLKSIZE);
782  int32 off;
783  ScanKeyData skey[2];
784  SysScanDesc sd;
785  HeapTuple oldtuple;
786  Form_pg_largeobject olddata;
787  union
788  {
789  bytea hdr;
790  /* this is to make the union big enough for a LO data chunk: */
791  char data[LOBLKSIZE + VARHDRSZ];
792  /* ensure union is aligned well enough: */
793  int32 align_it;
794  } workbuf;
795  char *workb = VARDATA(&workbuf.hdr);
796  HeapTuple newtup;
797  Datum values[Natts_pg_largeobject];
798  bool nulls[Natts_pg_largeobject];
799  bool replace[Natts_pg_largeobject];
800  CatalogIndexState indstate;
801 
802  Assert(PointerIsValid(obj_desc));
803 
804  /* enforce writability because snapshot is probably wrong otherwise */
805  if ((obj_desc->flags & IFS_WRLOCK) == 0)
806  ereport(ERROR,
807  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
808  errmsg("permission denied for large object %u",
809  obj_desc->id)));
810 
811  /*
812  * use errmsg_internal here because we don't want to expose INT64_FORMAT
813  * in translatable strings; doing better is not worth the trouble
814  */
815  if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
816  ereport(ERROR,
817  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
818  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
819  len)));
820 
822 
823  indstate = CatalogOpenIndexes(lo_heap_r);
824 
825  /*
826  * Set up to find all pages with desired loid and pageno >= target
827  */
828  ScanKeyInit(&skey[0],
829  Anum_pg_largeobject_loid,
830  BTEqualStrategyNumber, F_OIDEQ,
831  ObjectIdGetDatum(obj_desc->id));
832 
833  ScanKeyInit(&skey[1],
834  Anum_pg_largeobject_pageno,
836  Int32GetDatum(pageno));
837 
839  obj_desc->snapshot, 2, skey);
840 
841  /*
842  * If possible, get the page the truncation point is in. The truncation
843  * point may be beyond the end of the LO or in a hole.
844  */
845  olddata = NULL;
846  if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
847  {
848  if (HeapTupleHasNulls(oldtuple)) /* paranoia */
849  elog(ERROR, "null field found in pg_largeobject");
850  olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
851  Assert(olddata->pageno >= pageno);
852  }
853 
854  /*
855  * If we found the page of the truncation point we need to truncate the
856  * data in it. Otherwise if we're in a hole, we need to create a page to
857  * mark the end of data.
858  */
859  if (olddata != NULL && olddata->pageno == pageno)
860  {
861  /* First, load old data into workbuf */
862  bytea *datafield;
863  int pagelen;
864  bool pfreeit;
865 
866  getdatafield(olddata, &datafield, &pagelen, &pfreeit);
867  memcpy(workb, VARDATA(datafield), pagelen);
868  if (pfreeit)
869  pfree(datafield);
870 
871  /*
872  * Fill any hole
873  */
874  off = len % LOBLKSIZE;
875  if (off > pagelen)
876  MemSet(workb + pagelen, 0, off - pagelen);
877 
878  /* compute length of new page */
879  SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
880 
881  /*
882  * Form and insert updated tuple
883  */
884  memset(values, 0, sizeof(values));
885  memset(nulls, false, sizeof(nulls));
886  memset(replace, false, sizeof(replace));
887  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
888  replace[Anum_pg_largeobject_data - 1] = true;
889  newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
890  values, nulls, replace);
891  CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
892  indstate);
893  heap_freetuple(newtup);
894  }
895  else
896  {
897  /*
898  * If the first page we found was after the truncation point, we're in
899  * a hole that we'll fill, but we need to delete the later page
900  * because the loop below won't visit it again.
901  */
902  if (olddata != NULL)
903  {
904  Assert(olddata->pageno > pageno);
905  CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
906  }
907 
908  /*
909  * Write a brand new page.
910  *
911  * Fill the hole up to the truncation point
912  */
913  off = len % LOBLKSIZE;
914  if (off > 0)
915  MemSet(workb, 0, off);
916 
917  /* compute length of new page */
918  SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
919 
920  /*
921  * Form and insert new tuple
922  */
923  memset(values, 0, sizeof(values));
924  memset(nulls, false, sizeof(nulls));
925  values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
926  values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
927  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
928  newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
929  CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
930  heap_freetuple(newtup);
931  }
932 
933  /*
934  * Delete any pages after the truncation point. If the initial search
935  * didn't find a page, then of course there's nothing more to do.
936  */
937  if (olddata != NULL)
938  {
939  while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
940  {
941  CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
942  }
943  }
944 
946 
947  CatalogCloseIndexes(indstate);
948 
949  /*
950  * Advance command counter so that tuple updates will be seen by later
951  * large-object operations in this transaction.
952  */
954 }
@ ACLCHECK_OK
Definition: acl.h:182
AclResult pg_largeobject_aclcheck_snapshot(Oid lobj_oid, Oid roleid, AclMode mode, Snapshot snapshot)
Definition: aclchk.c:4118
static Datum values[MAXATTR]
Definition: bootstrap.c:156
signed int int32
Definition: c.h:483
#define InvalidSubTransactionId
Definition: c.h:647
#define INT64_FORMAT
Definition: c.h:537
#define VARHDRSZ
Definition: c.h:681
#define PointerIsValid(pointer)
Definition: c.h:752
#define MemSet(start, val, len)
Definition: c.h:1009
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:329
struct varlena * detoast_attr(struct varlena *attr)
Definition: detoast.c:116
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1162
int errcode(int sqlerrcode)
Definition: elog.c:860
int errmsg(const char *fmt,...)
Definition: elog.c:1075
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:599
SysScanDesc systable_beginscan_ordered(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:646
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:506
void systable_endscan_ordered(SysScanDesc sysscan)
Definition: genam.c:736
HeapTuple systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
Definition: genam.c:711
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:387
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define HeapTupleHasNulls(tuple)
Definition: htup_details.h:659
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup, CatalogIndexState indstate)
Definition: indexing.c:256
void CatalogCloseIndexes(CatalogIndexState indstate)
Definition: indexing.c:61
CatalogIndexState CatalogOpenIndexes(Relation heapRel)
Definition: indexing.c:43
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void CatalogTupleUpdateWithInfo(Relation heapRel, ItemPointer otid, HeapTuple tup, CatalogIndexState indstate)
Definition: indexing.c:337
static Relation lo_index_r
Definition: inv_api.c:68
void inv_truncate(LargeObjectDesc *obj_desc, int64 len)
Definition: inv_api.c:779
LargeObjectDesc * inv_open(Oid lobjId, int flags, MemoryContext mcxt)
Definition: inv_api.c:254
int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Definition: inv_api.c:489
static void getdatafield(Form_pg_largeobject tuple, bytea **pdatafield, int *plen, bool *pfreeit)
Definition: inv_api.c:170
Oid inv_create(Oid lobjId)
Definition: inv_api.c:212
static Relation lo_heap_r
Definition: inv_api.c:67
static void open_lo_relation(void)
Definition: inv_api.c:75
int64 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
Definition: inv_api.c:427
void close_lo_relation(bool isCommit)
Definition: inv_api.c:99
int inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
Definition: inv_api.c:582
int64 inv_tell(LargeObjectDesc *obj_desc)
Definition: inv_api.c:476
static bool myLargeObjectExists(Oid loid, Snapshot snapshot)
Definition: inv_api.c:132
bool lo_compat_privileges
Definition: inv_api.c:58
void inv_close(LargeObjectDesc *obj_desc)
Definition: inv_api.c:338
static uint64 inv_getsize(LargeObjectDesc *obj_desc)
Definition: inv_api.c:379
int inv_drop(Oid lobjId)
Definition: inv_api.c:350
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
#define IFS_RDLOCK
Definition: large_object.h:48
#define MAX_LARGE_OBJECT_SIZE
Definition: large_object.h:76
#define LOBLKSIZE
Definition: large_object.h:70
#define IFS_WRLOCK
Definition: large_object.h:49
#define INV_READ
Definition: libpq-fs.h:22
#define INV_WRITE
Definition: libpq-fs.h:21
Assert(fmt[strlen(fmt) - 1] !='\n')
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
void pfree(void *pointer)
Definition: mcxt.c:1431
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1034
Oid GetUserId(void)
Definition: miscinit.c:515
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define ACL_UPDATE
Definition: parsenodes.h:78
@ DROP_CASCADE
Definition: parsenodes.h:2170
#define ACL_SELECT
Definition: parsenodes.h:77
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
const void size_t len
const void * data
Oid LargeObjectCreate(Oid loid)
FormData_pg_largeobject * Form_pg_largeobject
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:165
static char * buf
Definition: pg_test_fsync.c:73
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
unsigned int Oid
Definition: postgres_ext.h:31
#define RelationGetDescr(relation)
Definition: rel.h:530
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:167
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
@ BackwardScanDirection
Definition: sdir.h:26
@ ForwardScanDirection
Definition: sdir.h:28
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:777
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define BTGreaterEqualStrategyNumber
Definition: stratnum.h:32
ItemPointerData t_self
Definition: htup.h:65
Snapshot snapshot
Definition: large_object.h:42
SubTransactionId subid
Definition: large_object.h:43
TupleDesc rd_att
Definition: rel.h:112
Definition: c.h:676
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
#define VARATT_IS_EXTENDED(PTR)
Definition: varatt.h:303
#define VARDATA(PTR)
Definition: varatt.h:278
#define SET_VARSIZE(PTR, len)
Definition: varatt.h:305
#define VARSIZE(PTR)
Definition: varatt.h:279
void CommandCounterIncrement(void)
Definition: xact.c:1078