OiO.lk Community platform!

Oio.lk is an excellent forum for developers, providing a wide range of resources, discussions, and support for those in the developer community. Join oio.lk today to connect with like-minded professionals, share insights, and stay updated on the latest trends and technologies in the development field.
  You need to log in or register to access the solved answers to this problem.
  • You have reached the maximum number of guest views allowed
  • Please register below to remove this limitation

How to connect my pod on AKS to Azure Event Hub with its kafka interface?

  • Thread starter Thread starter smj16
  • Start date Start date
S

smj16

Guest
How can I connect a Kafka consumer written in Python with Event Hub on an AKS pod? I've already tried using Workload Identity with a Service Connector (previously I've tried with a connection string without success), but I'm still unable to connect. I did make sure that the created identity has the necessary rights on Event Hub.

The consumer was tested locally and works fine, here is the code

Code:
from azure.identity import DefaultAzureCredential, WorkloadIdentityCredential, ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from functools import partial
from pprint import pformat
import os


def stats_cb(stats_json_str):
    stats_json = json.loads(stats_json_str)
    print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))


def oauth_cb(cred, namespace_fqdn, config):
    # confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

    # cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
    # namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
    # config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
    access_token = cred.get_token('https://<eventhubs-namespace>.servicebus.windows.net/.default')
    return access_token.token, access_token.expires_on


def print_usage_and_exit(program_name):
    sys.stderr.write(
        'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..\n' % program_name)
    options = '''
 Options:
  -T <intvl>   Enable client statistics at specified interval (ms)
'''
    sys.stderr.write(options)
    sys.exit(1)


if __name__ == '__main__':
    optlist, argv = getopt.getopt("<resource group", 'T:')
    if len(argv) < 3:
        print_usage_and_exit(sys.argv[0]) 

      # Azure credential     # See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview

    cred = DefaultAzureCredential(managed_identity_client_id="<id>")


    # Consumer configuration
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': '<eventhubs-namespace>.servicebus.windows.net:9093',
        'group.id': '$Default',
        'session.timeout.ms': 6000,
        'auto.offset.reset': 'earliest',

        # Required OAuth2 configuration properties
        'security.protocol': 'SASL_SSL',
        'sasl.mechanism': 'OAUTHBEARER',
        # the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
        'oauth_cb': partial(oauth_cb, cred, '<eventhubs-namespace>.servicebus.windows.net:9093'),
    }
    #print(str(conf))
    # Check to see if -T option exists
    for opt in optlist:
        if opt[0] != '-T':
            continue
        try:
            intval = int(opt[1])
        except ValueError:
            sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
            sys.exit(1)

        if intval <= 0:
            sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
            sys.exit(1)

        conf['stats_cb'] = stats_cb
        conf['statistics.interval.ms'] = int(opt[1])

    # Create logger for consumer (logs will be emitted when poll() is called)
    logger = logging.getLogger('consumer')
    logger.setLevel(logging.DEBUG)
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
    logger.addHandler(handler)

    # Create Consumer instance
    # Hint: try debug='fetch' to generate some log messages
    c = Consumer(conf, logger=logger)
    #print(str(c.list_topics().topics))
    def print_assignment(consumer, partitions):
        print('Assignment:', partitions)

    # Subscribe to topics
    c.subscribe(["test"], on_assign=print_assignment)

    # Read messages from Kafka, print to stdout
    try:
        while True:
            msg = c.poll(timeout=1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                # Proper message
                sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
                                 (msg.topic(), msg.partition(), msg.offset(),
                                  str(msg.key())))
                print(msg.value())

    except KeyboardInterrupt:
        sys.stderr.write('%% Aborted by user\n')

    finally:
        # Close down consumer to commit final offsets.
        c.close()

Here is the deployment on K8s

Code:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer
  namespace: default
  labels:
    app: consumer
    azure.workload.identity/use: "true"
spec:
  replicas: 1
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      serviceAccountName: sc-account-<name>
      containers:       
      - name: consumer         
        image: <image>
        command: ["/venv/bin/python","/consumer.py"]
        envFrom:
          - secretRef:
              name: sc-eventhub<secret>
<p>How can I connect a Kafka consumer written in Python with Event Hub on an AKS pod? I've already tried using Workload Identity with a Service Connector (previously I've tried with a connection string without success), but I'm still unable to connect. I did make sure that the created identity has the necessary rights on Event Hub.</p>
<p>The consumer was tested locally and works fine, here is the code</p>
<pre><code>from azure.identity import DefaultAzureCredential, WorkloadIdentityCredential, ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from functools import partial
from pprint import pformat
import os


def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))


def oauth_cb(cred, namespace_fqdn, config):
# confluent_kafka requires an oauth callback function to return (str, float) with the values of (<access token>, <expiration date in seconds from epoch>)

# cred: an Azure identity library credential object. Ex: an instance of DefaultAzureCredential, ManagedIdentityCredential, etc
# namespace_fqdn: the FQDN for the target Event Hubs namespace. Ex: 'mynamespace.servicebus.windows.net'
# config: confluent_kafka passes the value of 'sasl.oauthbearer.config' as the config param
access_token = cred.get_token('https://<eventhubs-namespace>.servicebus.windows.net/.default')
return access_token.token, access_token.expires_on


def print_usage_and_exit(program_name):
sys.stderr.write(
'Usage: %s [options..] <eventhubs-namespace> <group> <topic1> <topic2> ..\n' % program_name)
options = '''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)


if __name__ == '__main__':
optlist, argv = getopt.getopt("<resource group", 'T:')
if len(argv) < 3:
print_usage_and_exit(sys.argv[0])

# Azure credential # See https://docs.microsoft.com/en-us/azure/developer/python/sdk/authentication-overview

cred = DefaultAzureCredential(managed_identity_client_id="<id>")


# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {
'bootstrap.servers': '<eventhubs-namespace>.servicebus.windows.net:9093',
'group.id': '$Default',
'session.timeout.ms': 6000,
'auto.offset.reset': 'earliest',

# Required OAuth2 configuration properties
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
# the resulting oauth_cb must accept a single `config` parameter, so we use partial to bind the namespace/identity to our function
'oauth_cb': partial(oauth_cb, cred, '<eventhubs-namespace>.servicebus.windows.net:9093'),
}
#print(str(conf))
# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except ValueError:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)

if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)

conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])

# Create logger for consumer (logs will be emitted when poll() is called)
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)-15s %(levelname)-8s %(message)s'))
logger.addHandler(handler)

# Create Consumer instance
# Hint: try debug='fetch' to generate some log messages
c = Consumer(conf, logger=logger)
#print(str(c.list_topics().topics))
def print_assignment(consumer, partitions):
print('Assignment:', partitions)

# Subscribe to topics
c.subscribe(["test"], on_assign=print_assignment)

# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())

except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')

finally:
# Close down consumer to commit final offsets.
c.close()
</code></pre>
<p>Here is the deployment on K8s</p>
<pre><code>apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer
namespace: default
labels:
app: consumer
azure.workload.identity/use: "true"
spec:
replicas: 1
selector:
matchLabels:
app: consumer
template:
metadata:
labels:
app: consumer
spec:
serviceAccountName: sc-account-<name>
containers:
- name: consumer
image: <image>
command: ["/venv/bin/python","/consumer.py"]
envFrom:
- secretRef:
name: sc-eventhub<secret>
</code></pre>
 
Top