程序员人生 网站导航

php 连接zookeeper实例

栏目:框架设计时间:2015-02-07 08:48:58

1、安装成功zookeeper后,在zookeeper 的bin目录下有启动相应的启动脚本

启动Server

                      ./zkServer.sh start

启动client:(*注:cli需要安装java)

                     zkCli.sh

2、PHP实例:

<?php
class ZookeeperDemo extends Zookeeper {
 
  public function watcher( $i, $type, $key ) {
    echo "Insider Watcher " ;
 
    // Watcher gets consumed so we need to set a new one
    $this->get( '/test', array ($this, 'watcher' ) );
  }
}
$zoo = new ZookeeperDemo( '127.0.0.1:2181' );
$zoo->get( '/test', array ($zoo, 'watcher' ) );
while ( true ) {
  echo '.' ;
  sleep(2);
}

leader与worker任务的分配:
class Worker extends Zookeeper {
 
  const CONTAINER = '/cluster' ;
 
  protected $acl = array(
                    array (
                      'perms' => Zookeeper:: PERM_ALL,
                      'scheme' => 'world' ,
                      'id' => 'anyone' ) );
 
  private $isLeader = false;
 
  private $znode ;
 
  public function __construct( $host = '', $watcher_cb = null , $recv_timeout = 10000 ) {
    parent:: __construct( $host, $watcher_cb, $recv_timeout );
  }
 
  public function register() {
    if( ! $this->exists( self ::CONTAINER ) ) {
      $this->create( self ::CONTAINER , null, $this-> acl );
    }
 
    $this->znode = $this->create( self ::CONTAINER . '/w-' ,
                                  null ,
                                  $this->acl,
                                  Zookeeper::EPHEMERAL | Zookeeper::SEQUENCE );
 
    $this-> znode = str_replace( self ::CONTAINER .'/' , '' , $this-> znode );
 
    printf( "I'm registred as: %s ", $this-> znode );
 
    $watching = $this->watchPrevious();
 
    if( $watching == $this-> znode ) {
      printf( "Nobody here, I'm the leader " );
      $this->setLeader( true );
    }
    else {
      printf( "I'm watching %s " , $watching );
    }
  }
 
  public function watchPrevious() {
    $workers = $this->getChildren( self ::CONTAINER );
    sort( $workers );
    $size = sizeof( $workers );
    for( $i = 0 ; $i < $size ; $i++ ) {
      if( $this-> znode == $workers[ $i ] ) {
        if ( $i > 0 ) {
          $this->get( self ::CONTAINER . '/' . $workers[ $i - 1 ], array ( $this, 'watchNode' ) );
          return $workers[ $i - 1 ];
        }
 
        return $workers[ $i ];
      }
    }
 
    throw new Exception(  sprintf( "Something went very wrong! I can't find myself: %s/%s",
                          self ::CONTAINER ,
                          $this-> znode ) );
  }
 
  public function watchNode( $i, $type, $name ) {
    $watching = $this->watchPrevious();
    if( $watching == $this-> znode ) {
      printf( "I'm the new leader! " );
      $this->setLeader( true );
    }
    else {
      printf( "Now I'm watching %s " , $watching );
    }
  }
 
  public function isLeader() {
    return $this-> isLeader ;
  }
 
  public function setLeader($flag) {
    $this-> isLeader = $flag;
  }
 
  public function run() {
    $this->register();
 
    while( true ) {
      if( $this->isLeader() ) {
        $this->doLeaderJob();
    }
    else {
      $this->doWorkerJob();
    }
 
      sleep( 2 );
    }
  }
 
  public function doLeaderJob() {
    echo "Leading " ;
  }
 
  public function doWorkerJob() {
    echo "Working " ;
  }
}
$worker = new Worker( '127.0.0.1:2181' );
$worker->run();
可以启动3个php进程,查看脚本的运行。
进程1:
[root@localhost zookeeper]# php -f worker.php
I'm registred as: w-0000000010
Nobody here, I'm the leader
Leading
进程2:
[daniel.luo@localhost zookeeper]$ php -f worker.php
I'm registred as: w-0000000011
I'm watching w-0000000010
Working
进程3:
[daniel.luo@localhost zookeeper]$ php -f worker.php
I'm registred as: w-0000000012
I'm watching w-0000000011
Working

ctrl + c 关闭leader进程后,会发现进程2与3会选举出新的leader





------分隔线----------------------------
------分隔线----------------------------

最新技术推荐