Commit c275f3f8 authored by Waleed Akbar's avatar Waleed Akbar
Browse files

Temporary Kafka changes.

- add external port in yaml file
- update listener settings
parent 606eac56
Loading
Loading
Loading
Loading
+12 −10
Original line number Diff line number Diff line
@@ -35,7 +35,13 @@ spec:
    port: 9093
    protocol: TCP
    targetPort: 9093
  - name: external
    port: 9094
    protocol: TCP
    targetPort: 9094
---


apiVersion: apps/v1
kind: StatefulSet
metadata:
@@ -67,23 +73,19 @@ spec:
          containerPort: 9092
        - name: control-plane
          containerPort: 9093
        - name: external
          containerPort: 9094
        env:
          #- name: KAFKA_BROKER_ID
          #  value: "1"
          #- name: KAFKA_ZOOKEEPER_CONNECT
          #  value: zookeeper-service.<KAFKA_NAMESPACE>.svc.cluster.local:2181
          #- name: KAFKA_LISTENERS
          #  value: PLAINTEXT://:9092
          #- name: KAFKA_ADVERTISED_LISTENERS
          #  value: PLAINTEXT://kafka-service.<KAFKA_NAMESPACE>.svc.cluster.local:9092
          - name: KAFKA_CFG_NODE_ID
            value: "1"
          - name: KAFKA_CFG_PROCESS_ROLES
            value: "controller,broker"
          - name: KAFKA_CFG_LISTENERS
            value: "PLAINTEXT://:9092,CONTROLLER://:9093"
            value: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
          - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP
            value: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"
            value: "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT"
          - name: KAFKA_CFG_ADVERTISED_LISTENERS
            value: "PLAINTEXT://kafka-public.kafka.svc.cluster.local:9092,EXTERNAL://localhost:9094"
          - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES
            value: "CONTROLLER"
          - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS
+11 −8
Original line number Diff line number Diff line
@@ -42,8 +42,12 @@ class KafkaConfig(Enum):
    def get_kafka_address() -> str:
        kafka_server_address  = get_setting('KFK_SERVER_ADDRESS', default=None)
        if kafka_server_address is None:
            try:
                KFK_NAMESPACE = get_setting('KFK_NAMESPACE')
                KFK_PORT      = get_setting('KFK_SERVER_PORT')
            except Exception:
                KFK_NAMESPACE = 'kafka'
                KFK_PORT      = '9092'
            kafka_server_address = KFK_SERVER_ADDRESS_TEMPLATE.format(KFK_NAMESPACE, KFK_PORT)
        return kafka_server_address
        
@@ -59,10 +63,10 @@ class KafkaTopic(Enum):
    # TODO: Later to be populated from ENV variable.
    TELEMETRY_REQUEST    = 'topic_telemetry_request' 
    TELEMETRY_RESPONSE   = 'topic_telemetry_response'
    RAW                  = 'topic_raw' 
    LABELED              = 'topic_labeled'
    VALUE                = 'topic_value'
    ALARMS               = 'topic_alarms'
    RAW                  = 'topic_raw'                  # TODO: Update name to telemetry_raw
    LABELED              = 'topic_labeled'              # TODO: Update name to telemetry_labeled
    VALUE                = 'topic_value'                # TODO: Update name to telemetry_value
    ALARMS               = 'topic_alarms'               # TODO: Update name to telemetry_alarms
    ANALYTICS_REQUEST    = 'topic_analytics_request'
    ANALYTICS_RESPONSE   = 'topic_analytics_response'
    VNTMANAGER_REQUEST   = 'topic_vntmanager_request' 
@@ -144,7 +148,6 @@ class KafkaTopic(Enum):
            LOGGER.debug('All topics created and available.')
            return True

# TODO: create all topics after the deployments (Telemetry and Analytics)

if __name__ == '__main__':
    import os