PostgreSQL Source Code git master
Loading...
Searching...
No Matches
crosstabview.c
Go to the documentation of this file.
1/*
2 * psql - the PostgreSQL interactive terminal
3 *
4 * Copyright (c) 2000-2026, PostgreSQL Global Development Group
5 *
6 * src/bin/psql/crosstabview.c
7 */
8#include "postgres_fe.h"
9
10#include "common.h"
11#include "common/int.h"
12#include "common/logging.h"
13#include "crosstabview.h"
14#include "pqexpbuffer.h"
15#include "psqlscanslash.h"
16#include "settings.h"
17
18/*
19 * Value/position from the resultset that goes into the horizontal or vertical
20 * crosstabview header.
21 */
22typedef struct _pivot_field
23{
24 /*
25 * Pointer obtained from PQgetvalue() for colV or colH. Each distinct
26 * value becomes an entry in the vertical header (colV), or horizontal
27 * header (colH). A Null value is represented by a NULL pointer.
28 */
29 char *name;
30
31 /*
32 * When a sort is requested on an alternative column, this holds
33 * PQgetvalue() for the sort column corresponding to <name>. If <name>
34 * appear multiple times, it's the first value in the order of the results
35 * that is kept. A Null value is represented by a NULL pointer.
36 */
38
39 /*
40 * Rank of this value, starting at 0. Initially, it's the relative
41 * position of the first appearance of <name> in the resultset. For
42 * example, if successive rows contain B,A,C,A,D then it's B:0,A:1,C:2,D:3
43 * When a sort column is specified, ranks get updated in a final pass to
44 * reflect the desired order.
45 */
46 int rank;
48
49/* Node in avl_tree */
50typedef struct _avl_node
51{
52 /* Node contents */
54
55 /*
56 * Height of this node in the tree (number of nodes on the longest path to
57 * a leaf).
58 */
59 int height;
60
61 /*
62 * Child nodes. [0] points to left subtree, [1] to right subtree. Never
63 * NULL, points to the empty node avl_tree.end when no left or right
64 * value.
65 */
66 struct _avl_node *children[2];
68
69/*
70 * Control structure for the AVL tree (binary search tree kept
71 * balanced with the AVL algorithm)
72 */
73typedef struct _avl_tree
74{
75 int count; /* Total number of nodes */
76 avl_node *root; /* root of the tree */
77 avl_node *end; /* Immutable dereferenceable empty tree */
79
80
81static bool printCrosstab(const PGresult *result,
84 int field_for_data);
85static void avlInit(avl_tree *tree);
86static void avlMergeValue(avl_tree *tree, char *name, char *sort_value);
87static int avlCollectFields(avl_tree *tree, avl_node *node,
88 pivot_field *fields, int idx);
89static void avlFree(avl_tree *tree, avl_node *node);
91static int indexOfColumn(char *arg, const PGresult *res);
92static int pivotFieldCompare(const void *a, const void *b);
93static int rankCompare(const void *a, const void *b);
94
95
96/*
97 * Main entry point to this module.
98 *
99 * Process the data from *res according to the options in pset (global),
100 * to generate the horizontal and vertical headers contents,
101 * then call printCrosstab() for the actual output.
102 */
103bool
105{
106 bool retval = false;
111 int num_columns = 0;
112 int num_rows = 0;
113 int field_for_rows;
115 int field_for_data;
117 int rn;
118
121
123 {
124 pg_log_error("\\crosstabview: statement did not return a result set");
125 goto error_return;
126 }
127
128 if (PQnfields(res) < 3)
129 {
130 pg_log_error("\\crosstabview: query must return at least three columns");
131 goto error_return;
132 }
133
134 /* Process first optional arg (vertical header column) */
135 if (pset.ctv_args[0] == NULL)
136 field_for_rows = 0;
137 else
138 {
140 if (field_for_rows < 0)
141 goto error_return;
142 }
143
144 /* Process second optional arg (horizontal header column) */
145 if (pset.ctv_args[1] == NULL)
147 else
148 {
150 if (field_for_columns < 0)
151 goto error_return;
152 }
153
154 /* Insist that header columns be distinct */
156 {
157 pg_log_error("\\crosstabview: vertical and horizontal headers must be different columns");
158 goto error_return;
159 }
160
161 /* Process third optional arg (data column) */
162 if (pset.ctv_args[2] == NULL)
163 {
164 int i;
165
166 /*
167 * If the data column was not specified, we search for the one not
168 * used as either vertical or horizontal headers. Must be exactly
169 * three columns, or this won't be unique.
170 */
171 if (PQnfields(res) != 3)
172 {
173 pg_log_error("\\crosstabview: data column must be specified when query returns more than three columns");
174 goto error_return;
175 }
176
177 field_for_data = -1;
178 for (i = 0; i < PQnfields(res); i++)
179 {
180 if (i != field_for_rows && i != field_for_columns)
181 {
183 break;
184 }
185 }
187 }
188 else
189 {
191 if (field_for_data < 0)
192 goto error_return;
193 }
194
195 /* Process fourth optional arg (horizontal header sort column) */
196 if (pset.ctv_args[3] == NULL)
197 sort_field_for_columns = -1; /* no sort column */
198 else
199 {
202 goto error_return;
203 }
204
205 /*
206 * First part: accumulate the names that go into the vertical and
207 * horizontal headers, each into an AVL binary tree to build the set of
208 * DISTINCT values.
209 */
210
211 for (rn = 0; rn < PQntuples(res); rn++)
212 {
213 char *val;
214 char *val1;
215
216 /* horizontal */
219 val1 = NULL;
220
221 if (sort_field_for_columns >= 0 &&
224
226
228 {
229 pg_log_error("\\crosstabview: maximum number of columns (%d) exceeded",
231 goto error_return;
232 }
233
234 /* vertical */
237
239 }
240
241 /*
242 * Second part: Generate sorted arrays from the AVL trees.
243 */
244
245 num_columns = piv_columns.count;
246 num_rows = piv_rows.count;
247
249
251
254
255 /*
256 * Third part: optionally, process the ranking data for the horizontal
257 * header
258 */
259 if (sort_field_for_columns >= 0)
261
262 /*
263 * Fourth part: print the crosstab'ed result.
264 */
265 retval = printCrosstab(res,
269
272 avlFree(&piv_rows, piv_rows.root);
275
276 return retval;
277}
278
279/*
280 * Output the pivoted resultset with the printTable* functions. Return true
281 * if successful, false otherwise.
282 */
283static bool
287 int field_for_data)
288{
289 printQueryOpt popt = pset.popt;
291 int i,
292 rn;
293 char col_align;
294 int *horiz_map;
295 bool retval = false;
296
297 printTableInit(&cont, &popt.topt, popt.title, num_columns + 1, num_rows);
298
299 /* Step 1: set target column names (horizontal header) */
300
301 /* The name of the first column is kept unchanged by the pivoting */
303 PQfname(result, field_for_rows),
304 false,
307
308 /*
309 * To iterate over piv_columns[] by piv_columns[].rank, create a reverse
310 * map associating each piv_columns[].rank to its index in piv_columns.
311 * This avoids an O(N^2) loop later.
312 */
314 for (i = 0; i < num_columns; i++)
315 horiz_map[piv_columns[i].rank] = i;
316
317 /*
318 * The display alignment depends on its PQftype().
319 */
321
322 for (i = 0; i < num_columns; i++)
323 {
324 char *colname;
325
326 colname = piv_columns[horiz_map[i]].name ?
327 piv_columns[horiz_map[i]].name :
328 (popt.nullPrint ? popt.nullPrint : "");
329
330 printTableAddHeader(&cont, colname, false, col_align);
331 }
333
334 /* Step 2: set row names in the first output column (vertical header) */
335 for (i = 0; i < num_rows; i++)
336 {
337 int k = piv_rows[i].rank;
338
339 cont.cells[k * (num_columns + 1)] = piv_rows[i].name ?
340 piv_rows[i].name :
341 (popt.nullPrint ? popt.nullPrint : "");
342 }
343 cont.cellsadded = num_rows * (num_columns + 1);
344
345 /*
346 * Step 3: fill in the content cells.
347 */
348 for (rn = 0; rn < PQntuples(result); rn++)
349 {
350 int row_number;
351 int col_number;
353 *cp;
355
356 /* Find target row */
357 if (!PQgetisnull(result, rn, field_for_rows))
358 elt.name = PQgetvalue(result, rn, field_for_rows);
359 else
360 elt.name = NULL;
361 rp = (pivot_field *) bsearch(&elt,
362 piv_rows,
363 num_rows,
364 sizeof(pivot_field),
366 Assert(rp != NULL);
367 row_number = rp->rank;
368
369 /* Find target column */
370 if (!PQgetisnull(result, rn, field_for_columns))
371 elt.name = PQgetvalue(result, rn, field_for_columns);
372 else
373 elt.name = NULL;
374
375 cp = (pivot_field *) bsearch(&elt,
378 sizeof(pivot_field),
380 Assert(cp != NULL);
381 col_number = cp->rank;
382
383 /* Place value into cell */
384 if (col_number >= 0 && row_number >= 0)
385 {
386 int idx;
387
388 /* index into the cont.cells array */
389 idx = 1 + col_number + row_number * (num_columns + 1);
390
391 /*
392 * If the cell already contains a value, raise an error.
393 */
394 if (cont.cells[idx] != NULL)
395 {
396 pg_log_error("\\crosstabview: query result contains multiple data values for row \"%s\", column \"%s\"",
397 rp->name ? rp->name :
398 (popt.nullPrint ? popt.nullPrint : "(null)"),
399 cp->name ? cp->name :
400 (popt.nullPrint ? popt.nullPrint : "(null)"));
401 goto error;
402 }
403
404 cont.cells[idx] = !PQgetisnull(result, rn, field_for_data) ?
405 PQgetvalue(result, rn, field_for_data) :
406 (popt.nullPrint ? popt.nullPrint : "");
407 }
408 }
409
410 /*
411 * The non-initialized cells must be set to an empty string for the print
412 * functions
413 */
414 for (i = 0; i < cont.cellsadded; i++)
415 {
416 if (cont.cells[i] == NULL)
417 cont.cells[i] = "";
418 }
419
421 retval = true;
422
423error:
425
426 return retval;
427}
428
429/*
430 * The avl* functions below provide a minimalistic implementation of AVL binary
431 * trees, to efficiently collect the distinct values that will form the horizontal
432 * and vertical headers. It only supports adding new values, no removal or even
433 * search.
434 */
435static void
437{
439 tree->end->children[0] = tree->end->children[1] = tree->end;
440 tree->count = 0;
441 tree->root = tree->end;
442}
443
444/* Deallocate recursively an AVL tree, starting from node */
445static void
447{
448 if (node->children[0] != tree->end)
449 {
450 avlFree(tree, node->children[0]);
451 pg_free(node->children[0]);
452 }
453 if (node->children[1] != tree->end)
454 {
455 avlFree(tree, node->children[1]);
456 pg_free(node->children[1]);
457 }
458 if (node == tree->root)
459 {
460 /* free the root separately as it's not child of anything */
461 if (node != tree->end)
462 pg_free(node);
463 /* free the tree->end struct only once and when all else is freed */
464 pg_free(tree->end);
465 }
466}
467
468/* Set the height to 1 plus the greatest of left and right heights */
469static void
471{
472 n->height = 1 + (n->children[0]->height > n->children[1]->height ?
473 n->children[0]->height :
474 n->children[1]->height);
475}
476
477/* Rotate a subtree left (dir=0) or right (dir=1). Not recursive */
478static avl_node *
479avlRotate(avl_node **current, int dir)
480{
481 avl_node *before = *current;
482 avl_node *after = (*current)->children[dir];
483
484 *current = after;
485 before->children[dir] = after->children[!dir];
487 after->children[!dir] = before;
488
489 return after;
490}
491
492static int
494{
495 return n->children[0]->height - n->children[1]->height;
496}
497
498/*
499 * After an insertion, possibly rebalance the tree so that the left and right
500 * node heights don't differ by more than 1.
501 * May update *node.
502 */
503static void
505{
506 avl_node *current = *node;
507 int b = avlBalance(current) / 2;
508
509 if (b != 0)
510 {
511 int dir = (1 - b) / 2;
512
513 if (avlBalance(current->children[dir]) == -b)
514 avlRotate(&current->children[dir], !dir);
515 current = avlRotate(node, dir);
516 }
517 if (current != tree->end)
518 avlUpdateHeight(current);
519}
520
521/*
522 * Insert a new value/field, starting from *node, reaching the correct position
523 * in the tree by recursion. Possibly rebalance the tree and possibly update
524 * *node. Do nothing if the value is already present in the tree.
525 */
526static void
528{
529 avl_node *current = *node;
530
531 if (current == tree->end)
532 {
534
535 new_node->height = 1;
536 new_node->field = field;
537 new_node->children[0] = new_node->children[1] = tree->end;
538 tree->count++;
539 *node = new_node;
540 }
541 else
542 {
543 int cmp = pivotFieldCompare(&field, &current->field);
544
545 if (cmp != 0)
546 {
548 cmp > 0 ? &current->children[1] : &current->children[0],
549 field);
550 avlAdjustBalance(tree, node);
551 }
552 }
553}
554
555/* Insert the value into the AVL tree, if it does not preexist */
556static void
557avlMergeValue(avl_tree *tree, char *name, char *sort_value)
558{
559 pivot_field field;
560
561 field.name = name;
562 field.rank = tree->count;
563 field.sort_value = sort_value;
564 avlInsertNode(tree, &tree->root, field);
565}
566
567/*
568 * Recursively extract node values into the names array, in sorted order with a
569 * left-to-right tree traversal.
570 * Return the next candidate offset to write into the names array.
571 * fields[] must be preallocated to hold tree->count entries
572 */
573static int
575{
576 if (node == tree->end)
577 return idx;
578
579 idx = avlCollectFields(tree, node->children[0], fields, idx);
580 fields[idx] = node->field;
581 return avlCollectFields(tree, node->children[1], fields, idx + 1);
582}
583
584static void
586{
587 int *hmap; /* [[offset in piv_columns, rank], ...for
588 * every header entry] */
589 int i;
590
591 hmap = pg_malloc_array(int, num_columns * 2);
592 for (i = 0; i < num_columns; i++)
593 {
594 char *val = piv_columns[i].sort_value;
595
596 /* ranking information is valid if non null and matches /^-?\d+$/ */
597 if (val &&
598 ((*val == '-' &&
599 strspn(val + 1, "0123456789") == strlen(val + 1)) ||
600 strspn(val, "0123456789") == strlen(val)))
601 {
602 hmap[i * 2] = atoi(val);
603 hmap[i * 2 + 1] = i;
604 }
605 else
606 {
607 /* invalid rank information ignored (equivalent to rank 0) */
608 hmap[i * 2] = 0;
609 hmap[i * 2 + 1] = i;
610 }
611 }
612
613 qsort(hmap, num_columns, sizeof(int) * 2, rankCompare);
614
615 for (i = 0; i < num_columns; i++)
616 {
617 piv_columns[hmap[i * 2 + 1]].rank = i;
618 }
619
620 pg_free(hmap);
621}
622
623/*
624 * Look up a column reference, which can be either:
625 * - a number from 1 to PQnfields(res)
626 * - a column name matching one of PQfname(res,...)
627 *
628 * Returns zero-based column number, or -1 if not found or ambiguous.
629 *
630 * Note: may modify contents of "arg" string.
631 */
632static int
633indexOfColumn(char *arg, const PGresult *res)
634{
635 int idx;
636
637 if (arg[0] && strspn(arg, "0123456789") == strlen(arg))
638 {
639 /* if arg contains only digits, it's a column number */
640 idx = atoi(arg) - 1;
641 if (idx < 0 || idx >= PQnfields(res))
642 {
643 pg_log_error("\\crosstabview: column number %d is out of range 1..%d",
644 idx + 1, PQnfields(res));
645 return -1;
646 }
647 }
648 else
649 {
650 int i;
651
652 /*
653 * Dequote and downcase the column name. By checking for all-digits
654 * before doing this, we can ensure that a quoted name is treated as a
655 * name even if it's all digits.
656 */
658
659 /* Now look for match(es) among res' column names */
660 idx = -1;
661 for (i = 0; i < PQnfields(res); i++)
662 {
663 if (strcmp(arg, PQfname(res, i)) == 0)
664 {
665 if (idx >= 0)
666 {
667 /* another idx was already found for the same name */
668 pg_log_error("\\crosstabview: ambiguous column name: \"%s\"", arg);
669 return -1;
670 }
671 idx = i;
672 }
673 }
674 if (idx == -1)
675 {
676 pg_log_error("\\crosstabview: column name not found: \"%s\"", arg);
677 return -1;
678 }
679 }
680
681 return idx;
682}
683
684/*
685 * Value comparator for vertical and horizontal headers
686 * used for deduplication only.
687 * - null values are considered equal
688 * - non-null < null
689 * - non-null values are compared with strcmp()
690 */
691static int
692pivotFieldCompare(const void *a, const void *b)
693{
694 const pivot_field *pa = (const pivot_field *) a;
695 const pivot_field *pb = (const pivot_field *) b;
696
697 /* test null values */
698 if (!pb->name)
699 return pa->name ? -1 : 0;
700 else if (!pa->name)
701 return 1;
702
703 /* non-null values */
704 return strcmp(pa->name, pb->name);
705}
706
707static int
708rankCompare(const void *a, const void *b)
709{
710 return pg_cmp_s32(*(const int *) a, *(const int *) b);
711}
Datum idx(PG_FUNCTION_ARGS)
Definition _int_op.c:262
#define Assert(condition)
Definition c.h:906
static void avlUpdateHeight(avl_node *n)
static int avlCollectFields(avl_tree *tree, avl_node *node, pivot_field *fields, int idx)
static int pivotFieldCompare(const void *a, const void *b)
static void avlInsertNode(avl_tree *tree, avl_node **node, pivot_field field)
static int rankCompare(const void *a, const void *b)
struct _avl_tree avl_tree
static void avlFree(avl_tree *tree, avl_node *node)
static bool printCrosstab(const PGresult *result, int num_columns, pivot_field *piv_columns, int field_for_columns, int num_rows, pivot_field *piv_rows, int field_for_rows, int field_for_data)
static void rankSort(int num_columns, pivot_field *piv_columns)
bool PrintResultInCrosstab(const PGresult *res)
static int avlBalance(avl_node *n)
static int indexOfColumn(char *arg, const PGresult *res)
static avl_node * avlRotate(avl_node **current, int dir)
static void avlInit(avl_tree *tree)
static void avlMergeValue(avl_tree *tree, char *name, char *sort_value)
struct _avl_node avl_node
struct _pivot_field pivot_field
static void avlAdjustBalance(avl_tree *tree, avl_node **node)
#define CROSSTABVIEW_MAX_COLUMNS
Datum arg
Definition elog.c:1322
Oid PQftype(const PGresult *res, int field_num)
Definition fe-exec.c:3736
void pg_free(void *ptr)
#define pg_malloc_array(type, count)
Definition fe_memutils.h:56
#define pg_malloc0_object(type)
Definition fe_memutils.h:51
#define pg_malloc_object(type)
Definition fe_memutils.h:50
void printTableInit(printTableContent *const content, const printTableOpt *opt, const char *title, const int ncolumns, const int nrows)
Definition print.c:3191
void printTableCleanup(printTableContent *const content)
Definition print.c:3372
char column_type_alignment(Oid ftype)
Definition print.c:3811
void printTable(const printTableContent *cont, FILE *fout, bool is_pager, FILE *flog)
Definition print.c:3636
void printTableAddHeader(printTableContent *const content, char *header, const bool translate, const char align)
Definition print.c:3239
long val
Definition informix.c:689
static int pg_cmp_s32(int32 a, int32 b)
Definition int.h:713
int b
Definition isn.c:74
int a
Definition isn.c:73
int i
Definition isn.c:77
#define PQgetvalue
#define PQnfields
#define PQresultStatus
#define PQgetisnull
#define PQfname
#define PQntuples
@ PGRES_TUPLES_OK
Definition libpq-fe.h:128
#define pg_log_error(...)
Definition logging.h:106
#define qsort(a, b, c, d)
Definition port.h:495
static int fb(int x)
void dequote_downcase_identifier(char *str, bool downcase, int encoding)
tree
Definition radixtree.h:1828
static int cmp(const chr *x, const chr *y, size_t len)
static int before(chr x, chr y)
PsqlSettings pset
Definition startup.c:32
static void error(void)
struct _avl_node * children[2]
pivot_field field
avl_node * root
avl_node * end
char * sort_value
printQueryOpt popt
Definition settings.h:112
FILE * logfile
Definition settings.h:149
char * ctv_args[4]
Definition settings.h:133
FILE * queryFout
Definition settings.h:105
printTableOpt topt
Definition print.h:185
char * nullPrint
Definition print.h:186
char * title
Definition print.h:189
const char * name