PostgreSQL Source Code  git master
 All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros
tqueue.c File Reference
#include "postgres.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "executor/tqueue.h"
#include "funcapi.h"
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "utils/array.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rangetypes.h"
#include "utils/syscache.h"
#include "utils/typcache.h"
Include dependency graph for tqueue.c:

Go to the source code of this file.

Data Structures

struct  ArrayRemapInfo
 
struct  RangeRemapInfo
 
struct  RecordRemapInfo
 
struct  TupleRemapInfo
 
struct  TQueueDestReceiver
 
struct  RecordTypmodMap
 
struct  TupleQueueReader
 

Macros

#define TUPLE_QUEUE_MODE_CONTROL   'c' /* mode-switch message contents */
 
#define TUPLE_QUEUE_MODE_DATA   'd'
 

Typedefs

typedef struct TupleRemapInfo TupleRemapInfo
 
typedef struct ArrayRemapInfo ArrayRemapInfo
 
typedef struct RangeRemapInfo RangeRemapInfo
 
typedef struct RecordRemapInfo RecordRemapInfo
 
typedef struct TQueueDestReceiver TQueueDestReceiver
 
typedef struct RecordTypmodMap RecordTypmodMap
 

Enumerations

enum  TupleRemapClass { TQUEUE_REMAP_ARRAY, TQUEUE_REMAP_RANGE, TQUEUE_REMAP_RECORD }
 

Functions

static void TQExamine (TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
 
static void TQExamineArray (TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo, Datum value)
 
static void TQExamineRange (TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo, Datum value)
 
static void TQExamineRecord (TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo, Datum value)
 
static void TQSendRecordInfo (TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
 
static void TupleQueueHandleControlMessage (TupleQueueReader *reader, Size nbytes, char *data)
 
static HeapTuple TupleQueueHandleDataMessage (TupleQueueReader *reader, Size nbytes, HeapTupleHeader data)
 
static HeapTuple TQRemapTuple (TupleQueueReader *reader, TupleDesc tupledesc, TupleRemapInfo **field_remapinfo, HeapTuple tuple)
 
static Datum TQRemap (TupleQueueReader *reader, TupleRemapInfo *remapinfo, Datum value, bool *changed)
 
static Datum TQRemapArray (TupleQueueReader *reader, ArrayRemapInfo *remapinfo, Datum value, bool *changed)
 
static Datum TQRemapRange (TupleQueueReader *reader, RangeRemapInfo *remapinfo, Datum value, bool *changed)
 
static Datum TQRemapRecord (TupleQueueReader *reader, RecordRemapInfo *remapinfo, Datum value, bool *changed)
 
static TupleRemapInfoBuildTupleRemapInfo (Oid typid, MemoryContext mycontext)
 
static TupleRemapInfoBuildArrayRemapInfo (Oid elemtypid, MemoryContext mycontext)
 
static TupleRemapInfoBuildRangeRemapInfo (Oid rngtypid, MemoryContext mycontext)
 
static TupleRemapInfo ** BuildFieldRemapInfo (TupleDesc tupledesc, MemoryContext mycontext)
 
static bool tqueueReceiveSlot (TupleTableSlot *slot, DestReceiver *self)
 
static void tqueueStartupReceiver (DestReceiver *self, int operation, TupleDesc typeinfo)
 
static void tqueueShutdownReceiver (DestReceiver *self)
 
static void tqueueDestroyReceiver (DestReceiver *self)
 
DestReceiverCreateTupleQueueDestReceiver (shm_mq_handle *handle)
 
TupleQueueReaderCreateTupleQueueReader (shm_mq_handle *handle, TupleDesc tupledesc)
 
void DestroyTupleQueueReader (TupleQueueReader *reader)
 
HeapTuple TupleQueueReaderNext (TupleQueueReader *reader, bool nowait, bool *done)
 

Macro Definition Documentation

#define TUPLE_QUEUE_MODE_CONTROL   'c' /* mode-switch message contents */

Definition at line 60 of file tqueue.c.

Referenced by TQSendRecordInfo(), and TupleQueueReaderNext().

#define TUPLE_QUEUE_MODE_DATA   'd'

Typedef Documentation

Definition at line 91 of file tqueue.c.

Enumeration Type Documentation

Enumerator
TQUEUE_REMAP_ARRAY 
TQUEUE_REMAP_RANGE 
TQUEUE_REMAP_RECORD 

Definition at line 84 of file tqueue.c.

85 {
86  TQUEUE_REMAP_ARRAY, /* array */
87  TQUEUE_REMAP_RANGE, /* range */
88  TQUEUE_REMAP_RECORD /* composite type, named or transient */
TupleRemapClass
Definition: tqueue.c:84

Function Documentation

static TupleRemapInfo * BuildArrayRemapInfo ( Oid  elemtypid,
MemoryContext  mycontext 
)
static

Definition at line 1186 of file tqueue.c.

References TupleRemapInfo::arr, BuildTupleRemapInfo(), ArrayRemapInfo::element_remap, get_typlenbyvalalign(), MemoryContextAlloc(), NULL, TupleRemapInfo::remapclass, TQUEUE_REMAP_ARRAY, ArrayRemapInfo::typalign, ArrayRemapInfo::typbyval, ArrayRemapInfo::typlen, and TupleRemapInfo::u.

Referenced by BuildTupleRemapInfo().

1187 {
1188  TupleRemapInfo *remapinfo;
1189  TupleRemapInfo *element_remapinfo;
1190 
1191  /* See if element type requires remapping. */
1192  element_remapinfo = BuildTupleRemapInfo(elemtypid, mycontext);
1193  /* If not, the array doesn't either. */
1194  if (element_remapinfo == NULL)
1195  return NULL;
1196  /* OK, set up to remap the array. */
1197  remapinfo = (TupleRemapInfo *)
1198  MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
1199  remapinfo->remapclass = TQUEUE_REMAP_ARRAY;
1200  get_typlenbyvalalign(elemtypid,
1201  &remapinfo->u.arr.typlen,
1202  &remapinfo->u.arr.typbyval,
1203  &remapinfo->u.arr.typalign);
1204  remapinfo->u.arr.element_remap = element_remapinfo;
1205  return remapinfo;
1206 }
ArrayRemapInfo arr
Definition: tqueue.c:124
void get_typlenbyvalalign(Oid typid, int16 *typlen, bool *typbyval, char *typalign)
Definition: lsyscache.c:2021
static TupleRemapInfo * BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
Definition: tqueue.c:1122
char typalign
Definition: tqueue.c:97
TupleRemapClass remapclass
Definition: tqueue.c:121
bool typbyval
Definition: tqueue.c:96
#define NULL
Definition: c.h:229
union TupleRemapInfo::@22 u
TupleRemapInfo * element_remap
Definition: tqueue.c:98
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
int16 typlen
Definition: tqueue.c:95
static TupleRemapInfo ** BuildFieldRemapInfo ( TupleDesc  tupledesc,
MemoryContext  mycontext 
)
static

Definition at line 1244 of file tqueue.c.

References tupleDesc::attrs, BuildTupleRemapInfo(), i, MemoryContextAlloc(), tupleDesc::natts, NULL, and pfree().

Referenced by CreateTupleQueueReader(), TQExamineRecord(), TQRemapRecord(), and tqueueReceiveSlot().

1245 {
1246  TupleRemapInfo **remapinfo;
1247  bool noop = true;
1248  int i;
1249 
1250  /* Recursively determine the remapping status of each field. */
1251  remapinfo = (TupleRemapInfo **)
1252  MemoryContextAlloc(mycontext,
1253  tupledesc->natts * sizeof(TupleRemapInfo *));
1254  for (i = 0; i < tupledesc->natts; i++)
1255  {
1256  Form_pg_attribute attr = tupledesc->attrs[i];
1257 
1258  if (attr->attisdropped)
1259  {
1260  remapinfo[i] = NULL;
1261  continue;
1262  }
1263  remapinfo[i] = BuildTupleRemapInfo(attr->atttypid, mycontext);
1264  if (remapinfo[i] != NULL)
1265  noop = false;
1266  }
1267 
1268  /* If no fields require remapping, report that by returning NULL. */
1269  if (noop)
1270  {
1271  pfree(remapinfo);
1272  remapinfo = NULL;
1273  }
1274 
1275  return remapinfo;
1276 }
Form_pg_attribute * attrs
Definition: tupdesc.h:74
static TupleRemapInfo * BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
Definition: tqueue.c:1122
int natts
Definition: tupdesc.h:73
void pfree(void *pointer)
Definition: mcxt.c:950
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
#define NULL
Definition: c.h:229
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
int i
static TupleRemapInfo * BuildRangeRemapInfo ( Oid  rngtypid,
MemoryContext  mycontext 
)
static

Definition at line 1209 of file tqueue.c.

References RangeRemapInfo::bound_remap, BuildTupleRemapInfo(), elog, ERROR, lookup_type_cache(), MemoryContextAlloc(), NULL, TupleRemapInfo::remapclass, TupleRemapInfo::rng, TypeCacheEntry::rngelemtype, TQUEUE_REMAP_RANGE, RangeRemapInfo::typcache, TypeCacheEntry::type_id, TYPECACHE_RANGE_INFO, and TupleRemapInfo::u.

Referenced by BuildTupleRemapInfo().

1210 {
1211  TupleRemapInfo *remapinfo;
1212  TupleRemapInfo *bound_remapinfo;
1213  TypeCacheEntry *typcache;
1214 
1215  /*
1216  * Get range info from the typcache. We assume this pointer will stay
1217  * valid for the duration of the query.
1218  */
1219  typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
1220  if (typcache->rngelemtype == NULL)
1221  elog(ERROR, "type %u is not a range type", rngtypid);
1222 
1223  /* See if range bound type requires remapping. */
1224  bound_remapinfo = BuildTupleRemapInfo(typcache->rngelemtype->type_id,
1225  mycontext);
1226  /* If not, the range doesn't either. */
1227  if (bound_remapinfo == NULL)
1228  return NULL;
1229  /* OK, set up to remap the range. */
1230  remapinfo = (TupleRemapInfo *)
1231  MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
1232  remapinfo->remapclass = TQUEUE_REMAP_RANGE;
1233  remapinfo->u.rng.typcache = typcache;
1234  remapinfo->u.rng.bound_remap = bound_remapinfo;
1235  return remapinfo;
1236 }
#define TYPECACHE_RANGE_INFO
Definition: typcache.h:121
TypeCacheEntry * typcache
Definition: tqueue.c:103
static TupleRemapInfo * BuildTupleRemapInfo(Oid typid, MemoryContext mycontext)
Definition: tqueue.c:1122
TupleRemapClass remapclass
Definition: tqueue.c:121
#define ERROR
Definition: elog.h:43
TupleRemapInfo * bound_remap
Definition: tqueue.c:104
struct TypeCacheEntry * rngelemtype
Definition: typcache.h:84
TypeCacheEntry * lookup_type_cache(Oid type_id, int flags)
Definition: typcache.c:191
#define NULL
Definition: c.h:229
union TupleRemapInfo::@22 u
RangeRemapInfo rng
Definition: tqueue.c:125
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
#define elog
Definition: elog.h:219
static TupleRemapInfo * BuildTupleRemapInfo ( Oid  typid,
MemoryContext  mycontext 
)
static

Definition at line 1122 of file tqueue.c.

References BuildArrayRemapInfo(), BuildRangeRemapInfo(), check_stack_depth(), elog, ERROR, RecordRemapInfo::field_remap, GETSTRUCT, HeapTupleIsValid, InvalidOid, RecordRemapInfo::localtypmod, MemoryContextAlloc(), NULL, ObjectIdGetDatum, OidIsValid, TupleRemapInfo::rec, RECORDOID, RecordRemapInfo::rectypid, RecordRemapInfo::rectypmod, ReleaseSysCache(), TupleRemapInfo::remapclass, SearchSysCache1, TQUEUE_REMAP_RECORD, RecordRemapInfo::tupledesc, TYPEOID, TYPTYPE_COMPOSITE, TYPTYPE_DOMAIN, TYPTYPE_RANGE, and TupleRemapInfo::u.

Referenced by BuildArrayRemapInfo(), BuildFieldRemapInfo(), and BuildRangeRemapInfo().

1123 {
1124  HeapTuple tup;
1125  Form_pg_type typ;
1126 
1127  /* This is recursive, so it could be driven to stack overflow. */
1129 
1130 restart:
1131  tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(typid));
1132  if (!HeapTupleIsValid(tup))
1133  elog(ERROR, "cache lookup failed for type %u", typid);
1134  typ = (Form_pg_type) GETSTRUCT(tup);
1135 
1136  /* Look through domains to underlying base type. */
1137  if (typ->typtype == TYPTYPE_DOMAIN)
1138  {
1139  typid = typ->typbasetype;
1140  ReleaseSysCache(tup);
1141  goto restart;
1142  }
1143 
1144  /* If it's a true array type, deal with it that way. */
1145  if (OidIsValid(typ->typelem) && typ->typlen == -1)
1146  {
1147  typid = typ->typelem;
1148  ReleaseSysCache(tup);
1149  return BuildArrayRemapInfo(typid, mycontext);
1150  }
1151 
1152  /* Similarly, deal with ranges appropriately. */
1153  if (typ->typtype == TYPTYPE_RANGE)
1154  {
1155  ReleaseSysCache(tup);
1156  return BuildRangeRemapInfo(typid, mycontext);
1157  }
1158 
1159  /*
1160  * If it's a composite type (including RECORD), set up for remapping. We
1161  * don't attempt to determine the status of subfields here, since we do
1162  * not have enough information yet; just mark everything invalid.
1163  */
1164  if (typ->typtype == TYPTYPE_COMPOSITE || typid == RECORDOID)
1165  {
1166  TupleRemapInfo *remapinfo;
1167 
1168  remapinfo = (TupleRemapInfo *)
1169  MemoryContextAlloc(mycontext, sizeof(TupleRemapInfo));
1170  remapinfo->remapclass = TQUEUE_REMAP_RECORD;
1171  remapinfo->u.rec.rectypid = InvalidOid;
1172  remapinfo->u.rec.rectypmod = -1;
1173  remapinfo->u.rec.localtypmod = -1;
1174  remapinfo->u.rec.tupledesc = NULL;
1175  remapinfo->u.rec.field_remap = NULL;
1176  ReleaseSysCache(tup);
1177  return remapinfo;
1178  }
1179 
1180  /* Nothing else can possibly need remapping attention. */
1181  ReleaseSysCache(tup);
1182  return NULL;
1183 }
#define TYPTYPE_DOMAIN
Definition: pg_type.h:722
#define GETSTRUCT(TUP)
Definition: htup_details.h:656
#define TYPTYPE_COMPOSITE
Definition: pg_type.h:721
FormData_pg_type * Form_pg_type
Definition: pg_type.h:233
#define OidIsValid(objectId)
Definition: c.h:538
int32 rectypmod
Definition: tqueue.c:111
#define SearchSysCache1(cacheId, key1)
Definition: syscache.h:156
TupleRemapClass remapclass
Definition: tqueue.c:121
#define ObjectIdGetDatum(X)
Definition: postgres.h:513
#define ERROR
Definition: elog.h:43
void check_stack_depth(void)
Definition: postgres.c:3102
#define RECORDOID
Definition: pg_type.h:680
#define TYPTYPE_RANGE
Definition: pg_type.h:725
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:1117
#define InvalidOid
Definition: postgres_ext.h:36
#define HeapTupleIsValid(tuple)
Definition: htup.h:77
#define NULL
Definition: c.h:229
union TupleRemapInfo::@22 u
static TupleRemapInfo * BuildArrayRemapInfo(Oid elemtypid, MemoryContext mycontext)
Definition: tqueue.c:1186
static TupleRemapInfo * BuildRangeRemapInfo(Oid rngtypid, MemoryContext mycontext)
Definition: tqueue.c:1209
void * MemoryContextAlloc(MemoryContext context, Size size)
Definition: mcxt.c:707
TupleRemapInfo ** field_remap
Definition: tqueue.c:116
#define elog
Definition: elog.h:219
RecordRemapInfo rec
Definition: tqueue.c:126
TupleDesc tupledesc
Definition: tqueue.c:115
int32 localtypmod
Definition: tqueue.c:113
DestReceiver* CreateTupleQueueDestReceiver ( shm_mq_handle handle)

Definition at line 606 of file tqueue.c.

References CurrentMemoryContext, DestTupleQueue, NULL, palloc0(), tqueueDestroyReceiver(), tqueueReceiveSlot(), tqueueShutdownReceiver(), tqueueStartupReceiver(), and TUPLE_QUEUE_MODE_DATA.

Referenced by CreateDestReceiver(), and ExecParallelGetReceiver().

607 {
608  TQueueDestReceiver *self;
609 
610  self = (TQueueDestReceiver *) palloc0(sizeof(TQueueDestReceiver));
611 
612  self->pub.receiveSlot = tqueueReceiveSlot;
613  self->pub.rStartup = tqueueStartupReceiver;
614  self->pub.rShutdown = tqueueShutdownReceiver;
615  self->pub.rDestroy = tqueueDestroyReceiver;
616  self->pub.mydest = DestTupleQueue;
617  self->queue = handle;
618  self->mycontext = CurrentMemoryContext;
619  self->tmpcontext = NULL;
620  self->recordhtab = NULL;
621  self->mode = TUPLE_QUEUE_MODE_DATA;
622  /* Top-level tupledesc is not known yet */
623  self->tupledesc = NULL;
624  self->field_remapinfo = NULL;
625 
626  return (DestReceiver *) self;
627 }
static void tqueueDestroyReceiver(DestReceiver *self)
Definition: tqueue.c:588
static void tqueueStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
Definition: tqueue.c:568
static bool tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
Definition: tqueue.c:224
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
void * palloc0(Size size)
Definition: mcxt.c:878
#define NULL
Definition: c.h:229
static void tqueueShutdownReceiver(DestReceiver *self)
Definition: tqueue.c:577
#define TUPLE_QUEUE_MODE_DATA
Definition: tqueue.c:61
TupleQueueReader* CreateTupleQueueReader ( shm_mq_handle handle,
TupleDesc  tupledesc 
)

Definition at line 633 of file tqueue.c.

References BuildFieldRemapInfo(), CurrentMemoryContext, TupleQueueReader::field_remapinfo, TupleQueueReader::mode, TupleQueueReader::mycontext, NULL, palloc0(), TupleQueueReader::queue, TUPLE_QUEUE_MODE_DATA, TupleQueueReader::tupledesc, and TupleQueueReader::typmodmap.

Referenced by ExecGather(), and ExecGatherMerge().

634 {
635  TupleQueueReader *reader = palloc0(sizeof(TupleQueueReader));
636 
637  reader->queue = handle;
639  reader->typmodmap = NULL;
640  reader->mode = TUPLE_QUEUE_MODE_DATA;
641  reader->tupledesc = tupledesc;
642  reader->field_remapinfo = BuildFieldRemapInfo(tupledesc, reader->mycontext);
643 
644  return reader;
645 }
shm_mq_handle * queue
Definition: tqueue.c:170
TupleDesc tupledesc
Definition: tqueue.c:174
HTAB * typmodmap
Definition: tqueue.c:172
static TupleRemapInfo ** BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
Definition: tqueue.c:1244
MemoryContext CurrentMemoryContext
Definition: mcxt.c:37
TupleRemapInfo ** field_remapinfo
Definition: tqueue.c:175
void * palloc0(Size size)
Definition: mcxt.c:878
MemoryContext mycontext
Definition: tqueue.c:171
#define NULL
Definition: c.h:229
#define TUPLE_QUEUE_MODE_DATA
Definition: tqueue.c:61
void DestroyTupleQueueReader ( TupleQueueReader reader)

Definition at line 651 of file tqueue.c.

References TupleQueueReader::field_remapinfo, hash_destroy(), NULL, pfree(), TupleQueueReader::queue, shm_mq_detach(), shm_mq_get_queue(), and TupleQueueReader::typmodmap.

Referenced by ExecShutdownGatherMergeWorkers(), ExecShutdownGatherWorkers(), gather_merge_readnext(), and gather_readnext().

652 {
654  if (reader->typmodmap != NULL)
655  hash_destroy(reader->typmodmap);
656  /* Is it worth trying to free substructure of the remap tree? */
657  if (reader->field_remapinfo != NULL)
658  pfree(reader->field_remapinfo);
659  pfree(reader);
660 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
shm_mq_handle * queue
Definition: tqueue.c:170
void shm_mq_detach(shm_mq *mq)
Definition: shm_mq.c:778
void pfree(void *pointer)
Definition: mcxt.c:950
HTAB * typmodmap
Definition: tqueue.c:172
TupleRemapInfo ** field_remapinfo
Definition: tqueue.c:175
#define NULL
Definition: c.h:229
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:802
static void TQExamine ( TQueueDestReceiver tqueue,
TupleRemapInfo remapinfo,
Datum  value 
)
static

Definition at line 331 of file tqueue.c.

References TupleRemapInfo::arr, check_stack_depth(), TupleRemapInfo::rec, TupleRemapInfo::remapclass, TupleRemapInfo::rng, TQExamineArray(), TQExamineRange(), TQExamineRecord(), TQUEUE_REMAP_ARRAY, TQUEUE_REMAP_RANGE, TQUEUE_REMAP_RECORD, and TupleRemapInfo::u.

Referenced by TQExamineArray(), TQExamineRange(), TQExamineRecord(), and tqueueReceiveSlot().

332 {
333  /* This is recursive, so it could be driven to stack overflow. */
335 
336  switch (remapinfo->remapclass)
337  {
338  case TQUEUE_REMAP_ARRAY:
339  TQExamineArray(tqueue, &remapinfo->u.arr, value);
340  break;
341  case TQUEUE_REMAP_RANGE:
342  TQExamineRange(tqueue, &remapinfo->u.rng, value);
343  break;
344  case TQUEUE_REMAP_RECORD:
345  TQExamineRecord(tqueue, &remapinfo->u.rec, value);
346  break;
347  }
348 }
ArrayRemapInfo arr
Definition: tqueue.c:124
static void TQExamineRecord(TQueueDestReceiver *tqueue, RecordRemapInfo *remapinfo, Datum value)
Definition: tqueue.c:355
static void TQExamineArray(TQueueDestReceiver *tqueue, ArrayRemapInfo *remapinfo, Datum value)
Definition: tqueue.c:457
TupleRemapClass remapclass
Definition: tqueue.c:121
static void TQExamineRange(TQueueDestReceiver *tqueue, RangeRemapInfo *remapinfo, Datum value)
Definition: tqueue.c:485
void check_stack_depth(void)
Definition: postgres.c:3102
union TupleRemapInfo::@22 u
RangeRemapInfo rng
Definition: tqueue.c:125
static struct @121 value
RecordRemapInfo rec
Definition: tqueue.c:126
static void TQExamineArray ( TQueueDestReceiver tqueue,
ArrayRemapInfo remapinfo,
Datum  value 
)
static

Definition at line 457 of file tqueue.c.

References ARR_ELEMTYPE, DatumGetArrayTypeP, deconstruct_array(), ArrayRemapInfo::element_remap, i, TQExamine(), ArrayRemapInfo::typalign, ArrayRemapInfo::typbyval, and ArrayRemapInfo::typlen.

Referenced by TQExamine().

459 {
461  Oid typid = ARR_ELEMTYPE(arr);
462  Datum *elem_values;
463  bool *elem_nulls;
464  int num_elems;
465  int i;
466 
467  /* Deconstruct the array. */
468  deconstruct_array(arr, typid, remapinfo->typlen,
469  remapinfo->typbyval, remapinfo->typalign,
470  &elem_values, &elem_nulls, &num_elems);
471 
472  /* Examine each element. */
473  for (i = 0; i < num_elems; i++)
474  {
475  if (!elem_nulls[i])
476  TQExamine(tqueue, remapinfo->element_remap, elem_values[i]);
477  }
478 }
unsigned int Oid
Definition: postgres_ext.h:31
char typalign
Definition: tqueue.c:97
bool typbyval
Definition: tqueue.c:96
static void TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
Definition: tqueue.c:331
uintptr_t Datum
Definition: postgres.h:372
TupleRemapInfo * element_remap
Definition: tqueue.c:98
void deconstruct_array(ArrayType *array, Oid elmtype, int elmlen, bool elmbyval, char elmalign, Datum **elemsp, bool **nullsp, int *nelemsp)
Definition: arrayfuncs.c:3475
int i
static struct @121 value
#define ARR_ELEMTYPE(a)
Definition: array.h:273
int16 typlen
Definition: tqueue.c:95
#define DatumGetArrayTypeP(X)
Definition: array.h:242
static void TQExamineRange ( TQueueDestReceiver tqueue,
RangeRemapInfo remapinfo,
Datum  value 
)
static

Definition at line 485 of file tqueue.c.

References RangeRemapInfo::bound_remap, DatumGetRangeType, RangeBound::infinite, lower(), range(), range_deserialize(), TQExamine(), RangeRemapInfo::typcache, upper(), and RangeBound::val.

Referenced by TQExamine().

487 {
491  bool empty;
492 
493  /* Extract the lower and upper bounds. */
494  range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
495 
496  /* Nothing to do for an empty range. */
497  if (empty)
498  return;
499 
500  /* Examine each bound, if present. */
501  if (!upper.infinite)
502  TQExamine(tqueue, remapinfo->bound_remap, upper.val);
503  if (!lower.infinite)
504  TQExamine(tqueue, remapinfo->bound_remap, lower.val);
505 }
TypeCacheEntry * typcache
Definition: tqueue.c:103
Datum lower(PG_FUNCTION_ARGS)
Definition: oracle_compat.c:43
#define DatumGetRangeType(X)
Definition: rangetypes.h:71
Datum val
Definition: rangetypes.h:62
Datum upper(PG_FUNCTION_ARGS)
Definition: oracle_compat.c:74
TupleRemapInfo * bound_remap
Definition: tqueue.c:104
static struct cvec * range(struct vars *v, chr a, chr b, int cases)
Definition: regc_locale.c:416
static void TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
Definition: tqueue.c:331
void range_deserialize(TypeCacheEntry *typcache, RangeType *range, RangeBound *lower, RangeBound *upper, bool *empty)
Definition: rangetypes.c:1632
bool infinite
Definition: rangetypes.h:63
static struct @121 value
static void TQExamineRecord ( TQueueDestReceiver tqueue,
RecordRemapInfo remapinfo,
Datum  value 
)
static

Definition at line 355 of file tqueue.c.

References BuildFieldRemapInfo(), CreateTupleDescCopy(), DatumGetHeapTupleHeader, DecrTupleDescRefCount(), RecordRemapInfo::field_remap, FreeTupleDesc(), heap_deform_tuple(), HeapTupleHeaderGetDatumLength, HeapTupleHeaderGetTypeId, HeapTupleHeaderGetTypMod, i, InvalidOid, ItemPointerSetInvalid, lookup_rowtype_tupdesc(), MemoryContextSwitchTo(), TQueueDestReceiver::mycontext, tupleDesc::natts, NULL, palloc(), pfree(), RECORDOID, RecordRemapInfo::rectypid, RecordRemapInfo::rectypmod, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, HeapTupleData::t_tableOid, TQExamine(), TQSendRecordInfo(), RecordRemapInfo::tupledesc, and values.

Referenced by TQExamine().

357 {
358  HeapTupleHeader tup;
359  Oid typid;
360  int32 typmod;
361  TupleDesc tupledesc;
362 
363  /* Extract type OID and typmod from tuple. */
365  typid = HeapTupleHeaderGetTypeId(tup);
366  typmod = HeapTupleHeaderGetTypMod(tup);
367 
368  /*
369  * If first time through, or if this isn't the same composite type as last
370  * time, consider sending a control message, and then look up the
371  * necessary information for examining the fields.
372  */
373  if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
374  {
375  /* Free any old data. */
376  if (remapinfo->tupledesc != NULL)
377  FreeTupleDesc(remapinfo->tupledesc);
378  /* Is it worth trying to free substructure of the remap tree? */
379  if (remapinfo->field_remap != NULL)
380  pfree(remapinfo->field_remap);
381 
382  /* Look up tuple descriptor in typcache. */
383  tupledesc = lookup_rowtype_tupdesc(typid, typmod);
384 
385  /*
386  * If this is a transient record type, send the tupledesc in a control
387  * message. (TQSendRecordInfo is smart enough to do this only once
388  * per typmod.)
389  */
390  if (typid == RECORDOID)
391  TQSendRecordInfo(tqueue, typmod, tupledesc);
392 
393  /* Figure out whether fields need recursive processing. */
394  remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
395  tqueue->mycontext);
396  if (remapinfo->field_remap != NULL)
397  {
398  /*
399  * We need to inspect the record contents, so save a copy of the
400  * tupdesc. (We could possibly just reference the typcache's
401  * copy, but then it's problematic when to release the refcount.)
402  */
403  MemoryContext oldcontext = MemoryContextSwitchTo(tqueue->mycontext);
404 
405  remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
406  MemoryContextSwitchTo(oldcontext);
407  }
408  else
409  {
410  /* No fields of the record require remapping. */
411  remapinfo->tupledesc = NULL;
412  }
413  remapinfo->rectypid = typid;
414  remapinfo->rectypmod = typmod;
415 
416  /* Release reference count acquired by lookup_rowtype_tupdesc. */
417  DecrTupleDescRefCount(tupledesc);
418  }
419 
420  /*
421  * If field remapping is required, deform the tuple and examine each
422  * field.
423  */
424  if (remapinfo->field_remap != NULL)
425  {
426  Datum *values;
427  bool *isnull;
428  HeapTupleData tdata;
429  int i;
430 
431  /* Deform the tuple so we can check each column within. */
432  tupledesc = remapinfo->tupledesc;
433  values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
434  isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
436  ItemPointerSetInvalid(&(tdata.t_self));
437  tdata.t_tableOid = InvalidOid;
438  tdata.t_data = tup;
439  heap_deform_tuple(&tdata, tupledesc, values, isnull);
440 
441  /* Recursively check each interesting non-NULL attribute. */
442  for (i = 0; i < tupledesc->natts; i++)
443  {
444  if (!isnull[i] && remapinfo->field_remap[i])
445  TQExamine(tqueue, remapinfo->field_remap[i], values[i]);
446  }
447 
448  /* Need not clean up, since we're in a short-lived context. */
449  }
450 }
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:141
static void TQSendRecordInfo(TQueueDestReceiver *tqueue, int32 typmod, TupleDesc tupledesc)
Definition: tqueue.c:512
TupleDesc lookup_rowtype_tupdesc(Oid type_id, int32 typmod)
Definition: typcache.c:1257
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
unsigned int Oid
Definition: postgres_ext.h:31
#define DatumGetHeapTupleHeader(X)
Definition: fmgr.h:259
int natts
Definition: tupdesc.h:73
int32 rectypmod
Definition: tqueue.c:111
signed int int32
Definition: c.h:256
HeapTupleHeader t_data
Definition: htup.h:67
#define HeapTupleHeaderGetTypMod(tup)
Definition: htup_details.h:455
void pfree(void *pointer)
Definition: mcxt.c:950
MemoryContext mycontext
Definition: tqueue.c:142
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
static TupleRemapInfo ** BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
Definition: tqueue.c:1244
Oid t_tableOid
Definition: htup.h:66
#define RECORDOID
Definition: pg_type.h:680
static void TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
Definition: tqueue.c:331
uintptr_t Datum
Definition: postgres.h:372
#define HeapTupleHeaderGetTypeId(tup)
Definition: htup_details.h:445
#define InvalidOid
Definition: postgres_ext.h:36
#define NULL
Definition: c.h:229
void DecrTupleDescRefCount(TupleDesc tupdesc)
Definition: tupdesc.c:336
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:268
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:933
static Datum values[MAXATTR]
Definition: bootstrap.c:163
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:149
void * palloc(Size size)
Definition: mcxt.c:849
TupleRemapInfo ** field_remap
Definition: tqueue.c:116
int i
static struct @121 value
TupleDesc tupledesc
Definition: tqueue.c:115
#define HeapTupleHeaderGetDatumLength(tup)
Definition: htup_details.h:439
static Datum TQRemap ( TupleQueueReader reader,
TupleRemapInfo remapinfo,
Datum  value,
bool changed 
)
static

Definition at line 809 of file tqueue.c.

References TupleRemapInfo::arr, check_stack_depth(), elog, ERROR, TupleRemapInfo::rec, TupleRemapInfo::remapclass, TupleRemapInfo::rng, TQRemapArray(), TQRemapRange(), TQRemapRecord(), TQUEUE_REMAP_ARRAY, TQUEUE_REMAP_RANGE, TQUEUE_REMAP_RECORD, and TupleRemapInfo::u.

Referenced by TQRemapArray(), TQRemapRange(), and TQRemapTuple().

811 {
812  /* This is recursive, so it could be driven to stack overflow. */
814 
815  switch (remapinfo->remapclass)
816  {
817  case TQUEUE_REMAP_ARRAY:
818  return TQRemapArray(reader, &remapinfo->u.arr, value, changed);
819 
820  case TQUEUE_REMAP_RANGE:
821  return TQRemapRange(reader, &remapinfo->u.rng, value, changed);
822 
823  case TQUEUE_REMAP_RECORD:
824  return TQRemapRecord(reader, &remapinfo->u.rec, value, changed);
825  }
826 
827  elog(ERROR, "unrecognized tqueue remap class: %d",
828  (int) remapinfo->remapclass);
829  return (Datum) 0;
830 }
ArrayRemapInfo arr
Definition: tqueue.c:124
TupleRemapClass remapclass
Definition: tqueue.c:121
#define ERROR
Definition: elog.h:43
static Datum TQRemapRecord(TupleQueueReader *reader, RecordRemapInfo *remapinfo, Datum value, bool *changed)
Definition: tqueue.c:924
void check_stack_depth(void)
Definition: postgres.c:3102
uintptr_t Datum
Definition: postgres.h:372
static Datum TQRemapArray(TupleQueueReader *reader, ArrayRemapInfo *remapinfo, Datum value, bool *changed)
Definition: tqueue.c:837
union TupleRemapInfo::@22 u
RangeRemapInfo rng
Definition: tqueue.c:125
#define elog
Definition: elog.h:219
static struct @121 value
RecordRemapInfo rec
Definition: tqueue.c:126
static Datum TQRemapRange(TupleQueueReader *reader, RangeRemapInfo *remapinfo, Datum value, bool *changed)
Definition: tqueue.c:883
static Datum TQRemapArray ( TupleQueueReader reader,
ArrayRemapInfo remapinfo,
Datum  value,
bool changed 
)
static

Definition at line 837 of file tqueue.c.

References ARR_DIMS, ARR_ELEMTYPE, ARR_LBOUND, ARR_NDIM, construct_md_array(), DatumGetArrayTypeP, deconstruct_array(), ArrayRemapInfo::element_remap, i, PointerGetDatum, TQRemap(), ArrayRemapInfo::typalign, ArrayRemapInfo::typbyval, ArrayRemapInfo::typlen, and value.

Referenced by TQRemap().

839 {
841  Oid typid = ARR_ELEMTYPE(arr);
842  bool element_changed = false;
843  Datum *elem_values;
844  bool *elem_nulls;
845  int num_elems;
846  int i;
847 
848  /* Deconstruct the array. */
849  deconstruct_array(arr, typid, remapinfo->typlen,
850  remapinfo->typbyval, remapinfo->typalign,
851  &elem_values, &elem_nulls, &num_elems);
852 
853  /* Remap each element. */
854  for (i = 0; i < num_elems; i++)
855  {
856  if (!elem_nulls[i])
857  elem_values[i] = TQRemap(reader,
858  remapinfo->element_remap,
859  elem_values[i],
860  &element_changed);
861  }
862 
863  if (element_changed)
864  {
865  /* Reconstruct and return the array. */
866  *changed = true;
867  arr = construct_md_array(elem_values, elem_nulls,
868  ARR_NDIM(arr), ARR_DIMS(arr), ARR_LBOUND(arr),
869  typid, remapinfo->typlen,
870  remapinfo->typbyval, remapinfo->typalign);
871  return PointerGetDatum(arr);
872  }
873 
874  /* Else just return the value as-is. */
875  return value;
876 }
#define PointerGetDatum(X)
Definition: postgres.h:562
unsigned int Oid
Definition: postgres_ext.h:31
char typalign
Definition: tqueue.c:97
#define ARR_LBOUND(a)
Definition: array.h:277
#define ARR_DIMS(a)
Definition: array.h:275
static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo, Datum value, bool *changed)
Definition: tqueue.c:809
bool typbyval
Definition: tqueue.c:96
uintptr_t Datum
Definition: postgres.h:372
#define ARR_NDIM(a)
Definition: array.h:271
TupleRemapInfo * element_remap
Definition: tqueue.c:98
void deconstruct_array(ArrayType *array, Oid elmtype, int elmlen, bool elmbyval, char elmalign, Datum **elemsp, bool **nullsp, int *nelemsp)
Definition: arrayfuncs.c:3475
int i
static struct @121 value
ArrayType * construct_md_array(Datum *elems, bool *nulls, int ndims, int *dims, int *lbs, Oid elmtype, int elmlen, bool elmbyval, char elmalign)
Definition: arrayfuncs.c:3340
#define ARR_ELEMTYPE(a)
Definition: array.h:273
int16 typlen
Definition: tqueue.c:95
#define DatumGetArrayTypeP(X)
Definition: array.h:242
static Datum TQRemapRange ( TupleQueueReader reader,
RangeRemapInfo remapinfo,
Datum  value,
bool changed 
)
static

Definition at line 883 of file tqueue.c.

References RangeRemapInfo::bound_remap, DatumGetRangeType, RangeBound::infinite, lower(), range(), range_deserialize(), range_serialize(), RangeTypeGetDatum, TQRemap(), RangeRemapInfo::typcache, upper(), RangeBound::val, and value.

Referenced by TQRemap().

885 {
887  bool bound_changed = false;
890  bool empty;
891 
892  /* Extract the lower and upper bounds. */
893  range_deserialize(remapinfo->typcache, range, &lower, &upper, &empty);
894 
895  /* Nothing to do for an empty range. */
896  if (empty)
897  return value;
898 
899  /* Remap each bound, if present. */
900  if (!upper.infinite)
901  upper.val = TQRemap(reader, remapinfo->bound_remap,
902  upper.val, &bound_changed);
903  if (!lower.infinite)
904  lower.val = TQRemap(reader, remapinfo->bound_remap,
905  lower.val, &bound_changed);
906 
907  if (bound_changed)
908  {
909  /* Reserialize. */
910  *changed = true;
911  range = range_serialize(remapinfo->typcache, &lower, &upper, empty);
912  return RangeTypeGetDatum(range);
913  }
914 
915  /* Else just return the value as-is. */
916  return value;
917 }
#define RangeTypeGetDatum(X)
Definition: rangetypes.h:73
TypeCacheEntry * typcache
Definition: tqueue.c:103
Datum lower(PG_FUNCTION_ARGS)
Definition: oracle_compat.c:43
#define DatumGetRangeType(X)
Definition: rangetypes.h:71
Datum val
Definition: rangetypes.h:62
Datum upper(PG_FUNCTION_ARGS)
Definition: oracle_compat.c:74
static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo, Datum value, bool *changed)
Definition: tqueue.c:809
TupleRemapInfo * bound_remap
Definition: tqueue.c:104
static struct cvec * range(struct vars *v, chr a, chr b, int cases)
Definition: regc_locale.c:416
RangeType * range_serialize(TypeCacheEntry *typcache, RangeBound *lower, RangeBound *upper, bool empty)
Definition: rangetypes.c:1503
void range_deserialize(TypeCacheEntry *typcache, RangeType *range, RangeBound *lower, RangeBound *upper, bool *empty)
Definition: rangetypes.c:1632
bool infinite
Definition: rangetypes.h:63
static struct @121 value
static Datum TQRemapRecord ( TupleQueueReader reader,
RecordRemapInfo remapinfo,
Datum  value,
bool changed 
)
static

Definition at line 924 of file tqueue.c.

References Assert, BuildFieldRemapInfo(), CreateTupleDescCopy(), DatumGetHeapTupleHeader, DecrTupleDescRefCount(), elog, ERROR, RecordRemapInfo::field_remap, FreeTupleDesc(), HASH_FIND, hash_search(), HeapTupleHeaderGetDatum(), HeapTupleHeaderGetDatumLength, HeapTupleHeaderGetTypeId, HeapTupleHeaderGetTypMod, HeapTupleHeaderSetDatumLength, HeapTupleHeaderSetTypeId, HeapTupleHeaderSetTypMod, InvalidOid, ItemPointerSetInvalid, RecordRemapInfo::localtypmod, RecordTypmodMap::localtypmod, lookup_rowtype_tupdesc(), MemoryContextSwitchTo(), TupleQueueReader::mycontext, NULL, pfree(), RECORDOID, RecordRemapInfo::rectypid, RecordRemapInfo::rectypmod, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, HeapTupleData::t_tableOid, TQRemapTuple(), RecordRemapInfo::tupledesc, TupleQueueReader::typmodmap, and value.

Referenced by TQRemap().

926 {
927  HeapTupleHeader tup;
928  Oid typid;
929  int32 typmod;
930  bool changed_typmod;
931  TupleDesc tupledesc;
932 
933  /* Extract type OID and typmod from tuple. */
935  typid = HeapTupleHeaderGetTypeId(tup);
936  typmod = HeapTupleHeaderGetTypMod(tup);
937 
938  /*
939  * If first time through, or if this isn't the same composite type as last
940  * time, identify the required typmod mapping, and then look up the
941  * necessary information for processing the fields.
942  */
943  if (typid != remapinfo->rectypid || typmod != remapinfo->rectypmod)
944  {
945  /* Free any old data. */
946  if (remapinfo->tupledesc != NULL)
947  FreeTupleDesc(remapinfo->tupledesc);
948  /* Is it worth trying to free substructure of the remap tree? */
949  if (remapinfo->field_remap != NULL)
950  pfree(remapinfo->field_remap);
951 
952  /* If transient record type, look up matching local typmod. */
953  if (typid == RECORDOID)
954  {
955  RecordTypmodMap *mapent;
956 
957  Assert(reader->typmodmap != NULL);
958  mapent = hash_search(reader->typmodmap, &typmod,
959  HASH_FIND, NULL);
960  if (mapent == NULL)
961  elog(ERROR, "tqueue received unrecognized remote typmod %d",
962  typmod);
963  remapinfo->localtypmod = mapent->localtypmod;
964  }
965  else
966  remapinfo->localtypmod = -1;
967 
968  /* Look up tuple descriptor in typcache. */
969  tupledesc = lookup_rowtype_tupdesc(typid, remapinfo->localtypmod);
970 
971  /* Figure out whether fields need recursive processing. */
972  remapinfo->field_remap = BuildFieldRemapInfo(tupledesc,
973  reader->mycontext);
974  if (remapinfo->field_remap != NULL)
975  {
976  /*
977  * We need to inspect the record contents, so save a copy of the
978  * tupdesc. (We could possibly just reference the typcache's
979  * copy, but then it's problematic when to release the refcount.)
980  */
981  MemoryContext oldcontext = MemoryContextSwitchTo(reader->mycontext);
982 
983  remapinfo->tupledesc = CreateTupleDescCopy(tupledesc);
984  MemoryContextSwitchTo(oldcontext);
985  }
986  else
987  {
988  /* No fields of the record require remapping. */
989  remapinfo->tupledesc = NULL;
990  }
991  remapinfo->rectypid = typid;
992  remapinfo->rectypmod = typmod;
993 
994  /* Release reference count acquired by lookup_rowtype_tupdesc. */
995  DecrTupleDescRefCount(tupledesc);
996  }
997 
998  /* If transient record, replace remote typmod with local typmod. */
999  if (typid == RECORDOID && typmod != remapinfo->localtypmod)
1000  {
1001  typmod = remapinfo->localtypmod;
1002  changed_typmod = true;
1003  }
1004  else
1005  changed_typmod = false;
1006 
1007  /*
1008  * If we need to change the typmod, or if there are any potentially
1009  * remappable fields, replace the tuple.
1010  */
1011  if (changed_typmod || remapinfo->field_remap != NULL)
1012  {
1013  HeapTupleData htup;
1014  HeapTuple atup;
1015 
1016  /* For now, assume we always need to change the tuple in this case. */
1017  *changed = true;
1018 
1019  /* Copy tuple, possibly remapping contained fields. */
1021  htup.t_tableOid = InvalidOid;
1023  htup.t_data = tup;
1024  atup = TQRemapTuple(reader,
1025  remapinfo->tupledesc,
1026  remapinfo->field_remap,
1027  &htup);
1028 
1029  /* Apply the correct labeling for a local Datum. */
1030  HeapTupleHeaderSetTypeId(atup->t_data, typid);
1031  HeapTupleHeaderSetTypMod(atup->t_data, typmod);
1033 
1034  /* And return the results. */
1035  return HeapTupleHeaderGetDatum(atup->t_data);
1036  }
1037 
1038  /* Else just return the value as-is. */
1039  return value;
1040 }
TupleDesc CreateTupleDescCopy(TupleDesc tupdesc)
Definition: tupdesc.c:141
#define HeapTupleHeaderSetTypeId(tup, typeid)
Definition: htup_details.h:450
TupleDesc lookup_rowtype_tupdesc(Oid type_id, int32 typmod)
Definition: typcache.c:1257
int32 localtypmod
Definition: tqueue.c:156
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
unsigned int Oid
Definition: postgres_ext.h:31
#define DatumGetHeapTupleHeader(X)
Definition: fmgr.h:259
int32 rectypmod
Definition: tqueue.c:111
signed int int32
Definition: c.h:256
#define HeapTupleHeaderSetDatumLength(tup, len)
Definition: htup_details.h:442
HeapTupleHeader t_data
Definition: htup.h:67
#define HeapTupleHeaderGetTypMod(tup)
Definition: htup_details.h:455
void pfree(void *pointer)
Definition: mcxt.c:950
#define ERROR
Definition: elog.h:43
ItemPointerData t_self
Definition: htup.h:65
HTAB * typmodmap
Definition: tqueue.c:172
uint32 t_len
Definition: htup.h:64
static TupleRemapInfo ** BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
Definition: tqueue.c:1244
Oid t_tableOid
Definition: htup.h:66
#define RECORDOID
Definition: pg_type.h:680
#define HeapTupleHeaderSetTypMod(tup, typmod)
Definition: htup_details.h:460
#define HeapTupleHeaderGetTypeId(tup)
Definition: htup_details.h:445
MemoryContext mycontext
Definition: tqueue.c:171
#define InvalidOid
Definition: postgres_ext.h:36
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
void DecrTupleDescRefCount(TupleDesc tupdesc)
Definition: tupdesc.c:336
static HeapTuple TQRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc, TupleRemapInfo **field_remapinfo, HeapTuple tuple)
Definition: tqueue.c:763
void FreeTupleDesc(TupleDesc tupdesc)
Definition: tupdesc.c:268
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:149
Datum HeapTupleHeaderGetDatum(HeapTupleHeader tuple)
Definition: execTuples.c:1201
TupleRemapInfo ** field_remap
Definition: tqueue.c:116
#define elog
Definition: elog.h:219
static struct @121 value
TupleDesc tupledesc
Definition: tqueue.c:115
int32 localtypmod
Definition: tqueue.c:113
#define HeapTupleHeaderGetDatumLength(tup)
Definition: htup_details.h:439
static HeapTuple TQRemapTuple ( TupleQueueReader reader,
TupleDesc  tupledesc,
TupleRemapInfo **  field_remapinfo,
HeapTuple  tuple 
)
static

Definition at line 763 of file tqueue.c.

References heap_copytuple(), heap_deform_tuple(), heap_form_tuple(), i, tupleDesc::natts, NULL, palloc(), TQRemap(), and values.

Referenced by TQRemapRecord(), and TupleQueueHandleDataMessage().

767 {
768  Datum *values;
769  bool *isnull;
770  bool changed = false;
771  int i;
772 
773  /*
774  * If no remapping is necessary, just copy the tuple into a single
775  * palloc'd chunk, as caller will expect.
776  */
777  if (field_remapinfo == NULL)
778  return heap_copytuple(tuple);
779 
780  /* Deform tuple so we can remap record typmods for individual attrs. */
781  values = (Datum *) palloc(tupledesc->natts * sizeof(Datum));
782  isnull = (bool *) palloc(tupledesc->natts * sizeof(bool));
783  heap_deform_tuple(tuple, tupledesc, values, isnull);
784 
785  /* Recursively process each interesting non-NULL attribute. */
786  for (i = 0; i < tupledesc->natts; i++)
787  {
788  if (isnull[i] || field_remapinfo[i] == NULL)
789  continue;
790  values[i] = TQRemap(reader, field_remapinfo[i], values[i], &changed);
791  }
792 
793  /* Reconstruct the modified tuple, if anything was modified. */
794  if (changed)
795  return heap_form_tuple(tupledesc, values, isnull);
796  else
797  return heap_copytuple(tuple);
798 }
HeapTuple heap_copytuple(HeapTuple tuple)
Definition: heaptuple.c:608
HeapTuple heap_form_tuple(TupleDesc tupleDescriptor, Datum *values, bool *isnull)
Definition: heaptuple.c:692
int natts
Definition: tupdesc.h:73
static Datum TQRemap(TupleQueueReader *reader, TupleRemapInfo *remapinfo, Datum value, bool *changed)
Definition: tqueue.c:809
uintptr_t Datum
Definition: postgres.h:372
#define NULL
Definition: c.h:229
void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull)
Definition: heaptuple.c:933
static Datum values[MAXATTR]
Definition: bootstrap.c:163
void * palloc(Size size)
Definition: mcxt.c:849
int i
static void TQSendRecordInfo ( TQueueDestReceiver tqueue,
int32  typmod,
TupleDesc  tupledesc 
)
static

Definition at line 512 of file tqueue.c.

References appendBinaryStringInfo(), tupleDesc::attrs, buf, StringInfoData::data, DEBUG3, elog, HASHCTL::entrysize, FormData_pg_attribute, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, i, initStringInfo(), HASHCTL::keysize, StringInfoData::len, MemSet, TQueueDestReceiver::mode, TQueueDestReceiver::mycontext, tupleDesc::natts, NULL, TQueueDestReceiver::queue, TQueueDestReceiver::recordhtab, shm_mq_send(), tupleDesc::tdhasoid, and TUPLE_QUEUE_MODE_CONTROL.

Referenced by TQExamineRecord().

513 {
515  bool found;
516  int i;
517 
518  /* Initialize hash table if not done yet. */
519  if (tqueue->recordhtab == NULL)
520  {
521  HASHCTL ctl;
522 
523  MemSet(&ctl, 0, sizeof(ctl));
524  /* Hash table entries are just typmods */
525  ctl.keysize = sizeof(int32);
526  ctl.entrysize = sizeof(int32);
527  ctl.hcxt = tqueue->mycontext;
528  tqueue->recordhtab = hash_create("tqueue sender record type hashtable",
529  100, &ctl,
531  }
532 
533  /* Have we already seen this record type? If not, must report it. */
534  hash_search(tqueue->recordhtab, &typmod, HASH_ENTER, &found);
535  if (found)
536  return;
537 
538  elog(DEBUG3, "sending tqueue control message for record typmod %d", typmod);
539 
540  /* If message queue is in data mode, switch to control mode. */
541  if (tqueue->mode != TUPLE_QUEUE_MODE_CONTROL)
542  {
543  tqueue->mode = TUPLE_QUEUE_MODE_CONTROL;
544  shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
545  }
546 
547  /* Assemble a control message. */
548  initStringInfo(&buf);
549  appendBinaryStringInfo(&buf, (char *) &typmod, sizeof(int32));
550  appendBinaryStringInfo(&buf, (char *) &tupledesc->natts, sizeof(int));
551  appendBinaryStringInfo(&buf, (char *) &tupledesc->tdhasoid, sizeof(bool));
552  for (i = 0; i < tupledesc->natts; i++)
553  {
554  appendBinaryStringInfo(&buf, (char *) tupledesc->attrs[i],
555  sizeof(FormData_pg_attribute));
556  }
557 
558  /* Send control message. */
559  shm_mq_send(tqueue->queue, buf.len, buf.data, false);
560 
561  /* We assume it's OK to leak buf because we're in a short-lived context. */
562 }
bool tdhasoid
Definition: tupdesc.h:79
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define DEBUG3
Definition: elog.h:23
Form_pg_attribute * attrs
Definition: tupdesc.h:74
Size entrysize
Definition: hsearch.h:73
#define MemSet(start, val, len)
Definition: c.h:857
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
int natts
Definition: tupdesc.h:73
signed int int32
Definition: c.h:256
HTAB * recordhtab
Definition: tqueue.c:144
MemoryContext mycontext
Definition: tqueue.c:142
static char * buf
Definition: pg_test_fsync.c:66
void initStringInfo(StringInfo str)
Definition: stringinfo.c:46
#define HASH_BLOBS
Definition: hsearch.h:88
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
Size keysize
Definition: hsearch.h:72
FormData_pg_attribute
Definition: pg_attribute.h:171
shm_mq_handle * queue
Definition: tqueue.c:141
#define NULL
Definition: c.h:229
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:321
#define TUPLE_QUEUE_MODE_CONTROL
Definition: tqueue.c:60
int i
#define elog
Definition: elog.h:219
void appendBinaryStringInfo(StringInfo str, const char *data, int datalen)
Definition: stringinfo.c:208
static void tqueueDestroyReceiver ( DestReceiver self)
static

Definition at line 588 of file tqueue.c.

References TQueueDestReceiver::field_remapinfo, hash_destroy(), MemoryContextDelete(), NULL, pfree(), TQueueDestReceiver::recordhtab, and TQueueDestReceiver::tmpcontext.

Referenced by CreateTupleQueueDestReceiver().

589 {
590  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
591 
592  if (tqueue->tmpcontext != NULL)
594  if (tqueue->recordhtab != NULL)
595  hash_destroy(tqueue->recordhtab);
596  /* Is it worth trying to free substructure of the remap tree? */
597  if (tqueue->field_remapinfo != NULL)
598  pfree(tqueue->field_remapinfo);
599  pfree(self);
600 }
void hash_destroy(HTAB *hashp)
Definition: dynahash.c:793
void MemoryContextDelete(MemoryContext context)
Definition: mcxt.c:200
MemoryContext tmpcontext
Definition: tqueue.c:143
HTAB * recordhtab
Definition: tqueue.c:144
void pfree(void *pointer)
Definition: mcxt.c:950
#define NULL
Definition: c.h:229
TupleRemapInfo ** field_remapinfo
Definition: tqueue.c:147
static bool tqueueReceiveSlot ( TupleTableSlot slot,
DestReceiver self 
)
static

Definition at line 224 of file tqueue.c.

References ALLOCSET_DEFAULT_SIZES, AllocSetContextCreate(), BuildFieldRemapInfo(), ereport, errcode(), errmsg(), ERROR, ExecMaterializeSlot(), TQueueDestReceiver::field_remapinfo, i, MemoryContextReset(), MemoryContextSwitchTo(), TQueueDestReceiver::mode, TQueueDestReceiver::mycontext, tupleDesc::natts, NULL, pfree(), TQueueDestReceiver::queue, result, SHM_MQ_DETACHED, shm_mq_send(), SHM_MQ_SUCCESS, slot_getallattrs(), HeapTupleData::t_data, HeapTupleData::t_len, TQueueDestReceiver::tmpcontext, TQExamine(), TupleTableSlot::tts_isnull, TupleTableSlot::tts_tupleDescriptor, TupleTableSlot::tts_values, TUPLE_QUEUE_MODE_DATA, and TQueueDestReceiver::tupledesc.

Referenced by CreateTupleQueueDestReceiver().

225 {
226  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
227  TupleDesc tupledesc = slot->tts_tupleDescriptor;
228  HeapTuple tuple;
230 
231  /*
232  * If first time through, compute remapping info for the top-level fields.
233  * On later calls, if the tupledesc has changed, set up for the new
234  * tupledesc. (This is a strange test both because the executor really
235  * shouldn't change the tupledesc, and also because it would be unsafe if
236  * the old tupledesc could be freed and a new one allocated at the same
237  * address. But since some very old code in printtup.c uses a similar
238  * approach, we adopt it here as well.)
239  *
240  * Here and elsewhere in this module, when replacing remapping info we
241  * pfree the top-level object because that's easy, but we don't bother to
242  * recursively free any substructure. This would lead to query-lifespan
243  * memory leaks if the mapping info actually changed frequently, but since
244  * we don't expect that to happen, it doesn't seem worth expending code to
245  * prevent it.
246  */
247  if (tqueue->tupledesc != tupledesc)
248  {
249  /* Is it worth trying to free substructure of the remap tree? */
250  if (tqueue->field_remapinfo != NULL)
251  pfree(tqueue->field_remapinfo);
252  tqueue->field_remapinfo = BuildFieldRemapInfo(tupledesc,
253  tqueue->mycontext);
254  tqueue->tupledesc = tupledesc;
255  }
256 
257  /*
258  * When, because of the types being transmitted, no record typmod mapping
259  * can be needed, we can skip a good deal of work.
260  */
261  if (tqueue->field_remapinfo != NULL)
262  {
263  TupleRemapInfo **remapinfo = tqueue->field_remapinfo;
264  int i;
265  MemoryContext oldcontext = NULL;
266 
267  /* Deform the tuple so we can examine fields, if not done already. */
268  slot_getallattrs(slot);
269 
270  /* Iterate over each attribute and search it for transient typmods. */
271  for (i = 0; i < tupledesc->natts; i++)
272  {
273  /* Ignore nulls and types that don't need special handling. */
274  if (slot->tts_isnull[i] || remapinfo[i] == NULL)
275  continue;
276 
277  /* Switch to temporary memory context to avoid leaking. */
278  if (oldcontext == NULL)
279  {
280  if (tqueue->tmpcontext == NULL)
281  tqueue->tmpcontext =
283  "tqueue sender temp context",
285  oldcontext = MemoryContextSwitchTo(tqueue->tmpcontext);
286  }
287 
288  /* Examine the value. */
289  TQExamine(tqueue, remapinfo[i], slot->tts_values[i]);
290  }
291 
292  /* If we used the temp context, reset it and restore prior context. */
293  if (oldcontext != NULL)
294  {
295  MemoryContextSwitchTo(oldcontext);
297  }
298 
299  /* If we entered control mode, switch back to data mode. */
300  if (tqueue->mode != TUPLE_QUEUE_MODE_DATA)
301  {
302  tqueue->mode = TUPLE_QUEUE_MODE_DATA;
303  shm_mq_send(tqueue->queue, sizeof(char), &tqueue->mode, false);
304  }
305  }
306 
307  /* Send the tuple itself. */
308  tuple = ExecMaterializeSlot(slot);
309  result = shm_mq_send(tqueue->queue, tuple->t_len, tuple->t_data, false);
310 
311  /* Check for failure. */
312  if (result == SHM_MQ_DETACHED)
313  return false;
314  else if (result != SHM_MQ_SUCCESS)
315  ereport(ERROR,
316  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
317  errmsg("could not send tuple to shared-memory queue")));
318 
319  return true;
320 }
MemoryContext tmpcontext
Definition: tqueue.c:143
TupleDesc tupledesc
Definition: tqueue.c:146
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:109
int errcode(int sqlerrcode)
Definition: elog.c:575
Datum * tts_values
Definition: tuptable.h:125
return result
Definition: formatting.c:1632
void MemoryContextReset(MemoryContext context)
Definition: mcxt.c:135
int natts
Definition: tupdesc.h:73
HeapTupleHeader t_data
Definition: htup.h:67
void pfree(void *pointer)
Definition: mcxt.c:950
#define ERROR
Definition: elog.h:43
MemoryContext mycontext
Definition: tqueue.c:142
#define ALLOCSET_DEFAULT_SIZES
Definition: memutils.h:165
uint32 t_len
Definition: htup.h:64
bool * tts_isnull
Definition: tuptable.h:126
static TupleRemapInfo ** BuildFieldRemapInfo(TupleDesc tupledesc, MemoryContext mycontext)
Definition: tqueue.c:1244
#define ereport(elevel, rest)
Definition: elog.h:122
void slot_getallattrs(TupleTableSlot *slot)
Definition: heaptuple.c:1237
static void TQExamine(TQueueDestReceiver *tqueue, TupleRemapInfo *remapinfo, Datum value)
Definition: tqueue.c:331
TupleDesc tts_tupleDescriptor
Definition: tuptable.h:121
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize)
Definition: aset.c:322
shm_mq_result
Definition: shm_mq.h:36
shm_mq_handle * queue
Definition: tqueue.c:141
#define NULL
Definition: c.h:229
shm_mq_result shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
Definition: shm_mq.c:321
HeapTuple ExecMaterializeSlot(TupleTableSlot *slot)
Definition: execTuples.c:725
TupleRemapInfo ** field_remapinfo
Definition: tqueue.c:147
int errmsg(const char *fmt,...)
Definition: elog.c:797
int i
#define TUPLE_QUEUE_MODE_DATA
Definition: tqueue.c:61
static void tqueueShutdownReceiver ( DestReceiver self)
static

Definition at line 577 of file tqueue.c.

References TQueueDestReceiver::queue, shm_mq_detach(), and shm_mq_get_queue().

Referenced by CreateTupleQueueDestReceiver().

578 {
579  TQueueDestReceiver *tqueue = (TQueueDestReceiver *) self;
580 
582 }
void shm_mq_detach(shm_mq *mq)
Definition: shm_mq.c:778
shm_mq_handle * queue
Definition: tqueue.c:141
shm_mq * shm_mq_get_queue(shm_mq_handle *mqh)
Definition: shm_mq.c:802
static void tqueueStartupReceiver ( DestReceiver self,
int  operation,
TupleDesc  typeinfo 
)
static

Definition at line 568 of file tqueue.c.

Referenced by CreateTupleQueueDestReceiver().

569 {
570  /* do nothing */
571 }
static void TupleQueueHandleControlMessage ( TupleQueueReader reader,
Size  nbytes,
char *  data 
)
static

Definition at line 1050 of file tqueue.c.

References Assert, BlessTupleDesc(), CreateTupleDesc(), DEBUG3, elog, HASHCTL::entrysize, ERROR, FormData_pg_attribute, HASH_BLOBS, HASH_CONTEXT, hash_create(), HASH_ELEM, HASH_ENTER, hash_search(), HASHCTL::hcxt, i, HASHCTL::keysize, RecordTypmodMap::localtypmod, MemSet, TupleQueueReader::mycontext, NULL, palloc(), tupleDesc::tdtypmod, and TupleQueueReader::typmodmap.

Referenced by TupleQueueReaderNext().

1052 {
1053  int32 remotetypmod;
1054  int natts;
1055  bool hasoid;
1056  Size offset = 0;
1057  Form_pg_attribute *attrs;
1058  TupleDesc tupledesc;
1059  RecordTypmodMap *mapent;
1060  bool found;
1061  int i;
1062 
1063  /* Extract remote typmod. */
1064  memcpy(&remotetypmod, &data[offset], sizeof(int32));
1065  offset += sizeof(int32);
1066 
1067  /* Extract attribute count. */
1068  memcpy(&natts, &data[offset], sizeof(int));
1069  offset += sizeof(int);
1070 
1071  /* Extract hasoid flag. */
1072  memcpy(&hasoid, &data[offset], sizeof(bool));
1073  offset += sizeof(bool);
1074 
1075  /* Extract attribute details. The tupledesc made here is just transient. */
1076  attrs = palloc(natts * sizeof(Form_pg_attribute));
1077  for (i = 0; i < natts; i++)
1078  {
1079  attrs[i] = palloc(sizeof(FormData_pg_attribute));
1080  memcpy(attrs[i], &data[offset], sizeof(FormData_pg_attribute));
1081  offset += sizeof(FormData_pg_attribute);
1082  }
1083 
1084  /* We should have read the whole message. */
1085  Assert(offset == nbytes);
1086 
1087  /* Construct TupleDesc, and assign a local typmod. */
1088  tupledesc = CreateTupleDesc(natts, hasoid, attrs);
1089  tupledesc = BlessTupleDesc(tupledesc);
1090 
1091  /* Create mapping hashtable if it doesn't exist already. */
1092  if (reader->typmodmap == NULL)
1093  {
1094  HASHCTL ctl;
1095 
1096  MemSet(&ctl, 0, sizeof(ctl));
1097  ctl.keysize = sizeof(int32);
1098  ctl.entrysize = sizeof(RecordTypmodMap);
1099  ctl.hcxt = reader->mycontext;
1100  reader->typmodmap = hash_create("tqueue receiver record type hashtable",
1101  100, &ctl,
1103  }
1104 
1105  /* Create map entry. */
1106  mapent = hash_search(reader->typmodmap, &remotetypmod, HASH_ENTER,
1107  &found);
1108  if (found)
1109  elog(ERROR, "duplicate tqueue control message for typmod %d",
1110  remotetypmod);
1111  mapent->localtypmod = tupledesc->tdtypmod;
1112 
1113  elog(DEBUG3, "tqueue mapping remote typmod %d to local typmod %d",
1114  remotetypmod, mapent->localtypmod);
1115 }
#define HASH_CONTEXT
Definition: hsearch.h:93
#define HASH_ELEM
Definition: hsearch.h:87
MemoryContext hcxt
Definition: hsearch.h:78
#define DEBUG3
Definition: elog.h:23
int32 localtypmod
Definition: tqueue.c:156
Size entrysize
Definition: hsearch.h:73
#define MemSet(start, val, len)
Definition: c.h:857
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
Definition: dynahash.c:885
TupleDesc CreateTupleDesc(int natts, bool hasoid, Form_pg_attribute *attrs)
Definition: tupdesc.c:112
int32 tdtypmod
Definition: tupdesc.h:78
char bool
Definition: c.h:202
signed int int32
Definition: c.h:256
#define ERROR
Definition: elog.h:43
HTAB * typmodmap
Definition: tqueue.c:172
TupleDesc BlessTupleDesc(TupleDesc tupdesc)
Definition: execTuples.c:1031
struct RecordTypmodMap RecordTypmodMap
FormData_pg_attribute * Form_pg_attribute
Definition: pg_attribute.h:187
#define HASH_BLOBS
Definition: hsearch.h:88
HTAB * hash_create(const char *tabname, long nelem, HASHCTL *info, int flags)
Definition: dynahash.c:301
Size keysize
Definition: hsearch.h:72
MemoryContext mycontext
Definition: tqueue.c:171
FormData_pg_attribute
Definition: pg_attribute.h:171
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
void * palloc(Size size)
Definition: mcxt.c:849
int i
#define elog
Definition: elog.h:219
static HeapTuple TupleQueueHandleDataMessage ( TupleQueueReader reader,
Size  nbytes,
HeapTupleHeader  data 
)
static

Definition at line 734 of file tqueue.c.

References TupleQueueReader::field_remapinfo, InvalidOid, ItemPointerSetInvalid, HeapTupleData::t_data, HeapTupleData::t_len, HeapTupleData::t_self, HeapTupleData::t_tableOid, TQRemapTuple(), and TupleQueueReader::tupledesc.

Referenced by TupleQueueReaderNext().

737 {
738  HeapTupleData htup;
739 
740  /*
741  * Set up a dummy HeapTupleData pointing to the data from the shm_mq
742  * (which had better be sufficiently aligned).
743  */
745  htup.t_tableOid = InvalidOid;
746  htup.t_len = nbytes;
747  htup.t_data = data;
748 
749  /*
750  * Either just copy the data into a regular palloc'd tuple, or remap it,
751  * as required.
752  */
753  return TQRemapTuple(reader,
754  reader->tupledesc,
755  reader->field_remapinfo,
756  &htup);
757 }
TupleDesc tupledesc
Definition: tqueue.c:174
HeapTupleHeader t_data
Definition: htup.h:67
ItemPointerData t_self
Definition: htup.h:65
uint32 t_len
Definition: htup.h:64
Oid t_tableOid
Definition: htup.h:66
TupleRemapInfo ** field_remapinfo
Definition: tqueue.c:175
#define InvalidOid
Definition: postgres_ext.h:36
static HeapTuple TQRemapTuple(TupleQueueReader *reader, TupleDesc tupledesc, TupleRemapInfo **field_remapinfo, HeapTuple tuple)
Definition: tqueue.c:763
#define ItemPointerSetInvalid(pointer)
Definition: itemptr.h:149
HeapTuple TupleQueueReaderNext ( TupleQueueReader reader,
bool  nowait,
bool done 
)

Definition at line 679 of file tqueue.c.

References Assert, elog, ERROR, TupleQueueReader::mode, NULL, TupleQueueReader::queue, result, SHM_MQ_DETACHED, shm_mq_receive(), SHM_MQ_SUCCESS, SHM_MQ_WOULD_BLOCK, TUPLE_QUEUE_MODE_CONTROL, TUPLE_QUEUE_MODE_DATA, TupleQueueHandleControlMessage(), and TupleQueueHandleDataMessage().

Referenced by gather_readnext(), and gm_readnext_tuple().

680 {
682 
683  if (done != NULL)
684  *done = false;
685 
686  for (;;)
687  {
688  Size nbytes;
689  void *data;
690 
691  /* Attempt to read a message. */
692  result = shm_mq_receive(reader->queue, &nbytes, &data, nowait);
693 
694  /* If queue is detached, set *done and return NULL. */
695  if (result == SHM_MQ_DETACHED)
696  {
697  if (done != NULL)
698  *done = true;
699  return NULL;
700  }
701 
702  /* In non-blocking mode, bail out if no message ready yet. */
703  if (result == SHM_MQ_WOULD_BLOCK)
704  return NULL;
705  Assert(result == SHM_MQ_SUCCESS);
706 
707  /*
708  * We got a message (see message spec at top of file). Process it.
709  */
710  if (nbytes == 1)
711  {
712  /* Mode switch message. */
713  reader->mode = ((char *) data)[0];
714  }
715  else if (reader->mode == TUPLE_QUEUE_MODE_DATA)
716  {
717  /* Tuple data. */
718  return TupleQueueHandleDataMessage(reader, nbytes, data);
719  }
720  else if (reader->mode == TUPLE_QUEUE_MODE_CONTROL)
721  {
722  /* Control message, describing a transient record type. */
723  TupleQueueHandleControlMessage(reader, nbytes, data);
724  }
725  else
726  elog(ERROR, "unrecognized tqueue mode: %d", (int) reader->mode);
727  }
728 }
static HeapTuple TupleQueueHandleDataMessage(TupleQueueReader *reader, Size nbytes, HeapTupleHeader data)
Definition: tqueue.c:734
shm_mq_handle * queue
Definition: tqueue.c:170
return result
Definition: formatting.c:1632
#define ERROR
Definition: elog.h:43
shm_mq_result
Definition: shm_mq.h:36
#define NULL
Definition: c.h:229
#define Assert(condition)
Definition: c.h:675
size_t Size
Definition: c.h:356
#define TUPLE_QUEUE_MODE_CONTROL
Definition: tqueue.c:60
static void TupleQueueHandleControlMessage(TupleQueueReader *reader, Size nbytes, char *data)
Definition: tqueue.c:1050
#define TUPLE_QUEUE_MODE_DATA
Definition: tqueue.c:61
#define elog
Definition: elog.h:219
shm_mq_result shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
Definition: shm_mq.c:518