/* Original details: */ /* MogileFS.class.php - Class for accessing the Mogile File System * Copyright (C) 2007 Interactive Path, Inc. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ /* File Authors: * Erik Osterman * * Thanks to the MogileFS mailing list and the creator of the MediaWiki * MogileFS client. */ class MogileFS { var $PING = 'TEST'; var $DELETE = 'DELETE'; var $GET_DOMAINS = 'GET_DOMAINS'; var $GET_PATHS = 'GET_PATHS'; var $RENAME = 'RENAME'; var $LIST_KEYS = 'LIST_KEYS'; var $CREATE_OPEN = 'CREATE_OPEN'; var $CREATE_CLOSE = 'CREATE_CLOSE'; var $SUCCESS = 'OK'; // Tracker success code var $ERROR = 'ERR'; // Tracker error code var $DEFAULT_PORT = 7001; // Tracker port var $_domain; var $_class; var $_trackers; var $_socket; var $_requestTimeout; var $_putTimeout; var $_getTimeout; var $_debug; var $_errmsg = null; var $_curlreadfunc_data = null; var $_curlreadfunc_datapos = null; function MogileFS($domain, $trackers) { $this->setDomain($domain); $this->setHosts($trackers); $this->setRequestTimeout(2); // 10 $this->setPutTimeout(2); // 4 $this->setGetTimeout(2); // 10 $this->setDebug(0); } function getError() { return $this->_errmsg; } function getDebug() { return $this->_debug; } function setDebug($level) { return $this->_debug = $level; } function getRequestTimeout() { return $this->_requestTimeout; } function setRequestTimeout($timeout) { if($timeout > 0) return $this->_requestTimeout = $timeout; else die(get_class($this) . "::setRequestTimeout expects a positive integer"); } function getPutTimeout() { return $this->_putTimeout; } function setPutTimeout($timeout) { if($timeout > 0) return $this->_putTimeout = $timeout; else die(get_class($this) . "::setPutTimeout expects a positive integer"); } function getGetTimeout() { return $this->_getTimeout; } function setGetTimeout($timeout) { if($timeout > 0) return $this->_getTimeout = $timeout; else die(get_class($this) . "::setGetTimeout expects a positive integer"); } function getHosts() { return $this->_trackers; } function setHosts($trackers) { if(is_scalar($trackers)) $this->_trackers = Array($trackers); elseif(is_array($trackers)) $this->_trackers = $trackers; else die(get_class($this) . "::setHosts unrecognized host argument"); } function getDomain() { return $this->_domain; } function setDomain($domain) { if(is_scalar($domain)) return $this->_domain = $domain; else die(get_class($this) . "::setDomain unrecognized domain argument"); } // Connect to a mogilefsd; scans through the list of daemons and tries to connect one. function getConnection() { if($this->_socket && is_resource($this->_socket) && !feof($this->_socket)) return $this->_socket; foreach($this->_trackers as $host) { $parts = parse_url($host); if(!isset($parts['port'])) $parts['port'] = $this->DEFAULT_PORT; $errno = null; $errstr = null; $this->_socket = fsockopen($parts['host'], $parts['port'], $errno, $errstr, $this->_requestTimeout); if($this->_socket) break; } if(!is_resource($this->_socket) || feof($this->_socket)) { $this->_errmsg = (get_class($this) . "::getConnection failed to obtain connection. errno=$errno errmsg=$errstr"); return null; } else { return $this->_socket; } } // Send a request to mogilefsd and parse the result. function _doRequest($cmd, $args = Array()) { $this->_errmsg = null; $args['domain'] = $this->_domain; $params = ''; foreach ($args as $key => $value) { $params .= '&'.urlencode($key).'='.urlencode($value); } $socket = $this->getConnection(); if($socket == null) return null; $result = fwrite($socket, $cmd . $params . "\n"); if($result === false) { $this->_errmsg = (get_class($this) . "::_doRequest write failed"); return null; } $line = fgets($socket); if($line === false) { $this->_errmsg = (get_class($this) . "::_doRequest read failed"); return null; } #print "$cmd = [".trim($line)."]\n"; $words = explode(' ', $line); if($words[0] == $this->SUCCESS) { parse_str(trim($words[1]), $result); } else { $this->_errmsg = trim(urldecode($line)); return null; } return $result; } // Return a list of domains function getDomains() { $result = $this->_doRequest($this->GET_DOMAINS); if($result === null) return null; $domains = Array(); for($i=1; $i <= $result['domains']; $i++) { $dom = 'domain'.$i; $classes = Array(); for($j=1; $j <= $result[$dom.'classes']; $j++) $classes[$result[$dom.'class'.$j.'name']] = $result[$dom.'class'.$j.'mindevcount']; $domains[] = Array('name' => $result[$dom],'classes' => $classes); } return $domains; } function exists($key) { if($key === null) die(get_class($this) . "::exists key cannot be null"); // TODO: make this saner $result = $this->_doRequest($this->GET_PATHS, Array('key' => $key)); return ($result !== null && $result['paths'] > 0); } function ping() { $result = $this->_doRequest($this->PING); return ($result !== null); } // Get an array of paths function getPaths($key) { if($key === null) die(get_class($this) . "::getPaths key cannot be null"); $result = $this->_doRequest($this->GET_PATHS, Array('key' => $key)); if($result === null) return null; unset($result['paths']); return $result; } // Delete a file from system function delete($key) { if($key === null) die(get_class($this) . "::delete key cannot be null"); $result = $this->_doRequest($this->DELETE, Array('key' => $key)); if($result === null) return null; return true; } // Rename a file function rename($from, $to) { if($from === null) die(get_class($this) . "::rename from key cannot be null"); elseif($to === null) die(get_class($this) . "::rename to key cannot be null"); $result = $this->_doRequest($this->RENAME, Array('from_key' => $from, 'to_key' => $to)); if($result === null) return null; return true; } // Find keys starting with a prefix function listKeys($prefix = null, $lastKey = null, $limit = null) { $result = $this->_doRequest($this->LIST_KEYS, Array('prefix' => $prefix, 'after' => $lastKey, 'limit' => $limit)); if($result === null && $msg = $this->_errmsg) { if(strpos($msg,'ERR none_match') !== FALSE) return Array(); } return $result; } // Get a file from mogstored and return it as a string function get($key) { if($key === null) die(get_class($this) . "::get key cannot be null"); $paths = $this->getPaths($key); if($paths === null) return null; foreach($paths as $path) { $contents = ''; $ch = curl_init(); curl_setopt($ch, CURLOPT_VERBOSE, ($this->_debug > 0 ? 1 : 0)); curl_setopt($ch, CURLOPT_TIMEOUT, $this->_requestTimeout); curl_setopt($ch, CURLOPT_URL, $path); curl_setopt($ch, CURLOPT_FAILONERROR, true); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); $response = curl_exec($ch); if($response === false) continue; // Try next source curl_close($ch); return $response; } // TODO do not die here if the file does not exist $this->_errmsg = (get_class($this) . "::get unable to retrieve {$key}"); return null; } // Get a file from mogstored and send it directly to stdout by way of fpassthru() function getPassthru($key) { if($key === null) die(get_class($this) . "::getPassthru key cannot be null"); $paths = $this->getPaths($key); foreach($paths as $path) { $fh = fopen($path, 'r'); if($fh) { if(fpassthru($fh) === false) die(get_class($this) . "::getPassthru failed"); fclose($fh); } return $success; } die(get_class($this) . "::getPassthru unable to retrieve {$key}"); } // Save a file to the MogileFS function _setResource($key, $class, $fh, $curl_readfunc, $length) { if($key === null) die(get_class($this) . "::_setResource key cannot be null"); $location = $this->_doRequest($this->CREATE_OPEN, Array('key' => $key, 'class' => $class)); if($location === null) return null; $uri = $location['path']; $parts = parse_url($uri); $host = $parts['host']; $port = $parts['port']; $path = $parts['path']; $ch = curl_init(); curl_setopt($ch, CURLOPT_VERBOSE, ($this->_debug > 0 ? 1 : 0)); if($fh !== null) { curl_setopt($ch, CURLOPT_INFILE, $fh); } elseif($curl_readfunc !== null) { curl_setopt($ch, CURLOPT_READFUNCTION, $curl_readfunc); } else die(get_class($this) . "::_setResource was not provided a data source"); curl_setopt($ch, CURLOPT_INFILESIZE, $length); curl_setopt($ch, CURLOPT_TIMEOUT, $this->_requestTimeout); curl_setopt($ch, CURLOPT_PUT, $this->_putTimeout); curl_setopt($ch, CURLOPT_URL, $uri); curl_setopt($ch, CURLOPT_RETURNTRANSFER, true); curl_setopt($ch, CURLOPT_HTTPHEADER, Array('Expect: ')); $response = curl_exec($ch); if(is_resource($fh)) fclose($fh); if($response === false) { $error=curl_error($ch); curl_close($ch); // TODO: do not die here $this->_errmsg = (get_class($this) . "::set $error"); return null; } curl_close($ch); $result = $this->_doRequest($this->CREATE_CLOSE, array( 'key' => $key, 'devid' => $location['devid'], 'fid' => $location['fid'], 'path' => urldecode($uri), 'class' => $class )); if($result === null) return null; return true; } function set($key, $class, $value) { if($key === null) die(get_class($this) . "::set key cannot be null"); $this->_curlreadfunc_data = $value; $this->_curlreadfunc_datapos = 0; return $this->_setResource($key, $class, null, array($this,'_curlreadfunc'), strlen($value)); } function _curlreadfunc(&$ch,&$fd,$length) { $data = substr($this->_curlreadfunc_data, $this->_curlreadfunc_datapos, $length); $datalength = strlen($data); $this->_curlreadfunc_datapos += $datalength; #print("Length=$datalength Pos=$this->_curlreadfunc_datapos Data=$data\n"); return $data; } function setFile($key, $class, $filename) { if($key === null) die(get_class($this) . "::setFile key cannot be null"); $fh = fopen($filename, 'r'); if($fh === false) die(get_class($this) . "::setFile failed to open path {$filename}"); return $this->_setResource($key, $class, $fh, null, filesize($filename)); } } /* // Usage Example: $mfs = new MogileFS('socialverse', 'assets', 'tcp://127.0.0.1'); //$mfs->setDebug(10); $start = microtime(true); $mfs->set('test123', microtime(true)); printf("EXISTS: %d\n", $mfs->exists('test123')); print "GET: [" . $mfs->get('test123') . "]\n"; $mfs->delete('test123'); $stop = microtime(true); printf("%.4f\n", $stop - $start); */ ?>