PostgreSQL Source Code  git master
inv_api.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * inv_api.c
4  * routines for manipulating inversion fs large objects. This file
5  * contains the user-level large object application interface routines.
6  *
7  *
8  * Note: we access pg_largeobject.data using its C struct declaration.
9  * This is safe because it immediately follows pageno which is an int4 field,
10  * and therefore the data field will always be 4-byte aligned, even if it
11  * is in the short 1-byte-header format. We have to detoast it since it's
12  * quite likely to be in compressed or short format. We also need to check
13  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14  *
15  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16  * does most of the backend code. We expect that CurrentMemoryContext will
17  * be a short-lived context. Data that must persist across function calls
18  * is kept either in CacheMemoryContext (the Relation structs) or in the
19  * memory context given to inv_open (for LargeObjectDesc structs).
20  *
21  *
22  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
23  * Portions Copyright (c) 1994, Regents of the University of California
24  *
25  *
26  * IDENTIFICATION
27  * src/backend/storage/large_object/inv_api.c
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32 
33 #include <limits.h>
34 
35 #include "access/genam.h"
36 #include "access/sysattr.h"
37 #include "access/table.h"
38 #include "access/tuptoaster.h"
39 #include "access/xact.h"
40 #include "catalog/dependency.h"
41 #include "catalog/indexing.h"
42 #include "catalog/objectaccess.h"
43 #include "catalog/pg_largeobject.h"
45 #include "libpq/libpq-fs.h"
46 #include "miscadmin.h"
47 #include "storage/large_object.h"
48 #include "utils/fmgroids.h"
49 #include "utils/rel.h"
50 #include "utils/snapmgr.h"
51 
52 
53 /*
54  * GUC: backwards-compatibility flag to suppress LO permission checks
55  */
57 
58 /*
59  * All accesses to pg_largeobject and its index make use of a single Relation
60  * reference, so that we only need to open pg_relation once per transaction.
61  * To avoid problems when the first such reference occurs inside a
62  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
63  * the Relation reference to TopTransactionResourceOwner.
64  */
65 static Relation lo_heap_r = NULL;
66 static Relation lo_index_r = NULL;
67 
68 
69 /*
70  * Open pg_largeobject and its index, if not already done in current xact
71  */
72 static void
74 {
75  ResourceOwner currentOwner;
76 
77  if (lo_heap_r && lo_index_r)
78  return; /* already open in current xact */
79 
80  /* Arrange for the top xact to own these relation references */
81  currentOwner = CurrentResourceOwner;
83 
84  /* Use RowExclusiveLock since we might either read or write */
85  if (lo_heap_r == NULL)
86  lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
87  if (lo_index_r == NULL)
89 
90  CurrentResourceOwner = currentOwner;
91 }
92 
93 /*
94  * Clean up at main transaction end
95  */
96 void
97 close_lo_relation(bool isCommit)
98 {
99  if (lo_heap_r || lo_index_r)
100  {
101  /*
102  * Only bother to close if committing; else abort cleanup will handle
103  * it
104  */
105  if (isCommit)
106  {
107  ResourceOwner currentOwner;
108 
109  currentOwner = CurrentResourceOwner;
111 
112  if (lo_index_r)
113  index_close(lo_index_r, NoLock);
114  if (lo_heap_r)
115  table_close(lo_heap_r, NoLock);
116 
117  CurrentResourceOwner = currentOwner;
118  }
119  lo_heap_r = NULL;
120  lo_index_r = NULL;
121  }
122 }
123 
124 
125 /*
126  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
127  * read with can be specified.
128  */
129 static bool
131 {
132  Relation pg_lo_meta;
133  ScanKeyData skey[1];
134  SysScanDesc sd;
135  HeapTuple tuple;
136  bool retval = false;
137 
138  ScanKeyInit(&skey[0],
139  Anum_pg_largeobject_metadata_oid,
140  BTEqualStrategyNumber, F_OIDEQ,
141  ObjectIdGetDatum(loid));
142 
143  pg_lo_meta = table_open(LargeObjectMetadataRelationId,
145 
146  sd = systable_beginscan(pg_lo_meta,
148  snapshot, 1, skey);
149 
150  tuple = systable_getnext(sd);
151  if (HeapTupleIsValid(tuple))
152  retval = true;
153 
154  systable_endscan(sd);
155 
156  table_close(pg_lo_meta, AccessShareLock);
157 
158  return retval;
159 }
160 
161 
162 /*
163  * Extract data field from a pg_largeobject tuple, detoasting if needed
164  * and verifying that the length is sane. Returns data pointer (a bytea *),
165  * data length, and an indication of whether to pfree the data pointer.
166  */
167 static void
169  bytea **pdatafield,
170  int *plen,
171  bool *pfreeit)
172 {
173  bytea *datafield;
174  int len;
175  bool freeit;
176 
177  datafield = &(tuple->data); /* see note at top of file */
178  freeit = false;
179  if (VARATT_IS_EXTENDED(datafield))
180  {
181  datafield = (bytea *)
182  heap_tuple_untoast_attr((struct varlena *) datafield);
183  freeit = true;
184  }
185  len = VARSIZE(datafield) - VARHDRSZ;
186  if (len < 0 || len > LOBLKSIZE)
187  ereport(ERROR,
189  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
190  tuple->loid, tuple->pageno, len)));
191  *pdatafield = datafield;
192  *plen = len;
193  *pfreeit = freeit;
194 }
195 
196 
197 /*
198  * inv_create -- create a new large object
199  *
200  * Arguments:
201  * lobjId - OID to use for new large object, or InvalidOid to pick one
202  *
203  * Returns:
204  * OID of new object
205  *
206  * If lobjId is not InvalidOid, then an error occurs if the OID is already
207  * in use.
208  */
209 Oid
211 {
212  Oid lobjId_new;
213 
214  /*
215  * Create a new largeobject with empty data pages
216  */
217  lobjId_new = LargeObjectCreate(lobjId);
218 
219  /*
220  * dependency on the owner of largeobject
221  *
222  * The reason why we use LargeObjectRelationId instead of
223  * LargeObjectMetadataRelationId here is to provide backward compatibility
224  * to the applications which utilize a knowledge about internal layout of
225  * system catalogs. OID of pg_largeobject_metadata and loid of
226  * pg_largeobject are same value, so there are no actual differences here.
227  */
228  recordDependencyOnOwner(LargeObjectRelationId,
229  lobjId_new, GetUserId());
230 
231  /* Post creation hook for new large object */
232  InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
233 
234  /*
235  * Advance command counter to make new tuple visible to later operations.
236  */
238 
239  return lobjId_new;
240 }
241 
242 /*
243  * inv_open -- access an existing large object.
244  *
245  * Returns:
246  * Large object descriptor, appropriately filled in. The descriptor
247  * and subsidiary data are allocated in the specified memory context,
248  * which must be suitably long-lived for the caller's purposes.
249  */
251 inv_open(Oid lobjId, int flags, MemoryContext mcxt)
252 {
253  LargeObjectDesc *retval;
254  Snapshot snapshot = NULL;
255  int descflags = 0;
256 
257  /*
258  * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
259  * | INV_READ), the caller being allowed to read the large object
260  * descriptor in either case.
261  */
262  if (flags & INV_WRITE)
263  descflags |= IFS_WRLOCK | IFS_RDLOCK;
264  if (flags & INV_READ)
265  descflags |= IFS_RDLOCK;
266 
267  if (descflags == 0)
268  ereport(ERROR,
269  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
270  errmsg("invalid flags for opening a large object: %d",
271  flags)));
272 
273  /* Get snapshot. If write is requested, use an instantaneous snapshot. */
274  if (descflags & IFS_WRLOCK)
275  snapshot = NULL;
276  else
277  snapshot = GetActiveSnapshot();
278 
279  /* Can't use LargeObjectExists here because we need to specify snapshot */
280  if (!myLargeObjectExists(lobjId, snapshot))
281  ereport(ERROR,
282  (errcode(ERRCODE_UNDEFINED_OBJECT),
283  errmsg("large object %u does not exist", lobjId)));
284 
285  /* Apply permission checks, again specifying snapshot */
286  if ((descflags & IFS_RDLOCK) != 0)
287  {
288  if (!lo_compat_privileges &&
290  GetUserId(),
291  ACL_SELECT,
292  snapshot) != ACLCHECK_OK)
293  ereport(ERROR,
294  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
295  errmsg("permission denied for large object %u",
296  lobjId)));
297  }
298  if ((descflags & IFS_WRLOCK) != 0)
299  {
300  if (!lo_compat_privileges &&
302  GetUserId(),
303  ACL_UPDATE,
304  snapshot) != ACLCHECK_OK)
305  ereport(ERROR,
306  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
307  errmsg("permission denied for large object %u",
308  lobjId)));
309  }
310 
311  /* OK to create a descriptor */
312  retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
313  sizeof(LargeObjectDesc));
314  retval->id = lobjId;
315  retval->subid = GetCurrentSubTransactionId();
316  retval->offset = 0;
317  retval->flags = descflags;
318 
319  /*
320  * We must register the snapshot in TopTransaction's resowner, because it
321  * must stay alive until the LO is closed rather than until the current
322  * portal shuts down. Do this last to avoid uselessly leaking the
323  * snapshot if an error is thrown above.
324  */
325  if (snapshot)
326  snapshot = RegisterSnapshotOnOwner(snapshot,
328  retval->snapshot = snapshot;
329 
330  return retval;
331 }
332 
333 /*
334  * Closes a large object descriptor previously made by inv_open(), and
335  * releases the long-term memory used by it.
336  */
337 void
339 {
340  Assert(PointerIsValid(obj_desc));
341 
344 
345  pfree(obj_desc);
346 }
347 
348 /*
349  * Destroys an existing large object (not to be confused with a descriptor!)
350  *
351  * Note we expect caller to have done any required permissions check.
352  */
353 int
354 inv_drop(Oid lobjId)
355 {
356  ObjectAddress object;
357 
358  /*
359  * Delete any comments and dependencies on the large object
360  */
361  object.classId = LargeObjectRelationId;
362  object.objectId = lobjId;
363  object.objectSubId = 0;
364  performDeletion(&object, DROP_CASCADE, 0);
365 
366  /*
367  * Advance command counter so that tuple removal will be seen by later
368  * large-object operations in this transaction.
369  */
371 
372  /* For historical reasons, we always return 1 on success. */
373  return 1;
374 }
375 
376 /*
377  * Determine size of a large object
378  *
379  * NOTE: LOs can contain gaps, just like Unix files. We actually return
380  * the offset of the last byte + 1.
381  */
382 static uint64
384 {
385  uint64 lastbyte = 0;
386  ScanKeyData skey[1];
387  SysScanDesc sd;
388  HeapTuple tuple;
389 
390  Assert(PointerIsValid(obj_desc));
391 
393 
394  ScanKeyInit(&skey[0],
395  Anum_pg_largeobject_loid,
396  BTEqualStrategyNumber, F_OIDEQ,
397  ObjectIdGetDatum(obj_desc->id));
398 
399  sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
400  obj_desc->snapshot, 1, skey);
401 
402  /*
403  * Because the pg_largeobject index is on both loid and pageno, but we
404  * constrain only loid, a backwards scan should visit all pages of the
405  * large object in reverse pageno order. So, it's sufficient to examine
406  * the first valid tuple (== last valid page).
407  */
409  if (HeapTupleIsValid(tuple))
410  {
411  Form_pg_largeobject data;
412  bytea *datafield;
413  int len;
414  bool pfreeit;
415 
416  if (HeapTupleHasNulls(tuple)) /* paranoia */
417  elog(ERROR, "null field found in pg_largeobject");
418  data = (Form_pg_largeobject) GETSTRUCT(tuple);
419  getdatafield(data, &datafield, &len, &pfreeit);
420  lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
421  if (pfreeit)
422  pfree(datafield);
423  }
424 
426 
427  return lastbyte;
428 }
429 
430 int64
431 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
432 {
433  int64 newoffset;
434 
435  Assert(PointerIsValid(obj_desc));
436 
437  /*
438  * We allow seek/tell if you have either read or write permission, so no
439  * need for a permission check here.
440  */
441 
442  /*
443  * Note: overflow in the additions is possible, but since we will reject
444  * negative results, we don't need any extra test for that.
445  */
446  switch (whence)
447  {
448  case SEEK_SET:
449  newoffset = offset;
450  break;
451  case SEEK_CUR:
452  newoffset = obj_desc->offset + offset;
453  break;
454  case SEEK_END:
455  newoffset = inv_getsize(obj_desc) + offset;
456  break;
457  default:
458  ereport(ERROR,
459  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
460  errmsg("invalid whence setting: %d", whence)));
461  newoffset = 0; /* keep compiler quiet */
462  break;
463  }
464 
465  /*
466  * use errmsg_internal here because we don't want to expose INT64_FORMAT
467  * in translatable strings; doing better is not worth the trouble
468  */
469  if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
470  ereport(ERROR,
471  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
472  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
473  newoffset)));
474 
475  obj_desc->offset = newoffset;
476  return newoffset;
477 }
478 
479 int64
481 {
482  Assert(PointerIsValid(obj_desc));
483 
484  /*
485  * We allow seek/tell if you have either read or write permission, so no
486  * need for a permission check here.
487  */
488 
489  return obj_desc->offset;
490 }
491 
492 int
493 inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
494 {
495  int nread = 0;
496  int64 n;
497  int64 off;
498  int len;
499  int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
500  uint64 pageoff;
501  ScanKeyData skey[2];
502  SysScanDesc sd;
503  HeapTuple tuple;
504 
505  Assert(PointerIsValid(obj_desc));
506  Assert(buf != NULL);
507 
508  if ((obj_desc->flags & IFS_RDLOCK) == 0)
509  ereport(ERROR,
510  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
511  errmsg("permission denied for large object %u",
512  obj_desc->id)));
513 
514  if (nbytes <= 0)
515  return 0;
516 
518 
519  ScanKeyInit(&skey[0],
520  Anum_pg_largeobject_loid,
521  BTEqualStrategyNumber, F_OIDEQ,
522  ObjectIdGetDatum(obj_desc->id));
523 
524  ScanKeyInit(&skey[1],
525  Anum_pg_largeobject_pageno,
527  Int32GetDatum(pageno));
528 
529  sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
530  obj_desc->snapshot, 2, skey);
531 
532  while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
533  {
534  Form_pg_largeobject data;
535  bytea *datafield;
536  bool pfreeit;
537 
538  if (HeapTupleHasNulls(tuple)) /* paranoia */
539  elog(ERROR, "null field found in pg_largeobject");
540  data = (Form_pg_largeobject) GETSTRUCT(tuple);
541 
542  /*
543  * We expect the indexscan will deliver pages in order. However,
544  * there may be missing pages if the LO contains unwritten "holes". We
545  * want missing sections to read out as zeroes.
546  */
547  pageoff = ((uint64) data->pageno) * LOBLKSIZE;
548  if (pageoff > obj_desc->offset)
549  {
550  n = pageoff - obj_desc->offset;
551  n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
552  MemSet(buf + nread, 0, n);
553  nread += n;
554  obj_desc->offset += n;
555  }
556 
557  if (nread < nbytes)
558  {
559  Assert(obj_desc->offset >= pageoff);
560  off = (int) (obj_desc->offset - pageoff);
561  Assert(off >= 0 && off < LOBLKSIZE);
562 
563  getdatafield(data, &datafield, &len, &pfreeit);
564  if (len > off)
565  {
566  n = len - off;
567  n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
568  memcpy(buf + nread, VARDATA(datafield) + off, n);
569  nread += n;
570  obj_desc->offset += n;
571  }
572  if (pfreeit)
573  pfree(datafield);
574  }
575 
576  if (nread >= nbytes)
577  break;
578  }
579 
581 
582  return nread;
583 }
584 
585 int
586 inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
587 {
588  int nwritten = 0;
589  int n;
590  int off;
591  int len;
592  int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
593  ScanKeyData skey[2];
594  SysScanDesc sd;
595  HeapTuple oldtuple;
596  Form_pg_largeobject olddata;
597  bool neednextpage;
598  bytea *datafield;
599  bool pfreeit;
600  union
601  {
602  bytea hdr;
603  /* this is to make the union big enough for a LO data chunk: */
604  char data[LOBLKSIZE + VARHDRSZ];
605  /* ensure union is aligned well enough: */
606  int32 align_it;
607  } workbuf;
608  char *workb = VARDATA(&workbuf.hdr);
609  HeapTuple newtup;
610  Datum values[Natts_pg_largeobject];
611  bool nulls[Natts_pg_largeobject];
612  bool replace[Natts_pg_largeobject];
613  CatalogIndexState indstate;
614 
615  Assert(PointerIsValid(obj_desc));
616  Assert(buf != NULL);
617 
618  /* enforce writability because snapshot is probably wrong otherwise */
619  if ((obj_desc->flags & IFS_WRLOCK) == 0)
620  ereport(ERROR,
621  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
622  errmsg("permission denied for large object %u",
623  obj_desc->id)));
624 
625  if (nbytes <= 0)
626  return 0;
627 
628  /* this addition can't overflow because nbytes is only int32 */
629  if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
630  ereport(ERROR,
631  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
632  errmsg("invalid large object write request size: %d",
633  nbytes)));
634 
636 
637  indstate = CatalogOpenIndexes(lo_heap_r);
638 
639  ScanKeyInit(&skey[0],
640  Anum_pg_largeobject_loid,
641  BTEqualStrategyNumber, F_OIDEQ,
642  ObjectIdGetDatum(obj_desc->id));
643 
644  ScanKeyInit(&skey[1],
645  Anum_pg_largeobject_pageno,
647  Int32GetDatum(pageno));
648 
649  sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
650  obj_desc->snapshot, 2, skey);
651 
652  oldtuple = NULL;
653  olddata = NULL;
654  neednextpage = true;
655 
656  while (nwritten < nbytes)
657  {
658  /*
659  * If possible, get next pre-existing page of the LO. We expect the
660  * indexscan will deliver these in order --- but there may be holes.
661  */
662  if (neednextpage)
663  {
664  if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
665  {
666  if (HeapTupleHasNulls(oldtuple)) /* paranoia */
667  elog(ERROR, "null field found in pg_largeobject");
668  olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
669  Assert(olddata->pageno >= pageno);
670  }
671  neednextpage = false;
672  }
673 
674  /*
675  * If we have a pre-existing page, see if it is the page we want to
676  * write, or a later one.
677  */
678  if (olddata != NULL && olddata->pageno == pageno)
679  {
680  /*
681  * Update an existing page with fresh data.
682  *
683  * First, load old data into workbuf
684  */
685  getdatafield(olddata, &datafield, &len, &pfreeit);
686  memcpy(workb, VARDATA(datafield), len);
687  if (pfreeit)
688  pfree(datafield);
689 
690  /*
691  * Fill any hole
692  */
693  off = (int) (obj_desc->offset % LOBLKSIZE);
694  if (off > len)
695  MemSet(workb + len, 0, off - len);
696 
697  /*
698  * Insert appropriate portion of new data
699  */
700  n = LOBLKSIZE - off;
701  n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
702  memcpy(workb + off, buf + nwritten, n);
703  nwritten += n;
704  obj_desc->offset += n;
705  off += n;
706  /* compute valid length of new page */
707  len = (len >= off) ? len : off;
708  SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
709 
710  /*
711  * Form and insert updated tuple
712  */
713  memset(values, 0, sizeof(values));
714  memset(nulls, false, sizeof(nulls));
715  memset(replace, false, sizeof(replace));
716  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
717  replace[Anum_pg_largeobject_data - 1] = true;
718  newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
719  values, nulls, replace);
720  CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
721  indstate);
722  heap_freetuple(newtup);
723 
724  /*
725  * We're done with this old page.
726  */
727  oldtuple = NULL;
728  olddata = NULL;
729  neednextpage = true;
730  }
731  else
732  {
733  /*
734  * Write a brand new page.
735  *
736  * First, fill any hole
737  */
738  off = (int) (obj_desc->offset % LOBLKSIZE);
739  if (off > 0)
740  MemSet(workb, 0, off);
741 
742  /*
743  * Insert appropriate portion of new data
744  */
745  n = LOBLKSIZE - off;
746  n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
747  memcpy(workb + off, buf + nwritten, n);
748  nwritten += n;
749  obj_desc->offset += n;
750  /* compute valid length of new page */
751  len = off + n;
752  SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
753 
754  /*
755  * Form and insert updated tuple
756  */
757  memset(values, 0, sizeof(values));
758  memset(nulls, false, sizeof(nulls));
759  values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
760  values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
761  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
762  newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
763  CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
764  heap_freetuple(newtup);
765  }
766  pageno++;
767  }
768 
770 
771  CatalogCloseIndexes(indstate);
772 
773  /*
774  * Advance command counter so that my tuple updates will be seen by later
775  * large-object operations in this transaction.
776  */
778 
779  return nwritten;
780 }
781 
782 void
783 inv_truncate(LargeObjectDesc *obj_desc, int64 len)
784 {
785  int32 pageno = (int32) (len / LOBLKSIZE);
786  int32 off;
787  ScanKeyData skey[2];
788  SysScanDesc sd;
789  HeapTuple oldtuple;
790  Form_pg_largeobject olddata;
791  union
792  {
793  bytea hdr;
794  /* this is to make the union big enough for a LO data chunk: */
795  char data[LOBLKSIZE + VARHDRSZ];
796  /* ensure union is aligned well enough: */
797  int32 align_it;
798  } workbuf;
799  char *workb = VARDATA(&workbuf.hdr);
800  HeapTuple newtup;
801  Datum values[Natts_pg_largeobject];
802  bool nulls[Natts_pg_largeobject];
803  bool replace[Natts_pg_largeobject];
804  CatalogIndexState indstate;
805 
806  Assert(PointerIsValid(obj_desc));
807 
808  /* enforce writability because snapshot is probably wrong otherwise */
809  if ((obj_desc->flags & IFS_WRLOCK) == 0)
810  ereport(ERROR,
811  (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
812  errmsg("permission denied for large object %u",
813  obj_desc->id)));
814 
815  /*
816  * use errmsg_internal here because we don't want to expose INT64_FORMAT
817  * in translatable strings; doing better is not worth the trouble
818  */
819  if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
820  ereport(ERROR,
821  (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
822  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
823  len)));
824 
826 
827  indstate = CatalogOpenIndexes(lo_heap_r);
828 
829  /*
830  * Set up to find all pages with desired loid and pageno >= target
831  */
832  ScanKeyInit(&skey[0],
833  Anum_pg_largeobject_loid,
834  BTEqualStrategyNumber, F_OIDEQ,
835  ObjectIdGetDatum(obj_desc->id));
836 
837  ScanKeyInit(&skey[1],
838  Anum_pg_largeobject_pageno,
840  Int32GetDatum(pageno));
841 
842  sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
843  obj_desc->snapshot, 2, skey);
844 
845  /*
846  * If possible, get the page the truncation point is in. The truncation
847  * point may be beyond the end of the LO or in a hole.
848  */
849  olddata = NULL;
850  if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
851  {
852  if (HeapTupleHasNulls(oldtuple)) /* paranoia */
853  elog(ERROR, "null field found in pg_largeobject");
854  olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
855  Assert(olddata->pageno >= pageno);
856  }
857 
858  /*
859  * If we found the page of the truncation point we need to truncate the
860  * data in it. Otherwise if we're in a hole, we need to create a page to
861  * mark the end of data.
862  */
863  if (olddata != NULL && olddata->pageno == pageno)
864  {
865  /* First, load old data into workbuf */
866  bytea *datafield;
867  int pagelen;
868  bool pfreeit;
869 
870  getdatafield(olddata, &datafield, &pagelen, &pfreeit);
871  memcpy(workb, VARDATA(datafield), pagelen);
872  if (pfreeit)
873  pfree(datafield);
874 
875  /*
876  * Fill any hole
877  */
878  off = len % LOBLKSIZE;
879  if (off > pagelen)
880  MemSet(workb + pagelen, 0, off - pagelen);
881 
882  /* compute length of new page */
883  SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
884 
885  /*
886  * Form and insert updated tuple
887  */
888  memset(values, 0, sizeof(values));
889  memset(nulls, false, sizeof(nulls));
890  memset(replace, false, sizeof(replace));
891  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
892  replace[Anum_pg_largeobject_data - 1] = true;
893  newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
894  values, nulls, replace);
895  CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
896  indstate);
897  heap_freetuple(newtup);
898  }
899  else
900  {
901  /*
902  * If the first page we found was after the truncation point, we're in
903  * a hole that we'll fill, but we need to delete the later page
904  * because the loop below won't visit it again.
905  */
906  if (olddata != NULL)
907  {
908  Assert(olddata->pageno > pageno);
909  CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
910  }
911 
912  /*
913  * Write a brand new page.
914  *
915  * Fill the hole up to the truncation point
916  */
917  off = len % LOBLKSIZE;
918  if (off > 0)
919  MemSet(workb, 0, off);
920 
921  /* compute length of new page */
922  SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
923 
924  /*
925  * Form and insert new tuple
926  */
927  memset(values, 0, sizeof(values));
928  memset(nulls, false, sizeof(nulls));
929  values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
930  values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
931  values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
932  newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
933  CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
934  heap_freetuple(newtup);
935  }
936 
937  /*
938  * Delete any pages after the truncation point. If the initial search
939  * didn't find a page, then of course there's nothing more to do.
940  */
941  if (olddata != NULL)
942  {
943  while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
944  {
945  CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
946  }
947  }
948 
950 
951  CatalogCloseIndexes(indstate);
952 
953  /*
954  * Advance command counter so that tuple updates will be seen by later
955  * large-object operations in this transaction.
956  */
958 }
static bool myLargeObjectExists(Oid loid, Snapshot snapshot)
Definition: inv_api.c:130
#define MAX_LARGE_OBJECT_SIZE
Definition: large_object.h:76
static void open_lo_relation(void)
Definition: inv_api.c:73
#define LOBLKSIZE
Definition: large_object.h:70
#define IFS_RDLOCK
Definition: large_object.h:48
int64 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
Definition: inv_api.c:431
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:133
#define VARDATA(PTR)
Definition: postgres.h:302
void inv_truncate(LargeObjectDesc *obj_desc, int64 len)
Definition: inv_api.c:783
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:525
#define GETSTRUCT(TUP)
Definition: htup_details.h:655
bool lo_compat_privileges
Definition: inv_api.c:56
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:145
Oid inv_create(Oid lobjId)
Definition: inv_api.c:210
#define RelationGetDescr(relation)
Definition: rel.h:442
Oid GetUserId(void)
Definition: miscinit.c:380
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:144
#define VARSIZE(PTR)
Definition: postgres.h:303
#define PointerGetDatum(X)
Definition: postgres.h:556
#define VARHDRSZ
Definition: c.h:555
ResourceOwner CurrentResourceOwner
Definition: resowner.c:142
void inv_close(LargeObjectDesc *obj_desc)
Definition: inv_api.c:338
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:841
#define AccessShareLock
Definition: lockdefs.h:36
int errcode(int sqlerrcode)
Definition: elog.c:570
#define MemSet(start, val, len)
Definition: c.h:955
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:269
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:1020
struct varlena * heap_tuple_untoast_attr(struct varlena *attr)
Definition: tuptoaster.c:172
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:165
Oid LargeObjectCreate(Oid loid)
HeapTuple systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
Definition: genam.c:630
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1338
unsigned int Oid
Definition: postgres_ext.h:31
SubTransactionId subid
Definition: large_object.h:43
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:352
static Relation lo_heap_r
Definition: inv_api.c:65
#define IFS_WRLOCK
Definition: large_object.h:49
signed int int32
Definition: c.h:346
int64 inv_tell(LargeObjectDesc *obj_desc)
Definition: inv_api.c:480
static Relation lo_index_r
Definition: inv_api.c:66
#define INV_READ
Definition: libpq-fs.h:22
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:444
void pfree(void *pointer)
Definition: mcxt.c:1031
#define ObjectIdGetDatum(X)
Definition: postgres.h:507
#define ERROR
Definition: elog.h:43
Snapshot snapshot
Definition: large_object.h:42
ItemPointerData t_self
Definition: htup.h:65
int inv_drop(Oid lobjId)
Definition: inv_api.c:354
#define HeapTupleHasNulls(tuple)
Definition: htup_details.h:661
#define LargeObjectLOidPNIndexId
Definition: indexing.h:184
#define NoLock
Definition: lockdefs.h:34
static char * buf
Definition: pg_test_fsync.c:68
#define RowExclusiveLock
Definition: lockdefs.h:38
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:315
#define ereport(elevel, rest)
Definition: elog.h:141
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:44
#define LargeObjectMetadataOidIndexId
Definition: indexing.h:187
void CatalogTupleUpdateWithInfo(Relation heapRel, ItemPointer otid, HeapTuple tup, CatalogIndexState indstate)
Definition: indexing.c:245
#define ACL_UPDATE
Definition: parsenodes.h:76
static uint64 inv_getsize(LargeObjectDesc *obj_desc)
Definition: inv_api.c:383
void close_lo_relation(bool isCommit)
Definition: inv_api.c:97
uintptr_t Datum
Definition: postgres.h:367
void CommandCounterIncrement(void)
Definition: xact.c:1003
#define ACL_SELECT
Definition: parsenodes.h:75
void CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup, CatalogIndexState indstate)
Definition: indexing.c:204
void systable_endscan_ordered(SysScanDesc sysscan)
Definition: genam.c:649
TupleDesc rd_att
Definition: rel.h:84
Snapshot RegisterSnapshotOnOwner(Snapshot snapshot, ResourceOwner owner)
Definition: snapmgr.c:878
int errmsg_internal(const char *fmt,...)
Definition: elog.c:814
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define Assert(condition)
Definition: c.h:732
static void getdatafield(Form_pg_largeobject tuple, bytea **pdatafield, int *plen, bool *pfreeit)
Definition: inv_api.c:168
SubTransactionId GetCurrentSubTransactionId(void)
Definition: xact.c:708
CatalogIndexState CatalogOpenIndexes(Relation heapRel)
Definition: indexing.c:42
void UnregisterSnapshotFromOwner(Snapshot snapshot, ResourceOwner owner)
Definition: snapmgr.c:920
#define INT64_FORMAT
Definition: c.h:400
#define INV_WRITE
Definition: libpq-fs.h:21
#define VARATT_IS_EXTENDED(PTR)
Definition: postgres.h:327
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:152
static Datum values[MAXATTR]
Definition: bootstrap.c:167
#define Int32GetDatum(X)
Definition: postgres.h:479
LargeObjectDesc * inv_open(Oid lobjId, int flags, MemoryContext mcxt)
Definition: inv_api.c:251
int errmsg(const char *fmt,...)
Definition: elog.c:784
SysScanDesc systable_beginscan_ordered(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:565
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:771
#define elog(elevel,...)
Definition: elog.h:226
AclResult pg_largeobject_aclcheck_snapshot(Oid lobj_oid, Oid roleid, AclMode mode, Snapshot snapshot)
Definition: aclchk.c:4679
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
void CatalogCloseIndexes(CatalogIndexState indstate)
Definition: indexing.c:60
Definition: c.h:549
#define SET_VARSIZE(PTR, len)
Definition: postgres.h:329
FormData_pg_largeobject * Form_pg_largeobject
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:39
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *replValues, bool *replIsnull, bool *doReplace)
Definition: heaptuple.c:1113
int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Definition: inv_api.c:493
#define PointerIsValid(pointer)
Definition: c.h:626
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:126
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define BTGreaterEqualStrategyNumber
Definition: stratnum.h:32
int inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
Definition: inv_api.c:586