Stream node configurations for a Queue Processor rule

A Queue Processor rule requires at least one stream node to run. Without a stream node, messages cannot be queued to or retrieved from the Kafka server.

Stream nodes

A Pega node works as a stream node when you start it with a stream node type. The following JVM argument shows a stream node type:
-DNodeType=Stream
A universal node type also works as a stream node type. The following JVM argument shows a universal node type:
-DNodeType=Universal
If you do not define a stream node in a cluster, the items are queued to the database, and then these items are processed when a stream node is available.

Configure stream nodes by using Dynamic System Settings. For more information, see Dynamic System Settings.

Replication factor

If you configure only one node to be a stream node, the messages on this node are not processed when the node is down. For durability of messages, replicate the messages to another node or nodes. The default replication factor is 2, so the messages are replicated to two nodes.

The following Dynamic System Settings are for the replication factor:
  • Owning ruleset – Pega-Engine
  • pyPurpose – prconfig/dsm/services/stream/pyReplicationFactor/default
  • pySetting – 2
Note: The replication factor must be equal to or higher than the number of stream nodes.

Insync replicas

The Insync replicas value represents the number of replicas that can be created by the replication factor. The default value is 1. If the number of replicas is lower than the replication factor, messages are replicated to the number of nodes that is equal to the number of insync replicas.

The following Dynamic System Setting are for the insync replicas:
  • Owning ruleset – Pega-Engine
  • pyPurpose – prconfig/dsm/services/stream/server_properties/min.insync.replicas/default
  • pySetting – 1

Retention period

Kafka deletes messages from the file system after a retention period. The default retention period is 7 days. If you want to store your messages for a longer period, change the retention period. The retention period is expressed in hours. For example, 7 days is 168 hours.

The following Dynamic System Settings are for the retention period:
  • Owning ruleset – Pega-Engine
  • pyPurpose – prconfig/dsm/services/stream/server_properties/log.retention.hours/default
  • pySetting – 168

Number of partitions

Partition is a unit in Kafka that a thread can access independently. To ensure that numerous background processes can run simultaneously, provide a sufficient number of partitions. The default number of partitions is 20.

The following Dynamic System Settings are for the number of partitions:
  • Owning ruleset – Pega-Engine
  • pyPurpose – pyPurpose – prconfig/dsm/services/stream/pyTopicPartitionsCount/default
  • pySetting – 20

Message size

Change the message size if queuing pages with the pzInsKey value is not possible, and the size of your message exceeds 5 MB. The message size is expressed in bytes. Before you change the message size, change the following settings:
  • <env name="dnode/kafka/producer/max.message.bytes" value="5000000"/>

    This setting must be equal to the dsm/services/stream/server_properties/message.max.bytes setting.

  • <env name="dsm/services/stream/server_properties/replica.fetch.max.bytes" value="5001000"/>

    This setting must be higher than the dsm/services/stream/server_properties/message.max.bytes setting.

  • <env name="dsm/services/stream/server_properties/replica.fetch.response.max.bytes" value="5001000"/>

    This setting must be higher than the dsm/services/stream/server_properties/message.max.bytes setting.

The following Dynamic System Settings are for the message size:
  • Owning ruleset – Pega-Engine
  • pyPurpose – pyPurpose – prconfig/dsm/services/stream/server_properties/message.max.bytes/default
  • pySetting – 5000000