196#define NAPTIME_PER_CYCLE 1000
337#define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn)))
427 char *originname,
Size szoriginname)
432 snprintf(originname, szoriginname,
"pg_%u_%u", suboid, relid);
437 snprintf(originname, szoriginname,
"pg_%u", suboid);
475 if (rel->
state != SUBREL_STATE_READY &&
476 rel->
state != SUBREL_STATE_UNKNOWN)
478 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
479 errmsg(
"logical replication parallel apply worker for subscription \"%s\" will stop",
481 errdetail(
"Cannot handle streamed replication transactions using parallel apply workers until all tables have been synchronized.")));
483 return rel->
state == SUBREL_STATE_READY;
486 return (rel->
state == SUBREL_STATE_READY ||
487 (rel->
state == SUBREL_STATE_SYNCDONE &&
588 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
589 errmsg_internal(
"invalid transaction ID in streamed replication transaction")));
591 switch (apply_action)
638 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
742 int num_phys_attrs = desc->
natts;
756 defmap = (
int *)
palloc(num_phys_attrs *
sizeof(
int));
779 defmap[num_defaults] =
attnum;
784 for (
i = 0;
i < num_defaults;
i++)
805 for (
i = 0;
i < natts;
i++)
810 if (!att->attisdropped && remoteattnum >= 0)
814 Assert(remoteattnum < tupleData->ncols);
827 typioparam, att->atttypmod);
844 typioparam, att->atttypmod);
849 (
errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
850 errmsg(
"incorrect binary data format in logical replication column %d",
917 for (
i = 0;
i < natts;
i++)
922 if (remoteattnum < 0)
925 Assert(remoteattnum < tupleData->ncols);
942 typioparam, att->atttypmod);
959 typioparam, att->atttypmod);
964 (
errcode(ERRCODE_INVALID_BINARY_REPRESENTATION),
965 errmsg(
"incorrect binary data format in logical replication column %d",
1022 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1023 errmsg_internal(
"incorrect commit LSN %X/%X in commit message (expected %X/%X)",
1047 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1048 errmsg_internal(
"tablesync worker received a BEGIN PREPARE message")));
1114 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1115 errmsg_internal(
"incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
1295 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1301 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1302 errmsg_internal(
"tablesync worker received a STREAM PREPARE message")));
1309 switch (apply_action)
1336 elog(
DEBUG1,
"finished processing the STREAM PREPARE command");
1398 elog(
DEBUG1,
"finished processing the STREAM PREPARE command");
1402 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
1439 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1499 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1513 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1514 errmsg_internal(
"invalid transaction ID in streamed replication transaction")));
1524 switch (apply_action)
1610 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
1654 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1659 switch (apply_action)
1696 elog(
DEBUG1,
"applied %u changes in the streaming chunk",
1726 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
1847 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
1854 xid = abort_data.
xid;
1855 subxid = abort_data.
subxid;
1856 toplevel_xact = (xid == subxid);
1862 switch (apply_action)
1872 elog(
DEBUG1,
"finished processing the STREAM ABORT command");
1975 elog(
DEBUG1,
"finished processing the STREAM ABORT command");
1979 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
2013 if (last_fileno != fileno || last_offset != offset)
2014 elog(
ERROR,
"unexpected message left in streaming transaction's changes file \"%s\"",
2027 char *buffer = NULL;
2048 elog(
DEBUG1,
"replaying changes from file \"%s\"", path);
2098 elog(
ERROR,
"incorrect length %d in streaming transaction's changes file \"%s\"",
2134 if (nchanges % 1000 == 0)
2135 elog(
DEBUG1,
"replayed %d changes from file \"%s\"",
2142 elog(
DEBUG1,
"replayed %d (all) changes from file \"%s\"",
2164 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
2172 switch (apply_action)
2188 elog(
DEBUG1,
"finished processing the STREAM COMMIT command");
2242 elog(
DEBUG1,
"finished processing the STREAM COMMIT command");
2246 elog(
ERROR,
"unexpected apply action: %d", (
int) apply_action);
2381 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2382 errmsg(
"user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
2521 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2522 errmsg(
"publisher did not send replica identity column "
2523 "expected by the logical replication target relation \"%s.%s\"",
2528 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2529 errmsg(
"logical replication target relation \"%s.%s\" has "
2530 "neither REPLICA IDENTITY index nor PRIMARY "
2531 "KEY and published relation does not have "
2532 "REPLICA IDENTITY FULL",
2616 if (!att->attisdropped && remoteattnum >= 0)
2629 has_oldtup ? &oldtup : &newtup);
2679 remoteslot, &localslot);
2706 remoteslot, localslot, newslot,
2707 InvalidOid, localxmin, localorigin, localts);
2736 remoteslot, NULL, newslot,
2856 remoteslot, &localslot);
2872 remoteslot, localslot, NULL,
2873 InvalidOid, localxmin, localorigin, localts);
2888 remoteslot, NULL, NULL,
2924 (remoterel->
replident == REPLICA_IDENTITY_FULL));
2928#ifdef USE_ASSERT_CHECKING
2933 (remoterel->
replident == REPLICA_IDENTITY_FULL &&
2941 remoteslot, *localslot);
2945 remoteslot, *localslot);
2986 Assert(remoteslot != NULL);
2989 remoteslot, estate);
2990 Assert(partrelinfo != NULL);
3008 if (remoteslot_part == NULL)
3019 remoteslot_part =
ExecCopySlot(remoteslot_part, remoteslot);
3067 remoteslot_part, &localslot);
3081 remoteslot_part, NULL, newslot,
3102 remoteslot_part, localslot, newslot,
3122 if (!partrel->
rd_rel->relispartition ||
3140 localslot, remoteslot_part);
3161 remoteslot_part, remoteslot);
3165 remoteslot =
ExecCopySlot(remoteslot, remoteslot_part);
3175 Assert(partrelinfo_new != partrelinfo);
3198 if (remoteslot_part == NULL)
3225 elog(
ERROR,
"unrecognized CmdType: %d", (
int) operation);
3238 bool cascade =
false;
3239 bool restart_seqs =
false;
3261 foreach(lc, remote_relids)
3277 remote_rels =
lappend(remote_rels, rel);
3295 foreach(child, children)
3317 rels =
lappend(rels, childrel);
3318 part_rels =
lappend(part_rels, childrel);
3322 relids_logged =
lappend_oid(relids_logged, childrelid);
3343 foreach(lc, remote_rels)
3349 foreach(lc, part_rels)
3462 (
errcode(ERRCODE_PROTOCOL_VIOLATION),
3463 errmsg(
"invalid logical replication message type \"??? (%d)\"",
action)));
3485 bool *have_pending_txes)
3516 *have_pending_txes =
true;
3573 bool ping_sent =
false;
3582 "ApplyMessageContext",
3590 "LogicalStreamingContext",
3612 bool endofstream =
false;
3635 (
errmsg(
"data stream from publisher has ended")));
3671 if (last_received < start_lsn)
3672 last_received = start_lsn;
3674 if (last_received < end_lsn)
3675 last_received = end_lsn;
3685 bool reply_requested;
3691 if (last_received < end_lsn)
3692 last_received = end_lsn;
3747 WAIT_EVENT_LOGICAL_APPLY_MAIN);
3771 bool requestReply =
false;
3788 (
errcode(ERRCODE_CONNECTION_FAILURE),
3789 errmsg(
"terminating logical replication worker due to timeout")));
3798 requestReply =
true;
3846 bool have_pending_txes;
3856 if (recvpos < last_recvpos)
3857 recvpos = last_recvpos;
3865 if (!have_pending_txes)
3866 flushpos = writepos = recvpos;
3868 if (writepos < last_writepos)
3869 writepos = last_writepos;
3871 if (flushpos < last_flushpos)
3872 flushpos = last_flushpos;
3878 writepos == last_writepos &&
3879 flushpos == last_flushpos &&
3902 elog(
DEBUG2,
"sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
3911 if (recvpos > last_recvpos)
3912 last_recvpos = recvpos;
3913 if (writepos > last_writepos)
3914 last_writepos = writepos;
3915 if (flushpos > last_flushpos)
3916 last_flushpos = flushpos;
3961 bool started_tx =
false;
3986 (
errmsg(
"logical replication worker for subscription \"%s\" will stop because the subscription was removed",
4000 (
errmsg(
"logical replication worker for subscription \"%s\" will stop because the subscription was disabled",
4031 (
errmsg(
"logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
4035 (
errmsg(
"logical replication worker for subscription \"%s\" will restart because of a parameter change",
4049 errmsg(
"logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
4053 errmsg(
"logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
4062 elog(
ERROR,
"subscription %u changed unexpectedly",
4237 if (subxacts[
i - 1].xid == xid)
4335 elog(
DEBUG1,
"opening file \"%s\" for streamed changes", path);
4357 path, O_RDWR,
false);
4440 options->startpoint = *origin_startpos;
4444 options->proto.logical.proto_version =
4460 options->proto.logical.streaming_str =
"parallel";
4466 options->proto.logical.streaming_str =
"on";
4471 options->proto.logical.streaming_str = NULL;
4475 options->proto.logical.twophase =
false;
4538 char *slotname = NULL;
4543 bool must_use_password;
4554 (
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
4555 errmsg(
"subscription has no replication slot set")));
4559 originname,
sizeof(originname));
4574 true, must_use_password,
4579 (
errcode(ERRCODE_CONNECTION_FAILURE),
4580 errmsg(
"apply worker for subscription \"%s\" could not connect to the publisher: %s",
4606 options.proto.logical.twophase =
true;
4620 (
errmsg_internal(
"logical replication apply worker for subscription \"%s\" two_phase is %s",
4669 (
errmsg(
"logical replication worker for subscription %u will not start because the subscription was removed during startup",
4685 (
errmsg(
"logical replication worker for subscription \"%s\" will not start because the subscription was disabled during startup",
4709 (
errmsg(
"logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
4714 (
errmsg(
"logical replication apply worker for subscription \"%s\" has started",
4773 elog(
DEBUG1,
"connecting to publisher using connection string \"%s\"",
4836 errmsg(
"subscription \"%s\" has been disabled because of an error",
4884 errmsg(
"logical replication starts skipping transaction at LSN %X/%X",
4898 (
errmsg(
"logical replication completed skipping transaction at LSN %X/%X",
4920 bool started_tx =
false;
4960 if (subform->subskiplsn == myskiplsn)
4962 bool nulls[Natts_pg_subscription];
4963 bool replaces[Natts_pg_subscription];
4967 memset(nulls,
false,
sizeof(nulls));
4968 memset(replaces,
false,
sizeof(replaces));
4972 replaces[Anum_pg_subscription_subskiplsn - 1] =
true;
4978 if (myskiplsn != finish_lsn)
4981 errdetail(
"Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
5012 if (elevel >=
ERROR)
5015 if (errarg->
rel == NULL)
5018 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\"",
5022 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u",
5027 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
5038 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u",
5045 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
5056 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
5064 errcontext(
"processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
5130 foreach(lc2, workers)