Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pip install psycopg==3.1.6
# Ref: https://www.cockroachlabs.com/docs/stable/changefeed-for.html
# (current implementation) Ref: https://www.cockroachlabs.com/docs/v22.1/changefeed-for
# Ref: https://www.psycopg.org/psycopg3/docs/api/crdb.html
import contextlib, json, logging, psycopg, psycopg.conninfo, psycopg.crdb, sys, time
from typing import Any, Dict, Iterator, List, Optional, Tuple
from common.Settings import get_setting
LOGGER = logging.getLogger(__name__)
SQL_ACTIVATE_CHANGE_FEED = 'SET CLUSTER SETTING kv.rangefeed.enabled = true'
SQL_START_CHANGE_FEED = 'EXPERIMENTAL CHANGEFEED FOR {:s}.{:s} WITH format=json, no_initial_scan, updated'
class ChangeFeedClient:
def __init__(self) -> None:
self._connection : Optional[psycopg.crdb.CrdbConnection] = None
self._conn_info_dict : Dict = dict()
self._is_crdb : bool = False
def initialize(self) -> bool:
crdb_uri = get_setting('CRDB_URI')
if crdb_uri is None:
LOGGER.error('Connection string not found in EnvVar CRDB_URI')
return False
try:
crdb_uri = crdb_uri.replace('cockroachdb://', 'postgres://')
self._conn_info_dict = psycopg.conninfo.conninfo_to_dict(crdb_uri)
except psycopg.ProgrammingError:
LOGGER.exception('Invalid connection string: {:s}'.format(str(crdb_uri)))
return False
self._connection = psycopg.crdb.connect(**self._conn_info_dict)
self._is_crdb = psycopg.crdb.CrdbConnection.is_crdb(self._connection)
LOGGER.debug('is_crdb = {:s}'.format(str(self._is_crdb)))
# disable multi-statement transactions
self._connection.autocommit = True
# activate change feeds
self._connection.execute(SQL_ACTIVATE_CHANGE_FEED)
return self._is_crdb
def get_changes(self, table_name : str) -> Iterator[Tuple[float, str, List[Any], bool, Dict]]:
db_name = self._conn_info_dict.get('dbname')
if db_name is None: raise Exception('ChangeFeed has not been initialized!')
cur = self._connection.cursor()
str_sql_query = SQL_START_CHANGE_FEED.format(db_name, table_name)
with contextlib.closing(cur.stream(str_sql_query)) as feed:
for change in feed:
LOGGER.info(change)
table_name, primary_key, data = change[0], json.loads(change[1]), json.loads(change[2])
timestamp = data.get('updated') / 1.e9
if timestamp is None: timestamp = time.time()
after = data.get('after')
is_delete = ('after' in data) and (after is None)
yield timestamp, table_name, primary_key, is_delete, after
def main():
logging.basicConfig(level=logging.INFO)
cf = ChangeFeed()
ready = cf.initialize()
if not ready: raise Exception('Unable to initialize ChangeFeed')
for change in cf.get_changes('context'):
LOGGER.info(change)
return 0
if __name__ == '__main__':
sys.exit(main())