# Beam Kafka Produce

## ![](/files/cc2c18Fs9rcmyn760xOp) Beam Kafka Produce

### Description <a href="#description" id="description"></a>

The Beam Kafka Produce transform [publishes](https://kafka.apache.org/25/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html) records to a Kafka cluster using the Beam execution engine.

| Hop Engine | X            |
| ---------- | ------------ |
| Spark      | <sup>✓</sup> |
| Flink      | <sup>✓</sup> |
| Dataflow   | <sup>✓</sup> |

### Limitations

The main limitation of the Kafka Producer is that it currently only supports writing or producing Strings as keys and String and Avro Record as values.

### Options

| Option                      | Description                                                                             |
| --------------------------- | --------------------------------------------------------------------------------------- |
| Transform name              | Name of the transform, this name has to be unique in a single pipeline.                 |
| Bootstrap servers           | A comma separated list of hosts which are Kafka brokers in a "bootstrap" Kafka cluster. |
| The topics                  | The topics to publish to.                                                               |
| The field to use as key     | The record key.                                                                         |
| The field to use as message | The record message.                                                                     |

#### Avro and Schema registry

Here are some options you need to send Avro Record values to a Kafka server. The schema of Avro values are not sent to Kafka but to a schema registry. As such you need to have one available. Here are some options you need to set to make this work on a Confluent Cloud Kafka instance. There are various parts of the software stack that need authentication, hence the bit of redundancy. We recommend that you put these options in variables in your environment configuration file.

| Option                               | Example                                                                                                                        |
| ------------------------------------ | ------------------------------------------------------------------------------------------------------------------------------ |
| schema.registry.url                  | <https://abcd-12345x.europe-west3.gcp.confluent.cloud>                                                                         |
| value.converter.schema.registry.url  | <https://abcd-12345x.europe-west3.gcp.confluent.cloud>                                                                         |
| auto.register.schemas                | true                                                                                                                           |
| security.protocol                    | SASL\_SSL                                                                                                                      |
| sasl.jaas.config                     | org.apache.kafka.common.security.plain.PlainLoginModule required username="CLUSTER\_API\_KEY" password="CLUSTER\_API\_SECRET"; |
| username                             | CLUSTER\_API\_KEY                                                                                                              |
| password                             | CLUSTER\_API\_SECRET                                                                                                           |
| sasl.mechanism                       | PLAIN                                                                                                                          |
| client.dns.lookup                    | use\_all\_dns\_ips                                                                                                             |
| acks                                 | ALL                                                                                                                            |
| basic.auth.credentials.source        | USER\_INFO                                                                                                                     |
| basic.auth.user.info                 | CLUSTER\_API\_KEY:CLUSTER\_API\_SECRET                                                                                         |
| schema.registry.basic.auth.user.info | SCHEMA\_REGISTRY\_API\_KEY:SCHEMA\_REGISTRY\_API\_SECRET                                                                       |


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.primeur.com/data-shaper-1.21/knowing-the-data-shaper-designer/pipelines/transforms/beamkafkaproduce.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
