Herkese selam,
Bir süredir sizler ile mesleki bilgi beceri paylaşımı yapamıyordum. Bu süre zarfında ofisimiz çook uzak bi teknokente taşındı falan filan.. Ancak zaman bulabildim.. Ve bugün sizler ile büyük sistemlerde önemli bir yer taşıyan asenkron mesaj sıralandırma işlemlerinden en yaygın olan rabbitMq dan genel hatları ile bahsedeceğim ve basit bir mesaj örneği ile detaylandıracağım..
ZeroMq, Red Hat Enterprise MRG, OpenAMQ, Apache QPid, ActiveMQ gibi alternatifleri olsa da RabbitMQ ölçeklenebilirliği, kararlılığı ve basitliği ile en yaygın kullanılan Mesaj Kuyruğudur. Aslında mesaj kuyruğu sondaki MQ dan geliyor ve genelde sohbet uygulamalarında yaygın olarak kullanılmakta ama bunun dışında yaptığınız işleri de sıraya sokabilmektesiniz.
Daha iyi anlayabilmeniz için olayı biraz detaylandırmak ve örneklendirmek istiyorum..
Neden RabbitMq?
Belki bazıları hala asenkron işlem yapmak yerine direk üye olduğunda, x ürünü aldığında vb. durumlarda direk 3-5-10 sn sürecek işlemi yaptırıp kullanıcıyı ekranda öylece işlem bitene kadar bekletiyor olabilir.. Bu zaten işin yanlış kısmı..
Birazcık daha profesyonel düşünenler bu işlemleri cron job ile yapmakta.. Tabi bunun için periodik olarak mysql e bağlanıp, select ile baktırıp sonra koşullara uyan mesajlar varsa bunları işleyip sonra da silme veya update etme işlemi ile süreci tamamlama yoluna gidiyorsunuz. Tabi bu data arttıkça mysql üzerindeki yükünüzü daha da arttıracaktır..
İşte RabbitMq tam olarak da bu adımda sizi bu yükten kurtarmak için hayatımıza giriyor..
Şimdi bir örnek verelim isterseniz;
Bir kullanıcınız üye oldu sisteminize diyelim.. Üye olduğu zaman kişi daha önceden sitede ürün bakmış ise kişinin baktığı ürünlerden zevklerini tahmin edip kişiye zevklerine uygun ürünleri mail attırmak istiyorsunuz diyelim.. Burada kişi üye olduğunda kişiyi bekletmek yerine bu işlemi rabbitmq ile çözebiliriz..
Kurulum
Ubuntu / Debian için:
apt-get install rabbitmq-server
Windows ve diğer sistemler için rabbitmq kurulumuna bakmak istiyorsanız tıklayınız
Kurulumu yaptığımıza göre basitçe kodlara girmeden önce size kısaca kullanacağımız bazı terimlerden bahsetmek istiyorum:
Publisher: Kuyruğa mesaj gönderen uygulama daha doğrusu scriptimiz
Consumer: Kuyruğu dinleyen uygulama
Routing key: Mesajımızı yönlendireceğimiz anahtarımız
Exchange: Mesajı ilgili “routing key”e göre ilgili queue’ya yönlendiren bölüm
Queue: Mesajların son olarak düştüğü kuyruk
Exchange type: Gelen mesajın, “routing key”e göre hangi queue’ya “nasıl” gönderileceğini belirtir. Bunu birazdan aşağıda maddeler halinde size farklarını anlatacağım..
VirtualHost: VirtualHost’lar, genelde yetki yönetimi için kullanılır, Exchange ve Queue’lar virtualhost’lar içinde tanımlanır
Exchange demişken kısaca exchange tiplerinden de bahsetmek istiyorum:
- Direct Exchange
“Direct Exchange” tipinde “routing key” (yönlendirme anahtarı) belirlenir ve bu anahtar bilgisi kuyruğa yazılır. “consumer” tarafından da bu anahtarlara göre işlem yapılır. Birazdan bununla ilgili basit bir örnek yapacağız..
- Fanout Exchange
Mesajlar “exchange” de yer alan bütün kuyruklara gönderilir. Sadece yönlendirme anahtarı olanlar gözardı edilir..
- Topic Exchange
“topic exchange” de verilen yönlendirme anahtarlarına göre farklı kuyruklara yazma işlemleri yapılabilir ve bu yazılan mesajar “consumer” lar tarafından direk yönlendirme anahtarına göre veya (*) ile erişilebilir.
Ayrıca (#) ile “exchange” de bulanan bütün kuyruklara ait mesajları yakalamak mümkün.Burada “php publisher.php mesaj anahtar” şeklinde istekte bulunabilirsiniz.Örnek için kadın ve erkek kullanıcıların beğendiği ve beğenmediği ürünleri dinleyen consumer ları oluşturalım.
php consumer.php erkek.liked -> Erkeklerin beğendikleri
php consumer.php erkek.unliked -> Erkeklerin beğenmedikleri
php consumer.php kadin.liked -> Kadınların beğendikleri
php consumer.php kadin.unliked -> Kadınların beğenmediği ürünler
php consumer.php erkek.* -> Erkeklerin beğendiği ve beğenmediği herşey
php consumer.php kadin.* -> Kadınların beğendiği ve beğenmediği herşey
php consumer.php *.liked -> Bütün beğenilenler
php consumer.php *.unliked -> Bütün beğenilmeyenler
php consumer.php # -> Herşeyİlk ikisinden sonra biraz karışık gelmiş olabilir, bununla ilgili aşağıda tekrar örnek vereceğim size.. - Headers Exchange
“topic exchange” in benzeridir. Yönlendirme anahtarları yerine mesajlar “header” içerir ve kuyruk eşleştirmesi gönderilen headerlara göre yapılır.
Şimdi basitçe Direct Exchange tipinde mesaj gönderelim isterseniz ve diğer taraftan mesajlarımızı okuyalım..
Ben bu işlemlerde Alvaro Videla’nın geliştirdiği php-amqplib ı kullanacağım. Composer kullanmayı bildiğinizi var sayıyorum. Bilmeyenler varsa Composer sitesinden öğrenebilirler nasıl kurulur nasıl yapılır..
Composer a ilgili kütüphaneyi ekleyelim:
{ "require": { "videlalvaro/php-amqplib": "v2.1.0" } }
Kurulumu yaptıktan sonra önce gönderim yapacağımız dosyayı oluşturalım:
<?php //kütüphanemizi projemize dahil edelim require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; //AMQP a bağlantı kuralım $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); //kanalımızı tanımlayalım $channel->queue_declare('test', false, false, false, false); date_default_timezone_set('Europe/Istanbul'); $zaman = date("d-m-Y H:i:s"); //mesajımızı tanımlayalım ve gönderelim $msg = new AMQPMessage($zaman.' da gönderilen test mesajı!'); $channel->basic_publish($msg, '', 'test'); echo " [x] $zaman da test mesajı gönderildi!\n"; //kanalımızı ve bağlantımızı kapatalım $channel->close(); $connection->close();
Şimdi de Okuyacağımız dosyayı yazalım:
<?php //autoload ile amqplib kütüphanemizi ekleyelim ve kullanalım require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; //Bağlanalım $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); //test adında sıra tanımlayalım $channel->queue_declare('test', false, false, false, false); echo ' [*] Mesaj bekleniyor. iptal için CTRL+C', "\n"; $callback = function($msg) { echo " [x] Yeni mesaj geldi: ", $msg->body, "\n"; }; //kanalımıza callback fonksiyonumuzu tanımlayalım $channel->basic_consume('test', '', false, true, false, false, $callback); //kanalımızı dinleyelim while(count($channel->callbacks)) { $channel->wait(); }
Evet artık hazırsınız ve sırası ile send.php ve receive.php scriptlerini çalıştırabilirsiniz..
Gördüğünüz gibi her çalıştırmamda mesajımızı gönderdik ve bir başka terminal penceresinde receive dosyamızı dinleyebilirsiniz.. çıktısı soldaki gibi olacaktır..
Basit anlamda size mesaj transferini gösterdim bu örnekte. Şimdi de isterseniz topic exchange ile yukarıda belirttiğimiz gibi bir örnek yapalım.
Burada host ,port vb. bilgilerimi bi config dosyası açıp içinde define ettim ve autoload da çağırdım..
publisher.php dosyamız:
<?php //kütüphanemizi projemize dahil edelim require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; //AMQP a bağlantı kuralım.. $connection = new AMQPConnection(HOST,PORT,USER,PASS); $channel = $connection->channel(); //exchange imizi tanımlayalım $channel->exchange_declare('trends', '<strong>topic</strong>', false, false, false); //terminalde ilk gelen key imiz routing key olacak.. eğer gelmezse bişey anonymous.info anahtarı ile çalışsın $routing_key = $argv[1]; if(empty($routing_key)) $routing_key = "*.*"; //gönderilen data terminal üzerindeki 2. argümanımız.. Boşsa Merhaba Dünya olacak $data = implode(' ', array_slice($argv, 2)); if(empty($data)) $data = "Onur Canalp Her sıra ve her anahtarda..!"; //mesajımızı tanımlayalım ve gönderelim $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'trends', $routing_key); echo " [x] Gönderildi..\nAnahtar: ".$routing_key."\nData: ".$data."\n_______________\n\n"; //kanalımızı ve bağlantımızı kapatalım $channel->close(); $connection->close();
Şimdi de burada dinleyen consomer dosyamız:
<?php //kütüphanemizi projemize dahil edelim require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPConnection; use PhpAmqpLib\Message\AMQPMessage; //AMQP a bağlantı kuralım.. $connection = new AMQPConnection(HOST,PORT,USER,PASS); $channel = $connection->channel(); //kanalımızı tanımlayalım $channel->exchange_declare('trends', 'topic', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $binding_keys = array_slice($argv, 1); if( empty($binding_keys )) { file_put_contents('php://stderr', "KULLANIM: [queue].[binding_key]\n"); exit(1); } foreach($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'trends', $binding_key); } echo ' [*] Bekleniyor.. Kapatmak için CTRL+C', "\n"; $callback = function($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
Gördüğünüz gibi sol tarafta erkeğin beğendiklerini gönderdim ve diğer tarafta # ile tüm gelen talepleri okuyorduk. erkek.like a gelen datayı aldık. Bundan sonra ne yapmak istiyorsak bize kalmış….
Anlaşılır olmuştur umarım, Herkese iyi çalışmalar dilerim..
Kaynakça:
http://www.rabbitmq.com/tutorials/tutorial-one-php.html
https://www.rabbitmq.com/tutorials/tutorial-five-php.html
https://lostechies.com/derekgreer/2012/03/28/rabbitmq-for-windows-exchange-types/
Hocam bu konuda bir sorum olacak. Asenkron çalışmayı daha ileri seviyelerde yapabilirmiyiz. Ben araştırdım ama sağlıklı bir sonuca ulaşamadım. Yani şöyle ben 4 tane servisten içerik çekecem diyelim xml ile. fakat her servisin sağladığı xml yapısı ve boyutu farklı olduğu için sonuçlar ekrana çok geç geliyor. burada yapmak istediğim 4 xml yapısını aynı anda çekip erken bağlantı sağlayanları ekrana bastırmak bu süreç zarfında diğer yapılandan gelenleride devamına senkron bir şekilde yaptırmak. bu sistemi php de yapabilirmiyiz tam olarak daha önce ihtiyacım olmadığı için bilmiyorum. yeterli dökümanda bulamadım. Bilginiz veya fikriniz varsa paylaşmanızdan memnun olurum.
Tam ne yapmak istediğini anlayamadım açıkcası, yani anladım da kafamda bi örnek ile ilişkilendiremedim..
Kabaca anladığım kadarı ile yardımcı olmaya çalışmak gerekirse; User bilgilerine veya seçimine göre bi yere post edip xml okuyorsan bunun için rabbit kullanmak pek mantıklı olmaz gibi görünüyor. Zaten aksi halde bi yerden okuduğun datalar ile işlem yapacaksan atıyorum 4 farklı siteden ürünleri okutuyorsun diyelim, o zamanda bunları periyodik olarak okuyup kendinde tutman ve kullanıcıya hızlı şekilde sonuç döndürmek daha mantıklı olabilir..
Kendinde tutarken da data ve ehemmiyetine göre mysql yerine mongodb felan tercih edebilirsin 😉
Senin ilacin HTML Workers. Angular Workers’ı da kullana bilirsin.
İyi çalışmalar.
Merhabalar, rabbitmq’da publisher mesajın consumer’a iletilip iletilmediğini kontrol ediyor mu bununla ilgili kendisine geri bildirim oluyor mu?
Callback metodu ile çözebilirsiniz.