PostgreSQL Source Code git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
inv_api.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * inv_api.c
4 * routines for manipulating inversion fs large objects. This file
5 * contains the user-level large object application interface routines.
6 *
7 *
8 * Note: we access pg_largeobject.data using its C struct declaration.
9 * This is safe because it immediately follows pageno which is an int4 field,
10 * and therefore the data field will always be 4-byte aligned, even if it
11 * is in the short 1-byte-header format. We have to detoast it since it's
12 * quite likely to be in compressed or short format. We also need to check
13 * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 *
15 * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 * does most of the backend code. We expect that CurrentMemoryContext will
17 * be a short-lived context. Data that must persist across function calls
18 * is kept either in CacheMemoryContext (the Relation structs) or in the
19 * memory context given to inv_open (for LargeObjectDesc structs).
20 *
21 *
22 * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
23 * Portions Copyright (c) 1994, Regents of the University of California
24 *
25 *
26 * IDENTIFICATION
27 * src/backend/storage/large_object/inv_api.c
28 *
29 *-------------------------------------------------------------------------
30 */
31#include "postgres.h"
32
33#include <limits.h>
34
35#include "access/detoast.h"
36#include "access/genam.h"
37#include "access/htup_details.h"
38#include "access/table.h"
39#include "access/xact.h"
40#include "catalog/dependency.h"
41#include "catalog/indexing.h"
44#include "libpq/libpq-fs.h"
45#include "miscadmin.h"
47#include "utils/acl.h"
48#include "utils/fmgroids.h"
49#include "utils/rel.h"
50#include "utils/snapmgr.h"
51
52
53/*
54 * GUC: backwards-compatibility flag to suppress LO permission checks
55 */
57
58/*
59 * All accesses to pg_largeobject and its index make use of a single
60 * Relation reference. To guarantee that the relcache entry remains
61 * in the cache, on the first reference inside a subtransaction, we
62 * execute a slightly klugy maneuver to assign ownership of the
63 * Relation reference to TopTransactionResourceOwner.
64 */
65static Relation lo_heap_r = NULL;
66static Relation lo_index_r = NULL;
67
68
69/*
70 * Open pg_largeobject and its index, if not already done in current xact
71 */
72static void
74{
75 ResourceOwner currentOwner;
76
77 if (lo_heap_r && lo_index_r)
78 return; /* already open in current xact */
79
80 /* Arrange for the top xact to own these relation references */
81 currentOwner = CurrentResourceOwner;
83
84 /* Use RowExclusiveLock since we might either read or write */
85 if (lo_heap_r == NULL)
86 lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
87 if (lo_index_r == NULL)
88 lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
89
90 CurrentResourceOwner = currentOwner;
91}
92
93/*
94 * Clean up at main transaction end
95 */
96void
97close_lo_relation(bool isCommit)
98{
99 if (lo_heap_r || lo_index_r)
100 {
101 /*
102 * Only bother to close if committing; else abort cleanup will handle
103 * it
104 */
105 if (isCommit)
106 {
107 ResourceOwner currentOwner;
108
109 currentOwner = CurrentResourceOwner;
111
112 if (lo_index_r)
114 if (lo_heap_r)
116
117 CurrentResourceOwner = currentOwner;
118 }
119 lo_heap_r = NULL;
120 lo_index_r = NULL;
121 }
122}
123
124
125/*
126 * Extract data field from a pg_largeobject tuple, detoasting if needed
127 * and verifying that the length is sane. Returns data pointer (a bytea *),
128 * data length, and an indication of whether to pfree the data pointer.
129 */
130static void
132 bytea **pdatafield,
133 int *plen,
134 bool *pfreeit)
135{
136 bytea *datafield;
137 int len;
138 bool freeit;
139
140 datafield = &(tuple->data); /* see note at top of file */
141 freeit = false;
142 if (VARATT_IS_EXTENDED(datafield))
143 {
144 datafield = (bytea *)
145 detoast_attr((struct varlena *) datafield);
146 freeit = true;
147 }
148 len = VARSIZE(datafield) - VARHDRSZ;
149 if (len < 0 || len > LOBLKSIZE)
152 errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
153 tuple->loid, tuple->pageno, len)));
154 *pdatafield = datafield;
155 *plen = len;
156 *pfreeit = freeit;
157}
158
159
160/*
161 * inv_create -- create a new large object
162 *
163 * Arguments:
164 * lobjId - OID to use for new large object, or InvalidOid to pick one
165 *
166 * Returns:
167 * OID of new object
168 *
169 * If lobjId is not InvalidOid, then an error occurs if the OID is already
170 * in use.
171 */
172Oid
174{
175 Oid lobjId_new;
176
177 /*
178 * Create a new largeobject with empty data pages
179 */
180 lobjId_new = LargeObjectCreate(lobjId);
181
182 /*
183 * dependency on the owner of largeobject
184 *
185 * Note that LO dependencies are recorded using classId
186 * LargeObjectRelationId for backwards-compatibility reasons. Using
187 * LargeObjectMetadataRelationId instead would simplify matters for the
188 * backend, but it'd complicate pg_dump and possibly break other clients.
189 */
190 recordDependencyOnOwner(LargeObjectRelationId,
191 lobjId_new, GetUserId());
192
193 /* Post creation hook for new large object */
194 InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
195
196 /*
197 * Advance command counter to make new tuple visible to later operations.
198 */
200
201 return lobjId_new;
202}
203
204/*
205 * inv_open -- access an existing large object.
206 *
207 * Returns a large object descriptor, appropriately filled in.
208 * The descriptor and subsidiary data are allocated in the specified
209 * memory context, which must be suitably long-lived for the caller's
210 * purposes. If the returned descriptor has a snapshot associated
211 * with it, the caller must ensure that it also lives long enough,
212 * e.g. by calling RegisterSnapshotOnOwner
213 */
215inv_open(Oid lobjId, int flags, MemoryContext mcxt)
216{
217 LargeObjectDesc *retval;
218 Snapshot snapshot = NULL;
219 int descflags = 0;
220
221 /*
222 * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
223 * | INV_READ), the caller being allowed to read the large object
224 * descriptor in either case.
225 */
226 if (flags & INV_WRITE)
227 descflags |= IFS_WRLOCK | IFS_RDLOCK;
228 if (flags & INV_READ)
229 descflags |= IFS_RDLOCK;
230
231 if (descflags == 0)
233 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
234 errmsg("invalid flags for opening a large object: %d",
235 flags)));
236
237 /* Get snapshot. If write is requested, use an instantaneous snapshot. */
238 if (descflags & IFS_WRLOCK)
239 snapshot = NULL;
240 else
241 snapshot = GetActiveSnapshot();
242
243 /* Can't use LargeObjectExists here because we need to specify snapshot */
244 if (!LargeObjectExistsWithSnapshot(lobjId, snapshot))
246 (errcode(ERRCODE_UNDEFINED_OBJECT),
247 errmsg("large object %u does not exist", lobjId)));
248
249 /* Apply permission checks, again specifying snapshot */
250 if ((descflags & IFS_RDLOCK) != 0)
251 {
254 GetUserId(),
256 snapshot) != ACLCHECK_OK)
258 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
259 errmsg("permission denied for large object %u",
260 lobjId)));
261 }
262 if ((descflags & IFS_WRLOCK) != 0)
263 {
266 GetUserId(),
268 snapshot) != ACLCHECK_OK)
270 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
271 errmsg("permission denied for large object %u",
272 lobjId)));
273 }
274
275 /* OK to create a descriptor */
276 retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
277 sizeof(LargeObjectDesc));
278 retval->id = lobjId;
279 retval->offset = 0;
280 retval->flags = descflags;
281
282 /* caller sets if needed, not used by the functions in this file */
284
285 /*
286 * The snapshot (if any) is just the currently active snapshot. The
287 * caller will replace it with a longer-lived copy if needed.
288 */
289 retval->snapshot = snapshot;
290
291 return retval;
292}
293
294/*
295 * Closes a large object descriptor previously made by inv_open(), and
296 * releases the long-term memory used by it.
297 */
298void
300{
301 Assert(PointerIsValid(obj_desc));
302 pfree(obj_desc);
303}
304
305/*
306 * Destroys an existing large object (not to be confused with a descriptor!)
307 *
308 * Note we expect caller to have done any required permissions check.
309 */
310int
312{
313 ObjectAddress object;
314
315 /*
316 * Delete any comments and dependencies on the large object
317 */
318 object.classId = LargeObjectRelationId;
319 object.objectId = lobjId;
320 object.objectSubId = 0;
321 performDeletion(&object, DROP_CASCADE, 0);
322
323 /*
324 * Advance command counter so that tuple removal will be seen by later
325 * large-object operations in this transaction.
326 */
328
329 /* For historical reasons, we always return 1 on success. */
330 return 1;
331}
332
333/*
334 * Determine size of a large object
335 *
336 * NOTE: LOs can contain gaps, just like Unix files. We actually return
337 * the offset of the last byte + 1.
338 */
339static uint64
341{
342 uint64 lastbyte = 0;
343 ScanKeyData skey[1];
344 SysScanDesc sd;
345 HeapTuple tuple;
346
347 Assert(PointerIsValid(obj_desc));
348
350
351 ScanKeyInit(&skey[0],
352 Anum_pg_largeobject_loid,
353 BTEqualStrategyNumber, F_OIDEQ,
354 ObjectIdGetDatum(obj_desc->id));
355
357 obj_desc->snapshot, 1, skey);
358
359 /*
360 * Because the pg_largeobject index is on both loid and pageno, but we
361 * constrain only loid, a backwards scan should visit all pages of the
362 * large object in reverse pageno order. So, it's sufficient to examine
363 * the first valid tuple (== last valid page).
364 */
366 if (HeapTupleIsValid(tuple))
367 {
369 bytea *datafield;
370 int len;
371 bool pfreeit;
372
373 if (HeapTupleHasNulls(tuple)) /* paranoia */
374 elog(ERROR, "null field found in pg_largeobject");
376 getdatafield(data, &datafield, &len, &pfreeit);
377 lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
378 if (pfreeit)
379 pfree(datafield);
380 }
381
383
384 return lastbyte;
385}
386
387int64
388inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
389{
390 int64 newoffset;
391
392 Assert(PointerIsValid(obj_desc));
393
394 /*
395 * We allow seek/tell if you have either read or write permission, so no
396 * need for a permission check here.
397 */
398
399 /*
400 * Note: overflow in the additions is possible, but since we will reject
401 * negative results, we don't need any extra test for that.
402 */
403 switch (whence)
404 {
405 case SEEK_SET:
406 newoffset = offset;
407 break;
408 case SEEK_CUR:
409 newoffset = obj_desc->offset + offset;
410 break;
411 case SEEK_END:
412 newoffset = inv_getsize(obj_desc) + offset;
413 break;
414 default:
416 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
417 errmsg("invalid whence setting: %d", whence)));
418 newoffset = 0; /* keep compiler quiet */
419 break;
420 }
421
422 /*
423 * use errmsg_internal here because we don't want to expose INT64_FORMAT
424 * in translatable strings; doing better is not worth the trouble
425 */
426 if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
428 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
429 errmsg_internal("invalid large object seek target: " INT64_FORMAT,
430 newoffset)));
431
432 obj_desc->offset = newoffset;
433 return newoffset;
434}
435
436int64
438{
439 Assert(PointerIsValid(obj_desc));
440
441 /*
442 * We allow seek/tell if you have either read or write permission, so no
443 * need for a permission check here.
444 */
445
446 return obj_desc->offset;
447}
448
449int
450inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
451{
452 int nread = 0;
453 int64 n;
454 int64 off;
455 int len;
456 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
457 uint64 pageoff;
458 ScanKeyData skey[2];
459 SysScanDesc sd;
460 HeapTuple tuple;
461
462 Assert(PointerIsValid(obj_desc));
463 Assert(buf != NULL);
464
465 if ((obj_desc->flags & IFS_RDLOCK) == 0)
467 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
468 errmsg("permission denied for large object %u",
469 obj_desc->id)));
470
471 if (nbytes <= 0)
472 return 0;
473
475
476 ScanKeyInit(&skey[0],
477 Anum_pg_largeobject_loid,
478 BTEqualStrategyNumber, F_OIDEQ,
479 ObjectIdGetDatum(obj_desc->id));
480
481 ScanKeyInit(&skey[1],
482 Anum_pg_largeobject_pageno,
484 Int32GetDatum(pageno));
485
487 obj_desc->snapshot, 2, skey);
488
489 while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
490 {
492 bytea *datafield;
493 bool pfreeit;
494
495 if (HeapTupleHasNulls(tuple)) /* paranoia */
496 elog(ERROR, "null field found in pg_largeobject");
498
499 /*
500 * We expect the indexscan will deliver pages in order. However,
501 * there may be missing pages if the LO contains unwritten "holes". We
502 * want missing sections to read out as zeroes.
503 */
504 pageoff = ((uint64) data->pageno) * LOBLKSIZE;
505 if (pageoff > obj_desc->offset)
506 {
507 n = pageoff - obj_desc->offset;
508 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
509 MemSet(buf + nread, 0, n);
510 nread += n;
511 obj_desc->offset += n;
512 }
513
514 if (nread < nbytes)
515 {
516 Assert(obj_desc->offset >= pageoff);
517 off = (int) (obj_desc->offset - pageoff);
518 Assert(off >= 0 && off < LOBLKSIZE);
519
520 getdatafield(data, &datafield, &len, &pfreeit);
521 if (len > off)
522 {
523 n = len - off;
524 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
525 memcpy(buf + nread, VARDATA(datafield) + off, n);
526 nread += n;
527 obj_desc->offset += n;
528 }
529 if (pfreeit)
530 pfree(datafield);
531 }
532
533 if (nread >= nbytes)
534 break;
535 }
536
538
539 return nread;
540}
541
542int
543inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
544{
545 int nwritten = 0;
546 int n;
547 int off;
548 int len;
549 int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
550 ScanKeyData skey[2];
551 SysScanDesc sd;
552 HeapTuple oldtuple;
553 Form_pg_largeobject olddata;
554 bool neednextpage;
555 bytea *datafield;
556 bool pfreeit;
557 union
558 {
559 bytea hdr;
560 /* this is to make the union big enough for a LO data chunk: */
561 char data[LOBLKSIZE + VARHDRSZ];
562 /* ensure union is aligned well enough: */
563 int32 align_it;
564 } workbuf;
565 char *workb = VARDATA(&workbuf.hdr);
566 HeapTuple newtup;
567 Datum values[Natts_pg_largeobject];
568 bool nulls[Natts_pg_largeobject];
569 bool replace[Natts_pg_largeobject];
570 CatalogIndexState indstate;
571
572 Assert(PointerIsValid(obj_desc));
573 Assert(buf != NULL);
574
575 /* enforce writability because snapshot is probably wrong otherwise */
576 if ((obj_desc->flags & IFS_WRLOCK) == 0)
578 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
579 errmsg("permission denied for large object %u",
580 obj_desc->id)));
581
582 if (nbytes <= 0)
583 return 0;
584
585 /* this addition can't overflow because nbytes is only int32 */
586 if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
588 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
589 errmsg("invalid large object write request size: %d",
590 nbytes)));
591
593
594 indstate = CatalogOpenIndexes(lo_heap_r);
595
596 ScanKeyInit(&skey[0],
597 Anum_pg_largeobject_loid,
598 BTEqualStrategyNumber, F_OIDEQ,
599 ObjectIdGetDatum(obj_desc->id));
600
601 ScanKeyInit(&skey[1],
602 Anum_pg_largeobject_pageno,
604 Int32GetDatum(pageno));
605
607 obj_desc->snapshot, 2, skey);
608
609 oldtuple = NULL;
610 olddata = NULL;
611 neednextpage = true;
612
613 while (nwritten < nbytes)
614 {
615 /*
616 * If possible, get next pre-existing page of the LO. We expect the
617 * indexscan will deliver these in order --- but there may be holes.
618 */
619 if (neednextpage)
620 {
621 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
622 {
623 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
624 elog(ERROR, "null field found in pg_largeobject");
625 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
626 Assert(olddata->pageno >= pageno);
627 }
628 neednextpage = false;
629 }
630
631 /*
632 * If we have a pre-existing page, see if it is the page we want to
633 * write, or a later one.
634 */
635 if (olddata != NULL && olddata->pageno == pageno)
636 {
637 /*
638 * Update an existing page with fresh data.
639 *
640 * First, load old data into workbuf
641 */
642 getdatafield(olddata, &datafield, &len, &pfreeit);
643 memcpy(workb, VARDATA(datafield), len);
644 if (pfreeit)
645 pfree(datafield);
646
647 /*
648 * Fill any hole
649 */
650 off = (int) (obj_desc->offset % LOBLKSIZE);
651 if (off > len)
652 MemSet(workb + len, 0, off - len);
653
654 /*
655 * Insert appropriate portion of new data
656 */
657 n = LOBLKSIZE - off;
658 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
659 memcpy(workb + off, buf + nwritten, n);
660 nwritten += n;
661 obj_desc->offset += n;
662 off += n;
663 /* compute valid length of new page */
664 len = (len >= off) ? len : off;
665 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
666
667 /*
668 * Form and insert updated tuple
669 */
670 memset(values, 0, sizeof(values));
671 memset(nulls, false, sizeof(nulls));
672 memset(replace, false, sizeof(replace));
673 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
674 replace[Anum_pg_largeobject_data - 1] = true;
675 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
676 values, nulls, replace);
678 indstate);
679 heap_freetuple(newtup);
680
681 /*
682 * We're done with this old page.
683 */
684 oldtuple = NULL;
685 olddata = NULL;
686 neednextpage = true;
687 }
688 else
689 {
690 /*
691 * Write a brand new page.
692 *
693 * First, fill any hole
694 */
695 off = (int) (obj_desc->offset % LOBLKSIZE);
696 if (off > 0)
697 MemSet(workb, 0, off);
698
699 /*
700 * Insert appropriate portion of new data
701 */
702 n = LOBLKSIZE - off;
703 n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
704 memcpy(workb + off, buf + nwritten, n);
705 nwritten += n;
706 obj_desc->offset += n;
707 /* compute valid length of new page */
708 len = off + n;
709 SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
710
711 /*
712 * Form and insert updated tuple
713 */
714 memset(values, 0, sizeof(values));
715 memset(nulls, false, sizeof(nulls));
716 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
717 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
718 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
719 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
720 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
721 heap_freetuple(newtup);
722 }
723 pageno++;
724 }
725
727
728 CatalogCloseIndexes(indstate);
729
730 /*
731 * Advance command counter so that my tuple updates will be seen by later
732 * large-object operations in this transaction.
733 */
735
736 return nwritten;
737}
738
739void
741{
742 int32 pageno = (int32) (len / LOBLKSIZE);
743 int32 off;
744 ScanKeyData skey[2];
745 SysScanDesc sd;
746 HeapTuple oldtuple;
747 Form_pg_largeobject olddata;
748 union
749 {
750 bytea hdr;
751 /* this is to make the union big enough for a LO data chunk: */
752 char data[LOBLKSIZE + VARHDRSZ];
753 /* ensure union is aligned well enough: */
754 int32 align_it;
755 } workbuf;
756 char *workb = VARDATA(&workbuf.hdr);
757 HeapTuple newtup;
758 Datum values[Natts_pg_largeobject];
759 bool nulls[Natts_pg_largeobject];
760 bool replace[Natts_pg_largeobject];
761 CatalogIndexState indstate;
762
763 Assert(PointerIsValid(obj_desc));
764
765 /* enforce writability because snapshot is probably wrong otherwise */
766 if ((obj_desc->flags & IFS_WRLOCK) == 0)
768 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
769 errmsg("permission denied for large object %u",
770 obj_desc->id)));
771
772 /*
773 * use errmsg_internal here because we don't want to expose INT64_FORMAT
774 * in translatable strings; doing better is not worth the trouble
775 */
776 if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
778 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
779 errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
780 len)));
781
783
784 indstate = CatalogOpenIndexes(lo_heap_r);
785
786 /*
787 * Set up to find all pages with desired loid and pageno >= target
788 */
789 ScanKeyInit(&skey[0],
790 Anum_pg_largeobject_loid,
791 BTEqualStrategyNumber, F_OIDEQ,
792 ObjectIdGetDatum(obj_desc->id));
793
794 ScanKeyInit(&skey[1],
795 Anum_pg_largeobject_pageno,
797 Int32GetDatum(pageno));
798
800 obj_desc->snapshot, 2, skey);
801
802 /*
803 * If possible, get the page the truncation point is in. The truncation
804 * point may be beyond the end of the LO or in a hole.
805 */
806 olddata = NULL;
807 if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
808 {
809 if (HeapTupleHasNulls(oldtuple)) /* paranoia */
810 elog(ERROR, "null field found in pg_largeobject");
811 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
812 Assert(olddata->pageno >= pageno);
813 }
814
815 /*
816 * If we found the page of the truncation point we need to truncate the
817 * data in it. Otherwise if we're in a hole, we need to create a page to
818 * mark the end of data.
819 */
820 if (olddata != NULL && olddata->pageno == pageno)
821 {
822 /* First, load old data into workbuf */
823 bytea *datafield;
824 int pagelen;
825 bool pfreeit;
826
827 getdatafield(olddata, &datafield, &pagelen, &pfreeit);
828 memcpy(workb, VARDATA(datafield), pagelen);
829 if (pfreeit)
830 pfree(datafield);
831
832 /*
833 * Fill any hole
834 */
835 off = len % LOBLKSIZE;
836 if (off > pagelen)
837 MemSet(workb + pagelen, 0, off - pagelen);
838
839 /* compute length of new page */
840 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
841
842 /*
843 * Form and insert updated tuple
844 */
845 memset(values, 0, sizeof(values));
846 memset(nulls, false, sizeof(nulls));
847 memset(replace, false, sizeof(replace));
848 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
849 replace[Anum_pg_largeobject_data - 1] = true;
850 newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
851 values, nulls, replace);
853 indstate);
854 heap_freetuple(newtup);
855 }
856 else
857 {
858 /*
859 * If the first page we found was after the truncation point, we're in
860 * a hole that we'll fill, but we need to delete the later page
861 * because the loop below won't visit it again.
862 */
863 if (olddata != NULL)
864 {
865 Assert(olddata->pageno > pageno);
867 }
868
869 /*
870 * Write a brand new page.
871 *
872 * Fill the hole up to the truncation point
873 */
874 off = len % LOBLKSIZE;
875 if (off > 0)
876 MemSet(workb, 0, off);
877
878 /* compute length of new page */
879 SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
880
881 /*
882 * Form and insert new tuple
883 */
884 memset(values, 0, sizeof(values));
885 memset(nulls, false, sizeof(nulls));
886 values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
887 values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
888 values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
889 newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
890 CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
891 heap_freetuple(newtup);
892 }
893
894 /*
895 * Delete any pages after the truncation point. If the initial search
896 * didn't find a page, then of course there's nothing more to do.
897 */
898 if (olddata != NULL)
899 {
900 while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
901 {
903 }
904 }
905
907
908 CatalogCloseIndexes(indstate);
909
910 /*
911 * Advance command counter so that tuple updates will be seen by later
912 * large-object operations in this transaction.
913 */
915}
@ ACLCHECK_OK
Definition: acl.h:183
AclResult pg_largeobject_aclcheck_snapshot(Oid lobj_oid, Oid roleid, AclMode mode, Snapshot snapshot)
Definition: aclchk.c:4050
static Datum values[MAXATTR]
Definition: bootstrap.c:151
#define InvalidSubTransactionId
Definition: c.h:612
#define INT64_FORMAT
Definition: c.h:503
#define VARHDRSZ
Definition: c.h:646
#define Assert(condition)
Definition: c.h:812
int64_t int64
Definition: c.h:482
#define PointerIsValid(pointer)
Definition: c.h:717
int32_t int32
Definition: c.h:481
uint64_t uint64
Definition: c.h:486
#define MemSet(start, val, len)
Definition: c.h:974
void performDeletion(const ObjectAddress *object, DropBehavior behavior, int flags)
Definition: dependency.c:273
struct varlena * detoast_attr(struct varlena *attr)
Definition: detoast.c:116
int errmsg_internal(const char *fmt,...)
Definition: elog.c:1157
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
SysScanDesc systable_beginscan_ordered(Relation heapRelation, Relation indexRelation, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:653
void systable_endscan_ordered(SysScanDesc sysscan)
Definition: genam.c:760
HeapTuple systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
Definition: genam.c:735
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
Definition: heaptuple.c:1210
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, const Datum *values, const bool *isnull)
Definition: heaptuple.c:1117
void heap_freetuple(HeapTuple htup)
Definition: heaptuple.c:1435
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
#define GETSTRUCT(TUP)
Definition: htup_details.h:653
#define HeapTupleHasNulls(tuple)
Definition: htup_details.h:659
void index_close(Relation relation, LOCKMODE lockmode)
Definition: indexam.c:177
Relation index_open(Oid relationId, LOCKMODE lockmode)
Definition: indexam.c:133
void CatalogTupleInsertWithInfo(Relation heapRel, HeapTuple tup, CatalogIndexState indstate)
Definition: indexing.c:256
void CatalogCloseIndexes(CatalogIndexState indstate)
Definition: indexing.c:61
CatalogIndexState CatalogOpenIndexes(Relation heapRel)
Definition: indexing.c:43
void CatalogTupleDelete(Relation heapRel, ItemPointer tid)
Definition: indexing.c:365
void CatalogTupleUpdateWithInfo(Relation heapRel, ItemPointer otid, HeapTuple tup, CatalogIndexState indstate)
Definition: indexing.c:337
LargeObjectDesc * inv_open(Oid lobjId, int flags, MemoryContext mcxt)
Definition: inv_api.c:215
static Relation lo_index_r
Definition: inv_api.c:66
void inv_truncate(LargeObjectDesc *obj_desc, int64 len)
Definition: inv_api.c:740
int inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
Definition: inv_api.c:450
static void getdatafield(Form_pg_largeobject tuple, bytea **pdatafield, int *plen, bool *pfreeit)
Definition: inv_api.c:131
Oid inv_create(Oid lobjId)
Definition: inv_api.c:173
static Relation lo_heap_r
Definition: inv_api.c:65
static void open_lo_relation(void)
Definition: inv_api.c:73
int64 inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
Definition: inv_api.c:388
void close_lo_relation(bool isCommit)
Definition: inv_api.c:97
int inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
Definition: inv_api.c:543
int64 inv_tell(LargeObjectDesc *obj_desc)
Definition: inv_api.c:437
bool lo_compat_privileges
Definition: inv_api.c:56
void inv_close(LargeObjectDesc *obj_desc)
Definition: inv_api.c:299
static uint64 inv_getsize(LargeObjectDesc *obj_desc)
Definition: inv_api.c:340
int inv_drop(Oid lobjId)
Definition: inv_api.c:311
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
#define IFS_RDLOCK
Definition: large_object.h:48
#define MAX_LARGE_OBJECT_SIZE
Definition: large_object.h:76
#define LOBLKSIZE
Definition: large_object.h:70
#define IFS_WRLOCK
Definition: large_object.h:49
#define INV_READ
Definition: libpq-fs.h:22
#define INV_WRITE
Definition: libpq-fs.h:21
#define NoLock
Definition: lockdefs.h:34
#define RowExclusiveLock
Definition: lockdefs.h:38
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:1181
void pfree(void *pointer)
Definition: mcxt.c:1521
Oid GetUserId(void)
Definition: miscinit.c:517
#define InvokeObjectPostCreateHook(classId, objectId, subId)
Definition: objectaccess.h:173
#define ACL_UPDATE
Definition: parsenodes.h:78
@ DROP_CASCADE
Definition: parsenodes.h:2342
#define ACL_SELECT
Definition: parsenodes.h:77
#define ERRCODE_DATA_CORRUPTED
Definition: pg_basebackup.c:41
const void size_t len
const void * data
bool LargeObjectExistsWithSnapshot(Oid loid, Snapshot snapshot)
Oid LargeObjectCreate(Oid loid)
FormData_pg_largeobject * Form_pg_largeobject
void recordDependencyOnOwner(Oid classId, Oid objectId, Oid owner)
Definition: pg_shdepend.c:168
static char * buf
Definition: pg_test_fsync.c:72
static Datum PointerGetDatum(const void *X)
Definition: postgres.h:322
uintptr_t Datum
Definition: postgres.h:64
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:252
static Datum Int32GetDatum(int32 X)
Definition: postgres.h:212
unsigned int Oid
Definition: postgres_ext.h:31
#define RelationGetDescr(relation)
Definition: rel.h:531
ResourceOwner TopTransactionResourceOwner
Definition: resowner.c:167
ResourceOwner CurrentResourceOwner
Definition: resowner.c:165
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
@ BackwardScanDirection
Definition: sdir.h:26
@ ForwardScanDirection
Definition: sdir.h:28
Snapshot GetActiveSnapshot(void)
Definition: snapmgr.c:728
#define BTEqualStrategyNumber
Definition: stratnum.h:31
#define BTGreaterEqualStrategyNumber
Definition: stratnum.h:32
ItemPointerData t_self
Definition: htup.h:65
Snapshot snapshot
Definition: large_object.h:42
SubTransactionId subid
Definition: large_object.h:43
TupleDesc rd_att
Definition: rel.h:112
Definition: c.h:641
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
#define VARATT_IS_EXTENDED(PTR)
Definition: varatt.h:303
#define VARDATA(PTR)
Definition: varatt.h:278
#define SET_VARSIZE(PTR, len)
Definition: varatt.h:305
#define VARSIZE(PTR)
Definition: varatt.h:279
void CommandCounterIncrement(void)
Definition: xact.c:1099