PostgreSQL Source Code  git master
inv_api.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * inv_api.c
4  * routines for manipulating inversion fs large objects. This file
5  * contains the user-level large object application interface routines.
6  *
7  *
8  * Note: we access pg_largeobject.data using its C struct declaration.
9  * This is safe because it immediately follows pageno which is an int4 field,
10  * and therefore the data field will always be 4-byte aligned, even if it
11  * is in the short 1-byte-header format. We have to detoast it since it's
12  * quite likely to be in compressed or short format. We also need to check
13  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14  *
15  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16  * does most of the backend code. We expect that CurrentMemoryContext will
17  * be a short-lived context. Data that must persist across function calls
18  * is kept either in CacheMemoryContext (the Relation structs) or in the
19  * memory context given to inv_open (for LargeObjectDesc structs).
20  *
21  *
22  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
23  * Portions Copyright (c) 1994, Regents of the University of California
24  *
25  *
26  * IDENTIFICATION
27  * src/backend/storage/large_object/inv_api.c
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32 
33 #include <limits.h>
34 
35 #include "access/detoast.h"
36 #include "access/genam.h"
37 #include "access/htup_details.h"
38 #include "access/table.h"
39 #include "access/xact.h"
40 #include "catalog/dependency.h"
41 #include "catalog/indexing.h"
42 #include "catalog/objectaccess.h"
43 #include "catalog/pg_largeobject.h"
45 #include "libpq/libpq-fs.h"
46 #include "miscadmin.h"
47 #include "storage/large_object.h"
48 #include "utils/acl.h"
49 #include "utils/fmgroids.h"
50 #include "utils/rel.h"
51 #include "utils/snapmgr.h"
52 
53 
54 /*
55  * GUC: backwards-compatibility flag to suppress LO permission checks
56  */
58 
59 /*
60  * All accesses to pg_largeobject and its index make use of a single
61  * Relation reference. To guarantee that the relcache entry remains
62  * in the cache, on the first reference inside a subtransaction, we
63  * execute a slightly klugy maneuver to assign ownership of the
64  * Relation reference to TopTransactionResourceOwner.
65  */
66 static Relation lo_heap_r = NULL;
67 static Relation lo_index_r = NULL;
68 
69 
70 /*
71  * Open pg_largeobject and its index, if not already done in current xact
72  */
73 static void
75 {
76  ResourceOwner currentOwner;
77 
78  if (lo_heap_r && lo_index_r)
79  return; /* already open in current xact */
80 
81  /* Arrange for the top xact to own these relation references */
82  currentOwner = CurrentResourceOwner;
84 
85  /* Use RowExclusiveLock since we might either read or write */
86  if (lo_heap_r == NULL)
87  lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
88  if (lo_index_r == NULL)
89  lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
90 
91  CurrentResourceOwner = currentOwner;
92 }
93 
94 /*
95  * Clean up at main transaction end
96  */
97 void
98 close_lo_relation(bool isCommit)
99 {
100  if (lo_heap_r || lo_index_r)
101  {
102  /*
103  * Only bother to close if committing; else abort cleanup will handle
104  * it
105  */
106  if (isCommit)
107  {
108  ResourceOwner currentOwner;
109 
110  currentOwner = CurrentResourceOwner;
112 
113  if (lo_index_r)
115  if (lo_heap_r)
117 
118  CurrentResourceOwner = currentOwner;
119  }
120  lo_heap_r = NULL;
121  lo_index_r = NULL;
122  }
123 }
124 
125 
126 /*
127  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
128  * read with can be specified.
129  */
130 static bool
132 {
133  Relation pg_lo_meta;
134  ScanKeyData skey[1];
135  SysScanDesc sd;
136  HeapTuple tuple;
137  bool retval = false;
138 
139  ScanKeyInit(&skey[0],
140  Anum_pg_largeobject_metadata_oid,
141  BTEqualStrategyNumber, F_OIDEQ,
142  ObjectIdGetDatum(loid));
143 
144  pg_lo_meta = table_open(LargeObjectMetadataRelationId,
146 
147  sd = systable_beginscan(pg_lo_meta,
148  LargeObjectMetadataOidIndexId, true,
149  snapshot, 1, skey);
150 
151  tuple = systable_getnext(sd);
152  if (HeapTupleIsValid(tuple))
153  retval = true;
154 
155  systable_endscan(sd);
156 
157  table_close(pg_lo_meta, AccessShareLock);
158 
159  return retval;
160 }
161 
162 
163 /*
164  * Extract data field from a pg_largeobject tuple, detoasting if needed
165  * and verifying that the length is sane. Returns data pointer (a bytea *),
166  * data length, and an indication of whether to pfree the data pointer.
167  */
168 static void
170  bytea **pdatafield,
171  int *plen,
172  bool *pfreeit)
173 {
174  bytea *datafield;
175  int len;
176  bool freeit;
177 
178  datafield = &(tuple->data); /* see note at top of file */
179  freeit = false;
180  if (VARATT_IS_EXTENDED(datafield))
181  {
182  datafield = (bytea *)
183  detoast_attr((struct varlena *) datafield);
184  freeit = true;
185  }
186  len = VARSIZE(datafield) - VARHDRSZ;
187  if (len < 0 || len > LOBLKSIZE)
188  ereport(ERROR,
190  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
191  tuple->loid, tuple->pageno, len)));
192  *pdatafield = datafield;
193  *plen = len;
194  *pfreeit = freeit;
195 }
196 
197 
198 /*
199  * inv_create -- create a new large object
200  *
201  * Arguments:
202  * lobjId - OID to use for new large object, or InvalidOid to pick one
203  *
204  * Returns:
205  * OID of new object
206  *
207  * If lobjId is not InvalidOid, then an error occurs if the OID is already
208  * in use.
209  */
210 Oid
212 {
213  Oid lobjId_new;
214 
215  /*
216  * Create a new largeobject with empty data pages
217  */
218  lobjId_new = LargeObjectCreate(lobjId);
219 
220  /*
221  * dependency on the owner of largeobject
222  *
223  * Note that LO dependencies are recorded using classId
224  * LargeObjectRelationId for backwards-compatibility reasons. Using
225  * LargeObjectMetadataRelationId instead would simplify matters for the
226  * backend, but it'd complicate pg_dump and possibly break other clients.
227  */
228  recordDependencyOnOwner(LargeObjectRelationId,
229  lobjId_new, GetUserId());
230 
231  /* Post creation hook for new large object */
232  InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
233 
234  /*
235  * Advance command counter to make new tuple visible to later operations.
236  */
238 
239  return lobjId_new;
240 }
241 
242 /*
243  * inv_open -- access an existing large object.
244  *
245  * Returns a large object descriptor, appropriately filled in.
246  * The descriptor and subsidiary data are allocated in the specified
247  * memory context, which must be suitably long-lived for the caller's
248  * purposes. If the returned descriptor has a snapshot associated
249  * with it, the caller must ensure that it also lives long enough,
250  * e.g. by calling RegisterSnapshotOnOwner
251  */
253 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
254 {
255  LargeObjectDesc *retval;
256  Snapshot snapshot = NULL;
257  int descflags = 0;
258 
259  /*
260  * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
261  * | INV_READ), the caller being allowed to read the large object
262  * descriptor in either case.
263  */
264  if (flags & INV_WRITE)
265  descflags |= IFS_WRLOCK | IFS_RDLOCK;
266  if (flags & INV_READ)
267  descflags |= IFS_RDLOCK;
268 
269  if (descflags == 0)
270  ereport(ERROR,
271  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
272  errmsg("invalid flags for opening a large object: %d",
273  flags)));
274 
275  /* Get snapshot. If write is requested, use an instantaneous snapshot. */
276  if (descflags & IFS_WRLOCK)
277  snapshot = NULL;
278  else
279  snapshot = GetActiveSnapshot();
280 
281  /* Can't use LargeObjectExists here because we need to specify snapshot */
282  if (!myLargeObjectExists(lobjId, snapshot))
283  ereport(ERROR,
284  (errcode(ERRCODE_UNDEFINED_OBJECT),
285  errmsg("large object %u does not exist", lobjId)));
286 
287  /* Apply permission checks, again specifying snapshot */
288  if ((descflags & IFS_RDLOCK) != 0)
289  {
290  if (!lo_compat_privileges &&
292  GetUserId(),
293  ACL_SELECT,
294  snapshot) != ACLCHECK_OK)
295  ereport(ERROR,
296  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
297  errmsg("permission denied for large object %u",
298  lobjId)));
299  }
300  if ((descflags & IFS_WRLOCK) != 0)
301  {
302  if (!lo_compat_privileges &&
304  GetUserId(),
305  ACL_UPDATE,
306  snapshot) != ACLCHECK_OK)
307  ereport(ERROR,
308  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
309  errmsg("permission denied for large object %u",
310  lobjId)));
311  }
312 
313  /* OK to create a descriptor */
314  retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
315  sizeof(LargeObjectDesc));
316  retval->id = lobjId;
317  retval->offset = 0;
318  retval->flags = descflags;
319 
320  /* caller sets if needed, not used by the functions in this file */
321  retval->subid = InvalidSubTransactionId;
322 
323  /*
324  * The snapshot (if any) is just the currently active snapshot. The
325  * caller will replace it with a longer-lived copy if needed.
326  */
327  retval->snapshot = snapshot;
328 
329  return retval;
330 }
331 
332 /*
333  * Closes a large object descriptor previously made by inv_open(), and
334  * releases the long-term memory used by it.
335  */
336 void
338 {
339  Assert(PointerIsValid(obj_desc));
340  pfree(obj_desc);
341 }
342 
343 /*
344  * Destroys an existing large object (not to be confused with a descriptor!)
345  *
346  * Note we expect caller to have done any required permissions check.
347  */
348 int
349 inv_drop(Oid lobjId)
350 {
351  ObjectAddress object;
352 
353  /*
354  * Delete any comments and dependencies on the large object
355  */
356  object.classId = LargeObjectRelationId;
357  object.objectId = lobjId;
358  object.objectSubId = 0;
359  performDeletion(&object, DROP_CASCADE, 0);
360 
361  /*
362  * Advance command counter so that tuple removal will be seen by later
363  * large-object operations in this transaction.
364  */
366 
367  /* For historical reasons, we always return 1 on success. */
368  return 1;
369 }
370 
371 /*
372  * Determine size of a large object
373  *
374  * NOTE: LOs can contain gaps, just like Unix files. We actually return
375  * the offset of the last byte + 1.
376  */
377 static uint64
379 {
380  uint64 lastbyte = 0;
381  ScanKeyData skey[1];
382  SysScanDesc sd;
383  HeapTuple tuple;
384 
385  Assert(PointerIsValid(obj_desc));
386 
388 
389  ScanKeyInit(&skey[0],
390  Anum_pg_largeobject_loid,
391  BTEqualStrategyNumber, F_OIDEQ,
392  ObjectIdGetDatum(obj_desc->id));
393 
395  obj_desc->snapshot, 1, skey);
396 
397  /*
398  * Because the pg_largeobject index is on both loid and pageno, but we
399  * constrain only loid, a backwards scan should visit all pages of the
400  * large object in reverse pageno order. So, it's sufficient to examine
401  * the first valid tuple (== last valid page).
402  */
404  if (HeapTupleIsValid(tuple))
405  {
407  bytea *datafield;
408  int len;
409  bool pfreeit;
410 
411  if (HeapTupleHasNulls(tuple)) /* paranoia */
412  elog(ERROR, "null field found in pg_largeobject");
414  getdatafield(data, &datafield, &len, &pfreeit);
415  lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
416  if (pfreeit)
417  pfree(datafield);
418  }
419 
421 
422  return lastbyte;
423 }
424 
425 int64
426 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
427 {
428  int64 newoffset;
429 
430  Assert(PointerIsValid(obj_desc));
431 
432  /*
433  * We allow seek/tell if you have either read or write permission, so no
434  * need for a permission check here.
435  */
436 
437  /*
438  * Note: overflow in the additions is possible, but since we will reject
439  * negative results, we don't need any extra test for that.
440  */
441  switch (whence)
442  {
443  case SEEK_SET:
444  newoffset = offset;
445  break;
446  case SEEK_CUR:
447  newoffset = obj_desc->offset + offset;
448  break;
449  case SEEK_END:
450  newoffset = inv_getsize(obj_desc) + offset;
451  break;
452  default:
453  ereport(ERROR,
454  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455  errmsg("invalid whence setting: %d", whence)));
456  newoffset = 0; /* keep compiler quiet */
457  break;
458  }
459 
460  /*
461  * use errmsg_internal here because we don't want to expose INT64_FORMAT
462  * in translatable strings; doing better is not worth the trouble
463  */
464  if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
465  ereport(ERROR,
466  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
467  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
468  newoffset)));
469 
470  obj_desc->offset = newoffset;
471  return newoffset;
472 }
473 
474 int64
476 {
477  Assert(PointerIsValid(obj_desc));
478 
479  /*
480  * We allow seek/tell if you have either read or write permission, so no
481  * need for a permission check here.
482  */
483 
484  return obj_desc->offset;
485 }
486 
487 int
488 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
489 {
490  int nread = 0;
491  int64 n;
492  int64 off;
493  int len;
494  int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
495  uint64 pageoff;
496  ScanKeyData skey[2];
497  SysScanDesc sd;
498  HeapTuple tuple;
499 
500  Assert(PointerIsValid(obj_desc));
501  Assert(buf != NULL);
502 
503  if ((obj_desc->flags & IFS_RDLOCK) == 0)
504  ereport(ERROR,
505  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
506  errmsg("permission denied for large object %u",
507  obj_desc->id)));
508 
509  if (nbytes <= 0)
510  return 0;
511 
513 
514  ScanKeyInit(&skey[0],
515  Anum_pg_largeobject_loid,
516  BTEqualStrategyNumber, F_OIDEQ,
517  ObjectIdGetDatum(obj_desc->id));
518 
519  ScanKeyInit(&skey[1],
520  Anum_pg_largeobject_pageno,
522  Int32GetDatum(pageno));
523 
525  obj_desc->snapshot, 2, skey);
526 
527  while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
528  {
530  bytea *datafield;
531  bool pfreeit;
532 
533  if (HeapTupleHasNulls(tuple)) /* paranoia */
534  elog(ERROR, "null field found in pg_largeobject");
536 
537  /*
538  * We expect the indexscan will deliver pages in order. However,
539  * there may be missing pages if the LO contains unwritten "holes". We
540  * want missing sections to read out as zeroes.
541  */
542  pageoff = ((uint64) data->pageno) * LOBLKSIZE;
543  if (pageoff > obj_desc->offset)
544  {
545  n = pageoff - obj_desc->offset;
546  n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
547  MemSet(buf + nread, 0, n);
548  nread += n;
549  obj_desc->offset += n;
550  }
551 
552  if (nread < nbytes)
553  {
554  Assert(obj_desc->offset >= pageoff);
555  off = (int) (obj_desc->offset - pageoff);
556  Assert(off >= 0 && off < LOBLKSIZE);
557 
558  getdatafield(data, &datafield, &len, &pfreeit);
559  if (len > off)
560  {
561  n = len - off;
562  n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
563  memcpy(buf + nread, VARDATA(datafield) + off, n);
564  nread += n;
565  obj_desc->offset += n;
566  }
567  if (pfreeit)
568  pfree(datafield);
569  }
570 
571  if (nread >= nbytes)
572  break;
573  }
574 
576 
577  return nread;
578 }
579 
580 int
581 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
582 {
583  int nwritten = 0;
584  int n;
585  int off;
586  int len;
587  int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
588  ScanKeyData skey[2];
589  SysScanDesc sd;
590  HeapTuple oldtuple;
591  Form_pg_largeobject olddata;
592  bool neednextpage;
593  bytea *datafield;
594  bool pfreeit;
595  union
596  {
597  bytea hdr;
598  /* this is to make the union big enough for a LO data chunk: */
599  char data[LOBLKSIZE + VARHDRSZ];
600  /* ensure union is aligned well enough: */
601  int32 align_it;
602  } workbuf;
603  char *workb = VARDATA(&workbuf.hdr);
604  HeapTuple newtup;
605  Datum values[Natts_pg_largeobject];
606  bool nulls[Natts_pg_largeobject];
607  bool replace[Natts_pg_largeobject];
608  CatalogIndexState indstate;
609 
610  Assert(PointerIsValid(obj_desc));
611  Assert(buf != NULL);
612 
613  /* enforce writability because snapshot is probably wrong otherwise */
614  if ((obj_desc->flags & IFS_WRLOCK) == 0)
615  ereport(ERROR,
616  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
617  errmsg("permission denied for large object %u",
618  obj_desc->id)));
619 
620  if (nbytes <= 0)
621  return 0;
622 
623  /* this addition can't overflow because nbytes is only int32 */
624  if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
625  ereport(ERROR,
626  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
627  errmsg("invalid large object write request size: %d",
628  nbytes)));
629 
631 
632  indstate = CatalogOpenIndexes(lo_heap_r);
633 
634  ScanKeyInit(&skey[0],
635  Anum_pg_largeobject_loid,
636  BTEqualStrategyNumber, F_OIDEQ,
637  ObjectIdGetDatum(obj_desc->id));
638 
639  ScanKeyInit(&skey[1],
640  Anum_pg_largeobject_pageno,
642  Int32GetDatum(pageno));
643 
645  obj_desc->snapshot, 2, skey);
646 
647  oldtuple = NULL;
648  olddata = NULL;
649  neednextpage = true;
650 
651  while (nwritten < nbytes)
652  {
653  /*
654  * If possible, get next pre-existing page of the LO. We expect the
655  * indexscan will deliver these in order --- but there may be holes.
656  */
657  if (neednextpage)
658  {
659  if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
660  {
661  if (HeapTupleHasNulls(oldtuple)) /* paranoia */
662  elog(ERROR, "null field found in pg_largeobject");
663  olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
664  Assert(olddata->pageno >= pageno);
665  }
666  neednextpage = false;
667  }
668 
669  /*
670  * If we have a pre-existing page, see if it is the page we want to
671  * write, or a later one.
672  */
673  if (olddata != NULL && olddata->pageno == pageno)
674  {
675  /*
676  * Update an existing page with fresh data.
677  *
678  * First, load old data into workbuf
679  */
680  getdatafield(olddata, &datafield, &len, &pfreeit);
681  memcpy(workb, VARDATA(datafield), len);
682  if (pfreeit)
683  pfree(datafield);
684 
685  /*
686  * Fill any hole
687  */
688  off = (int) (obj_desc->offset % LOBLKSIZE);
689  if (off > len)
690  MemSet(workb + len, 0, off - len);
691 
692  /*
693  * Insert appropriate portion of new data
694  */
695  n = LOBLKSIZE - off;
696  n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
697  memcpy(workb + off, buf + nwritten, n);
698  nwritten += n;
699  obj_desc->offset += n;
700  off += n;
701  /* compute valid length of new page */
702  len = (len >= off) ? len : off;
703  SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
704 
705  /*
706  * Form and insert updated tuple
707  */
708  memset(values, 0, sizeof(values));
709  memset(nulls, false, sizeof(nulls));
710  memset(replace, false, sizeof(replace));
711  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
712  replace[Anum_pg_largeobject_data - 1] = true;
713  newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
714  values, nulls, replace);
715  CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
716  indstate);
717  heap_freetuple(newtup);
718 
719  /*
720  * We're done with this old page.
721  */
722  oldtuple = NULL;
723  olddata = NULL;
724  neednextpage = true;
725  }
726  else
727  {
728  /*
729  * Write a brand new page.
730  *
731  * First, fill any hole
732  */
733  off = (int) (obj_desc->offset % LOBLKSIZE);
734  if (off > 0)
735  MemSet(workb, 0, off);
736 
737  /*
738  * Insert appropriate portion of new data
739  */
740  n = LOBLKSIZE - off;
741  n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
742  memcpy(workb + off, buf + nwritten, n);
743  nwritten += n;
744  obj_desc->offset += n;
745  /* compute valid length of new page */
746  len = off + n;
747  SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
748 
749  /*
750  * Form and insert updated tuple
751  */
752  memset(values, 0, sizeof(values));
753  memset(nulls, false, sizeof(nulls));
754  values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
755  values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
756  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
757  newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
758  CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
759  heap_freetuple(newtup);
760  }
761  pageno++;
762  }
763 
765 
766  CatalogCloseIndexes(indstate);
767 
768  /*
769  * Advance command counter so that my tuple updates will be seen by later
770  * large-object operations in this transaction.
771  */
773 
774  return nwritten;
775 }
776 
777 void
779 {
780  int32 pageno = (int32) (len / LOBLKSIZE);
781  int32 off;
782  ScanKeyData skey[2];
783  SysScanDesc sd;
784  HeapTuple oldtuple;
785  Form_pg_largeobject olddata;
786  union
787  {
788  bytea hdr;
789  /* this is to make the union big enough for a LO data chunk: */
790  char data[LOBLKSIZE + VARHDRSZ];
791  /* ensure union is aligned well enough: */
792  int32 align_it;
793  } workbuf;
794  char *workb = VARDATA(&workbuf.hdr);
795  HeapTuple newtup;
796  Datum values[Natts_pg_largeobject];
797  bool nulls[Natts_pg_largeobject];
798  bool replace[Natts_pg_largeobject];
799  CatalogIndexState indstate;
800 
801  Assert(PointerIsValid(obj_desc));
802 
803  /* enforce writability because snapshot is probably wrong otherwise */
804  if ((obj_desc->flags & IFS_WRLOCK) == 0)
805  ereport(ERROR,
806  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
807  errmsg("permission denied for large object %u",
808  obj_desc->id)));
809 
810  /*
811  * use errmsg_internal here because we don't want to expose INT64_FORMAT
812  * in translatable strings; doing better is not worth the trouble
813  */
814  if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
815  ereport(ERROR,
816  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
817  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
818  len)));
819 
821 
822  indstate = CatalogOpenIndexes(lo_heap_r);
823 
824  /*
825  * Set up to find all pages with desired loid and pageno >= target
826  */
827  ScanKeyInit(&skey[0],
828  Anum_pg_largeobject_loid,
829  BTEqualStrategyNumber, F_OIDEQ,
830  ObjectIdGetDatum(obj_desc->id));
831 
832  ScanKeyInit(&skey[1],
833  Anum_pg_largeobject_pageno,
835  Int32GetDatum(pageno));
836 
838  obj_desc->snapshot, 2, skey);
839 
840  /*
841  * If possible, get the page the truncation point is in. The truncation
842  * point may be beyond the end of the LO or in a hole.
843  */
844  olddata = NULL;
845  if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
846  {
847  if (HeapTupleHasNulls(oldtuple)) /* paranoia */
848  elog(ERROR, "null field found in pg_largeobject");
849  olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
850  Assert(olddata->pageno >= pageno);
851  }
852 
853  /*
854  * If we found the page of the truncation point we need to truncate the
855  * data in it. Otherwise if we're in a hole, we need to create a page to
856  * mark the end of data.
857  */
858  if (olddata != NULL && olddata->pageno == pageno)
859  {
860  /* First, load old data into workbuf */
861  bytea *datafield;
862  int pagelen;
863  bool pfreeit;
864 
865  getdatafield(olddata, &datafield, &pagelen, &pfreeit);
866  memcpy(workb, VARDATA(datafield), pagelen);
867  if (pfreeit)
868  pfree(datafield);
869 
870  /*
871  * Fill any hole
872  */
873  off = len % LOBLKSIZE;
874  if (off > pagelen)
875  MemSet(workb + pagelen, 0, off - pagelen);
876 
877  /* compute length of new page */
878  SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
879 
880  /*
881  * Form and insert updated tuple
882  */
883  memset(values, 0, sizeof(values));
884  memset(nulls, false, sizeof(nulls));
885  memset(replace, false, sizeof(replace));
886  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
887  replace[Anum_pg_largeobject_data - 1] = true;
888  newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
889  values, nulls, replace);
890  CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
891  indstate);
892  heap_freetuple(newtup);
893  }
894  else
895  {
896  /*
897  * If the first page we found was after the truncation point, we're in
898  * a hole that we'll fill, but we need to delete the later page
899  * because the loop below won't visit it again.
900  */
901  if (olddata != NULL)
902  {
903  Assert(olddata->pageno > pageno);
904  CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
905  }
906 
907  /*
908  * Write a brand new page.
909  *
910  * Fill the hole up to the truncation point
911  */
912  off = len % LOBLKSIZE;
913  if (off > 0)
914  MemSet(workb, 0, off);
915 
916  /* compute length of new page */
917  SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
918 
919  /*
920  * Form and insert new tuple
921  */
922  memset(values, 0, sizeof(values));
923  memset(nulls, false, sizeof(nulls));
924  values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
925  values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
926  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
927  newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
928  CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
929  heap_freetuple(newtup);
930  }
931 
932  /*
933  * Delete any pages after the truncation point. If the initial search
934  * didn't find a page, then of course there's nothing more to do.
935  */
936  if (olddata != NULL)
937  {
938  while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
939  {
940  CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
941  }
942  }
943 
945 
946  CatalogCloseIndexes(indstate);
947 
948  /*
949  * Advance command counter so that tuple updates will be seen by later
950  * large-object operations in this transaction.
951  */
953 }
@ ACLCHECK_OK
Definition: acl.h:183
AclResult pg_largeobject_aclcheck_snapshot(Oid lobj_oid, Oid roleid, AclMode mode, Snapshot snapshot)
Definition: aclchk.c:4130
static Datum values[MAXATTR]
Definition: bootstrap.c:152
signed int int32
Definition: c.h:494
#define InvalidSubTransactionId
Definition: c.h:658
#define INT64_FORMAT
Definition: c.h:548
#define VARHDRSZ
Definition: c.h:692
#define Assert(condition)
Definition: c.h:858
#define PointerIsValid(pointer)
Definition: c.h:763
#define MemSet(start, val, len)
Definition: c.h:1020
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:273
struct varlena * detoast_attr(struct varlena *attr)
Definition: detoast.c:116
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
int errcode(int sqlerrcode)
Definition: elog.c:857
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:224
#define ereport(elevel,...)
Definition: elog.h:149
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:596
SysScanDesc systable_beginscan_ordered(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:643
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:503
void systable_endscan_ordered(SysScanDesc sysscan)
Definition: genam.c:735
HeapTuple systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
Definition: genam.c:710
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:384
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1209
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1116
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1434
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define HeapTupleHasNulls(tuple)
Definition: htup_details.h:659
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup, CatalogIndexState indstate)
Definition: indexing.c:256
void CatalogCloseIndexes(CatalogIndexState indstate)
Definition: indexing.c:61
CatalogIndexState CatalogOpenIndexes(Relation heapRel)
Definition: indexing.c:43
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void CatalogTupleUpdateWithInfo(Relation heapRel, ItemPointer otid, HeapTuple tup, CatalogIndexState indstate)
Definition: indexing.c:337
static Relation lo_index_r
Definition: inv_api.c:67
void inv_truncate(LargeObjectDesc *obj_desc, int64 len)
Definition: inv_api.c:778
LargeObjectDesc * inv_open(Oid lobjId, int flags, MemoryContext mcxt)
Definition: inv_api.c:253
int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Definition: inv_api.c:488
static void getdatafield(Form_pg_largeobject tuple, bytea **pdatafield, int *plen, bool *pfreeit)
Definition: inv_api.c:169
Oid inv_create(Oid lobjId)
Definition: inv_api.c:211
static Relation lo_heap_r
Definition: inv_api.c:66
static void open_lo_relation(void)
Definition: inv_api.c:74
int64 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
Definition: inv_api.c:426
void close_lo_relation(bool isCommit)
Definition: inv_api.c:98
int inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
Definition: inv_api.c:581
int64 inv_tell(LargeObjectDesc *obj_desc)
Definition: inv_api.c:475
static bool myLargeObjectExists(Oid loid, Snapshot snapshot)
Definition: inv_api.c:131
bool lo_compat_privileges
Definition: inv_api.c:57
void inv_close(LargeObjectDesc *obj_desc)
Definition: inv_api.c:337
static uint64 inv_getsize(LargeObjectDesc *obj_desc)
Definition: inv_api.c:378
int inv_drop(Oid lobjId)
Definition: inv_api.c:349
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
#define IFS_RDLOCK
Definition: large_object.h:48
#define MAX_LARGE_OBJECT_SIZE
Definition: large_object.h:76
#define LOBLKSIZE
Definition: large_object.h:70
#define IFS_WRLOCK
Definition: large_object.h:49
#define INV_READ
Definition: libpq-fs.h:22
#define INV_WRITE
Definition: libpq-fs.h:21
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
void pfree(void *pointer)
Definition: mcxt.c:1520
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1180
Oid GetUserId(void)
Definition: miscinit.c:514
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define ACL_UPDATE
Definition: parsenodes.h:78
@ DROP_CASCADE
Definition: parsenodes.h:2335
#define ACL_SELECT
Definition: parsenodes.h:77
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
const void size_t len
const void * data
Oid LargeObjectCreate(Oid loid)
FormData_pg_largeobject * Form_pg_largeobject
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:165
static char * buf
Definition: pg_test_fsync.c:73
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
unsigned int Oid
Definition: postgres_ext.h:31
#define RelationGetDescr(relation)
Definition: rel.h:531
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:167
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
@ BackwardScanDirection
Definition: sdir.h:26
@ ForwardScanDirection
Definition: sdir.h:28
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:770
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define BTGreaterEqualStrategyNumber
Definition: stratnum.h:32
ItemPointerData t_self
Definition: htup.h:65
Snapshot snapshot
Definition: large_object.h:42
SubTransactionId subid
Definition: large_object.h:43
TupleDesc rd_att
Definition: rel.h:112
Definition: c.h:687
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
#define VARATT_IS_EXTENDED(PTR)
Definition: varatt.h:303
#define VARDATA(PTR)
Definition: varatt.h:278
#define SET_VARSIZE(PTR, len)
Definition: varatt.h:305
#define VARSIZE(PTR)
Definition: varatt.h:279
void CommandCounterIncrement(void)
Definition: xact.c:1097