Skip to content
Snippets Groups Projects
ChangeFeedClient.py 3.56 KiB
Newer Older
Lluis Gifre Renom's avatar
Lluis Gifre Renom committed
# Copyright 2021-2023 H2020 TeraFlow (https://www.teraflow-h2020.eu/)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pip install psycopg==3.1.6
# Ref: https://www.cockroachlabs.com/docs/stable/changefeed-for.html
# (current implementation) Ref: https://www.cockroachlabs.com/docs/v22.1/changefeed-for
# Ref: https://www.psycopg.org/psycopg3/docs/api/crdb.html

import contextlib, json, logging, psycopg, psycopg.conninfo, psycopg.crdb, sys, time
from typing import Any, Dict, Iterator, List, Optional, Tuple
from common.Settings import get_setting

LOGGER = logging.getLogger(__name__)

SQL_ACTIVATE_CHANGE_FEED = 'SET CLUSTER SETTING kv.rangefeed.enabled = true'
SQL_START_CHANGE_FEED = 'EXPERIMENTAL CHANGEFEED FOR {:s}.{:s} WITH format=json, no_initial_scan, updated'

class ChangeFeedClient:
    def __init__(self) -> None:
        self._connection : Optional[psycopg.crdb.CrdbConnection] = None
        self._conn_info_dict : Dict = dict()
        self._is_crdb : bool = False

    def initialize(self) -> bool:
        crdb_uri = get_setting('CRDB_URI')
        if crdb_uri is None:
            LOGGER.error('Connection string not found in EnvVar CRDB_URI')
            return False

        try:
            crdb_uri = crdb_uri.replace('cockroachdb://', 'postgres://')
            self._conn_info_dict = psycopg.conninfo.conninfo_to_dict(crdb_uri)
        except psycopg.ProgrammingError:
            LOGGER.exception('Invalid connection string: {:s}'.format(str(crdb_uri)))
            return False

        self._connection = psycopg.crdb.connect(**self._conn_info_dict)
        self._is_crdb = psycopg.crdb.CrdbConnection.is_crdb(self._connection)
        LOGGER.debug('is_crdb = {:s}'.format(str(self._is_crdb)))

        # disable multi-statement transactions
        self._connection.autocommit = True

        # activate change feeds
        self._connection.execute(SQL_ACTIVATE_CHANGE_FEED)

        return self._is_crdb

    def get_changes(self, table_name : str) -> Iterator[Tuple[float, str, List[Any], bool, Dict]]:
        db_name = self._conn_info_dict.get('dbname')
        if db_name is None: raise Exception('ChangeFeed has not been initialized!')
        cur = self._connection.cursor()
        str_sql_query = SQL_START_CHANGE_FEED.format(db_name, table_name)
        with contextlib.closing(cur.stream(str_sql_query)) as feed:
            for change in feed:
                LOGGER.info(change)
                table_name, primary_key, data = change[0], json.loads(change[1]), json.loads(change[2])
                timestamp = data.get('updated') / 1.e9
                if timestamp is None: timestamp = time.time()
                after = data.get('after')
                is_delete = ('after' in data) and (after is None)
                yield timestamp, table_name, primary_key, is_delete, after

def main():
    logging.basicConfig(level=logging.INFO)

    cf = ChangeFeed()
    ready = cf.initialize()
    if not ready: raise Exception('Unable to initialize ChangeFeed')
    for change in cf.get_changes('context'):
        LOGGER.info(change)

    return 0

if __name__ == '__main__':
    sys.exit(main())