564 lines
21 KiB
PHP

<?php
/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/
namespace Symfony\Component\Messenger\Bridge\Doctrine\Transport;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\Driver\Exception as DriverException;
use Doctrine\DBAL\Driver\Result as DriverResult;
use Doctrine\DBAL\Exception as DBALException;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\LockMode;
use Doctrine\DBAL\Platforms\MySQLPlatform;
use Doctrine\DBAL\Platforms\OraclePlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Comparator;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaDiff;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Types;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Contracts\Service\ResetInterface;
/**
* @internal since Symfony 5.1
*
* @author Vincent Touzet <vincent.touzet@gmail.com>
* @author Kévin Dunglas <dunglas@gmail.com>
*/
class Connection implements ResetInterface
{
protected const TABLE_OPTION_NAME = '_symfony_messenger_table_name';
protected const DEFAULT_OPTIONS = [
'table_name' => 'messenger_messages',
'queue_name' => 'default',
'redeliver_timeout' => 3600,
'auto_setup' => true,
];
/**
* Configuration of the connection.
*
* Available options:
*
* * table_name: name of the table
* * connection: name of the Doctrine's entity manager
* * queue_name: name of the queue
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
*/
protected $configuration = [];
protected $driverConnection;
protected $queueEmptiedAt;
private $schemaSynchronizer;
private $autoSetup;
public function __construct(array $configuration, DBALConnection $driverConnection, ?SchemaSynchronizer $schemaSynchronizer = null)
{
$this->configuration = array_replace_recursive(static::DEFAULT_OPTIONS, $configuration);
$this->driverConnection = $driverConnection;
$this->schemaSynchronizer = $schemaSynchronizer;
$this->autoSetup = $this->configuration['auto_setup'];
}
public function reset()
{
$this->queueEmptiedAt = null;
}
public function getConfiguration(): array
{
return $this->configuration;
}
public static function buildConfiguration(string $dsn, array $options = []): array
{
if (false === $params = parse_url($dsn)) {
throw new InvalidArgumentException('The given Doctrine Messenger DSN is invalid.');
}
$query = [];
if (isset($params['query'])) {
parse_str($params['query'], $query);
}
$configuration = ['connection' => $params['host']];
$configuration += $query + $options + static::DEFAULT_OPTIONS;
$configuration['auto_setup'] = filter_var($configuration['auto_setup'], \FILTER_VALIDATE_BOOLEAN);
// check for extra keys in options
$optionsExtraKeys = array_diff(array_keys($options), array_keys(static::DEFAULT_OPTIONS));
if (0 < \count($optionsExtraKeys)) {
throw new InvalidArgumentException(sprintf('Unknown option found: [%s]. Allowed options are [%s].', implode(', ', $optionsExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
}
// check for extra keys in options
$queryExtraKeys = array_diff(array_keys($query), array_keys(static::DEFAULT_OPTIONS));
if (0 < \count($queryExtraKeys)) {
throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(static::DEFAULT_OPTIONS))));
}
return $configuration;
}
/**
* @param int $delay The delay in milliseconds
*
* @return string The inserted id
*
* @throws DBALException
*/
public function send(string $body, array $headers, int $delay = 0): string
{
$now = new \DateTime();
$availableAt = (clone $now)->modify(sprintf('%+d seconds', $delay / 1000));
$queryBuilder = $this->driverConnection->createQueryBuilder()
->insert($this->configuration['table_name'])
->values([
'body' => '?',
'headers' => '?',
'queue_name' => '?',
'created_at' => '?',
'available_at' => '?',
]);
$this->executeStatement($queryBuilder->getSQL(), [
$body,
json_encode($headers),
$this->configuration['queue_name'],
$now,
$availableAt,
], [
Types::STRING,
Types::STRING,
Types::STRING,
Types::DATETIME_MUTABLE,
Types::DATETIME_MUTABLE,
]);
return $this->driverConnection->lastInsertId();
}
public function get(): ?array
{
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
try {
$this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59']);
} catch (DriverException $e) {
// Ignore the exception
} catch (TableNotFoundException $e) {
if ($this->autoSetup) {
$this->setup();
}
}
}
get:
$this->driverConnection->beginTransaction();
try {
$query = $this->createAvailableMessagesQueryBuilder()
->orderBy('available_at', 'ASC')
->setMaxResults(1);
if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
$query->select('m.id');
}
// Append pessimistic write lock to FROM clause if db platform supports it
$sql = $query->getSQL();
// Wrap the rownum query in a sub-query to allow writelocks without ORA-02014 error
if ($this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
$query = $this->createQueryBuilder('w')
->where('w.id IN ('.str_replace('SELECT a.* FROM', 'SELECT a.id FROM', $sql).')')
->setParameters($query->getParameters(), $query->getParameterTypes());
if (method_exists(QueryBuilder::class, 'forUpdate')) {
$query->forUpdate();
}
$sql = $query->getSQL();
} elseif (method_exists(QueryBuilder::class, 'forUpdate')) {
$query->forUpdate();
try {
$sql = $query->getSQL();
} catch (DBALException $e) {
}
} elseif (preg_match('/FROM (.+) WHERE/', (string) $sql, $matches)) {
$fromClause = $matches[1];
$sql = str_replace(
sprintf('FROM %s WHERE', $fromClause),
sprintf('FROM %s WHERE', $this->driverConnection->getDatabasePlatform()->appendLockHint($fromClause, LockMode::PESSIMISTIC_WRITE)),
$sql
);
}
// use SELECT ... FOR UPDATE to lock table
if (!method_exists(QueryBuilder::class, 'forUpdate')) {
$sql .= ' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL();
}
$stmt = $this->executeQuery(
$sql,
$query->getParameters(),
$query->getParameterTypes()
);
$doctrineEnvelope = $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchAssociative() : $stmt->fetch();
if (false === $doctrineEnvelope) {
$this->driverConnection->commit();
$this->queueEmptiedAt = microtime(true) * 1000;
return null;
}
// Postgres can "group" notifications having the same channel and payload
// We need to be sure to empty the queue before blocking again
$this->queueEmptiedAt = null;
$doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);
$queryBuilder = $this->driverConnection->createQueryBuilder()
->update($this->configuration['table_name'])
->set('delivered_at', '?')
->where('id = ?');
$now = new \DateTime();
$this->executeStatement($queryBuilder->getSQL(), [
$now,
$doctrineEnvelope['id'],
], [
Types::DATETIME_MUTABLE,
]);
$this->driverConnection->commit();
return $doctrineEnvelope;
} catch (\Throwable $e) {
$this->driverConnection->rollBack();
if ($this->autoSetup && $e instanceof TableNotFoundException) {
$this->setup();
goto get;
}
throw $e;
}
}
public function ack(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
}
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function reject(string $id): bool
{
try {
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31 23:59:59'], ['id' => $id]) > 0;
}
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
} catch (DBALException $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
}
public function setup(): void
{
$configuration = $this->driverConnection->getConfiguration();
$assetFilter = $configuration->getSchemaAssetsFilter();
$configuration->setSchemaAssetsFilter(static function () { return true; });
$this->updateSchema();
$configuration->setSchemaAssetsFilter($assetFilter);
$this->autoSetup = false;
}
public function getMessageCount(): int
{
$queryBuilder = $this->createAvailableMessagesQueryBuilder()
->select('COUNT(m.id) AS message_count')
->setMaxResults(1);
$stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
return $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchOne() : $stmt->fetchColumn();
}
public function findAll(?int $limit = null): array
{
$queryBuilder = $this->createAvailableMessagesQueryBuilder();
if (null !== $limit) {
$queryBuilder->setMaxResults($limit);
}
$stmt = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes());
$data = $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchAllAssociative() : $stmt->fetchAll();
return array_map(function ($doctrineEnvelope) {
return $this->decodeEnvelopeHeaders($doctrineEnvelope);
}, $data);
}
public function find($id): ?array
{
$queryBuilder = $this->createQueryBuilder()
->where('m.id = ? and m.queue_name = ?');
$stmt = $this->executeQuery($queryBuilder->getSQL(), [$id, $this->configuration['queue_name']]);
$data = $stmt instanceof Result || $stmt instanceof DriverResult ? $stmt->fetchAssociative() : $stmt->fetch();
return false === $data ? null : $this->decodeEnvelopeHeaders($data);
}
/**
* @internal
*/
public function configureSchema(Schema $schema, DBALConnection $forConnection): void
{
// only update the schema for this connection
if ($forConnection !== $this->driverConnection) {
return;
}
if ($schema->hasTable($this->configuration['table_name'])) {
return;
}
$this->addTableToSchema($schema);
}
/**
* @internal
*/
public function getExtraSetupSqlForTable(Table $createdTable): array
{
return [];
}
private function createAvailableMessagesQueryBuilder(): QueryBuilder
{
$now = new \DateTime();
$redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
return $this->createQueryBuilder()
->where('m.queue_name = ?')
->andWhere('m.delivered_at is null OR m.delivered_at < ?')
->andWhere('m.available_at <= ?')
->setParameters([
$this->configuration['queue_name'],
$redeliverLimit,
$now,
], [
Types::STRING,
Types::DATETIME_MUTABLE,
Types::DATETIME_MUTABLE,
]);
}
private function createQueryBuilder(string $alias = 'm'): QueryBuilder
{
$queryBuilder = $this->driverConnection->createQueryBuilder()
->from($this->configuration['table_name'], $alias);
$alias .= '.';
if (!$this->driverConnection->getDatabasePlatform() instanceof OraclePlatform) {
return $queryBuilder->select($alias.'*');
}
// Oracle databases use UPPER CASE on tables and column identifiers.
// Column alias is added to force the result to be lowercase even when the actual field is all caps.
return $queryBuilder->select(str_replace(', ', ', '.$alias,
$alias.'id AS "id", body AS "body", headers AS "headers", queue_name AS "queue_name", '.
'created_at AS "created_at", available_at AS "available_at", '.
'delivered_at AS "delivered_at"'
));
}
private function executeQuery(string $sql, array $parameters = [], array $types = [])
{
try {
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
} catch (TableNotFoundException $e) {
if ($this->driverConnection->isTransactionActive()) {
throw $e;
}
// create table
if ($this->autoSetup) {
$this->setup();
}
$stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
}
return $stmt;
}
protected function executeStatement(string $sql, array $parameters = [], array $types = [])
{
try {
if (method_exists($this->driverConnection, 'executeStatement')) {
$stmt = $this->driverConnection->executeStatement($sql, $parameters, $types);
} else {
$stmt = $this->driverConnection->executeUpdate($sql, $parameters, $types);
}
} catch (TableNotFoundException $e) {
if ($this->driverConnection->isTransactionActive()) {
throw $e;
}
// create table
if ($this->autoSetup) {
$this->setup();
}
if (method_exists($this->driverConnection, 'executeStatement')) {
$stmt = $this->driverConnection->executeStatement($sql, $parameters, $types);
} else {
$stmt = $this->driverConnection->executeUpdate($sql, $parameters, $types);
}
}
return $stmt;
}
private function getSchema(): Schema
{
$schema = new Schema([], [], $this->createSchemaManager()->createSchemaConfig());
$this->addTableToSchema($schema);
return $schema;
}
private function addTableToSchema(Schema $schema): void
{
$table = $schema->createTable($this->configuration['table_name']);
// add an internal option to mark that we created this & the non-namespaced table name
$table->addOption(self::TABLE_OPTION_NAME, $this->configuration['table_name']);
$table->addColumn('id', Types::BIGINT)
->setAutoincrement(true)
->setNotnull(true);
$table->addColumn('body', Types::TEXT)
->setNotnull(true);
$table->addColumn('headers', Types::TEXT)
->setNotnull(true);
$table->addColumn('queue_name', Types::STRING)
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
->setNotnull(true);
$table->addColumn('created_at', Types::DATETIME_MUTABLE)
->setNotnull(true);
$table->addColumn('available_at', Types::DATETIME_MUTABLE)
->setNotnull(true);
$table->addColumn('delivered_at', Types::DATETIME_MUTABLE)
->setNotnull(false);
$table->setPrimaryKey(['id']);
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
$table->addIndex(['delivered_at']);
}
private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
{
$doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
return $doctrineEnvelope;
}
private function updateSchema(): void
{
if (null !== $this->schemaSynchronizer) {
$this->schemaSynchronizer->updateSchema($this->getSchema(), true);
return;
}
$schemaManager = $this->createSchemaManager();
$comparator = $this->createComparator($schemaManager);
$schemaDiff = $this->compareSchemas($comparator, method_exists($schemaManager, 'introspectSchema') ? $schemaManager->introspectSchema() : $schemaManager->createSchema(), $this->getSchema());
$platform = $this->driverConnection->getDatabasePlatform();
$exec = method_exists($this->driverConnection, 'executeStatement') ? 'executeStatement' : 'exec';
if (!method_exists(SchemaDiff::class, 'getCreatedSchemas')) {
foreach ($schemaDiff->toSaveSql($platform) as $sql) {
$this->driverConnection->$exec($sql);
}
return;
}
if ($platform->supportsSchemas()) {
foreach ($schemaDiff->getCreatedSchemas() as $schema) {
$this->driverConnection->$exec($platform->getCreateSchemaSQL($schema));
}
}
if ($platform->supportsSequences()) {
foreach ($schemaDiff->getAlteredSequences() as $sequence) {
$this->driverConnection->$exec($platform->getAlterSequenceSQL($sequence));
}
foreach ($schemaDiff->getCreatedSequences() as $sequence) {
$this->driverConnection->$exec($platform->getCreateSequenceSQL($sequence));
}
}
foreach ($platform->getCreateTablesSQL($schemaDiff->getCreatedTables()) as $sql) {
$this->driverConnection->$exec($sql);
}
foreach ($schemaDiff->getAlteredTables() as $tableDiff) {
foreach ($platform->getAlterTableSQL($tableDiff) as $sql) {
$this->driverConnection->$exec($sql);
}
}
}
private function createSchemaManager(): AbstractSchemaManager
{
return method_exists($this->driverConnection, 'createSchemaManager')
? $this->driverConnection->createSchemaManager()
: $this->driverConnection->getSchemaManager();
}
private function createComparator(AbstractSchemaManager $schemaManager): Comparator
{
return method_exists($schemaManager, 'createComparator')
? $schemaManager->createComparator()
: new Comparator();
}
private function compareSchemas(Comparator $comparator, Schema $from, Schema $to): SchemaDiff
{
return method_exists($comparator, 'compareSchemas') || method_exists($comparator, 'doCompareSchemas')
? $comparator->compareSchemas($from, $to)
: $comparator->compare($from, $to);
}
}
if (!class_exists(\Symfony\Component\Messenger\Transport\Doctrine\Connection::class, false)) {
class_alias(Connection::class, \Symfony\Component\Messenger\Transport\Doctrine\Connection::class);
}