Skip to content

Latest commit

 

History

History
409 lines (309 loc) · 13.4 KB

File metadata and controls

409 lines (309 loc) · 13.4 KB

Valkey PHP - Streams

Command Description Supported Tested Class/Trait Method
xAck Acknowledge one or more pending messages. Streams xAck
xAdd Add a message to a stream. Streams xAdd
xClaim Acquire ownership of a pending message. Streams xClaim
xDel Remove a message from a stream. Streams xDel
xGroup Manage consumer groups. Streams xGroup
xInfo Get information about a stream. Streams xInfo
xLen Get the length of a stream. Streams xLen
xPending Inspect pending messages in a stream. Streams xPending
xRange Query a range of messages from a stream. Streams xRange
xRead Read message(s) from a stream. Streams xRead
xReadGroup Read stream messages with a group and consumer. Streams xReadGroup
xRevRange Query one or more messages from end to start. Streams xRevRange
xTrim Trim a stream's size. Streams xTrim
  • XACK Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
  • XADD Appends a new message to a stream. Creates the key if it doesn't exist.
  • XAUTOCLAIM Changes, or acquires, ownership of messages in a consumer group, as if the messages were delivered to as consumer group member.
  • XCLAIM Changes, or acquires, ownership of a message in a consumer group, as if the message was delivered a consumer group member.
  • XDEL Returns the number of messages after removing them from a stream.
  • XGROUP A container for consumer groups commands.
  • XGROUP CREATE Creates a consumer group.
  • XGROUP CREATECONSUMER Creates a consumer in a consumer group.
  • XGROUP DELCONSUMER Deletes a consumer from a consumer group.
  • XGROUP DESTROY Destroys a consumer group.
  • XGROUP HELP Returns helpful text about the different subcommands.
  • XGROUP SETID Sets the last-delivered ID of a consumer group.
  • XINFO A container for stream introspection commands.
  • XINFO CONSUMERS Returns a list of the consumers in a consumer group.
  • XINFO GROUPS Returns a list of the consumer groups of a stream.
  • XINFO HELP Returns helpful text about the different subcommands.
  • XINFO STREAM Returns information about a stream.
  • XLEN Return the number of messages in a stream.
  • XPENDING Returns the information and entries from a stream consumer group's pending entries list.
  • XRANGE Returns the messages from a stream within a range of IDs.
  • XREAD Returns messages from multiple streams with IDs greater than the ones requested. Blocks until a message is available otherwise.
  • XREADGROUP Returns new or historical messages from a stream for a consumer in a group. Blocks until a message is available otherwise.
  • XREVRANGE Returns the messages from a stream within a range of IDs in reverse order.
  • XSETID An internal command for replicating stream values.
  • XTRIM Deletes messages from the beginning of a stream.
  • xAck - Acknowledge one or more pending messages
  • xAdd - Add a message to a stream
  • xClaim - Acquire ownership of a pending message
  • xDel - Remove a message from a stream
  • xGroup - Manage consumer groups
  • xInfo - Get information about a stream
  • xLen - Get the length of a stream
  • xPending - Inspect pending messages in a stream
  • xRange - Query a range of messages from a stream
  • xRead - Read message(s) from a stream
  • xReadGroup - Read stream messages with a group and consumer
  • xRevRange - Query one or more messages from end to start
  • xTrim - Trim a stream's size

Usage

$valkey = new Valkey();
$valkey->connect('127.0.0.1', 6379);

$obj_redis->xAdd('mystream', "*", ['field' => 'value']);
$obj_redis->xAdd('mystream', "*", ['field' => 'value'], 1000); // set max length of stream to 1000
$obj_redis->xAdd('mystream', "*", ['field' => 'value'], 1000, true); // set max length of stream to ~1000
$obj_redis->xAck('mystream', 'group1', ['1530063064286-0', '1530063064286-1']);

/* Get everything in this stream */
$obj_redis->xRange('mystream', '-', '+');

/* Only the first two messages */
$obj_redis->xRange('mystream', '-', '+', 2);

$obj_redis->xInfo('STREAM', 'mystream', 'FULL', 10);

$obj_redis->xRead(['stream1' => '1535222584555-0', 'stream2' => '1535222584555-0']);

$obj_redis->xDel('mystream', ['1530115304877-0', '1530115305731-0']);

xAck


Prototype
$obj_redis->xAck($stream, $group, $arr_messages);

Description: Acknowledge one or more messages on behalf of a consumer group.

Return value

long: The number of messages Valkey reports as acknowledged.

Example
$obj_redis->xAck('stream', 'group1', ['1530063064286-0', '1530063064286-1']);

xAdd


Prototype
$obj_redis->xAdd($str_key, $str_id, $arr_message[, $i_maxlen, $boo_approximate]);

Description: Add a message to a stream

Return value

String: The added message ID

Example
$obj_redis->xAdd('mystream', "*", ['field' => 'value']);
$obj_redis->xAdd('mystream', "*", ['field' => 'value'], 1000); // set max length of stream to 1000
$obj_redis->xAdd('mystream', "*", ['field' => 'value'], 1000, true); // set max length of stream to ~1000

xClaim


Prototype
$obj_redis->xClaim($str_key, $str_group, $str_consumer, $min_idle_time, $arr_ids, [$arr_options]);

Description: Claim ownership of one or more pending messages.

Options Array

$options = [
    /* Note:  'TIME', and 'IDLE' are mutually exclusive */
    'IDLE' => $value, /* Set the idle time to $value ms  */,
    'TIME' => $value, /* Set the idle time to now - $value */
    'RETRYCOUNT' => $value, /* Update message retrycount to $value */
    'FORCE', /* Claim the message(s) even if they're not pending anywhere */
    'JUSTID', /* Instruct Valkey to only return IDs */
];
Return value

Array: Either an array of message IDs along with corresponding data, or just an array of IDs (if the 'JUSTID' option was passed).

Example
$ids = ['1530113681011-0', '1530113681011-1', '1530113681011-2'];

/* Without any options */
$obj_redis->xClaim(
    'mystream', 'group1', 'myconsumer1', 0, $ids
);

/* With options */
$obj_redis->xClaim(
    'mystream', 'group1', 'myconsumer2', 0, $ids,
    [
        'IDLE' => time() * 1000,
        'RETRYCOUNT' => 5,
        'FORCE',
        'JUSTID'
    ]
);

xDel


Prototype
$obj_redis->xDel($str_key, $arr_ids);

Description: Delete one or more messages from a stream.

Return value

long: The number of messages removed

Example
$obj_redis->xDel('mystream', ['1530115304877-0', '1530115305731-0']);

xGroup


Prototype
$obj_redis->xGroup('HELP');
$obj_redis->xGroup('CREATE', $str_key, $str_group, $str_msg_id, [$boo_mkstream]);
$obj_redis->xGroup('SETID', $str_key, $str_group, $str_msg_id);
$obj_redis->xGroup('DESTROY', $str_key, $str_group);
$obj_redis->xGroup('DELCONSUMER', $str_key, $str_group, $str_consumer_name);

Description: This command is used in order to create, destroy, or manage consumer groups.

Return value

Mixed: This command returns different types depending on the specific XGROUP command executed.

Example
$obj_redis->xGroup('CREATE', 'mystream', 'mygroup', '0');
$obj_redis->xGroup('CREATE', 'mystream', 'mygroup2', '0', true); /* Create stream if non-existent. */
$obj_redis->xGroup('DESTROY', 'mystream', 'mygroup');

xInfo


Prototype
$obj_redis->xInfo('CONSUMERS', $str_stream, $str_group);
$obj_redis->xInfo('GROUPS', $str_stream);
$obj_redis->xInfo('STREAM', $str_stream [, 'FULL' [, $i_count]]);
$obj_redis->xInfo('HELP');

Description: Get information about a stream or consumer groups.

Return value

Mixed: This command returns different types depending on which subcommand is used.

Example
$obj_redis->xInfo('STREAM', 'mystream');
$obj_redis->xInfo('STREAM', 'mystream', 'FULL', 10);

xLen


Prototype
$obj_redis->xLen($str_stream);

Description: Get the length of a given stream

Return value

Long: The number of messages in the stream.

Example
$obj_redis->xLen('mystream');

xPending


Prototype
$obj_redis->xPending($str_stream, $str_group [, $str_start, $str_end, $i_count, $str_consumer]);

Description: Get information about pending messages in a given stream.

Return value

Array: Information about the pending messages, in various forms depending on the specific invocation of XPENDING.

Examples
$obj_redis->xPending('mystream', 'mygroup');
$obj_redis->xPending('mystream', 'mygroup', '-', '+', 1, 'consumer-1');

xRange


Prototype
$obj_redis->xRange($str_stream, $str_start, $str_end [, $i_count]);

Description: Get a range of messages from a given stream.

Return value

Array: The messages in the stream within the requested range.

Example
/* Get everything in this stream */
$obj_redis->xRange('mystream', '-', '+');

/* Only the first two messages */
$obj_redis->xRange('mystream', '-', '+', 2);

xRead


Prototype
$obj_redis->xRead($arr_streams [, $i_count, $i_block]);

Description: Read data from one or more streams and only return IDs greater than sent in the command.

Return value

Array: The messages in the stream newer than the IDs passed to Valkey (if any).

Example
$obj_redis->xRead(['stream1' => '1535222584555-0', 'stream2' => '1535222584555-0']);

/* --- Possible output  ---
Array
(
    [stream1] => Array
        (
            [1535222584555-1] => Array
                (
                    [key:1] => val:1
                )

        )

    [stream2] => Array
        (
            [1535222584555-1] => Array
                (
                    [key:1] => val:1
                )

        )

)
*/

// Receive only new message ($ = last id) and wait for one new message unlimited time
$obj_redis->xRead(['stream1' => '$'], 1, 0);

xReadGroup


Prototype
$obj_redis->xReadGroup($str_group, $str_consumer, $arr_streams [, $i_count, $i_block]);

Description: This method is similar to xRead except that it supports reading messages for a specific consumer group.

Return value

Array: The messages delivered to this consumer group (if any).

Examples
/* Consume messages for 'mygroup', 'consumer1' */
$obj_redis->xReadGroup('mygroup', 'consumer1', ['s1' => 0, 's2' => 0]);

/* Consume messages for 'mygroup', 'consumer1' which were not consumed yet by the group */
$obj_redis->xReadGroup('mygroup', 'consumer1', ['s1' => '>', 's2' => '>']);

/* Read a single message as 'consumer2' wait for up to a second until a message arrives. */
$obj_redis->xReadGroup('mygroup', 'consumer2', ['s1' => 0, 's2' => 0], 1, 1000);

xRevRange


Prototype
$obj_redis->xRevRange($str_stream, $str_end, $str_start [, $i_count]);

Description: This is identical to xRange except the results come back in reverse order. Also note that Valkey reverses the order of "start" and "end".

Return value

Array: The messages in the range specified.

Example
$obj_redis->xRevRange('mystream', '+', '-');

xTrim


Prototype
$obj_redis->xTrim($str_stream, $i_max_len [, $boo_approximate]);

Description: Trim the stream length to a given maximum. If the "approximate" flag is pasesed, Valkey will use your size as a hint but only trim trees in whole nodes (this is more efficient).

Return value

long: The number of messages trimmed from the stream.

Example
/* Trim to exactly 100 messages */
$obj_redis->xTrim('mystream', 100);

/* Let Valkey approximate the trimming */
$obj_redis->xTrim('mystream', 100, true);