Skip to main content


         This documentation site is for previous versions. Visit our new documentation site for current releases.      
 

Kafka operations

Updated on July 11, 2022

Manage a Kafka cluster connected to Pega Platform™ to adjust the database to your needs.

Scaling a Kafka cluster

Modify the size of your Kafka cluster by performing the following procedures:

  1. Reassigning partitions
  2. Adding a new node
  3. Starting a temporarily unavailable node
  4. Decommissioning a node
  5. Replacing a node
  6. Changing the replication factor
  7. Changing the number of partitions
  8. Deleting a data set
  9. Truncating a data set

Reassigning a partition

Assign partitions to Kafka nodes to grow and shrink Kafka clusters, to change the number of partitions, and to change the replication factor. During the partition reassignment process, the cluster becomes unavailable for reads and writes.

  1. Generate a reassignment map structure (a ring-like partition).
  2. Submit the map structure to Zookeper.

Kafka detects the change and performs rebalancing. Partitions are distributed evenly in the cluster according to the round-robin partition assignment algorithm.

Adding a new node

  1. Ensure that all nodes are available.
  2. Add a new Kafka node.
    Note: If not all nodes are available, the JOINING_FAILED message appears when you try to add a new stream service node.

When a new node is added, the reassignment is started and the partitions are redistributed across all available nodes. To rebalance data among brokers, some data is moved from existing nodes to the new node.

Starting a temporarily unavailable node

When a node has been temporarily unavailable, Kafka synchronizes the existing partitions automatically, if replicas exist. The synchronization occurs in the background, and the cluster is available throughout that process.

Decommissioning a Stream service node

Decommissioning a Stream service node triggers the Kafka partition reassignment. Decommissioning is not a very stable procedure that can fail because of many factors, such as:

  • Network issues
  • A broker failure during the reassignment
  • Existing offline partitions.
Important: Do not perform the decommission operation in the Production environment. You should only decommission nodes in non-production environments.

To decommission a node, complete one of the following processes, depending on the node status:

Decommissioning a normal or stopped node

If the node is up and normal or stopped, perform the following actions:

  1. Reassign the partitions.
  2. Shut down the Kafka cluster.
  3. To prevent issues when you add the node back in the future, delete data folders.
    Note: This operation is allowed only if the decommission does not lead to data loss.

Decommissioning an unavailable node

If the node is unavailable, bring up the node, and complete decommissioning by performing one of the following actions:
  • If you can bring up the node, when the cluster status returns to normal, run partition reassignment.
  • If you cannot bring the node up, warn the users about the data loss (not implemented), and then run partition reassignment.

    With enough replicas, you do not need to warn the users about the data loss.

Decommissioning the last node

If you want to decommission the last node, perform the following actions:
  1. Warn the users about the data loss (not implemented).
  2. Remove all metadata topics.
  3. Remove the Kafka cluster data.

Replacing a node

Replace a node by completing one of the following processes that applies to your scenario:

  • If you have an old volume attached, the node starts as a known node that has been temporarily unavailable.
  • If you do not have an attached volume, but enough replicas, add the node as a new node.
  • If you do not have an attached volume and not enough replicas, add the node as a new node.

     

    Note: If you have two nodes, one of which is in the UNAVAILABLE status, the node that you add gets the JOINING_FAILED status. To resolve that issue, start the unreachable node to recover the data, or decommission the unavailable node.

Replication factor changes

You can configure a preferable replication factor (PRF) in the server settings dialog. The default PRF is 2.

If the number of nodes is less than PRF, the replication factor is set to the number of nodes, and a warning appears on the Stream service landing page.

If the number of nodes is greater than or equal to PRF, the replication factor is set to the PRF value.

For example, if you set the PRF value to 3, the following settings are configured:

  • If the number of nodes is 1, the replication factor is set to 1 and a warning appears.
  • If the number of nodes is 2, the replication factor is set to 2 and a warning appears.
  • If the number of nodes is 3, the replication factor is set to 3.
  • If the number of nodes is 4, the replication factor is set to 3.

Changing the number of partitions

The number of partitions is a global setting and you cannot change it once you create a topic or start a node. The default number of partitions is:

  • 6 starting in Pega Platform version 8.7
  • 20 in Pega Platform versions 8.6 and earlier

To change the number of partitions, in the prconfig.xml file, enter the following string:

<env name="dsm/services/stream/pyTopicPartitionsCount" value="22" />

Data set deletion

Data set deletion is supported.

Note: Kafka topic deletion is not an instant operation. Kafka renames the data and schedules disk cleanup.

Data set truncation

Kafka does not support topic truncation. The stream data set truncation implementation deletes the topic and then recreates it.

Multi-datacenter setup

The stream service supports multi-datacenter setup on the rack level. If datacenter machines are grouped into racks, supply that rack information in the prconfig.xml file, for example:

<env name="dsm/services/stream/server_properties/broker.rack" value = "Rack1" />

If partition replicas are placed in as many different racks as possible, the partition can continue to function if a rack goes down.

Recovering the stream service data

Recover stream service data by copying the files from one cluster to another.

  1. Stop all stream service nodes on Cluster1.

    Note: Do not decommission the stream service to avoid the loss of data.
  2. Ensure that Cluster2 does not have any enabled services and shut down Pega Platform™.

  3. Ensure that the pr_data_stream_nodes table in Cluster2 is empty.

  4. Import the contents of the pr_data_stream_nodes table from Cluster1 to Cluster2.

  5. Move the kafka-data folder from every stream service node of Cluster1 to Cluster2. Place the folder in either the default location or any other location with the proper configuration in the prconfig.xml file.

    For more information, see <PLACEHOLDER>.

  6. Start stream services on Cluster2.

 

To view the main outline for this article, see Kafka as a streaming service.

Have a question? Get answers now.

Visit the Support Center to ask questions, engage in discussions, share ideas, and help others.

Did you find this content helpful?

Want to help us improve this content?

We'd prefer it if you saw us at our best.

Pega.com is not optimized for Internet Explorer. For the optimal experience, please use:

Close Deprecation Notice
Contact us