# Kafka Producer

## ![](/files/oD1ZXmg22mSa9rsPUrmC) Kafka Producer

### Description <a href="#description" id="description"></a>

The Kafka Producer transform allows you to publish messages in near-real-time across worker nodes where multiple, subscribed members have access.

A Kafka Producer transform publishes a stream of records to one Kafka topic.

| Hop Engine | <sup>✓</sup> |
| ---------- | ------------ |
| Spark      | ?            |
| Flink      | ?            |
| Dataflow   | ?            |

### Options

| Option            | Description                                                                                                                                                                                                        |
| ----------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| Transform name    | the name for this transform                                                                                                                                                                                        |
| Bootstrap servers | comma separated list of bootstrap servers in a Kafka cluster                                                                                                                                                       |
| Client ID         | The unique Client identifier, used to identify and set up a durable connection path to the server to make requests and to distinguish between different clients.                                                   |
| Topic             | The category to which records are published.                                                                                                                                                                       |
| Key Field         | In Kafka, all messages can be keyed, allowing for messages to be distributed to partitions based on their keys in a default routing scheme. If no key is present, messages are randomly distributed to partitions. |
| Message Field     | The individual record contained in a topic.                                                                                                                                                                        |

#### Options

Use this tab to configure the property formats of the Kafka consumer broker sources. A few of the most common property formats have been included for your convenience. You can enter any desired Kafka property. For further information on these input names, see the Apache Kafka documentation site: <https://kafka.apache.org/documentation/>.

The options that are included by default are:

| Option                  | Value  |
| ----------------------- | ------ |
| auto.offset.reset       | latest |
| ssl.key.password        |        |
| ssl.keystore.location   |        |
| ssl.keystore.password   |        |
| ssl.truststore.location |        |
| ssl.truststore.password |        |

#### Avro and Schema registry

Here are some options you need to send Avro Record values to a Kafka server. The schema of Avro values are not sent to Kafka but to a schema registry. As such you need to have one available. Here are some options you need to set to make this work on a Confluent Cloud Kafka instance. There are various parts of the software stack that need authentication, hence the bit of redundancy. We recommend that you put these options in variables in your environment configuration file.

| Option                               | Example                                                                                                                        |
| ------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------ |
| schema.registry.url                  | <https://abcd-12345x.europe-west3.gcp.confluent.cloud>                                                                         |
| value.converter.schema.registry.url  | <https://abcd-12345x.europe-west3.gcp.confluent.cloud>                                                                         |
| auto.register.schemas                | true                                                                                                                           |
| security.protocol                    | SASL\_SSL                                                                                                                      |
| sasl.jaas.config                     | org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER\_API\_KEY" password="CLUSTER\_API\_SECRET"; |
| username                             | CLUSTER\_API\_KEY                                                                                                              |
| password                             | CLUSTER\_API\_SECRET                                                                                                           |
| sasl.mechanism                       | PLAIN                                                                                                                          |
| client.dns.lookup                    | use\_all\_dns\_ips                                                                                                             |
| acks                                 | ALL                                                                                                                            |
| basic.auth.credentials.source        | USER\_INFO                                                                                                                     |
| basic.auth.user.info                 | CLUSTER\_API\_KEY:CLUSTER\_API\_SECRET                                                                                         |
| schema.registry.basic.auth.user.info | SCHEMA\_REGISTRY\_API\_KEY:SCHEMA\_REGISTRY\_API\_SECRET                                                                       |

Last updated 2025-09-04 18:22:32 +0200


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.primeur.com/data-shaper-1.21/knowing-the-data-shaper-designer/pipelines/transforms/kafkaproducer.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
