Skip to main content

         This documentation site is for previous versions. Visit our new documentation site for current releases.      

This content has been archived and is no longer being updated.

Links may not function; however, this content may be relevant to outdated versions of the product.

Creating a Kafka data set

Updated on March 11, 2021

You can create a Kafka data set in Pega Platform, and then associate it with a topic in a Kafka cluster. You can select an existing topic in the Kafka cluster or define a new topic when creating the data set.

Configure Kafka data sets to read and write data from and to Kafka topics, and use this data as a source of events, such as customer calls or messages. Your application can use these events as input for rules that process data in real time and trigger actions.

For example, when a customer who has a checking account with UPlus Bank accesses the bank's ATM, this event can initiate an associated action, such as displaying an offer for a new credit card on the ATM's screen. For more information, see Triggering a real-time event with the Event Stream service.

You can connect to an Apache Kafka cluster version or later.

Before you begin:

Ensure that a Kafka configuration instance for connecting to your Kafka server or cluster of servers is available in your system. For more information, see Creating a Kafka configuration instance.

If you want to use Schema Registry with your Kafka data set, download the Schema Registry component provided by Pega, and install and configure the component by following the instructions that are available in the Pega GitHub repository.

Note: This Schema Registry component is supported by Pega Platform 8.2.x and later.
  1. In Dev Studio, click CreateData ModelData Set.
  2. Provide the data set label and identifier.
  3. From the Type list, select Kafka.
  4. Provide the ruleset, Applies to class, and ruleset version of the data set.
  5. Click Create and open.
  6. In the Connection section, perform one of the following actions:
    • Select a Kafka configuration instance in the Data-Admin-Kafka class.
    • Create a Kafka configuration instance (for example, when no instances are present) by clicking the Open icon.

      For more information, see Creating a Kafka configuration instance.

  7. Check whether the Pega Platform is connected to the Kafka cluster by clicking Test connectivity.
  8. In the Topic section, perform one of the following actions:
    • Select Create new, and then enter the topic name to define a new topic in the Kafka cluster.
    • Select Select from list, and then connect to an existing topic in the Kafka cluster.
    Note: By default, the name of the topic is the same as the name of the data set. If you enter a new topic name, that topic is created in the Kafka cluster only if the ability to automatically create topics is enabled on that Kafka cluster.
  9. Optional: In the Partition Key(s) section, define the data set partitioning by performing the following actions:
    1. Click Add key.
    2. In the Key field, press the Down Arrow key to select a property to be used by the Kafka data set as a partitioning key.
      Note: By default, the available properties to be used as keys correspond to the properties of the Applies To class of the Kafka data set.
    By configuring partitioning you can ensure that related records are sent to the same partition. If no partition keys are set, the Kafka data set randomly assigns records to partitions.
  10. Optional: You can configure the JSON Data Transform. It has the ability to map only the properties that you want to map, or if you have a long JSON message with many attributes, you can skip some of them. You can also have special characters in your property names (for example, the $ sign), and then map them to the corresponding Pega properties. For more information, see Data transform actions for JSON.
    Important: The ability to configure a JSON Data Transform is available only from Pega Platform version 8.4.3 and later.
    1. In the header of Dev Studio, click CreateData ModelData Transform.
    2. In the Label field, enter the purpose for the new record.
    3. In the Additional configuration options, select JSON.
    4. Select the Context and fill the Apply to field.
    5. In the Add to ruleset field, select the ruleset from the list.
    6. Click Create and open.
    7. Optional: Fill in the details of your JSON Data Transform.
    8. Click Save.
    9. In your Kafka data set creation ruleform, select Custom as the record format.
    10. In the Serialization implementation field, enter: com.pega.dsm.kafka.api.serde.DataTransformSerde.
    11. Click Additional ConfigurationAdd key-value pair.
    12. In the Key field, enter:
    13. In the Value field, enter the name of your data transform, that you created previously.
  11. Optional: If you want to use a different format for records than JSON, in the Record format section, select Custom and configure the record settings:

    If you use Schema Registry with your Kafka data set, configure these settings according to the instructions that are provided with the Schema Registry component in the Pega GitHub repository.

    For information about writing and configuring custom Kafka serialization, see Kafka custom serializer/deserializer implementation.

    1. In the Serialization implementation field, enter a fully qualified Java class name for your PegaSerde implementation.
      For example: com.pega.dsm.kafka.CsvPegaSerde
    2. Optional: Expand the Additional configuration section and define additional configuration options for the implementation class by clicking Add key value pair and entering properties in the Key and Value fields.
  12. Click Save.

Have a question? Get answers now.

Visit the Support Center to ask questions, engage in discussions, share ideas, and help others.

Did you find this content helpful?

Want to help us improve this content?

We'd prefer it if you saw us at our best. is not optimized for Internet Explorer. For the optimal experience, please use:

Close Deprecation Notice
Contact us