* @copyright 2011 Vasil Rangelov * @license http://www.gnu.org/copyleft/lesser.html LGPL License 2.1 * @version 1.0.0b2 * @link http://pear2.php.net/PEAR2_Net_Transmitter */ /** * The namespace declaration. */ namespace PEAR2\Net\Transmitter; /** * Used for managing persistent connections. */ use PEAR2\Cache\SHM; /** * Used for matching arbitrary exceptions in * {@link TcpClient::createException()} and releasing locks properly. */ use Exception as E; /** * A socket transmitter. * * This is a convenience wrapper for socket functionality. Used to ensure data * integrity. * * @category Net * @package PEAR2_Net_Transmitter * @author Vasil Rangelov * @license http://www.gnu.org/copyleft/lesser.html LGPL License 2.1 * @link http://pear2.php.net/PEAR2_Net_Transmitter */ class TcpClient extends NetworkStream { /** * The error code of the last error on the socket. * * @var int */ protected $errorNo = 0; /** * The error message of the last error on the socket. * * @var string */ protected $errorStr = null; /** * Persistent connection handler. * * Remains NULL for non-persistent connections. * * @var SHM */ protected $shmHandler = null; /** * An array with all connections from this PHP request (as keys) * and their lock state (as a value). * * @var array */ protected static $lockState = array(); /** * Mappings from a protocol name to an URI scheme. * * @var array */ protected static $cryptoScheme = array( parent::CRYPTO_OFF => 'tcp', parent::CRYPTO_SSL2 => 'sslv2', parent::CRYPTO_SSL3 => 'sslv3', parent::CRYPTO_SSL => 'ssl', parent::CRYPTO_TLS => 'tls' ); /** * The URI of this connection. * * @var string */ protected $uri; /** * Creates a new connection with the specified options. * * @param string $host Hostname (IP or domain) of the server. * @param int $port The port on the server. * @param bool $persist Whether or not the connection should be a * persistent one. * @param float $timeout The timeout for the connection. * @param string $key A string that uniquely identifies the * connection. Ignored for non-persistent connections. * @param string $crypto Encryption setting. Must be one of the * NetworkStream::CRYPTO_* constants. By default, encryption is * disabled. If the setting has an associated scheme for it, it will be * used, and if not, the setting will be adjusted right after the * connection is established. * @param resource $context A context for the socket. */ public function __construct( $host, $port, $persist = false, $timeout = null, $key = '', $crypto = parent::CRYPTO_OFF, $context = null ) { $this->streamType = '_CLIENT'; if (strpos($host, ':') !== false) { $host = "[{$host}]"; } $timeout = null == $timeout ? ini_get('default_socket_timeout') : $timeout; if (null === $context) { $context = stream_context_get_default(); } elseif ((!is_resource($context)) || ('stream-context' !== get_resource_type($context)) ) { throw $this->createException('Invalid context supplied.', 6); } $hasCryptoScheme = array_key_exists($crypto, static::$cryptoScheme); $scheme = $hasCryptoScheme ? static::$cryptoScheme[$crypto] : 'tcp'; $flags = STREAM_CLIENT_CONNECT; if ($persist) { $flags |= STREAM_CLIENT_PERSISTENT; $key = rawurlencode($key); $this->uri = "{$scheme}://{$host}:{$port}/{$key}"; } else { $this->uri = "{$scheme}://{$host}:{$port}"; } set_error_handler(array($this, 'handleError')); try { parent::__construct( stream_socket_client( $this->uri, $this->errorNo, $this->errorStr, $timeout, $flags, $context ) ); restore_error_handler(); } catch (E $e) { restore_error_handler(); if (0 === $this->errorNo) { throw $this->createException( 'Failed to initialize socket.', 7, $e ); } throw $this->createException( 'Failed to connect with socket.', 8, $e ); } if ($hasCryptoScheme) { $this->crypto = $crypto; } elseif (parent::CRYPTO_OFF !== $crypto) { $this->setCrypto($crypto); } if (parent::CRYPTO_OFF !== $crypto) { $this->setIsBlocking(false); } if ($persist) { $this->shmHandler = SHM::factory( __CLASS__ . ' ' . $this->uri . ' ' ); self::$lockState[$this->uri] = self::DIRECTION_NONE; } } /** * Creates a new exception. * * Creates a new exception. Used by the rest of the functions in this class. * * @param string $message The exception message. * @param int $code The exception code. * @param E|null $previous Previous exception thrown, * or NULL if there is none. * @param int|string|resource|null $fragment The fragment up until the * point of failure. * On failure with sending, this is the number of bytes sent * successfully before the failure. * On failure when receiving, this is a string/stream holding * the contents received successfully before the failure. * * @return SocketException The exception to then be thrown. */ protected function createException( $message, $code = 0, E $previous = null, $fragment = null ) { return new SocketException( $message, $code, $previous, $fragment, $this->errorNo, $this->errorStr ); } /** * Locks transmission. * * Locks transmission in one or more directions. Useful when dealing with * persistent connections. Note that every send/receive call implicitly * calls this function and then restores it to the previous state. You only * need to call this function if you need to do an uninterrupted sequence of * such calls. * * @param int $direction The direction(s) to have locked. Acceptable values * are the DIRECTION_* constants. If a lock for a direction can't be * obtained immediately, the function will block until one is acquired. * Note that if you specify {@link static::DIRECTION_ALL}, * the sending lock will be obtained before the receiving one, * and if obtaining the receiving lock afterwards fails, * the sending lock will be released too. * @param bool $replace Whether to replace all locks with the specified * ones. Setting this to FALSE will make the function only obtain the * locks which are not already obtained. * * @return int|false The previous state or FALSE if the connection is not * persistent or arguments are invalid. */ public function lock($direction = self::DIRECTION_ALL, $replace = false) { if ($this->persist && is_int($direction)) { $old = self::$lockState[$this->uri]; if ($direction & self::DIRECTION_SEND) { if (($old & self::DIRECTION_SEND) || $this->shmHandler->lock(self::DIRECTION_SEND) ) { self::$lockState[$this->uri] |= self::DIRECTION_SEND; } else { throw new LockException('Unable to obtain sending lock.'); } } elseif ($replace) { if (!($old & self::DIRECTION_SEND) || $this->shmHandler->unlock(self::DIRECTION_SEND) ) { self::$lockState[$this->uri] &= ~self::DIRECTION_SEND; } else { throw new LockException('Unable to release sending lock.'); } } try { if ($direction & self::DIRECTION_RECEIVE) { if (($old & self::DIRECTION_RECEIVE) || $this->shmHandler->lock(self::DIRECTION_RECEIVE) ) { self::$lockState[$this->uri] |= self::DIRECTION_RECEIVE; } else { throw new LockException( 'Unable to obtain receiving lock.' ); } } elseif ($replace) { if (!($old & self::DIRECTION_RECEIVE) || $this->shmHandler->unlock(self::DIRECTION_RECEIVE) ) { self::$lockState[$this->uri] &= ~self::DIRECTION_RECEIVE; } else { throw new LockException( 'Unable to release receiving lock.' ); } } } catch (LockException $e) { if ($direction & self::DIRECTION_SEND && !($old & self::DIRECTION_SEND) ) { $this->shmHandler->unlock(self::DIRECTION_SEND); } throw $e; } return $old; } return false; } /** * Sends a string or stream to the server. * * Sends a string or stream to the server. If a seekable stream is * provided, it will be seeked back to the same position it was passed as, * regardless of the $offset parameter. * * @param string|resource $contents The string or stream to send. * @param int $offset The offset from which to start sending. * If a stream is provided, and this is set to NULL, sending will start * from the current stream position. * @param int $length The maximum length to send. If omitted, * the string/stream will be sent to its end. * * @return int The number of bytes sent. * @throws E */ public function send($contents, $offset = null, $length = null) { if (false === ($previousState = $this->lock(self::DIRECTION_SEND)) && $this->persist ) { throw $this->createException( 'Unable to obtain sending lock', 10 ); } try { $result = parent::send($contents, $offset, $length); } catch (E $e) { $this->lock($previousState, true); throw $e; } $this->lock($previousState, true); return $result; } /** * Receives data from the server. * * Receives data from the server as a string. * * @param int $length The number of bytes to receive. * @param string $what Descriptive string about what is being received * (used in exception messages). * * @return string The received content. */ public function receive($length, $what = 'data') { if (false === ($previousState = $this->lock(self::DIRECTION_RECEIVE)) && $this->persist ) { throw $this->createException( 'Unable to obtain receiving lock', 9 ); } try { $result = parent::receive($length, $what); } catch (E $e) { $this->lock($previousState, true); throw $e; } $this->lock($previousState, true); return $result; } /** * Receives data from the server. * * Receives data from the server as a stream. * * @param int $length The number of bytes to receive. * @param FilterCollection $filters A collection of filters to apply to the * stream while receiving. Note that the filters will not be present on * the stream after receiving is done. * @param string $what Descriptive string about what is being * received (used in exception messages). * * @return resource The received content. */ public function receiveStream( $length, FilterCollection $filters = null, $what = 'stream data' ) { if (false === ($previousState = $this->lock(self::DIRECTION_RECEIVE)) && $this->persist ) { throw $this->createException( 'Unable to obtain receiving lock', 9 ); } try { $result = parent::receiveStream($length, $filters, $what); } catch (E $e) { $this->lock($previousState, true); throw $e; } $this->lock($previousState, true); return $result; } }