slot.h 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. /*-------------------------------------------------------------------------
  2. * slot.h
  3. * Replication slot management.
  4. *
  5. * Copyright (c) 2012-2016, PostgreSQL Global Development Group
  6. *
  7. *-------------------------------------------------------------------------
  8. */
  9. #ifndef SLOT_H
  10. #define SLOT_H
  11. #include "fmgr.h"
  12. #include "access/xlog.h"
  13. #include "access/xlogreader.h"
  14. #include "storage/lwlock.h"
  15. #include "storage/shmem.h"
  16. #include "storage/spin.h"
  17. /*
  18. * Behaviour of replication slots, upon release or crash.
  19. *
  20. * Slots marked as PERSISTENT are crashsafe and will not be dropped when
  21. * released. Slots marked as EPHEMERAL will be dropped when released or after
  22. * restarts.
  23. *
  24. * EPHEMERAL slots can be made PERSISTENT by calling ReplicationSlotPersist().
  25. */
  26. typedef enum ReplicationSlotPersistency
  27. {
  28. RS_PERSISTENT,
  29. RS_EPHEMERAL
  30. } ReplicationSlotPersistency;
  31. /*
  32. * On-Disk data of a replication slot, preserved across restarts.
  33. */
  34. typedef struct ReplicationSlotPersistentData
  35. {
  36. /* The slot's identifier */
  37. NameData name;
  38. /* database the slot is active on */
  39. Oid database;
  40. /*
  41. * The slot's behaviour when being dropped (or restored after a crash).
  42. */
  43. ReplicationSlotPersistency persistency;
  44. /*
  45. * xmin horizon for data
  46. *
  47. * NB: This may represent a value that hasn't been written to disk yet;
  48. * see notes for effective_xmin, below.
  49. */
  50. TransactionId xmin;
  51. /*
  52. * xmin horizon for catalog tuples
  53. *
  54. * NB: This may represent a value that hasn't been written to disk yet;
  55. * see notes for effective_xmin, below.
  56. */
  57. TransactionId catalog_xmin;
  58. /* oldest LSN that might be required by this replication slot */
  59. XLogRecPtr restart_lsn;
  60. /*
  61. * Oldest LSN that the client has acked receipt for. This is used as the
  62. * start_lsn point in case the client doesn't specify one, and also as a
  63. * safety measure to jump forwards in case the client specifies a
  64. * start_lsn that's further in the past than this value.
  65. */
  66. XLogRecPtr confirmed_flush;
  67. /* plugin name */
  68. NameData plugin;
  69. } ReplicationSlotPersistentData;
  70. /*
  71. * Shared memory state of a single replication slot.
  72. */
  73. typedef struct ReplicationSlot
  74. {
  75. /* lock, on same cacheline as effective_xmin */
  76. slock_t mutex;
  77. /* is this slot defined */
  78. bool in_use;
  79. /* Who is streaming out changes for this slot? 0 in unused slots. */
  80. pid_t active_pid;
  81. /* any outstanding modifications? */
  82. bool just_dirtied;
  83. bool dirty;
  84. /*
  85. * For logical decoding, it's extremely important that we never remove any
  86. * data that's still needed for decoding purposes, even after a crash;
  87. * otherwise, decoding will produce wrong answers. Ordinary streaming
  88. * replication also needs to prevent old row versions from being removed
  89. * too soon, but the worst consequence we might encounter there is
  90. * unwanted query cancellations on the standby. Thus, for logical
  91. * decoding, this value represents the latest xmin that has actually been
  92. * written to disk, whereas for streaming replication, it's just the same
  93. * as the persistent value (data.xmin).
  94. */
  95. TransactionId effective_xmin;
  96. TransactionId effective_catalog_xmin;
  97. /* data surviving shutdowns and crashes */
  98. ReplicationSlotPersistentData data;
  99. /* is somebody performing io on this slot? */
  100. LWLock io_in_progress_lock;
  101. /* all the remaining data is only used for logical slots */
  102. /*
  103. * When the client has confirmed flushes >= candidate_xmin_lsn we can
  104. * advance the catalog xmin. When restart_valid has been passed,
  105. * restart_lsn can be increased.
  106. */
  107. TransactionId candidate_catalog_xmin;
  108. XLogRecPtr candidate_xmin_lsn;
  109. XLogRecPtr candidate_restart_valid;
  110. XLogRecPtr candidate_restart_lsn;
  111. } ReplicationSlot;
  112. #define SlotIsPhysical(slot) (slot->data.database == InvalidOid)
  113. #define SlotIsLogical(slot) (slot->data.database != InvalidOid)
  114. /*
  115. * Shared memory control area for all of replication slots.
  116. */
  117. typedef struct ReplicationSlotCtlData
  118. {
  119. /*
  120. * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
  121. * reason you can't do that in an otherwise-empty struct.
  122. */
  123. ReplicationSlot replication_slots[1];
  124. } ReplicationSlotCtlData;
  125. /*
  126. * Pointers to shared memory
  127. */
  128. extern ReplicationSlotCtlData *ReplicationSlotCtl;
  129. extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
  130. /* GUCs */
  131. extern PGDLLIMPORT int max_replication_slots;
  132. /* shmem initialization functions */
  133. extern Size ReplicationSlotsShmemSize(void);
  134. extern void ReplicationSlotsShmemInit(void);
  135. /* management of individual slots */
  136. extern void ReplicationSlotCreate(const char *name, bool db_specific,
  137. ReplicationSlotPersistency p);
  138. extern void ReplicationSlotPersist(void);
  139. extern void ReplicationSlotDrop(const char *name);
  140. extern void ReplicationSlotAcquire(const char *name);
  141. extern void ReplicationSlotRelease(void);
  142. extern void ReplicationSlotSave(void);
  143. extern void ReplicationSlotMarkDirty(void);
  144. /* misc stuff */
  145. extern bool ReplicationSlotValidateName(const char *name, int elevel);
  146. extern void ReplicationSlotReserveWal(void);
  147. extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
  148. extern void ReplicationSlotsComputeRequiredLSN(void);
  149. extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
  150. extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
  151. extern void StartupReplicationSlots(void);
  152. extern void CheckPointReplicationSlots(void);
  153. extern void CheckSlotRequirements(void);
  154. /* SQL callable functions */
  155. extern Datum pg_create_physical_replication_slot(PG_FUNCTION_ARGS);
  156. extern Datum pg_create_logical_replication_slot(PG_FUNCTION_ARGS);
  157. extern Datum pg_drop_replication_slot(PG_FUNCTION_ARGS);
  158. extern Datum pg_get_replication_slots(PG_FUNCTION_ARGS);
  159. #endif /* SLOT_H */