2023-05-12 15:06:01 +02:00
< ? php
/*
* This file is part of the Symfony package .
*
* ( c ) Fabien Potencier < fabien @ symfony . com >
*
* For the full copyright and license information , please view the LICENSE
* file that was distributed with this source code .
*/
namespace Symfony\Component\Messenger\Bridge\Redis\Transport ;
use Symfony\Component\Messenger\Exception\InvalidArgumentException ;
use Symfony\Component\Messenger\Exception\LogicException ;
use Symfony\Component\Messenger\Exception\TransportException ;
/**
* A Redis connection .
*
* @ author Alexander Schranz < alexander @ sulu . io >
* @ author Antoine Bluchet < soyuka @ gmail . com >
* @ author Robin Chalas < robin . chalas @ gmail . com >
*
* @ internal
2024-05-17 20:26:01 +02:00
*
2023-05-12 15:06:01 +02:00
* @ final
*/
class Connection
{
private const DEFAULT_OPTIONS = [
'stream' => 'messages' ,
'group' => 'symfony' ,
'consumer' => 'consumer' ,
'auto_setup' => true ,
'delete_after_ack' => false ,
'delete_after_reject' => true ,
'stream_max_entries' => 0 , // any value higher than 0 defines an approximate maximum number of stream entries
'dbindex' => 0 ,
'tls' => false ,
'redeliver_timeout' => 3600 , // Timeout before redeliver messages still in pending state (seconds)
'claim_interval' => 60000 , // Interval by which pending/abandoned messages should be checked
'lazy' => false ,
'auth' => null ,
'serializer' => \Redis :: SERIALIZER_PHP ,
];
private $connection ;
private $stream ;
private $queue ;
private $group ;
private $consumer ;
private $autoSetup ;
private $maxEntries ;
private $redeliverTimeout ;
private $nextClaim = 0.0 ;
private $claimInterval ;
private $deleteAfterAck ;
private $deleteAfterReject ;
private $couldHavePendingMessages = true ;
/**
* @ param \Redis | \RedisCluster | null $redis
*/
public function __construct ( array $configuration , array $connectionCredentials = [], array $redisOptions = [], $redis = null )
{
if ( version_compare ( phpversion ( 'redis' ), '4.3.0' , '<' )) {
throw new LogicException ( 'The redis transport requires php-redis 4.3.0 or higher.' );
}
$host = $connectionCredentials [ 'host' ] ? ? '127.0.0.1' ;
$port = $connectionCredentials [ 'port' ] ? ? 6379 ;
$serializer = $redisOptions [ 'serializer' ] ? ? \Redis :: SERIALIZER_PHP ;
$dbIndex = $configuration [ 'dbindex' ] ? ? self :: DEFAULT_OPTIONS [ 'dbindex' ];
$auth = $connectionCredentials [ 'auth' ] ? ? null ;
if ( '' === $auth ) {
$auth = null ;
}
$lazy = $configuration [ 'lazy' ] ? ? self :: DEFAULT_OPTIONS [ 'lazy' ];
if ( \is_array ( $host ) || $redis instanceof \RedisCluster ) {
$hosts = \is_string ( $host ) ? [ $host . ':' . $port ] : $host ; // Always ensure we have an array
$initializer = static function ( $redis ) use ( $hosts , $auth , $serializer ) {
return self :: initializeRedisCluster ( $redis , $hosts , $auth , $serializer );
};
$redis = $lazy ? new RedisClusterProxy ( $redis , $initializer ) : $initializer ( $redis );
} else {
$redis = $redis ? ? new \Redis ();
$initializer = static function ( $redis ) use ( $host , $port , $auth , $serializer , $dbIndex ) {
return self :: initializeRedis ( $redis , $host , $port , $auth , $serializer , $dbIndex );
};
$redis = $lazy ? new RedisProxy ( $redis , $initializer ) : $initializer ( $redis );
}
$this -> connection = $redis ;
foreach ([ 'stream' , 'group' , 'consumer' ] as $key ) {
if ( isset ( $configuration [ $key ]) && '' === $configuration [ $key ]) {
throw new InvalidArgumentException ( sprintf ( '"%s" should be configured, got an empty string.' , $key ));
}
}
$this -> stream = $configuration [ 'stream' ] ? ? self :: DEFAULT_OPTIONS [ 'stream' ];
$this -> group = $configuration [ 'group' ] ? ? self :: DEFAULT_OPTIONS [ 'group' ];
$this -> consumer = $configuration [ 'consumer' ] ? ? self :: DEFAULT_OPTIONS [ 'consumer' ];
$this -> queue = $this -> stream . '__queue' ;
$this -> autoSetup = $configuration [ 'auto_setup' ] ? ? self :: DEFAULT_OPTIONS [ 'auto_setup' ];
$this -> maxEntries = $configuration [ 'stream_max_entries' ] ? ? self :: DEFAULT_OPTIONS [ 'stream_max_entries' ];
$this -> deleteAfterAck = $configuration [ 'delete_after_ack' ] ? ? self :: DEFAULT_OPTIONS [ 'delete_after_ack' ];
$this -> deleteAfterReject = $configuration [ 'delete_after_reject' ] ? ? self :: DEFAULT_OPTIONS [ 'delete_after_reject' ];
$this -> redeliverTimeout = ( $configuration [ 'redeliver_timeout' ] ? ? self :: DEFAULT_OPTIONS [ 'redeliver_timeout' ]) * 1000 ;
$this -> claimInterval = ( $configuration [ 'claim_interval' ] ? ? self :: DEFAULT_OPTIONS [ 'claim_interval' ]) / 1000 ;
}
/**
* @ param string | string [] | null $auth
*/
private static function initializeRedis ( \Redis $redis , string $host , int $port , $auth , int $serializer , int $dbIndex ) : \Redis
{
2024-05-17 20:26:01 +02:00
if ( $redis -> isConnected ()) {
return $redis ;
}
2023-05-12 15:06:01 +02:00
$redis -> connect ( $host , $port );
$redis -> setOption ( \Redis :: OPT_SERIALIZER , $serializer );
if ( null !== $auth && ! $redis -> auth ( $auth )) {
throw new InvalidArgumentException ( 'Redis connection failed: ' . $redis -> getLastError ());
}
if ( $dbIndex && ! $redis -> select ( $dbIndex )) {
throw new InvalidArgumentException ( 'Redis connection failed: ' . $redis -> getLastError ());
}
return $redis ;
}
/**
* @ param string | string [] | null $auth
*/
private static function initializeRedisCluster ( ? \RedisCluster $redis , array $hosts , $auth , int $serializer ) : \RedisCluster
{
if ( null === $redis ) {
$redis = new \RedisCluster ( null , $hosts , 0.0 , 0.0 , false , $auth );
}
$redis -> setOption ( \Redis :: OPT_SERIALIZER , $serializer );
return $redis ;
}
/**
* @ param \Redis | \RedisCluster | null $redis
*/
public static function fromDsn ( string $dsn , array $redisOptions = [], $redis = null ) : self
{
if ( false === strpos ( $dsn , ',' )) {
2024-05-17 20:26:01 +02:00
$params = self :: parseDsn ( $dsn , $redisOptions );
2023-05-12 15:06:01 +02:00
} else {
$dsns = explode ( ',' , $dsn );
$parsedUrls = array_map ( function ( $dsn ) use ( & $redisOptions ) {
return self :: parseDsn ( $dsn , $redisOptions );
}, $dsns );
// Merge all the URLs, the last one overrides the previous ones
2024-05-17 20:26:01 +02:00
$params = array_merge ( ... $parsedUrls );
2023-05-12 15:06:01 +02:00
// Regroup all the hosts in an array interpretable by RedisCluster
2024-05-17 20:26:01 +02:00
$params [ 'host' ] = array_map ( function ( $parsedUrl ) {
2023-05-12 15:06:01 +02:00
if ( ! isset ( $parsedUrl [ 'host' ])) {
2024-05-17 20:26:01 +02:00
throw new InvalidArgumentException ( 'Missing host in DSN, it must be defined when using Redis Cluster.' );
2023-05-12 15:06:01 +02:00
}
return $parsedUrl [ 'host' ] . ':' . ( $parsedUrl [ 'port' ] ? ? 6379 );
}, $parsedUrls , $dsns );
}
self :: validateOptions ( $redisOptions );
$autoSetup = null ;
if ( \array_key_exists ( 'auto_setup' , $redisOptions )) {
$autoSetup = filter_var ( $redisOptions [ 'auto_setup' ], \FILTER_VALIDATE_BOOLEAN );
unset ( $redisOptions [ 'auto_setup' ]);
}
$maxEntries = null ;
if ( \array_key_exists ( 'stream_max_entries' , $redisOptions )) {
$maxEntries = filter_var ( $redisOptions [ 'stream_max_entries' ], \FILTER_VALIDATE_INT );
unset ( $redisOptions [ 'stream_max_entries' ]);
}
$deleteAfterAck = null ;
if ( \array_key_exists ( 'delete_after_ack' , $redisOptions )) {
$deleteAfterAck = filter_var ( $redisOptions [ 'delete_after_ack' ], \FILTER_VALIDATE_BOOLEAN );
unset ( $redisOptions [ 'delete_after_ack' ]);
} else {
trigger_deprecation ( 'symfony/redis-messenger' , '5.4' , 'Not setting the "delete_after_ack" boolean option explicitly is deprecated, its default value will change to true in 6.0.' );
}
$deleteAfterReject = null ;
if ( \array_key_exists ( 'delete_after_reject' , $redisOptions )) {
$deleteAfterReject = filter_var ( $redisOptions [ 'delete_after_reject' ], \FILTER_VALIDATE_BOOLEAN );
unset ( $redisOptions [ 'delete_after_reject' ]);
}
$dbIndex = null ;
if ( \array_key_exists ( 'dbindex' , $redisOptions )) {
$dbIndex = filter_var ( $redisOptions [ 'dbindex' ], \FILTER_VALIDATE_INT );
unset ( $redisOptions [ 'dbindex' ]);
}
2024-05-17 20:26:01 +02:00
$tls = 'rediss' === $params [ 'scheme' ];
2023-05-12 15:06:01 +02:00
if ( \array_key_exists ( 'tls' , $redisOptions )) {
trigger_deprecation ( 'symfony/redis-messenger' , '5.3' , 'Providing "tls" parameter is deprecated, use "rediss://" DSN scheme instead' );
$tls = filter_var ( $redisOptions [ 'tls' ], \FILTER_VALIDATE_BOOLEAN );
unset ( $redisOptions [ 'tls' ]);
}
$redeliverTimeout = null ;
if ( \array_key_exists ( 'redeliver_timeout' , $redisOptions )) {
$redeliverTimeout = filter_var ( $redisOptions [ 'redeliver_timeout' ], \FILTER_VALIDATE_INT );
unset ( $redisOptions [ 'redeliver_timeout' ]);
}
$claimInterval = null ;
if ( \array_key_exists ( 'claim_interval' , $redisOptions )) {
$claimInterval = filter_var ( $redisOptions [ 'claim_interval' ], \FILTER_VALIDATE_INT );
unset ( $redisOptions [ 'claim_interval' ]);
}
$configuration = [
'stream' => $redisOptions [ 'stream' ] ? ? null ,
'group' => $redisOptions [ 'group' ] ? ? null ,
'consumer' => $redisOptions [ 'consumer' ] ? ? null ,
'lazy' => $redisOptions [ 'lazy' ] ? ? self :: DEFAULT_OPTIONS [ 'lazy' ],
'auto_setup' => $autoSetup ,
'stream_max_entries' => $maxEntries ,
'delete_after_ack' => $deleteAfterAck ,
'delete_after_reject' => $deleteAfterReject ,
'dbindex' => $dbIndex ,
'redeliver_timeout' => $redeliverTimeout ,
'claim_interval' => $claimInterval ,
];
2024-05-17 20:26:01 +02:00
if ( isset ( $params [ 'host' ])) {
$user = isset ( $params [ 'user' ]) && '' !== $params [ 'user' ] ? rawurldecode ( $params [ 'user' ]) : null ;
$pass = isset ( $params [ 'pass' ]) && '' !== $params [ 'pass' ] ? rawurldecode ( $params [ 'pass' ]) : null ;
2023-05-12 15:06:01 +02:00
$connectionCredentials = [
2024-05-17 20:26:01 +02:00
'host' => $params [ 'host' ],
'port' => $params [ 'port' ] ? ? 6379 ,
2023-05-12 15:06:01 +02:00
// See: https://github.com/phpredis/phpredis/#auth
'auth' => $redisOptions [ 'auth' ] ? ? ( null !== $pass && null !== $user ? [ $user , $pass ] : ( $pass ? ? $user )),
];
2024-05-17 20:26:01 +02:00
$pathParts = explode ( '/' , rtrim ( $params [ 'path' ] ? ? '' , '/' ));
2023-05-12 15:06:01 +02:00
$configuration [ 'stream' ] = $pathParts [ 1 ] ? ? $configuration [ 'stream' ];
$configuration [ 'group' ] = $pathParts [ 2 ] ? ? $configuration [ 'group' ];
$configuration [ 'consumer' ] = $pathParts [ 3 ] ? ? $configuration [ 'consumer' ];
if ( $tls ) {
$connectionCredentials [ 'host' ] = 'tls://' . $connectionCredentials [ 'host' ];
}
} else {
$connectionCredentials = [
2024-05-17 20:26:01 +02:00
'host' => $params [ 'path' ],
2023-05-12 15:06:01 +02:00
'port' => 0 ,
];
}
return new self ( $configuration , $connectionCredentials , $redisOptions , $redis );
}
private static function parseDsn ( string $dsn , array & $redisOptions ) : array
{
$url = $dsn ;
$scheme = 0 === strpos ( $dsn , 'rediss:' ) ? 'rediss' : 'redis' ;
if ( preg_match ( '#^' . $scheme . ':///([^:@])+$#' , $dsn )) {
$url = str_replace ( $scheme . ':' , 'file:' , $dsn );
}
2024-05-17 20:26:01 +02:00
if ( false === $params = parse_url ( $url )) {
throw new InvalidArgumentException ( 'The given Redis DSN is invalid.' );
2023-05-12 15:06:01 +02:00
}
2024-05-17 20:26:01 +02:00
if ( isset ( $params [ 'query' ])) {
parse_str ( $params [ 'query' ], $dsnOptions );
2023-05-12 15:06:01 +02:00
$redisOptions = array_merge ( $redisOptions , $dsnOptions );
}
2024-05-17 20:26:01 +02:00
return $params ;
2023-05-12 15:06:01 +02:00
}
private static function validateOptions ( array $options ) : void
{
$availableOptions = array_keys ( self :: DEFAULT_OPTIONS );
if ( 0 < \count ( $invalidOptions = array_diff ( array_keys ( $options ), $availableOptions ))) {
trigger_deprecation ( 'symfony/messenger' , '5.1' , 'Invalid option(s) "%s" passed to the Redis Messenger transport. Passing invalid options is deprecated.' , implode ( '", "' , $invalidOptions ));
}
}
private function claimOldPendingMessages ()
{
try {
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
// https://github.com/antirez/redis/issues/6256
$pendingMessages = $this -> connection -> xpending ( $this -> stream , $this -> group , '-' , '+' , 1 );
} catch ( \RedisException $e ) {
throw new TransportException ( $e -> getMessage (), 0 , $e );
}
$claimableIds = [];
foreach ( $pendingMessages as $pendingMessage ) {
if ( $pendingMessage [ 1 ] === $this -> consumer ) {
$this -> couldHavePendingMessages = true ;
return ;
}
if ( $pendingMessage [ 2 ] >= $this -> redeliverTimeout ) {
$claimableIds [] = $pendingMessage [ 0 ];
}
}
if ( \count ( $claimableIds ) > 0 ) {
try {
$this -> connection -> xclaim (
$this -> stream ,
$this -> group ,
$this -> consumer ,
$this -> redeliverTimeout ,
$claimableIds ,
[ 'JUSTID' ]
);
$this -> couldHavePendingMessages = true ;
} catch ( \RedisException $e ) {
throw new TransportException ( $e -> getMessage (), 0 , $e );
}
}
$this -> nextClaim = microtime ( true ) + $this -> claimInterval ;
}
public function get () : ? array
{
if ( $this -> autoSetup ) {
$this -> setup ();
}
$now = microtime ();
$now = substr ( $now , 11 ) . substr ( $now , 2 , 3 );
2024-05-17 20:26:01 +02:00
$queuedMessageCount = $this -> rawCommand ( 'ZCOUNT' , 0 , $now ) ? ? 0 ;
2023-05-12 15:06:01 +02:00
while ( $queuedMessageCount -- ) {
if ( ! $message = $this -> rawCommand ( 'ZPOPMIN' , 1 )) {
break ;
}
[ $queuedMessage , $expiry ] = $message ;
if ( \strlen ( $expiry ) === \strlen ( $now ) ? $expiry > $now : \strlen ( $expiry ) < \strlen ( $now )) {
// if a future-placed message is popped because of a race condition with
// another running consumer, the message is readded to the queue
if ( ! $this -> rawCommand ( 'ZADD' , 'NX' , $expiry , $queuedMessage )) {
throw new TransportException ( 'Could not add a message to the redis stream.' );
}
break ;
}
$decodedQueuedMessage = json_decode ( $queuedMessage , true );
$this -> add ( \array_key_exists ( 'body' , $decodedQueuedMessage ) ? $decodedQueuedMessage [ 'body' ] : $queuedMessage , $decodedQueuedMessage [ 'headers' ] ? ? [], 0 );
}
if ( ! $this -> couldHavePendingMessages && $this -> nextClaim <= microtime ( true )) {
$this -> claimOldPendingMessages ();
}
$messageId = '>' ; // will receive new messages
if ( $this -> couldHavePendingMessages ) {
$messageId = '0' ; // will receive consumers pending messages
}
try {
$messages = $this -> connection -> xreadgroup (
$this -> group ,
$this -> consumer ,
[ $this -> stream => $messageId ],
1
);
} catch ( \RedisException $e ) {
throw new TransportException ( $e -> getMessage (), 0 , $e );
}
if ( false === $messages ) {
if ( $error = $this -> connection -> getLastError () ? : null ) {
$this -> connection -> clearLastError ();
}
throw new TransportException ( $error ? ? 'Could not read messages from the redis stream.' );
}
if ( $this -> couldHavePendingMessages && empty ( $messages [ $this -> stream ])) {
$this -> couldHavePendingMessages = false ;
// No pending messages so get a new one
return $this -> get ();
}
foreach ( $messages [ $this -> stream ] ? ? [] as $key => $message ) {
return [
'id' => $key ,
'data' => $message ,
];
}
return null ;
}
public function ack ( string $id ) : void
{
try {
$acknowledged = $this -> connection -> xack ( $this -> stream , $this -> group , [ $id ]);
if ( $this -> deleteAfterAck ) {
$acknowledged = $this -> connection -> xdel ( $this -> stream , [ $id ]);
}
} catch ( \RedisException $e ) {
throw new TransportException ( $e -> getMessage (), 0 , $e );
}
if ( ! $acknowledged ) {
if ( $error = $this -> connection -> getLastError () ? : null ) {
$this -> connection -> clearLastError ();
}
throw new TransportException ( $error ? ? sprintf ( 'Could not acknowledge redis message "%s".' , $id ));
}
}
public function reject ( string $id ) : void
{
try {
$deleted = $this -> connection -> xack ( $this -> stream , $this -> group , [ $id ]);
if ( $this -> deleteAfterReject ) {
$deleted = $this -> connection -> xdel ( $this -> stream , [ $id ]) && $deleted ;
}
} catch ( \RedisException $e ) {
throw new TransportException ( $e -> getMessage (), 0 , $e );
}
if ( ! $deleted ) {
if ( $error = $this -> connection -> getLastError () ? : null ) {
$this -> connection -> clearLastError ();
}
throw new TransportException ( $error ? ? sprintf ( 'Could not delete message "%s" from the redis stream.' , $id ));
}
}
public function add ( string $body , array $headers , int $delayInMs = 0 ) : void
{
if ( $this -> autoSetup ) {
$this -> setup ();
}
try {
if ( $delayInMs > 0 ) { // the delay is <= 0 for queued messages
$message = json_encode ([
'body' => $body ,
'headers' => $headers ,
// Entry need to be unique in the sorted set else it would only be added once to the delayed messages queue
'uniqid' => uniqid ( '' , true ),
]);
if ( false === $message ) {
throw new TransportException ( json_last_error_msg ());
}
$now = explode ( ' ' , microtime (), 2 );
$now [ 0 ] = str_pad ( $delayInMs + substr ( $now [ 0 ], 2 , 3 ), 3 , '0' , \STR_PAD_LEFT );
if ( 3 < \strlen ( $now [ 0 ])) {
$now [ 1 ] += substr ( $now [ 0 ], 0 , - 3 );
$now [ 0 ] = substr ( $now [ 0 ], - 3 );
if ( \is_float ( $now [ 1 ])) {
throw new TransportException ( " Message delay is too big: { $delayInMs } ms. " );
}
}
$added = $this -> rawCommand ( 'ZADD' , 'NX' , $now [ 1 ] . $now [ 0 ], $message );
} else {
$message = json_encode ([
'body' => $body ,
'headers' => $headers ,
]);
if ( false === $message ) {
throw new TransportException ( json_last_error_msg ());
}
if ( $this -> maxEntries ) {
$added = $this -> connection -> xadd ( $this -> stream , '*' , [ 'message' => $message ], $this -> maxEntries , true );
} else {
$added = $this -> connection -> xadd ( $this -> stream , '*' , [ 'message' => $message ]);
}
}
} catch ( \RedisException $e ) {
if ( $error = $this -> connection -> getLastError () ? : null ) {
$this -> connection -> clearLastError ();
}
throw new TransportException ( $error ? ? $e -> getMessage (), 0 , $e );
}
if ( ! $added ) {
if ( $error = $this -> connection -> getLastError () ? : null ) {
$this -> connection -> clearLastError ();
}
throw new TransportException ( $error ? ? 'Could not add a message to the redis stream.' );
}
}
public function setup () : void
{
try {
$this -> connection -> xgroup ( 'CREATE' , $this -> stream , $this -> group , 0 , true );
} catch ( \RedisException $e ) {
throw new TransportException ( $e -> getMessage (), 0 , $e );
}
// group might already exist, ignore
if ( $this -> connection -> getLastError ()) {
$this -> connection -> clearLastError ();
}
if ( $this -> deleteAfterAck || $this -> deleteAfterReject ) {
$groups = $this -> connection -> xinfo ( 'GROUPS' , $this -> stream );
if (
// support for Redis extension version 5+
( \is_array ( $groups ) && 1 < \count ( $groups ))
// support for Redis extension version 4.x
|| ( \is_string ( $groups ) && substr_count ( $groups , '"name"' ))
) {
throw new LogicException ( sprintf ( 'More than one group exists for stream "%s", delete_after_ack and delete_after_reject cannot be enabled as it risks deleting messages before all groups could consume them.' , $this -> stream ));
}
}
$this -> autoSetup = false ;
}
private function getCurrentTimeInMilliseconds () : int
{
return ( int ) ( microtime ( true ) * 1000 );
}
public function cleanup () : void
{
static $unlink = true ;
if ( $unlink ) {
try {
$unlink = false !== $this -> connection -> unlink ( $this -> stream , $this -> queue );
} catch ( \Throwable $e ) {
$unlink = false ;
}
}
if ( ! $unlink ) {
$this -> connection -> del ( $this -> stream , $this -> queue );
}
}
/**
* @ return mixed
*/
private function rawCommand ( string $command , ... $arguments )
{
try {
if ( $this -> connection instanceof \RedisCluster || $this -> connection instanceof RedisClusterProxy ) {
$result = $this -> connection -> rawCommand ( $this -> queue , $command , $this -> queue , ... $arguments );
} else {
$result = $this -> connection -> rawCommand ( $command , $this -> queue , ... $arguments );
}
} catch ( \RedisException $e ) {
throw new TransportException ( $e -> getMessage (), 0 , $e );
}
if ( false === $result ) {
if ( $error = $this -> connection -> getLastError () ? : null ) {
$this -> connection -> clearLastError ();
}
throw new TransportException ( $error ? ? sprintf ( 'Could not run "%s" on Redis queue.' , $command ));
}
return $result ;
}
}
if ( ! class_exists ( \Symfony\Component\Messenger\Transport\RedisExt\Connection :: class , false )) {
class_alias ( Connection :: class , \Symfony\Component\Messenger\Transport\RedisExt\Connection :: class );
}