diff --git a/manifests/contextservice.yaml b/manifests/contextservice.yaml index 8655b275b43c05f6eace544416c88123bb4725f1..96735bf5f89f682f31131c123ee9884a1becbfdb 100644 --- a/manifests/contextservice.yaml +++ b/manifests/contextservice.yaml @@ -23,6 +23,8 @@ spec: #replicas: 1 template: metadata: + annotations: + config.linkerd.io/skip-outbound-ports: "4222" labels: app: contextservice spec: @@ -52,11 +54,11 @@ spec: command: ["/bin/grpc_health_probe", "-addr=:1010"] resources: requests: - cpu: 75m - memory: 64Mi - limits: - cpu: 100m + cpu: 250m memory: 128Mi + limits: + cpu: 1000m + memory: 1024Mi --- apiVersion: v1 kind: Service diff --git a/src/common/message_broker/backend/nats/NatsBackendThread.py b/src/common/message_broker/backend/nats/NatsBackendThread.py index e59e4d6835ef662e4b0ed9f92d79a45c22954a6f..0bedd2b242f7eeaa1585d0eb41c5a0bd9efe07e5 100644 --- a/src/common/message_broker/backend/nats/NatsBackendThread.py +++ b/src/common/message_broker/backend/nats/NatsBackendThread.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asyncio, nats, nats.errors, queue, threading +import asyncio, logging, nats, nats.errors, queue, threading from typing import List from common.message_broker.Message import Message +LOGGER = logging.getLogger(__name__) + class NatsBackendThread(threading.Thread): def __init__(self, nats_uri : str) -> None: self._nats_uri = nats_uri @@ -32,7 +34,9 @@ class NatsBackendThread(threading.Thread): self._tasks_terminated.set() async def _run_publisher(self) -> None: + LOGGER.info('[_run_publisher] NATS URI: {:s}'.format(str(self._nats_uri))) client = await nats.connect(servers=[self._nats_uri]) + LOGGER.info('[_run_publisher] Connected!') while not self._terminate.is_set(): try: message : Message = await self._publish_queue.get() @@ -47,8 +51,11 @@ class NatsBackendThread(threading.Thread): async def _run_subscriber( self, topic_name : str, timeout : float, out_queue : queue.Queue[Message], unsubscribe : threading.Event ) -> None: + LOGGER.info('[_run_subscriber] NATS URI: {:s}'.format(str(self._nats_uri))) client = await nats.connect(servers=[self._nats_uri]) + LOGGER.info('[_run_subscriber] Connected!') subscription = await client.subscribe(topic_name) + LOGGER.info('[_run_subscriber] Subscribed!') while not self._terminate.is_set() and not unsubscribe.is_set(): try: message = await subscription.next_msg(timeout)