Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka Supplier and Consumer need to provide separate Kafka configuration properties #557

Open
corneil opened this issue Oct 21, 2024 · 8 comments

Comments

@corneil
Copy link
Contributor

corneil commented Oct 21, 2024

In what version(s) of Spring Functions Catalog are you seeing this issue?
5.0.0
Describe the bug
When kafka-supplier is used as kafka-source-kafka the properties provided to configure access to the 'external' kafka cluster is also applied to the 'internal' kafka the stream apps outputs and inputs are bound to.

To Reproduce

Deploy simple SCDF with single container kafka that doesn't require authentication in same namespace as scdf.
Deploy another kafka using Bitnami helm.
Configure a stream kafka | log with the properties:

app.kafka.spring.kafka.bootstrap-servers=my-release-kafka-controller-0.my-release-kafka-controller-headless.default.svc.cluster.local:9092, my-release-kafka-controller-1.my-release-kafka-controller-headless.default.svc.cluster.local:9092,my-release-kafka-controller-2.my-release-kafka-controller-headless.default.svc.cluster.local:9092
app.kafka.spring.kafka.properties.security.protocol=SASL_PLAINTEXT
app.kafka.spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
app.kafka.spring.kafka.properties.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user1\" password=\"XXXXXXXX\";"
app.kafka.spring.kafka.client-id=scdf
app.kafka.spring.kafka.group-id=abc
app.kafka.kafka.supplier.topics=ABC
deployer.*.kubernetes.image-pull-policy=IfNotPresent

Expected behavior

A message created on topic ABC should be written to log.
The behaviour found is that topic .kafka cannot be written Unexpected handshake request with client mechanism SCRAM-SHA-256, enabled mechanisms are []

** Required change **
The properties for configuration of Kafka instance should be prefixed with kafka.supplier or kafka.consumer as follows:

app.kafka.kafka.supplier.spring.kafka.bootstrap-servers=my-release-kafka-controller-0.my-release-kafka-controller-headless.default.svc.cluster.local:9092, my-release-kafka-controller-1.my-release-kafka-controller-headless.default.svc.cluster.local:9092,my-release-kafka-controller-2.my-release-kafka-controller-headless.default.svc.cluster.local:9092
app.kafka.kafka.supplier.spring.kafka.properties.security.protocol=SASL_PLAINTEXT
app.kafka.kafka.supplier.spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
app.kafka.kafka.supplier.spring.kafka.properties.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user1\" password=\"XXXXXXXX\";"
app.kafka.kafka.supplier.spring.kafka.client-id=scdf
app.kafka.kafka.supplier.spring.kafka.group-id=abc

Alternatively the user will only supply the topic and the same kafka will be used as configured for all stream applications.

@artembilan
Copy link
Member

Isn't that what is supposed to be configured on the binder level via spring.cloud.stream.kafka.binder properties: https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/config-options.html ?

@corneil
Copy link
Contributor Author

corneil commented Oct 21, 2024

The binder has no authentication properties but tries to use the security properties that was set for the source application.

@corneil
Copy link
Contributor Author

corneil commented Oct 21, 2024

So there could be a problem in the binder code that it picks up spring.kafka.properties when it should only be looking at spring.cloud.stream.kafka.binder.consumerProperties and spring.cloud.stream.kafka.binder.configuration

@artembilan
Copy link
Member

Right. So, it feels like fix has to be done in Kafka Binder itself.

@corneil corneil transferred this issue from spring-cloud/spring-functions-catalog Oct 21, 2024
@artembilan
Copy link
Member

Let's add @sobychacko , since I'm not fully on board how Kafka Binder works.

@sobychacko
Copy link
Collaborator

I will have a look at this today.

@sobychacko
Copy link
Collaborator

@corneil The issue is that we have a Kafka supplier on one side consuming from the first instance and then the output binding that publishes to the other instance of Kafka. However, you only provide the Spring Boot based properties for the whole application which is used by the auto-configuration on the supplier side. We think, you need to override the following properties on the output binding to override the values used by the supplier.

spring.cloud.stream.kafka.binder.configuration.bootstrap-servers=<value>
spring.cloud.stream.kafka.binder.configuration.security.protocol=
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=
spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=

@corneil
Copy link
Contributor Author

corneil commented Oct 22, 2024

@corneil The issue is that we have a Kafka supplier on one side consuming from the first instance and then the output binding that publishes to the other instance of Kafka. However, you only provide the Spring Boot based properties for the whole application which is used by the auto-configuration on the supplier side. We think, you need to override the following properties on the output binding to override the values used by the supplier.

spring.cloud.stream.kafka.binder.configuration.bootstrap-servers=<value>
spring.cloud.stream.kafka.binder.configuration.security.protocol=
spring.cloud.stream.kafka.binder.configuration.sasl.mechanism=
spring.cloud.stream.kafka.binder.configuration.sasl.jaas.config=

Adding these properties worked.

I believe a need to change the configuration of the binders to use this mechanism as the default to avoid issue for other customer stream apps that may rely on default spring kafka configuration.
Managed to create a configuration for kafka where the binder configuration is in a separate secret that is added to skipper configuration and becomes the default on all deployments.

Need to create similar for RabbitMQ

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants