contrib/cache_scan/Makefile | 19 + contrib/cache_scan/cache_scan--1.0.sql | 26 + contrib/cache_scan/cache_scan--unpackaged--1.0.sql | 3 + contrib/cache_scan/cache_scan.control | 5 + contrib/cache_scan/cache_scan.h | 68 + contrib/cache_scan/ccache.c | 1410 ++++++++++++++++++++ contrib/cache_scan/cscan.c | 761 +++++++++++ doc/src/sgml/cache-scan.sgml | 224 ++++ doc/src/sgml/contrib.sgml | 1 + doc/src/sgml/custom-scan.sgml | 14 + doc/src/sgml/filelist.sgml | 1 + src/backend/access/heap/pruneheap.c | 13 + src/backend/utils/time/tqual.c | 7 + src/include/access/heapam.h | 7 + 14 files changed, 2559 insertions(+) diff --git a/contrib/cache_scan/Makefile b/contrib/cache_scan/Makefile new file mode 100644 index 0000000..4e68b68 --- /dev/null +++ b/contrib/cache_scan/Makefile @@ -0,0 +1,19 @@ +# contrib/dbcache/Makefile + +MODULE_big = cache_scan +OBJS = cscan.o ccache.o + +EXTENSION = cache_scan +DATA = cache_scan--1.0.sql cache_scan--unpackaged--1.0.sql + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/cache_scan +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif + diff --git a/contrib/cache_scan/cache_scan--1.0.sql b/contrib/cache_scan/cache_scan--1.0.sql new file mode 100644 index 0000000..4bd04d1 --- /dev/null +++ b/contrib/cache_scan/cache_scan--1.0.sql @@ -0,0 +1,26 @@ +CREATE FUNCTION public.cache_scan_synchronizer() +RETURNS trigger +AS 'MODULE_PATHNAME' +LANGUAGE C VOLATILE STRICT; + +CREATE TYPE public.__cache_scan_debuginfo AS +( + tableoid oid, + status text, + chunk text, + upper text, + l_depth int4, + l_chunk text, + r_depth int4, + r_chunk text, + ntuples int4, + usage int4, + min_ctid tid, + max_ctid tid +); +CREATE FUNCTION public.cache_scan_debuginfo() + RETURNS SETOF public.__cache_scan_debuginfo + AS 'MODULE_PATHNAME' + LANGUAGE C STRICT; + + diff --git a/contrib/cache_scan/cache_scan--unpackaged--1.0.sql b/contrib/cache_scan/cache_scan--unpackaged--1.0.sql new file mode 100644 index 0000000..718a2de --- /dev/null +++ b/contrib/cache_scan/cache_scan--unpackaged--1.0.sql @@ -0,0 +1,3 @@ +DROP FUNCTION public.cache_scan_synchronizer() CASCADE; +DROP FUNCTION public.cache_scan_debuginfo() CASCADE; +DROP TYPE public.__cache_scan_debuginfo; diff --git a/contrib/cache_scan/cache_scan.control b/contrib/cache_scan/cache_scan.control new file mode 100644 index 0000000..77946da --- /dev/null +++ b/contrib/cache_scan/cache_scan.control @@ -0,0 +1,5 @@ +# cache_scan extension +comment = 'custom scan provider for cache-only scan' +default_version = '1.0' +module_pathname = '$libdir/cache_scan' +relocatable = false diff --git a/contrib/cache_scan/cache_scan.h b/contrib/cache_scan/cache_scan.h new file mode 100644 index 0000000..d06156e --- /dev/null +++ b/contrib/cache_scan/cache_scan.h @@ -0,0 +1,68 @@ +/* ------------------------------------------------------------------------- + * + * contrib/cache_scan/cache_scan.h + * + * Definitions for the cache_scan extension + * + * Copyright (c) 2010-2013, PostgreSQL Global Development Group + * + * ------------------------------------------------------------------------- + */ +#ifndef CACHE_SCAN_H +#define CACHE_SCAN_H +#include "access/htup_details.h" +#include "lib/ilist.h" +#include "nodes/bitmapset.h" +#include "storage/lwlock.h" +#include "utils/rel.h" + +typedef struct ccache_chunk { + struct ccache_chunk *upper; /* link to the upper node */ + struct ccache_chunk *right; /* link to the greaternode, if exist */ + struct ccache_chunk *left; /* link to the less node, if exist */ + int r_depth; /* max depth in right branch */ + int l_depth; /* max depth in left branch */ + uint32 ntups; /* number of tuples being cached */ + uint32 usage; /* usage counter of this chunk */ + HeapTuple tuples[FLEXIBLE_ARRAY_MEMBER]; +} ccache_chunk; + +#define CCACHE_STATUS_INITIALIZED 1 +#define CCACHE_STATUS_IN_PROGRESS 2 +#define CCACHE_STATUS_CONSTRUCTED 3 + +typedef struct { + LWLockId lock; /* used to protect ttree links */ + volatile int refcnt; + int status; + + dlist_node hash_chain; /* linked to ccache_hash->slots[] */ + dlist_node lru_chain; /* linked to ccache_hash->lru_list */ + + Oid tableoid; + ccache_chunk *root_chunk; + Bitmapset attrs_used; /* !Bitmapset is variable length! */ +} ccache_head; + +extern int ccache_max_attribute_number(void); +extern ccache_head *cs_get_ccache(Oid tableoid, Bitmapset *attrs_used, + bool create_on_demand); +extern void cs_put_ccache(ccache_head *ccache); + +extern bool ccache_insert_tuple(ccache_head *ccache, + Relation rel, HeapTuple tuple); +extern bool ccache_delete_tuple(ccache_head *ccache, HeapTuple oldtup); + +extern void ccache_vacuum_page(ccache_head *ccache, Buffer buffer); + +extern HeapTuple ccache_find_tuple(ccache_chunk *cchunk, + ItemPointer ctid, + ScanDirection direction); +extern void ccache_init(void); + +extern Datum cache_scan_synchronizer(PG_FUNCTION_ARGS); +extern Datum cache_scan_debuginfo(PG_FUNCTION_ARGS); + +extern void _PG_init(void); + +#endif /* CACHE_SCAN_H */ diff --git a/contrib/cache_scan/ccache.c b/contrib/cache_scan/ccache.c new file mode 100644 index 0000000..0bb9ff4 --- /dev/null +++ b/contrib/cache_scan/ccache.c @@ -0,0 +1,1410 @@ +/* ------------------------------------------------------------------------- + * + * contrib/cache_scan/ccache.c + * + * Routines for columns-culled cache implementation + * + * Copyright (c) 2013-2014, PostgreSQL Global Development Group + * + * ------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/hash.h" +#include "access/heapam.h" +#include "access/sysattr.h" +#include "catalog/pg_type.h" +#include "funcapi.h" +#include "storage/ipc.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "cache_scan.h" + +/* + * Hash table to manage all the ccache_head + */ +typedef struct { + slock_t lock; /* lock of the hash table */ + dlist_head lru_list; /* list of recently used cache */ + dlist_head free_list; /* list of free ccache_head */ + volatile int lwlocks_usage; + LWLockId *lwlocks; + dlist_head *slots; +} ccache_hash; + +/* + * Data structure to manage blocks on the shared memory segment. + * This extension acquires (shmseg_blocksize) x (shmseg_num_blocks) bytes of + * shared memory, then it shall be split into the fixed-length memory blocks. + * All the memory allocation and relase are done by block, to avoid memory + * fragmentation that eventually makes implementation complicated. + * + * The shmseg_head has a spinlock and global free_list to link free blocks. + * Its blocks[] array contains shmseg_block structures that points a particular + * address of the associated memory block. + * The shmseg_block being chained in the free_list of shmseg_head are available + * to allocate. Elsewhere, this block is already allocated on somewhere. + */ +typedef struct { + dlist_node chain; + Size address; +} shmseg_block; + +typedef struct { + slock_t lock; + dlist_head free_list; + Size base_address; + shmseg_block blocks[FLEXIBLE_ARRAY_MEMBER]; +} shmseg_head; + +/* + * ccache_entry is used to track ccache_head being acquired by this backend. + */ +typedef struct { + dlist_node chain; + ResourceOwner owner; + ccache_head *ccache; +} ccache_entry; + +static dlist_head ccache_local_list; +static dlist_head ccache_free_list; + +/* Static variables */ +static shmem_startup_hook_type shmem_startup_next = NULL; + +static ccache_hash *cs_ccache_hash = NULL; +static shmseg_head *cs_shmseg_head = NULL; + +/* GUC variables */ +static int ccache_hash_size; +static int shmseg_blocksize; +static int shmseg_num_blocks; +static int max_cached_attnum; + +/* Static functions */ +static void *cs_alloc_shmblock(void); +static void cs_free_shmblock(void *address); + +int +ccache_max_attribute_number(void) +{ + return (max_cached_attnum - FirstLowInvalidHeapAttributeNumber + + BITS_PER_BITMAPWORD - 1) / BITS_PER_BITMAPWORD; +} + +/* + * ccache_on_resource_release + * + * It is a callback to put ccache_head being acquired locally, to keep + * consistency of reference counter. + */ +static void +ccache_on_resource_release(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg) +{ + dlist_mutable_iter iter; + + if (phase != RESOURCE_RELEASE_AFTER_LOCKS) + return; + + dlist_foreach_modify(iter, &ccache_local_list) + { + ccache_entry *entry + = dlist_container(ccache_entry, chain, iter.cur); + + if (entry->owner == CurrentResourceOwner) + { + dlist_delete(&entry->chain); + + if (isCommit) + elog(WARNING, "cache reference leak (tableoid=%u, refcnt=%d)", + entry->ccache->tableoid, entry->ccache->refcnt); + cs_put_ccache(entry->ccache); + + entry->ccache = NULL; + dlist_push_tail(&ccache_free_list, &entry->chain); + } + } +} + +static ccache_chunk * +ccache_alloc_chunk(ccache_head *ccache, ccache_chunk *upper) +{ + ccache_chunk *cchunk = cs_alloc_shmblock(); + + if (cchunk) + { + cchunk->upper = upper; + cchunk->right = NULL; + cchunk->left = NULL; + cchunk->r_depth = 0; + cchunk->l_depth = 0; + cchunk->ntups = 0; + cchunk->usage = shmseg_blocksize; + } + return cchunk; +} + +/* + * ccache_rebalance_tree + * + * It keeps the balance of ccache tree if the supplied chunk has + * unbalanced subtrees. + */ +#define AssertIfNotShmem(addr) \ + Assert((addr) == NULL || \ + (((Size)(addr)) >= cs_shmseg_head->base_address && \ + ((Size)(addr)) < (cs_shmseg_head->base_address + \ + shmseg_num_blocks * shmseg_blocksize))) + +#define TTREE_DEPTH(chunk) \ + ((chunk) == 0 ? 0 : Max((chunk)->l_depth, (chunk)->r_depth) + 1) + +static void +ccache_rebalance_tree(ccache_head *ccache, ccache_chunk *cchunk) +{ + Assert(cchunk->upper != NULL + ? (cchunk->upper->left == cchunk || cchunk->upper->right == cchunk) + : (ccache->root_chunk == cchunk)); + + if (cchunk->l_depth + 1 < cchunk->r_depth) + { + /* anticlockwise rotation */ + ccache_chunk *rchunk = cchunk->right; + ccache_chunk *upper = cchunk->upper; + + cchunk->right = rchunk->left; + cchunk->r_depth = TTREE_DEPTH(cchunk->right); + cchunk->upper = rchunk; + + rchunk->left = cchunk; + rchunk->l_depth = TTREE_DEPTH(rchunk->left); + rchunk->upper = upper; + + if (!upper) + ccache->root_chunk = rchunk; + else if (upper->left == cchunk) + { + upper->left = rchunk; + upper->l_depth = TTREE_DEPTH(rchunk); + } + else + { + upper->right = rchunk; + upper->r_depth = TTREE_DEPTH(rchunk); + } + AssertIfNotShmem(cchunk->right); + AssertIfNotShmem(cchunk->left); + AssertIfNotShmem(cchunk->upper); + AssertIfNotShmem(rchunk->left); + AssertIfNotShmem(rchunk->right); + AssertIfNotShmem(rchunk->upper); + } + else if (cchunk->l_depth > cchunk->r_depth + 1) + { + /* clockwise rotation */ + ccache_chunk *lchunk = cchunk->left; + ccache_chunk *upper = cchunk->upper; + + cchunk->left = lchunk->right; + cchunk->l_depth = TTREE_DEPTH(cchunk->left); + cchunk->upper = lchunk; + + lchunk->right = cchunk; + lchunk->l_depth = TTREE_DEPTH(lchunk->right); + lchunk->upper = upper; + + if (!upper) + ccache->root_chunk = lchunk; + else if (upper->right == cchunk) + { + upper->right = lchunk; + upper->r_depth = TTREE_DEPTH(lchunk) + 1; + } + else + { + upper->left = lchunk; + upper->l_depth = TTREE_DEPTH(lchunk) + 1; + } + AssertIfNotShmem(cchunk->right); + AssertIfNotShmem(cchunk->left); + AssertIfNotShmem(cchunk->upper); + AssertIfNotShmem(lchunk->left); + AssertIfNotShmem(lchunk->right); + AssertIfNotShmem(lchunk->upper); + } +} + +/* + * ccache_insert_tuple + * + * It inserts the supplied tuple, but uncached columns are dropped off, + * onto the ccache_head. If no space is left, it expands the t-tree + * structure with a chunk newly allocated. If no shared memory space was + * left, it returns false. + */ +#define cchunk_freespace(cchunk) \ + ((cchunk)->usage - offsetof(ccache_chunk, tuples[(cchunk)->ntups + 1])) + +static void +do_insert_tuple(ccache_head *ccache, ccache_chunk *cchunk, HeapTuple tuple) +{ + HeapTuple newtup; + ItemPointer ctid = &tuple->t_self; + int i_min = 0; + int i_max = cchunk->ntups; + int i, required = HEAPTUPLESIZE + MAXALIGN(tuple->t_len); + + Assert(required <= cchunk_freespace(cchunk)); + + while (i_min < i_max) + { + int i_mid = (i_min + i_max) / 2; + + if (ItemPointerCompare(ctid, &cchunk->tuples[i_mid]->t_self) <= 0) + i_max = i_mid; + else + i_min = i_mid + 1; + } + + if (i_min < cchunk->ntups) + { + HeapTuple movtup = cchunk->tuples[i_min]; + Size movlen = HEAPTUPLESIZE + MAXALIGN(movtup->t_len); + char *destaddr = (char *)movtup + movlen - required; + + Assert(ItemPointerCompare(&tuple->t_self, &movtup->t_self) < 0); + + memmove((char *)cchunk + cchunk->usage - required, + (char *)cchunk + cchunk->usage, + ((Size)movtup + movlen) - ((Size)cchunk + cchunk->usage)); + for (i=cchunk->ntups; i > i_min; i--) + { + HeapTuple temp; + + temp = (HeapTuple)((char *)cchunk->tuples[i-1] - required); + cchunk->tuples[i] = temp; + temp->t_data = (HeapTupleHeader)((char *)temp->t_data - required); + } + cchunk->tuples[i_min] = newtup = (HeapTuple)destaddr; + memcpy(newtup, tuple, HEAPTUPLESIZE); + newtup->t_data = (HeapTupleHeader)((char *)newtup + HEAPTUPLESIZE); + memcpy(newtup->t_data, tuple->t_data, tuple->t_len); + cchunk->usage -= required; + cchunk->ntups++; + + Assert(cchunk->usage >= offsetof(ccache_chunk, tuples[cchunk->ntups])); + } + else + { + cchunk->usage -= required; + newtup = (HeapTuple)(((char *)cchunk) + cchunk->usage); + memcpy(newtup, tuple, HEAPTUPLESIZE); + newtup->t_data = (HeapTupleHeader)((char *)newtup + HEAPTUPLESIZE); + memcpy(newtup->t_data, tuple->t_data, tuple->t_len); + + cchunk->tuples[i_min] = newtup; + cchunk->ntups++; + + Assert(cchunk->usage >= offsetof(ccache_chunk, tuples[cchunk->ntups])); + } +} + +static void +copy_tuple_properties(HeapTuple newtup, HeapTuple oldtup) +{ + ItemPointerCopy(&oldtup->t_self, &newtup->t_self); + newtup->t_tableOid = oldtup->t_tableOid; + memcpy(&newtup->t_data->t_choice.t_heap, + &oldtup->t_data->t_choice.t_heap, + sizeof(HeapTupleFields)); + ItemPointerCopy(&oldtup->t_data->t_ctid, + &newtup->t_data->t_ctid); + newtup->t_data->t_infomask + = ((newtup->t_data->t_infomask & ~HEAP_XACT_MASK) | + (oldtup->t_data->t_infomask & HEAP_XACT_MASK)); + newtup->t_data->t_infomask2 + = ((newtup->t_data->t_infomask2 & ~HEAP2_XACT_MASK) | + (oldtup->t_data->t_infomask2 & HEAP2_XACT_MASK)); +} + +static bool +ccache_insert_tuple_internal(ccache_head *ccache, + ccache_chunk *cchunk, + HeapTuple newtup) +{ + ItemPointer ctid = &newtup->t_self; + ItemPointer min_ctid; + ItemPointer max_ctid; + int required = MAXALIGN(HEAPTUPLESIZE + newtup->t_len); + + if (cchunk->ntups == 0) + { + HeapTuple tup; + + cchunk->usage -= required; + cchunk->tuples[0] = tup = (HeapTuple)((char *)cchunk + cchunk->usage); + memcpy(tup, newtup, HEAPTUPLESIZE); + tup->t_data = (HeapTupleHeader)((char *)tup + HEAPTUPLESIZE); + memcpy(tup->t_data, newtup->t_data, newtup->t_len); + cchunk->ntups++; + + return true; + } + +retry: + min_ctid = &cchunk->tuples[0]->t_self; + max_ctid = &cchunk->tuples[cchunk->ntups - 1]->t_self; + + if (ItemPointerCompare(ctid, min_ctid) < 0) + { + if (!cchunk->left && required <= cchunk_freespace(cchunk)) + do_insert_tuple(ccache, cchunk, newtup); + else + { + if (!cchunk->left) + { + cchunk->left = ccache_alloc_chunk(ccache, cchunk); + if (!cchunk->left) + return false; + cchunk->l_depth = 1; + } + if (!ccache_insert_tuple_internal(ccache, cchunk->left, newtup)) + return false; + cchunk->l_depth = TTREE_DEPTH(cchunk->left); + } + } + else if (ItemPointerCompare(ctid, max_ctid) > 0) + { + if (!cchunk->right && required <= cchunk_freespace(cchunk)) + do_insert_tuple(ccache, cchunk, newtup); + else + { + if (!cchunk->right) + { + cchunk->right = ccache_alloc_chunk(ccache, cchunk); + if (!cchunk->right) + return false; + cchunk->r_depth = 1; + } + if (!ccache_insert_tuple_internal(ccache, cchunk->right, newtup)) + return false; + cchunk->r_depth = TTREE_DEPTH(cchunk->right); + } + } + else + { + if (required <= cchunk_freespace(cchunk)) + do_insert_tuple(ccache, cchunk, newtup); + else + { + HeapTuple movtup; + + /* push out largest ctid until we get enough space */ + if (!cchunk->right) + { + cchunk->right = ccache_alloc_chunk(ccache, cchunk); + if (!cchunk->right) + return false; + cchunk->r_depth = 1; + } + movtup = cchunk->tuples[cchunk->ntups - 1]; + + if (!ccache_insert_tuple_internal(ccache, cchunk->right, movtup)) + return false; + + cchunk->ntups--; + cchunk->usage += MAXALIGN(HEAPTUPLESIZE + movtup->t_len); + cchunk->r_depth = TTREE_DEPTH(cchunk->right); + + goto retry; + } + } + /* Rebalance the tree, if needed */ + ccache_rebalance_tree(ccache, cchunk); + + return true; +} + +bool +ccache_insert_tuple(ccache_head *ccache, Relation rel, HeapTuple tuple) +{ + TupleDesc tupdesc = RelationGetDescr(rel); + HeapTuple newtup; + Datum *cs_values = alloca(sizeof(Datum) * tupdesc->natts); + bool *cs_isnull = alloca(sizeof(bool) * tupdesc->natts); + int i, j; + + /* remove unreferenced columns */ + heap_deform_tuple(tuple, tupdesc, cs_values, cs_isnull); + for (i=0; i < tupdesc->natts; i++) + { + j = i + 1 - FirstLowInvalidHeapAttributeNumber; + + if (!bms_is_member(j, &ccache->attrs_used)) + cs_isnull[i] = true; + } + newtup = heap_form_tuple(tupdesc, cs_values, cs_isnull); + copy_tuple_properties(newtup, tuple); + + return ccache_insert_tuple_internal(ccache, ccache->root_chunk, newtup); +} + +/* + * ccache_find_tuple + * + * It find a tuple that satisfies the supplied ItemPointer according to + * the ScanDirection. If NoMovementScanDirection, it returns a tuple that + * has strictly same ItemPointer. On the other hand, it returns a tuple + * that has the least ItemPointer greater than the supplied one if + * ForwardScanDirection, and also returns a tuple with the greatest + * ItemPointer smaller than the supplied one if BackwardScanDirection. + */ +HeapTuple +ccache_find_tuple(ccache_chunk *cchunk, ItemPointer ctid, + ScanDirection direction) +{ + ItemPointer min_ctid; + ItemPointer max_ctid; + HeapTuple tuple = NULL; + int i_min = 0; + int i_max = cchunk->ntups - 1; + int rc; + + if (cchunk->ntups == 0) + return false; + + min_ctid = &cchunk->tuples[i_min]->t_self; + max_ctid = &cchunk->tuples[i_max]->t_self; + + if ((rc = ItemPointerCompare(ctid, min_ctid)) <= 0) + { + if (rc == 0 && (direction == NoMovementScanDirection || + direction == ForwardScanDirection)) + { + if (cchunk->ntups > direction) + return cchunk->tuples[direction]; + } + else + { + if (cchunk->left) + tuple = ccache_find_tuple(cchunk->left, ctid, direction); + if (!HeapTupleIsValid(tuple) && direction == ForwardScanDirection) + return cchunk->tuples[0]; + return tuple; + } + } + + if ((rc = ItemPointerCompare(ctid, max_ctid)) >= 0) + { + if (rc == 0 && (direction == NoMovementScanDirection || + direction == BackwardScanDirection)) + { + if (i_max + direction >= 0) + return cchunk->tuples[i_max + direction]; + } + else + { + if (cchunk->right) + tuple = ccache_find_tuple(cchunk->right, ctid, direction); + if (!HeapTupleIsValid(tuple) && direction == BackwardScanDirection) + return cchunk->tuples[i_max]; + return tuple; + } + } + + while (i_min < i_max) + { + int i_mid = (i_min + i_max) / 2; + + if (ItemPointerCompare(ctid, &cchunk->tuples[i_mid]->t_self) <= 0) + i_max = i_mid; + else + i_min = i_mid + 1; + } + Assert(i_min == i_max); + + if (ItemPointerCompare(ctid, &cchunk->tuples[i_min]->t_self) == 0) + { + if (direction == BackwardScanDirection && i_min > 0) + return cchunk->tuples[i_min - 1]; + else if (direction == NoMovementScanDirection) + return cchunk->tuples[i_min]; + else if (direction == ForwardScanDirection) + { + Assert(i_min + 1 < cchunk->ntups); + return cchunk->tuples[i_min + 1]; + } + } + else + { + if (direction == BackwardScanDirection && i_min > 0) + return cchunk->tuples[i_min - 1]; + else if (direction == ForwardScanDirection) + return cchunk->tuples[i_min]; + } + return NULL; +} + +/* + * ccache_delete_tuple + * + * It synchronizes the properties of tuple being already cached, usually + * for deletion. + */ +bool +ccache_delete_tuple(ccache_head *ccache, HeapTuple oldtup) +{ + HeapTuple tuple; + + tuple = ccache_find_tuple(ccache->root_chunk, &oldtup->t_self, + NoMovementScanDirection); + if (!tuple) + return false; + + copy_tuple_properties(tuple, oldtup); + + return true; +} + +/* + * ccache_merge_chunk + * + * It merges two chunks if these have enough free space to consolidate + * its contents into one. + */ +static void +ccache_merge_chunk(ccache_head *ccache, ccache_chunk *cchunk) +{ + ccache_chunk *curr; + ccache_chunk **upper; + int *p_depth; + int i; + bool needs_rebalance = false; + + /* find the least right node that has no left node */ + upper = &cchunk->right; + p_depth = &cchunk->r_depth; + curr = cchunk->right; + while (curr != NULL) + { + if (!curr->left) + { + Size shift = shmseg_blocksize - curr->usage; + long total_usage = cchunk->usage - shift; + int total_ntups = cchunk->ntups + curr->ntups; + + if ((long)offsetof(ccache_chunk, tuples[total_ntups]) < total_usage) + { + ccache_chunk *rchunk = curr->right; + + /* merge contents */ + for (i=0; i < curr->ntups; i++) + { + HeapTuple oldtup = curr->tuples[i]; + HeapTuple newtup; + + cchunk->usage -= HEAPTUPLESIZE + MAXALIGN(oldtup->t_len); + newtup = (HeapTuple)((char *)cchunk + cchunk->usage); + memcpy(newtup, oldtup, HEAPTUPLESIZE); + newtup->t_data + = (HeapTupleHeader)((char *)newtup + HEAPTUPLESIZE); + memcpy(newtup->t_data, oldtup->t_data, + MAXALIGN(oldtup->t_len)); + + cchunk->tuples[cchunk->ntups++] = newtup; + } + + /* detach the current chunk */ + *upper = curr->right; + *p_depth = curr->r_depth; + if (rchunk) + rchunk->upper = curr->upper; + + /* release it */ + cs_free_shmblock(curr); + needs_rebalance = true; + } + break; + } + upper = &curr->left; + p_depth = &curr->l_depth; + curr = curr->left; + } + + /* find the greatest left node that has no right node */ + upper = &cchunk->left; + p_depth = &cchunk->l_depth; + curr = cchunk->left; + + while (curr != NULL) + { + if (!curr->right) + { + Size shift = shmseg_blocksize - curr->usage; + long total_usage = cchunk->usage - shift; + int total_ntups = cchunk->ntups + curr->ntups; + + if ((long)offsetof(ccache_chunk, tuples[total_ntups]) < total_usage) + { + ccache_chunk *lchunk = curr->left; + Size offset; + + /* merge contents */ + memmove((char *)cchunk + cchunk->usage - shift, + (char *)cchunk + cchunk->usage, + shmseg_blocksize - cchunk->usage); + for (i=cchunk->ntups - 1; i >= 0; i--) + { + HeapTuple temp + = (HeapTuple)((char *)cchunk->tuples[i] - shift); + + cchunk->tuples[curr->ntups + i] = temp; + temp->t_data = (HeapTupleHeader)((char *)temp + + HEAPTUPLESIZE); + } + cchunk->usage -= shift; + cchunk->ntups += curr->ntups; + + /* merge contents */ + offset = shmseg_blocksize; + for (i=0; i < curr->ntups; i++) + { + HeapTuple oldtup = curr->tuples[i]; + HeapTuple newtup; + + offset -= HEAPTUPLESIZE + MAXALIGN(oldtup->t_len); + newtup = (HeapTuple)((char *)cchunk + offset); + memcpy(newtup, oldtup, HEAPTUPLESIZE); + newtup->t_data + = (HeapTupleHeader)((char *)newtup + HEAPTUPLESIZE); + memcpy(newtup->t_data, oldtup->t_data, + MAXALIGN(oldtup->t_len)); + cchunk->tuples[i] = newtup; + } + + /* detach the current chunk */ + *upper = curr->left; + *p_depth = curr->l_depth; + if (lchunk) + lchunk->upper = curr->upper; + /* release it */ + cs_free_shmblock(curr); + needs_rebalance = true; + } + break; + } + upper = &curr->right; + p_depth = &curr->r_depth; + curr = curr->right; + } + /* Rebalance the tree, if needed */ + if (needs_rebalance) + ccache_rebalance_tree(ccache, cchunk); +} + +/* + * ccache_vacuum_page + * + * It reclaims the tuples being already vacuumed. It shall be kicked on + * the callback function of heap_page_prune_hook to synchronize contents + * of the cache with on-disk image. + */ +static void +ccache_vacuum_tuple(ccache_head *ccache, + ccache_chunk *cchunk, + ItemPointer ctid) +{ + ItemPointer min_ctid; + ItemPointer max_ctid; + int i_min = 0; + int i_max = cchunk->ntups; + + if (cchunk->ntups == 0) + return; + + min_ctid = &cchunk->tuples[i_min]->t_self; + max_ctid = &cchunk->tuples[i_max - 1]->t_self; + + if (ItemPointerCompare(ctid, min_ctid) < 0) + { + if (cchunk->left) + ccache_vacuum_tuple(ccache, cchunk->left, ctid); + } + else if (ItemPointerCompare(ctid, max_ctid) > 0) + { + if (cchunk->right) + ccache_vacuum_tuple(ccache, cchunk->right, ctid); + } + else + { + while (i_min < i_max) + { + int i_mid = (i_min + i_max) / 2; + + if (ItemPointerCompare(ctid, &cchunk->tuples[i_mid]->t_self) <= 0) + i_max = i_mid; + else + i_min = i_mid + 1; + } + Assert(i_min == i_max); + + if (ItemPointerCompare(ctid, &cchunk->tuples[i_min]->t_self) == 0) + { + HeapTuple tuple = cchunk->tuples[i_min]; + int length = MAXALIGN(HEAPTUPLESIZE + tuple->t_len); + + if (i_min < cchunk->ntups - 1) + { + int j; + + memmove((char *)cchunk + cchunk->usage + length, + (char *)cchunk + cchunk->usage, + (Size)tuple - ((Size)cchunk + cchunk->usage)); + for (j=i_min + 1; j < cchunk->ntups; j++) + { + HeapTuple temp; + + temp = (HeapTuple)((char *)cchunk->tuples[j] + length); + cchunk->tuples[j-1] = temp; + temp->t_data + = (HeapTupleHeader)((char *)temp->t_data + length); + } + } + cchunk->usage += length; + cchunk->ntups--; + } + } + /* merge chunks if this chunk has enough space to merge */ + ccache_merge_chunk(ccache, cchunk); +} + +void +ccache_vacuum_page(ccache_head *ccache, Buffer buffer) +{ + /* XXX it needs buffer is valid and pinned */ + BlockNumber blknum = BufferGetBlockNumber(buffer); + Page page = BufferGetPage(buffer); + OffsetNumber maxoff = PageGetMaxOffsetNumber(page); + OffsetNumber offnum; + + for (offnum = FirstOffsetNumber; + offnum <= maxoff; + offnum = OffsetNumberNext(offnum)) + { + ItemPointerData ctid; + ItemId itemid = PageGetItemId(page, offnum); + + if (ItemIdIsNormal(itemid)) + continue; + + ItemPointerSetBlockNumber(&ctid, blknum); + ItemPointerSetOffsetNumber(&ctid, offnum); + + ccache_vacuum_tuple(ccache, ccache->root_chunk, &ctid); + } +} + +static void +ccache_release_all_chunks(ccache_chunk *cchunk) +{ + if (cchunk->left) + ccache_release_all_chunks(cchunk->left); + if (cchunk->right) + ccache_release_all_chunks(cchunk->right); + cs_free_shmblock(cchunk); +} + +static void +track_ccache_locally(ccache_head *ccache) +{ + ccache_entry *entry; + dlist_node *dnode; + + if (dlist_is_empty(&ccache_free_list)) + { + int i; + + PG_TRY(); + { + for (i=0; i < 20; i++) + { + entry = MemoryContextAlloc(TopMemoryContext, + sizeof(ccache_entry)); + dlist_push_tail(&ccache_free_list, &entry->chain); + } + } + PG_CATCH(); + { + cs_put_ccache(ccache); + PG_RE_THROW(); + } + PG_END_TRY(); + } + dnode = dlist_pop_head_node(&ccache_free_list); + entry = dlist_container(ccache_entry, chain, dnode); + entry->owner = CurrentResourceOwner; + entry->ccache = ccache; + dlist_push_tail(&ccache_local_list, &entry->chain); +} + +static void +untrack_ccache_locally(ccache_head *ccache) +{ + dlist_mutable_iter iter; + + dlist_foreach_modify(iter, &ccache_local_list) + { + ccache_entry *entry + = dlist_container(ccache_entry, chain, iter.cur); + + if (entry->ccache == ccache && + entry->owner == CurrentResourceOwner) + { + dlist_delete(&entry->chain); + dlist_push_tail(&ccache_free_list, &entry->chain); + return; + } + } +} + +static void +cs_put_ccache_nolock(ccache_head *ccache) +{ + Assert(ccache->refcnt > 0); + if (--ccache->refcnt == 0) + { + ccache_release_all_chunks(ccache->root_chunk); + dlist_delete(&ccache->hash_chain); + dlist_delete(&ccache->lru_chain); + dlist_push_head(&cs_ccache_hash->free_list, &ccache->hash_chain); + } + untrack_ccache_locally(ccache); +} + +void +cs_put_ccache(ccache_head *cache) +{ + SpinLockAcquire(&cs_ccache_hash->lock); + cs_put_ccache_nolock(cache); + SpinLockRelease(&cs_ccache_hash->lock); +} + +static ccache_head * +cs_create_ccache(Oid tableoid, Bitmapset *attrs_used) +{ + ccache_head *temp; + ccache_head *new_cache; + dlist_node *dnode; + int i; + + /* + * Here is no columnar cache of this relation or cache attributes are + * not enough to run the required query. So, it tries to create a new + * ccache_head for the upcoming cache-scan. + * Also allocate ones, if we have no free ccache_head any more. + */ + if (dlist_is_empty(&cs_ccache_hash->free_list)) + { + char *buffer; + int offset; + int nwords, size; + + buffer = cs_alloc_shmblock(); + if (!buffer) + return NULL; + + nwords = (max_cached_attnum - FirstLowInvalidHeapAttributeNumber + + BITS_PER_BITMAPWORD - 1) / BITS_PER_BITMAPWORD; + size = MAXALIGN(offsetof(ccache_head, + attrs_used.words[nwords + 1])); + for (offset = 0; offset <= shmseg_blocksize - size; offset += size) + { + temp = (ccache_head *)(buffer + offset); + + dlist_push_tail(&cs_ccache_hash->free_list, &temp->hash_chain); + } + } + dnode = dlist_pop_head_node(&cs_ccache_hash->free_list); + new_cache = dlist_container(ccache_head, hash_chain, dnode); + + i = cs_ccache_hash->lwlocks_usage++ % ccache_hash_size; + new_cache->lock = cs_ccache_hash->lwlocks[i]; + new_cache->refcnt = 2; + new_cache->status = CCACHE_STATUS_INITIALIZED; + + new_cache->tableoid = tableoid; + new_cache->root_chunk = ccache_alloc_chunk(new_cache, NULL); + if (!new_cache->root_chunk) + { + dlist_push_head(&cs_ccache_hash->free_list, &new_cache->hash_chain); + return NULL; + } + + if (attrs_used) + memcpy(&new_cache->attrs_used, attrs_used, + offsetof(Bitmapset, words[attrs_used->nwords])); + else + { + new_cache->attrs_used.nwords = 1; + new_cache->attrs_used.words[0] = 0; + } + return new_cache; +} + +ccache_head * +cs_get_ccache(Oid tableoid, Bitmapset *attrs_used, bool create_on_demand) +{ + Datum hash = hash_any((unsigned char *)&tableoid, sizeof(Oid)); + Index i = hash % ccache_hash_size; + dlist_iter iter; + ccache_head *old_cache = NULL; + ccache_head *new_cache = NULL; + ccache_head *temp; + + SpinLockAcquire(&cs_ccache_hash->lock); + PG_TRY(); + { + /* + * Try to find out existing ccache that has all the columns being + * referenced in this query. + */ + dlist_foreach(iter, &cs_ccache_hash->slots[i]) + { + temp = dlist_container(ccache_head, hash_chain, iter.cur); + + if (tableoid != temp->tableoid) + continue; + + if (bms_is_subset(attrs_used, &temp->attrs_used)) + { + temp->refcnt++; + if (create_on_demand) + dlist_move_head(&cs_ccache_hash->lru_list, + &temp->lru_chain); + new_cache = temp; + goto out_unlock; + } + old_cache = temp; + break; + } + + if (create_on_demand) + { + if (old_cache) + attrs_used = bms_union(attrs_used, &old_cache->attrs_used); + + new_cache = cs_create_ccache(tableoid, attrs_used); + if (!new_cache) + goto out_unlock; + + dlist_push_head(&cs_ccache_hash->slots[i], &new_cache->hash_chain); + dlist_push_head(&cs_ccache_hash->lru_list, &new_cache->lru_chain); + if (old_cache) + cs_put_ccache_nolock(old_cache); + } + } + PG_CATCH(); + { + SpinLockRelease(&cs_ccache_hash->lock); + PG_RE_THROW(); + } + PG_END_TRY(); +out_unlock: + SpinLockRelease(&cs_ccache_hash->lock); + + if (new_cache) + track_ccache_locally(new_cache); + + return new_cache; +} + +typedef struct { + Oid tableoid; + int status; + ccache_chunk *cchunk; + ccache_chunk *upper; + ccache_chunk *right; + ccache_chunk *left; + int r_depth; + int l_depth; + uint32 ntups; + uint32 usage; + ItemPointerData min_ctid; + ItemPointerData max_ctid; +} ccache_status; + +static List * +cache_scan_debuginfo_internal(ccache_head *ccache, + ccache_chunk *cchunk, List *result) +{ + ccache_status *cstatus = palloc0(sizeof(ccache_status)); + List *temp; + + if (cchunk->left) + { + temp = cache_scan_debuginfo_internal(ccache, cchunk->left, NIL); + result = list_concat(result, temp); + } + cstatus->tableoid = ccache->tableoid; + cstatus->status = ccache->status; + cstatus->cchunk = cchunk; + cstatus->upper = cchunk->upper; + cstatus->right = cchunk->right; + cstatus->left = cchunk->left; + cstatus->r_depth = cchunk->r_depth; + cstatus->l_depth = cchunk->l_depth; + cstatus->ntups = cchunk->ntups; + cstatus->usage = cchunk->usage; + if (cchunk->ntups > 0) + { + ItemPointerCopy(&cchunk->tuples[0]->t_self, + &cstatus->min_ctid); + ItemPointerCopy(&cchunk->tuples[cchunk->ntups - 1]->t_self, + &cstatus->max_ctid); + } + else + { + ItemPointerSet(&cstatus->min_ctid, + InvalidBlockNumber, + InvalidOffsetNumber); + ItemPointerSet(&cstatus->max_ctid, + InvalidBlockNumber, + InvalidOffsetNumber); + } + result = lappend(result, cstatus); + + if (cchunk->right) + { + temp = cache_scan_debuginfo_internal(ccache, cchunk->right, NIL); + result = list_concat(result, temp); + } + return result; +} + +/* + * cache_scan_debuginfo + * + * It shows the current status of ccache_chunks being allocated. + */ +Datum +cache_scan_debuginfo(PG_FUNCTION_ARGS) +{ + FuncCallContext *fncxt; + List *cstatus_list; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + MemoryContext oldcxt; + int i; + dlist_iter iter; + List *result = NIL; + + fncxt = SRF_FIRSTCALL_INIT(); + oldcxt = MemoryContextSwitchTo(fncxt->multi_call_memory_ctx); + + /* make definition of tuple-descriptor */ + tupdesc = CreateTemplateTupleDesc(12, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "tableoid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "status", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "chunk", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "upper", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "l_depth", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "l_chunk", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "r_depth", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "r_chunk", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "ntuples", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber)10, "usage", + INT4OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber)11, "min_ctid", + TIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber)12, "max_ctid", + TIDOID, -1, 0); + fncxt->tuple_desc = BlessTupleDesc(tupdesc); + + /* make a snapshot of the current table cache */ + SpinLockAcquire(&cs_ccache_hash->lock); + for (i=0; i < ccache_hash_size; i++) + { + dlist_foreach(iter, &cs_ccache_hash->slots[i]) + { + ccache_head *ccache + = dlist_container(ccache_head, hash_chain, iter.cur); + + ccache->refcnt++; + SpinLockRelease(&cs_ccache_hash->lock); + track_ccache_locally(ccache); + + LWLockAcquire(ccache->lock, LW_SHARED); + result = cache_scan_debuginfo_internal(ccache, + ccache->root_chunk, + result); + LWLockRelease(ccache->lock); + + SpinLockAcquire(&cs_ccache_hash->lock); + cs_put_ccache_nolock(ccache); + } + } + SpinLockRelease(&cs_ccache_hash->lock); + + fncxt->user_fctx = result; + MemoryContextSwitchTo(oldcxt); + } + fncxt = SRF_PERCALL_SETUP(); + + cstatus_list = (List *)fncxt->user_fctx; + if (cstatus_list != NIL && + fncxt->call_cntr < cstatus_list->length) + { + ccache_status *cstatus = list_nth(cstatus_list, fncxt->call_cntr); + Datum values[12]; + bool isnull[12]; + HeapTuple tuple; + + memset(isnull, false, sizeof(isnull)); + values[0] = ObjectIdGetDatum(cstatus->tableoid); + if (cstatus->status == CCACHE_STATUS_INITIALIZED) + values[1] = CStringGetTextDatum("initialized"); + else if (cstatus->status == CCACHE_STATUS_IN_PROGRESS) + values[1] = CStringGetTextDatum("in-progress"); + else if (cstatus->status == CCACHE_STATUS_CONSTRUCTED) + values[1] = CStringGetTextDatum("constructed"); + else + values[1] = CStringGetTextDatum("unknown"); + values[2] = CStringGetTextDatum(psprintf("%p", cstatus->cchunk)); + values[3] = CStringGetTextDatum(psprintf("%p", cstatus->upper)); + values[4] = Int32GetDatum(cstatus->l_depth); + values[5] = CStringGetTextDatum(psprintf("%p", cstatus->left)); + values[6] = Int32GetDatum(cstatus->r_depth); + values[7] = CStringGetTextDatum(psprintf("%p", cstatus->right)); + values[8] = Int32GetDatum(cstatus->ntups); + values[9] = Int32GetDatum(cstatus->usage); + + if (ItemPointerIsValid(&cstatus->min_ctid)) + values[10] = PointerGetDatum(&cstatus->min_ctid); + else + isnull[10] = true; + if (ItemPointerIsValid(&cstatus->max_ctid)) + values[11] = PointerGetDatum(&cstatus->max_ctid); + else + isnull[11] = true; + + tuple = heap_form_tuple(fncxt->tuple_desc, values, isnull); + + SRF_RETURN_NEXT(fncxt, HeapTupleGetDatum(tuple)); + } + SRF_RETURN_DONE(fncxt); +} +PG_FUNCTION_INFO_V1(cache_scan_debuginfo); + +/* + * cs_alloc_shmblock + * + * It allocates a fixed-length block. The reason why this routine does not + * support variable length allocation is to simplify the logic for its purpose. + */ +static void * +cs_alloc_shmblock(void) +{ + ccache_head *ccache; + dlist_node *dnode; + shmseg_block *block; + void *address = NULL; + int retry = 2; + +do_retry: + SpinLockAcquire(&cs_shmseg_head->lock); + if (dlist_is_empty(&cs_shmseg_head->free_list) && retry-- > 0) + { + SpinLockRelease(&cs_shmseg_head->lock); + + SpinLockAcquire(&cs_ccache_hash->lock); + if (!dlist_is_empty(&cs_ccache_hash->lru_list)) + { + dnode = dlist_tail_node(&cs_ccache_hash->lru_list); + ccache = dlist_container(ccache_head, lru_chain, dnode); + + cs_put_ccache_nolock(ccache); + } + SpinLockRelease(&cs_ccache_hash->lock); + + goto do_retry; + } + + if (!dlist_is_empty(&cs_shmseg_head->free_list)) + { + dnode = dlist_pop_head_node(&cs_shmseg_head->free_list); + block = dlist_container(shmseg_block, chain, dnode); + + memset(&block->chain, 0, sizeof(dlist_node)); + + address = (void *) block->address; + } + SpinLockRelease(&cs_shmseg_head->lock); + + return address; +} + +/* + * cs_free_shmblock + * + * It release a block being allocated by cs_alloc_shmblock + */ +static void +cs_free_shmblock(void *address) +{ + Size curr = (Size) address; + Size base = cs_shmseg_head->base_address; + ulong index; + shmseg_block *block; + + Assert((curr - base) % shmseg_blocksize == 0); + Assert(curr >= base && curr < base + shmseg_num_blocks * shmseg_blocksize); + index = (curr - base) / shmseg_blocksize; + + SpinLockAcquire(&cs_shmseg_head->lock); + block = &cs_shmseg_head->blocks[index]; + + dlist_push_head(&cs_shmseg_head->free_list, &block->chain); + + SpinLockRelease(&cs_shmseg_head->lock); +} + +static void +ccache_setup(void) +{ + Size curr_address; + ulong i; + bool found; + + /* allocation of a shared memory segment for table's hash */ + cs_ccache_hash = ShmemInitStruct("cache_scan: hash of columnar cache", + MAXALIGN(sizeof(ccache_hash)) + + MAXALIGN(sizeof(LWLockId) * + ccache_hash_size) + + MAXALIGN(sizeof(dlist_node) * + ccache_hash_size), + &found); + Assert(!found); + + SpinLockInit(&cs_ccache_hash->lock); + dlist_init(&cs_ccache_hash->lru_list); + dlist_init(&cs_ccache_hash->free_list); + cs_ccache_hash->lwlocks = (void *)(&cs_ccache_hash[1]); + cs_ccache_hash->slots + = (void *)(&cs_ccache_hash->lwlocks[ccache_hash_size]); + + for (i=0; i < ccache_hash_size; i++) + cs_ccache_hash->lwlocks[i] = LWLockAssign(); + for (i=0; i < ccache_hash_size; i++) + dlist_init(&cs_ccache_hash->slots[i]); + + /* allocation of a shared memory segment for columnar cache */ + cs_shmseg_head = ShmemInitStruct("cache_scan: columnar cache", + offsetof(shmseg_head, + blocks[shmseg_num_blocks]) + + shmseg_num_blocks * shmseg_blocksize, + &found); + Assert(!found); + + SpinLockInit(&cs_shmseg_head->lock); + dlist_init(&cs_shmseg_head->free_list); + + curr_address = MAXALIGN(&cs_shmseg_head->blocks[shmseg_num_blocks]); + + cs_shmseg_head->base_address = curr_address; + for (i=0; i < shmseg_num_blocks; i++) + { + shmseg_block *block = &cs_shmseg_head->blocks[i]; + + block->address = curr_address; + dlist_push_tail(&cs_shmseg_head->free_list, &block->chain); + + curr_address += shmseg_blocksize; + } +} + +void +ccache_init(void) +{ + /* setup GUC variables */ + DefineCustomIntVariable("cache_scan.block_size", + "block size of in-memory columnar cache", + NULL, + &shmseg_blocksize, + 2048 * 1024, /* 2MB */ + 1024 * 1024, /* 1MB */ + INT_MAX, + PGC_SIGHUP, + GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + if ((shmseg_blocksize & (shmseg_blocksize - 1)) != 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cache_scan.block_size must be power of 2"))); + + DefineCustomIntVariable("cache_scan.num_blocks", + "number of in-memory columnar cache blocks", + NULL, + &shmseg_num_blocks, + 64, + 64, + INT_MAX, + PGC_SIGHUP, + GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + + DefineCustomIntVariable("cache_scan.hash_size", + "number of hash slots for columnar cache", + NULL, + &ccache_hash_size, + 128, + 128, + INT_MAX, + PGC_SIGHUP, + GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + + DefineCustomIntVariable("cache_scan.max_cached_attnum", + "max attribute number we can cache", + NULL, + &max_cached_attnum, + 256, + sizeof(bitmapword) * BITS_PER_BYTE, + 2048, + PGC_SIGHUP, + GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + + /* request shared memory segment for table's cache */ + RequestAddinShmemSpace(MAXALIGN(sizeof(ccache_hash)) + + MAXALIGN(sizeof(dlist_head) * ccache_hash_size) + + MAXALIGN(sizeof(LWLockId) * ccache_hash_size) + + MAXALIGN(offsetof(shmseg_head, + blocks[shmseg_num_blocks])) + + shmseg_num_blocks * shmseg_blocksize); + RequestAddinLWLocks(ccache_hash_size); + + shmem_startup_next = shmem_startup_hook; + shmem_startup_hook = ccache_setup; + + /* register resource-release callback */ + dlist_init(&ccache_local_list); + dlist_init(&ccache_free_list); + RegisterResourceReleaseCallback(ccache_on_resource_release, NULL); +} diff --git a/contrib/cache_scan/cscan.c b/contrib/cache_scan/cscan.c new file mode 100644 index 0000000..0a63c2e --- /dev/null +++ b/contrib/cache_scan/cscan.c @@ -0,0 +1,761 @@ +/* ------------------------------------------------------------------------- + * + * contrib/cache_scan/cscan.c + * + * An extension that offers an alternative way to scan a table utilizing column + * oriented database cache. + * + * Copyright (c) 2010-2013, PostgreSQL Global Development Group + * + * ------------------------------------------------------------------------- + */ +#include "postgres.h" +#include "access/heapam.h" +#include "access/relscan.h" +#include "access/sysattr.h" +#include "catalog/objectaccess.h" +#include "catalog/pg_language.h" +#include "catalog/pg_proc.h" +#include "catalog/pg_trigger.h" +#include "commands/trigger.h" +#include "executor/nodeCustom.h" +#include "miscadmin.h" +#include "optimizer/cost.h" +#include "optimizer/pathnode.h" +#include "optimizer/paths.h" +#include "optimizer/var.h" +#include "storage/bufmgr.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/guc.h" +#include "utils/spccache.h" +#include "utils/syscache.h" +#include "utils/tqual.h" +#include "cache_scan.h" +#include + +PG_MODULE_MAGIC; + +/* Static variables */ +static add_scan_path_hook_type add_scan_path_next = NULL; +static object_access_hook_type object_access_next = NULL; +static heap_page_prune_hook_type heap_page_prune_next = NULL; + +static bool cache_scan_disabled; + +static bool +cs_estimate_costs(PlannerInfo *root, + RelOptInfo *baserel, + Relation rel, + CustomPath *cpath, + Bitmapset **attrs_used) +{ + ListCell *lc; + ccache_head *ccache; + Oid tableoid = RelationGetRelid(rel); + TupleDesc tupdesc = RelationGetDescr(rel); + int total_width = 0; + int tuple_width = 0; + double hit_ratio; + Cost run_cost = 0.0; + Cost startup_cost = 0.0; + double tablespace_page_cost; + QualCost qpqual_cost; + Cost cpu_per_tuple; + int i; + + /* Mark the path with the correct row estimate */ + if (cpath->path.param_info) + cpath->path.rows = cpath->path.param_info->ppi_rows; + else + cpath->path.rows = baserel->rows; + + /* List up all the columns being in-use */ + pull_varattnos((Node *) baserel->reltargetlist, + baserel->relid, + attrs_used); + foreach(lc, baserel->baserestrictinfo) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + + pull_varattnos((Node *) rinfo->clause, + baserel->relid, + attrs_used); + } + + for (i=FirstLowInvalidHeapAttributeNumber + 1; i <= 0; i++) + { + int attidx = i - FirstLowInvalidHeapAttributeNumber; + + if (bms_is_member(attidx, *attrs_used)) + { + /* oid and whole-row reference is not supported */ + if (i == ObjectIdAttributeNumber || i == InvalidAttrNumber) + return false; + + /* clear system attributes from the bitmap */ + *attrs_used = bms_del_member(*attrs_used, attidx); + } + } + + /* + * Because of layout on the shared memory segment, we have to restrict + * the largest attribute number in use to prevent overrun by growth of + * Bitmapset. + */ + if (*attrs_used && + (*attrs_used)->nwords > ccache_max_attribute_number()) + return false; + + /* + * Estimation of average width of cached tuples - it does not make + * sense to construct a new cache if its average width is more than + * 30% of the raw data. + */ + for (i=0; i < tupdesc->natts; i++) + { + Form_pg_attribute attr = tupdesc->attrs[i]; + int attidx = i + 1 - FirstLowInvalidHeapAttributeNumber; + int width; + + if (attr->attlen > 0) + width = attr->attlen; + else + width = get_attavgwidth(tableoid, attr->attnum); + + total_width += width; + if (bms_is_member(attidx, *attrs_used)) + tuple_width += width; + } + + ccache = cs_get_ccache(RelationGetRelid(rel), *attrs_used, false); + if (!ccache) + { + if ((double)tuple_width / (double)total_width > 0.3) + return false; + hit_ratio = 0.05; + } + else + { + hit_ratio = 0.95; + cs_put_ccache(ccache); + } + + get_tablespace_page_costs(baserel->reltablespace, + NULL, + &tablespace_page_cost); + /* Disk costs */ + run_cost += (1.0 - hit_ratio) * tablespace_page_cost * baserel->pages; + + /* CPU costs */ + get_restriction_qual_cost(root, baserel, + cpath->path.param_info, + &qpqual_cost); + + startup_cost += qpqual_cost.startup; + cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple; + run_cost += cpu_per_tuple * baserel->tuples; + + cpath->path.startup_cost = startup_cost; + cpath->path.total_cost = startup_cost + run_cost; + + return true; +} + +/* + * cs_relation_has_synchronizer + * + * A table that can have columner-cache also needs to have trigger for + * synchronization, to ensure the on-memory cache keeps the latest contents + * of the heap. It returns TRUE, if supplied relation has triggers that + * invokes cache_scan_synchronizer on appropriate context. Elsewhere, FALSE + * shall be returned. + */ +static bool +cs_relation_has_synchronizer(Relation rel) +{ + int i, numtriggers; + bool has_on_insert_synchronizer = false; + bool has_on_update_synchronizer = false; + bool has_on_delete_synchronizer = false; + bool has_on_truncate_synchronizer = false; + + if (!rel->trigdesc) + return false; + + numtriggers = rel->trigdesc->numtriggers; + for (i=0; i < numtriggers; i++) + { + Trigger *trig = rel->trigdesc->triggers + i; + HeapTuple tup; + + if (!trig->tgenabled) + continue; + + tup = SearchSysCache1(PROCOID, ObjectIdGetDatum(trig->tgfoid)); + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for function %u", trig->tgfoid); + + if (((Form_pg_proc) GETSTRUCT(tup))->prolang == ClanguageId) + { + Datum value; + bool isnull; + char *prosrc; + char *probin; + + value = SysCacheGetAttr(PROCOID, tup, + Anum_pg_proc_prosrc, &isnull); + if (isnull) + elog(ERROR, "null prosrc for C function %u", trig->tgoid); + prosrc = TextDatumGetCString(value); + + value = SysCacheGetAttr(PROCOID, tup, + Anum_pg_proc_probin, &isnull); + if (isnull) + elog(ERROR, "null probin for C function %u", trig->tgoid); + probin = TextDatumGetCString(value); + + if (strcmp(prosrc, "cache_scan_synchronizer") == 0 && + strcmp(probin, "$libdir/cache_scan") == 0) + { + int16 tgtype = trig->tgtype; + + if (TRIGGER_TYPE_MATCHES(tgtype, + TRIGGER_TYPE_ROW, + TRIGGER_TYPE_AFTER, + TRIGGER_TYPE_INSERT)) + has_on_insert_synchronizer = true; + if (TRIGGER_TYPE_MATCHES(tgtype, + TRIGGER_TYPE_ROW, + TRIGGER_TYPE_AFTER, + TRIGGER_TYPE_UPDATE)) + has_on_update_synchronizer = true; + if (TRIGGER_TYPE_MATCHES(tgtype, + TRIGGER_TYPE_ROW, + TRIGGER_TYPE_AFTER, + TRIGGER_TYPE_DELETE)) + has_on_delete_synchronizer = true; + if (TRIGGER_TYPE_MATCHES(tgtype, + TRIGGER_TYPE_STATEMENT, + TRIGGER_TYPE_AFTER, + TRIGGER_TYPE_TRUNCATE)) + has_on_truncate_synchronizer = true; + } + pfree(prosrc); + pfree(probin); + } + ReleaseSysCache(tup); + } + + if (has_on_insert_synchronizer && + has_on_update_synchronizer && + has_on_delete_synchronizer && + has_on_truncate_synchronizer) + return true; + return false; +} + + +static void +cs_add_scan_path(PlannerInfo *root, + RelOptInfo *baserel, + RangeTblEntry *rte) +{ + Relation rel; + + /* call the secondary hook if exist */ + if (add_scan_path_next) + (*add_scan_path_next)(root, baserel, rte); + + /* Is this feature available now? */ + if (cache_scan_disabled) + return; + + /* Only regular tables can be cached */ + if (baserel->reloptkind != RELOPT_BASEREL || + rte->rtekind != RTE_RELATION) + return; + + /* Core code should already acquire an appropriate lock */ + rel = heap_open(rte->relid, NoLock); + + if (cs_relation_has_synchronizer(rel)) + { + CustomPath *cpath = makeNode(CustomPath); + Relids required_outer; + Bitmapset *attrs_used = NULL; + + /* + * We don't support pushing join clauses into the quals of a ctidscan, + * but it could still have required parameterization due to LATERAL + * refs in its tlist. + */ + required_outer = baserel->lateral_relids; + + cpath->path.pathtype = T_CustomScan; + cpath->path.parent = baserel; + cpath->path.param_info = get_baserel_parampathinfo(root, baserel, + required_outer); + if (cs_estimate_costs(root, baserel, rel, cpath, &attrs_used)) + { + cpath->custom_name = pstrdup("cache scan"); + cpath->custom_flags = 0; + cpath->custom_private + = list_make1(makeString(bms_to_string(attrs_used))); + + add_path(baserel, &cpath->path); + } + } + heap_close(rel, NoLock); +} + +static void +cs_init_custom_scan_plan(PlannerInfo *root, + CustomScan *cscan_plan, + CustomPath *cscan_path, + List *tlist, + List *scan_clauses) +{ + List *quals = NIL; + ListCell *lc; + + /* should be a base relation */ + Assert(cscan_path->path.parent->relid > 0); + Assert(cscan_path->path.parent->rtekind == RTE_RELATION); + + /* extract the supplied RestrictInfo */ + foreach (lc, scan_clauses) + { + RestrictInfo *rinfo = lfirst(lc); + quals = lappend(quals, rinfo->clause); + } + + /* do nothing something special pushing-down */ + cscan_plan->scan.plan.targetlist = tlist; + cscan_plan->scan.plan.qual = quals; + cscan_plan->custom_private = cscan_path->custom_private; +} + +typedef struct +{ + ccache_head *ccache; + ItemPointerData curr_ctid; + bool normal_seqscan; + bool with_construction; +} cs_state; + +static void +cs_begin_custom_scan(CustomScanState *node, int eflags) +{ + CustomScan *cscan = (CustomScan *)node->ss.ps.plan; + Relation rel = node->ss.ss_currentRelation; + EState *estate = node->ss.ps.state; + HeapScanDesc scandesc = NULL; + cs_state *csstate; + Bitmapset *attrs_used; + ccache_head *ccache; + + csstate = palloc0(sizeof(cs_state)); + + attrs_used = bms_from_string(strVal(linitial(cscan->custom_private))); + + ccache = cs_get_ccache(RelationGetRelid(rel), attrs_used, true); + if (ccache) + { + LWLockAcquire(ccache->lock, LW_SHARED); + if (ccache->status != CCACHE_STATUS_CONSTRUCTED) + { + LWLockRelease(ccache->lock); + LWLockAcquire(ccache->lock, LW_EXCLUSIVE); + if (ccache->status == CCACHE_STATUS_INITIALIZED) + { + ccache->status = CCACHE_STATUS_IN_PROGRESS; + csstate->with_construction = true; + scandesc = heap_beginscan(rel, SnapshotAny, 0, NULL); + } + else if (ccache->status == CCACHE_STATUS_IN_PROGRESS) + { + csstate->normal_seqscan = true; + scandesc = heap_beginscan(rel, estate->es_snapshot, 0, NULL); + } + } + LWLockRelease(ccache->lock); + csstate->ccache = ccache; + + /* seek to the first position */ + if (estate->es_direction == ForwardScanDirection) + { + ItemPointerSetBlockNumber(&csstate->curr_ctid, 0); + ItemPointerSetOffsetNumber(&csstate->curr_ctid, 0); + } + else + { + ItemPointerSetBlockNumber(&csstate->curr_ctid, MaxBlockNumber); + ItemPointerSetOffsetNumber(&csstate->curr_ctid, MaxOffsetNumber); + } + } + else + { + scandesc = heap_beginscan(rel, estate->es_snapshot, 0, NULL); + csstate->normal_seqscan = true; + } + node->ss.ss_currentScanDesc = scandesc; + + node->custom_state = csstate; +} + +/* + * cache_scan_needs_next + * + * We may fetch a tuple to be invisible because columner cache stores + * all the living tuples, including ones updated / deleted by concurrent + * sessions. So, it is a job of the caller to check MVCC visibility. + * It decides whether we need to move the next tuple due to the visibility + * condition, or not. If given tuple was NULL, it is obviously a time to + * break searching because it means no more tuples on the cache. + */ +static bool +cache_scan_needs_next(HeapTuple tuple, Snapshot snapshot, Buffer buffer) +{ + bool visibility; + + /* end of the scan */ + if (!HeapTupleIsValid(tuple)) + return false; + + if (buffer != InvalidBuffer) + LockBuffer(buffer, BUFFER_LOCK_SHARE); + + visibility = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer); + + if (buffer != InvalidBuffer) + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + return !visibility ? true : false; +} + +static TupleTableSlot * +cache_scan_next(CustomScanState *node) +{ + cs_state *csstate = node->custom_state; + Relation rel = node->ss.ss_currentRelation; + HeapScanDesc scan = node->ss.ss_currentScanDesc; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + EState *estate = node->ss.ps.state; + Snapshot snapshot = estate->es_snapshot; + HeapTuple tuple; + Buffer buffer; + + /* in case of fallback path, we don't need to something special. */ + if (csstate->normal_seqscan) + { + tuple = heap_getnext(scan, estate->es_direction); + if (HeapTupleIsValid(tuple)) + ExecStoreTuple(tuple, slot, scan->rs_cbuf, false); + else + ExecClearTuple(slot); + return slot; + } + Assert(csstate->ccache != NULL); + + /* elsewhere, we either run or construct the columner cache */ + do { + ccache_head *ccache = csstate->ccache; + + /* + * "with_construction" means the columner cache is under construction, + * so we need to fetch a tuple from heap of the target relation and + * insert it into the cache. Note that we use SnapshotAny to fetch + * all the tuples both of visible and invisible ones, so it is our + * responsibility to check tuple visibility according to snapshot or + * the current estate. + * It is same even when we fetch tuples from the cache, without + * referencing heap buffer. + */ + if (csstate->with_construction) + { + tuple = heap_getnext(scan, estate->es_direction); + + LWLockAcquire(ccache->lock, LW_EXCLUSIVE); + if (HeapTupleIsValid(tuple)) + { + if (ccache_insert_tuple(ccache, rel, tuple)) + LWLockRelease(ccache->lock); + else + { + /* + * If ccache_insert_tuple got failed, it usually means + * lack of shared memory and unable to continue + * construction of the columner cacher. + * So, we put is twice to reset its reference counter + * to zero and release shared memory blocks. + */ + LWLockRelease(ccache->lock); + cs_put_ccache(ccache); + cs_put_ccache(ccache); + csstate->ccache = NULL; + } + } + else + { + /* + * If we reached end of the relation, it means the columner- + * cache become constructed. + */ + ccache->status = CCACHE_STATUS_CONSTRUCTED; + LWLockRelease(ccache->lock); + } + buffer = scan->rs_cbuf; + } + else + { + LWLockAcquire(ccache->lock, LW_SHARED); + tuple = ccache_find_tuple(ccache->root_chunk, + &csstate->curr_ctid, + estate->es_direction); + if (HeapTupleIsValid(tuple)) + { + ItemPointerCopy(&tuple->t_self, &csstate->curr_ctid); + tuple = heap_copytuple(tuple); + } + LWLockRelease(ccache->lock); + buffer = InvalidBuffer; + } + } while (cache_scan_needs_next(tuple, snapshot, buffer)); + + if (HeapTupleIsValid(tuple)) + ExecStoreTuple(tuple, slot, buffer, buffer == InvalidBuffer); + else + ExecClearTuple(slot); + + return slot; +} + +static bool +cache_scan_recheck(CustomScanState *node, TupleTableSlot *slot) +{ + return true; +} + +static TupleTableSlot * +cs_exec_custom_scan(CustomScanState *node) +{ + return ExecScan((ScanState *) node, + (ExecScanAccessMtd) cache_scan_next, + (ExecScanRecheckMtd) cache_scan_recheck); +} + +static void +cs_end_custom_scan(CustomScanState *node) +{ + cs_state *csstate = node->custom_state; + + if (csstate->ccache) + { + ccache_head *ccache = csstate->ccache; + bool needs_remove = false; + + LWLockAcquire(ccache->lock, LW_EXCLUSIVE); + if (ccache->status == CCACHE_STATUS_IN_PROGRESS) + needs_remove = true; + LWLockRelease(ccache->lock); + cs_put_ccache(ccache); + if (needs_remove) + cs_put_ccache(ccache); + } + if (node->ss.ss_currentScanDesc) + heap_endscan(node->ss.ss_currentScanDesc); +} + +static void +cs_rescan_custom_scan(CustomScanState *node) +{ + elog(ERROR, "not implemented yet"); +} + +/* + * cache_scan_synchronizer + * + * trigger function to synchronize the columner-cache with heap contents. + */ +Datum +cache_scan_synchronizer(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + Relation rel = trigdata->tg_relation; + HeapTuple tuple = trigdata->tg_trigtuple; + HeapTuple newtup = trigdata->tg_newtuple; + HeapTuple result = NULL; + const char *tg_name = trigdata->tg_trigger->tgname; + ccache_head *ccache; + + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "%s: not fired by trigger manager", tg_name); + + ccache = cs_get_ccache(RelationGetRelid(rel), NULL, false); + if (!ccache) + return PointerGetDatum(newtup); + LWLockAcquire(ccache->lock, LW_EXCLUSIVE); + + PG_TRY(); + { + TriggerEvent tg_event = trigdata->tg_event; + + if (TRIGGER_FIRED_AFTER(tg_event) && + TRIGGER_FIRED_FOR_ROW(tg_event) && + TRIGGER_FIRED_BY_INSERT(tg_event)) + { + ccache_insert_tuple(ccache, rel, tuple); + result = tuple; + } + else if (TRIGGER_FIRED_AFTER(tg_event) && + TRIGGER_FIRED_FOR_ROW(tg_event) && + TRIGGER_FIRED_BY_UPDATE(tg_event)) + { + ccache_insert_tuple(ccache, rel, newtup); + ccache_delete_tuple(ccache, tuple); + result = newtup; + } + else if (TRIGGER_FIRED_AFTER(tg_event) && + TRIGGER_FIRED_FOR_ROW(tg_event) && + TRIGGER_FIRED_BY_DELETE(tg_event)) + { + ccache_delete_tuple(ccache, tuple); + result = tuple; + } + else if (TRIGGER_FIRED_AFTER(tg_event) && + TRIGGER_FIRED_FOR_STATEMENT(tg_event) && + TRIGGER_FIRED_BY_TRUNCATE(tg_event)) + { + if (ccache->status != CCACHE_STATUS_IN_PROGRESS) + cs_put_ccache(ccache); + } + else + elog(ERROR, "%s: fired by unexpected context (%08x)", + tg_name, tg_event); + } + PG_CATCH(); + { + LWLockRelease(ccache->lock); + cs_put_ccache(ccache); + PG_RE_THROW(); + } + PG_END_TRY(); + LWLockRelease(ccache->lock); + cs_put_ccache(ccache); + + PG_RETURN_POINTER(result); +} +PG_FUNCTION_INFO_V1(cache_scan_synchronizer); + +/* + * ccache_on_object_access + * + * It dropps an existing columner-cache if the cached table was altered or + * dropped. + */ +static void +ccache_on_object_access(ObjectAccessType access, + Oid classId, + Oid objectId, + int subId, + void *arg) +{ + ccache_head *ccache; + + /* ALTER TABLE and DROP TABLE needs cache invalidation */ + if (access != OAT_DROP && access != OAT_POST_ALTER) + return; + if (classId != RelationRelationId) + return; + + ccache = cs_get_ccache(objectId, NULL, false); + if (!ccache) + return; + + LWLockAcquire(ccache->lock, LW_EXCLUSIVE); + if (ccache->status != CCACHE_STATUS_IN_PROGRESS) + cs_put_ccache(ccache); + LWLockRelease(ccache->lock); + cs_put_ccache(ccache); +} + +/* + * ccache_on_page_prune + * + * It is a callback function when a particular heap block got vacuumed. + * On vacuuming, its dead space, being allocated by dead tuples, got + * reclaimed and tuple's location was ought to be moved. + * This routine also reclaims the space by dead tuples on the columner + * cache according to layout changes on the heap. + */ +static void +ccache_on_page_prune(Relation relation, + Buffer buffer, + int ndeleted, + TransactionId OldestXmin, + TransactionId latestRemovedXid) +{ + ccache_head *ccache; + + /* call the secondary hook */ + if (heap_page_prune_next) + (*heap_page_prune_next)(relation, buffer, ndeleted, + OldestXmin, latestRemovedXid); + + ccache = cs_get_ccache(RelationGetRelid(relation), NULL, false); + if (ccache) + { + LWLockAcquire(ccache->lock, LW_EXCLUSIVE); + + ccache_vacuum_page(ccache, buffer); + + LWLockRelease(ccache->lock); + + cs_put_ccache(ccache); + } +} + +void +_PG_init(void) +{ + CustomProvider provider; + + if (IsUnderPostmaster) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cache_scan must be loaded via shared_preload_libraries"))); + + DefineCustomBoolVariable("cache_scan.disabled", + "turn on/off cache_scan feature on run-time", + NULL, + &cache_scan_disabled, + false, + PGC_USERSET, + GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + + /* initialization of cache subsystem */ + ccache_init(); + + /* callbacks for cache invalidation */ + object_access_next = object_access_hook; + object_access_hook = ccache_on_object_access; + + heap_page_prune_next = heap_page_prune_hook; + heap_page_prune_hook = ccache_on_page_prune; + + /* registration of custom scan provider */ + add_scan_path_next = add_scan_path_hook; + add_scan_path_hook = cs_add_scan_path; + + memset(&provider, 0, sizeof(provider)); + strncpy(provider.name, "cache scan", sizeof(provider.name)); + provider.InitCustomScanPlan = cs_init_custom_scan_plan; + provider.BeginCustomScan = cs_begin_custom_scan; + provider.ExecCustomScan = cs_exec_custom_scan; + provider.EndCustomScan = cs_end_custom_scan; + provider.ReScanCustomScan = cs_rescan_custom_scan; + + register_custom_provider(&provider); +} diff --git a/doc/src/sgml/cache-scan.sgml b/doc/src/sgml/cache-scan.sgml new file mode 100644 index 0000000..c4cc165 --- /dev/null +++ b/doc/src/sgml/cache-scan.sgml @@ -0,0 +1,224 @@ + + + + cache-scan + + + cache-scan + + + + Overview + + The cache-scan module provides an alternative way to scan + relations using on-memory columner cache, instead of usual heap scan, + in case when previous scan already holds contents of the table on the + cache. + Unlike buffer cache, it holds contents of the limited number of columns, + but not whole of the record, thus it allows to hold larger number of records + per same amount of RAM. Probably, this characteristic makes sense to run + analytic queries on a table with many columns and records. + + + Once this module gets loaded, it registers itself as a custom-scan provider. + It allows to provide an additional scan path on regular relations using + on-memory columner cache, instead of regular heap scan. + It also performs as a proof-of-concept implementation that works on + the custom-scan API that enables to extend the core executor system. + + + + + Installation + + This module has to be loaded using + parameter to acquired + a particular amount of shared memory on startup time. + In addition, the relation to be cached has special triggers, called + synchronizer, are implemented with cache_scan_synchronizer + function that synchronizes the cache contents according to the latest + heap on INSERT, UPDATE, DELETE or + TRUNCATE. + + + You can run this extension according to the following steps. + + + + + Adjust parameter to + load cache_scan binary on startup time, then restart + the postmaster. + + + + + Run to create synchronizer + function of cache_scan. + +CREATE EXTENSION cache_scan; + + + + + + Create triggers of synchronizer on the target relation. + +CREATE TRIGGER t1_cache_row_sync + AFTER INSERT OR UPDATE OR DELETE ON t1 FOR ROW + EXECUTE PROCEDURE cache_scan_synchronizer(); +CREATE TRIGGER t1_cache_stmt_sync + AFTER TRUNCATE ON t1 FOR STATEMENT + EXECUTE PROCEDURE cache_scan_synchronizer(); + + + + + + + + How does it works + + This module performs according to the usual fashion of + . + It offers an alternative way to scan a relation if relation has synchronizer + triggers and width of referenced columns are less than 30% of average + record width. + Then, query optimizer will pick up the cheapest path. If the path chosen + is a custom-scan path managed by cache_scan, it runs on the + target relation using columner cache. + On the first time running, it tries to construct relation's cache along + with regular sequential scan. Next time or later, it can run on + the columner cache without referencing the heap. + + + You can check whether the query plan uses cache_scan using + command, as follows: + +postgres=# EXPLAIN (costs off) SELECT a,b FROM t1 WHERE b < pi(); + QUERY PLAN +---------------------------------------------------- + Custom Scan (cache scan) on t1 + Filter: (b < 3.14159265358979::double precision) +(2 rows) + + + + A columner cache, associated with a particular relation, has one or more chunks + that performs as node or leaf of t-tree structure. + The cache_scan_debuginfo() function can dump useful informationl; + properties of all the active chunks as follows. + +postgres=# SELECT * FROM cache_scan_debuginfo(); + tableoid | status | chunk | upper | l_depth | l_chunk | r_depth | r_chunk | ntuples | usage | min_ctid | max_ct +id +----------+-------------+----------------+----------------+---------+----------------+---------+----------------+---------+---------+-----------+----------- + 16400 | constructed | 0x7f2b8ad84740 | 0x7f2b8af84740 | 0 | (nil) | 0 | (nil) | 29126 | 233088 | (0,1) | (677,15) + 16400 | constructed | 0x7f2b8af84740 | (nil) | 1 | 0x7f2b8ad84740 | 2 | 0x7f2b8b384740 | 29126 | 233088 | (677,16) | (1354,30) + 16400 | constructed | 0x7f2b8b184740 | 0x7f2b8b384740 | 0 | (nil) | 0 | (nil) | 29126 | 233088 | (1354,31) | (2032,2) + 16400 | constructed | 0x7f2b8b384740 | 0x7f2b8af84740 | 1 | 0x7f2b8b184740 | 1 | 0x7f2b8b584740 | 29126 | 233088 | (2032,3) | (2709,33) + 16400 | constructed | 0x7f2b8b584740 | 0x7f2b8b384740 | 0 | (nil) | 0 | (nil) | 3478 | 1874560 | (2709,34) | (2790,28) +(5 rows) + + + + All the cached tuples are indexed with ctid order, and each chunk has + an array of partial tuples with min- and max- values. Its left node is linked to + the chunks that have tuples with smaller ctid, and its right node is + linked to the chunks that have larger ones. + It enables to find out tuples in timely fashion when it needs to be invalidated + according to heap updates by DDL, DML or vacuuming. + + + The columner cache are not owned by a particular session, so it retains the cache + unless it does not dropped or postmaster does not restart. + + + + + GUC Parameters + + + cache_scan.block_size (integer) + + cache_scan.block_size configuration parameter + + + + This parameter controls length of the block on shared memory segment + for the columner-cache. It needs to restart postmaster for validation. + + + cache_scan module acquires cache_scan.num_blocks + x cache_scan.block_size bytes of shared memory segment on + the startup time, then allocates them for columner cache on demand. + Too large block size damages flexibility of memory assignment, and + too small block size consumes much management are for each block. + So, we recommend to keep is as the default value; that is 2MB per block. + + + + + + cache_scan.num_blocks (integer) + + cache_scan.num_blocks configuration parameter + + + + This parameter controls number of the block on shared memory segment + for the columner-cache. It needs to restart postmaster for validation. + + + cache_scan module acquires cache_scan.num_blocks + x cache_scan.block_size bytes of shared memory segment on + the startup time, then allocates them for columner cache on demand. + Too small number of blocks damages flexibility of memory assignment + and may cause undesired cache dropping. + So, we recommend to set enough number of blocks to keep contents of + the target relations on memory. + Its default is 64; probably too small for most of + real use cases. + + + + + + cache_scan.hash_size (integer) + + cache_scan.hash_size configuration parameter + + + + This parameter controls width of the internal hash table slots; that + link every columnar cache distributed by table's oid. + Its default is 128; no need to adjust it usually. + + + + + + cache_scan.max_cached_attnum (integer) + + cache_scan.max_cached_attnum configuration parameter + + + + This parameter controls the maximum attribute number we can cache on + the columner cache. Because of internal data representation, a bitmap set + to track attributes being cached has to be fixed-length. + Thus, the largest attribute number needs to be fixed preliminary. + Its default is 128; although most tables likely have less than + 100 columns. + + + + + + + Author + + KaiGai Kohei kaigai@kaigai.gr.jp + + + diff --git a/doc/src/sgml/contrib.sgml b/doc/src/sgml/contrib.sgml index 2002f60..3d8fd05 100644 --- a/doc/src/sgml/contrib.sgml +++ b/doc/src/sgml/contrib.sgml @@ -107,6 +107,7 @@ CREATE EXTENSION module_name FROM unpackaged; &auto-explain; &btree-gin; &btree-gist; + &cache-scan; &chkpass; &citext; &ctidscan; diff --git a/doc/src/sgml/custom-scan.sgml b/doc/src/sgml/custom-scan.sgml index f53902d..218a5fd 100644 --- a/doc/src/sgml/custom-scan.sgml +++ b/doc/src/sgml/custom-scan.sgml @@ -55,6 +55,20 @@ + + + + + This custom scan in this module enables a scan refering the on-memory + columner cache instead of the heap, if the target relation already has + this cache being constructed already. + Unlike buffer cache, it holds limited number of columns that have been + referenced before, but not all the columns in the table definition. + Thus, it allows to cache much larger number of records on-memory than + buffer cache. + + + diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index aa2be4b..10c7666 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -103,6 +103,7 @@ + diff --git a/src/backend/access/heap/pruneheap.c b/src/backend/access/heap/pruneheap.c index 27cbac8..1fb5f4a 100644 --- a/src/backend/access/heap/pruneheap.c +++ b/src/backend/access/heap/pruneheap.c @@ -42,6 +42,9 @@ typedef struct bool marked[MaxHeapTuplesPerPage + 1]; } PruneState; +/* Callback for each page pruning */ +heap_page_prune_hook_type heap_page_prune_hook = NULL; + /* Local functions */ static int heap_prune_chain(Relation relation, Buffer buffer, OffsetNumber rootoffnum, @@ -294,6 +297,16 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin, * and update FSM with the remaining space. */ + /* + * This callback allows extensions to synchronize their own status with + * heap image on the disk, when this buffer page is vacuumed. + */ + if (heap_page_prune_hook) + (*heap_page_prune_hook)(relation, + buffer, + ndeleted, + OldestXmin, + prstate.latestRemovedXid); return ndeleted; } diff --git a/src/backend/utils/time/tqual.c b/src/backend/utils/time/tqual.c index f626755..023f78e 100644 --- a/src/backend/utils/time/tqual.c +++ b/src/backend/utils/time/tqual.c @@ -103,11 +103,18 @@ static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot); * * The caller should pass xid as the XID of the transaction to check, or * InvalidTransactionId if no check is needed. + * + * In case when the supplied HeapTuple is not associated with a particular + * buffer, it just returns without any jobs. It may happen when an extension + * caches tuple with their own way. */ static inline void SetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 infomask, TransactionId xid) { + if (BufferIsInvalid(buffer)) + return; + if (TransactionIdIsValid(xid)) { /* NB: xid must be known committed here! */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index bfdadc3..9775aad 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -164,6 +164,13 @@ extern void heap_restrpos(HeapScanDesc scan); extern void heap_sync(Relation relation); /* in heap/pruneheap.c */ +typedef void (*heap_page_prune_hook_type)(Relation relation, + Buffer buffer, + int ndeleted, + TransactionId OldestXmin, + TransactionId latestRemovedXid); +extern heap_page_prune_hook_type heap_page_prune_hook; + extern void heap_page_prune_opt(Relation relation, Buffer buffer, TransactionId OldestXmin); extern int heap_page_prune(Relation relation, Buffer buffer,