RabbitMQ ile asenkron işlemler

Standard

Herkese selam,

Bir süredir sizler ile mesleki bilgi beceri paylaşımı yapamıyordum. Bu süre zarfında ofisimiz çook uzak bi teknokente taşındı falan filan.. Ancak zaman bulabildim.. Ve bugün sizler ile büyük sistemlerde önemli bir yer taşıyan asenkron mesaj sıralandırma işlemlerinden en yaygın olan rabbitMq dan genel hatları ile bahsedeceğim ve basit bir mesaj örneği ile detaylandıracağım..

ZeroMq, Red Hat Enterprise MRG, OpenAMQ, Apache QPid, ActiveMQ gibi alternatifleri olsa da RabbitMQ ölçeklenebilirliği, kararlılığı ve basitliği ile en yaygın kullanılan Mesaj Kuyruğudur. Aslında mesaj kuyruğu sondaki MQ dan geliyor ve genelde sohbet uygulamalarında yaygın olarak kullanılmakta ama bunun dışında yaptığınız işleri de sıraya sokabilmektesiniz.

Daha iyi anlayabilmeniz için olayı biraz detaylandırmak ve örneklendirmek istiyorum..

Neden RabbitMq?

Belki bazıları hala asenkron işlem yapmak yerine direk üye olduğunda, x ürünü aldığında vb. durumlarda direk 3-5-10 sn sürecek işlemi yaptırıp kullanıcıyı ekranda öylece işlem bitene kadar bekletiyor olabilir.. Bu zaten işin yanlış kısmı..

Birazcık daha profesyonel düşünenler bu işlemleri cron job ile yapmakta.. Tabi bunun için periodik olarak mysql e bağlanıp, select ile baktırıp sonra koşullara uyan mesajlar varsa bunları işleyip sonra da silme veya update etme işlemi ile süreci tamamlama yoluna gidiyorsunuz. Tabi bu data arttıkça mysql üzerindeki yükünüzü daha da arttıracaktır..

İşte RabbitMq tam olarak da bu adımda sizi bu yükten kurtarmak için hayatımıza giriyor..

Şimdi bir örnek verelim isterseniz;

Bir kullanıcınız üye oldu sisteminize diyelim.. Üye olduğu zaman kişi daha önceden sitede ürün bakmış ise kişinin baktığı ürünlerden zevklerini tahmin edip kişiye zevklerine uygun ürünleri mail attırmak istiyorsunuz diyelim.. Burada kişi üye olduğunda kişiyi bekletmek yerine bu işlemi rabbitmq ile çözebiliriz..

Kurulum

Ubuntu / Debian için:

apt-get install rabbitmq-server

rabbitmq kurulum

Windows ve diğer sistemler için rabbitmq kurulumuna bakmak istiyorsanız tıklayınız

Kurulumu yaptığımıza göre basitçe kodlara girmeden önce size kısaca kullanacağımız bazı terimlerden bahsetmek istiyorum:

Publisher: Kuyruğa mesaj gönderen uygulama daha doğrusu scriptimiz
Consumer: Kuyruğu dinleyen uygulama
Routing key: Mesajımızı yönlendireceğimiz anahtarımız
Exchange: Mesajı ilgili “routing key”e göre ilgili queue’ya yönlendiren bölüm
Queue: Mesajların son olarak düştüğü kuyruk
Exchange type: Gelen mesajın, “routing key”e göre hangi queue’ya “nasıl” gönderileceğini belirtir. Bunu birazdan aşağıda maddeler halinde size farklarını anlatacağım..
VirtualHost: VirtualHost’lar, genelde yetki yönetimi için kullanılır, Exchange ve Queue’lar virtualhost’lar içinde tanımlanır

Exchange demişken kısaca exchange tiplerinden de bahsetmek istiyorum:

  1. Direct Exchange
    “Direct Exchange” tipinde “routing key” (yönlendirme anahtarı) belirlenir ve bu anahtar bilgisi kuyruğa yazılır. “consumer” tarafından da bu anahtarlara göre işlem yapılır. Birazdan bununla ilgili basit bir örnek yapacağız..
    direkt_exchange_rabbitmq
  2. Fanout Exchange
    Mesajlar “exchange” de yer alan bütün kuyruklara gönderilir. Sadece yönlendirme anahtarı olanlar gözardı edilir..
    Fanout Exchange rabbitmq
  3. Topic Exchange
    “topic exchange” de verilen yönlendirme anahtarlarına göre farklı kuyruklara yazma işlemleri rabbit exchange topicyapılabilir ve bu yazılan mesajar “consumer” lar tarafından direk yönlendirme anahtarına göre veya (*) ile erişilebilir.
    Ayrıca (#) ile “exchange” de bulanan bütün kuyruklara ait mesajları yakalamak mümkün.Burada “php publisher.php mesaj anahtar” şeklinde istekte bulunabilirsiniz.Örnek için kadın ve erkek kullanıcıların beğendiği ve beğenmediği ürünleri dinleyen consumer ları oluşturalım.
    php consumer.php erkek.liked -> Erkeklerin beğendikleri
    php consumer.php erkek.unliked -> Erkeklerin beğenmedikleri
    php consumer.php kadin.liked -> Kadınların beğendikleri
    php consumer.php kadin.unliked -> Kadınların beğenmediği ürünler
    php consumer.php erkek.* -> Erkeklerin beğendiği ve beğenmediği herşey
    php consumer.php kadin.* -> Kadınların beğendiği ve beğenmediği herşey
    php consumer.php *.liked -> Bütün beğenilenler
    php consumer.php *.unliked -> Bütün beğenilmeyenler
    php consumer.php # -> Herşeyİlk ikisinden sonra biraz karışık gelmiş olabilir, bununla ilgili aşağıda tekrar örnek vereceğim size..
  4. Headers Exchange
    “topic exchange” in benzeridir. Yönlendirme anahtarları yerine mesajlar “header” içerir ve kuyruk eşleştirmesi gönderilen headerlara göre yapılır.
    exchange headers rabbitmq

Şimdi basitçe Direct Exchange tipinde mesaj gönderelim isterseniz ve diğer taraftan mesajlarımızı okuyalım..

rabbitmq logic - 1

Ben bu işlemlerde Alvaro Videla’nın geliştirdiği php-amqplib ı kullanacağım. Composer kullanmayı bildiğinizi var sayıyorum. Bilmeyenler varsa Composer sitesinden öğrenebilirler nasıl kurulur nasıl yapılır..

Composer a ilgili kütüphaneyi ekleyelim:

{
    "require": {
        "videlalvaro/php-amqplib": "v2.1.0"
    }
}

Kurulumu yaptıktan sonra önce gönderim yapacağımız dosyayı oluşturalım:

<?php
//kütüphanemizi projemize dahil edelim
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

//AMQP a bağlantı kuralım
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

//kanalımızı tanımlayalım
$channel->queue_declare('test', false, false, false, false);

date_default_timezone_set('Europe/Istanbul');
$zaman = date("d-m-Y H:i:s");

//mesajımızı tanımlayalım ve gönderelim
$msg = new AMQPMessage($zaman.' da gönderilen test mesajı!');
$channel->basic_publish($msg, '', 'test');

echo " [x] $zaman da test mesajı gönderildi!\n";

//kanalımızı ve bağlantımızı kapatalım
$channel->close();
$connection->close();

Şimdi de Okuyacağımız dosyayı yazalım:

<?php
//autoload ile amqplib kütüphanemizi ekleyelim ve kullanalım
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

//Bağlanalım
$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

//test adında sıra tanımlayalım
$channel->queue_declare('test', false, false, false, false);

echo ' [*] Mesaj bekleniyor. iptal için CTRL+C', "\n";

$callback = function($msg) {
 echo " [x] Yeni mesaj geldi: ", $msg->body, "\n";
};

//kanalımıza callback fonksiyonumuzu tanımlayalım
$channel->basic_consume('test', '', false, true, false, false, $callback);

//kanalımızı dinleyelim
while(count($channel->callbacks)) {
 $channel->wait();
}

Evet artık hazırsınız ve sırası ile send.php ve receive.php scriptlerini çalıştırabilirsiniz..

rabbitmq receive

Gördüğünüz gibi her çalıştırmamda mesajımızı gönderdik ve bir başka terminal penceresinde receive dosyamızı dinleyebilirsiniz.. çıktısı soldaki gibi olacaktır..

 

Basit anlamda size mesaj transferini gösterdim bu örnekte. Şimdi de isterseniz topic exchange ile yukarıda belirttiğimiz gibi bir örnek yapalım.

Burada host ,port vb. bilgilerimi bi config dosyası açıp içinde define ettim ve autoload da çağırdım..

publisher.php dosyamız:

&lt;?php
//kütüphanemizi projemize dahil edelim
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

//AMQP a bağlantı kuralım..
$connection = new AMQPConnection(HOST,PORT,USER,PASS);
$channel = $connection-&gt;channel();

//exchange imizi tanımlayalım
$channel-&gt;exchange_declare('trends', '<strong>topic</strong>', false, false, false);

//terminalde ilk gelen  key imiz routing key olacak.. eğer gelmezse bişey anonymous.info anahtarı ile çalışsın
$routing_key = $argv[1];
if(empty($routing_key)) $routing_key = "*.*";

//gönderilen data terminal üzerindeki 2. argümanımız.. Boşsa Merhaba Dünya olacak
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Onur Canalp Her sıra ve her anahtarda..!";

//mesajımızı tanımlayalım ve gönderelim
$msg = new AMQPMessage($data);

$channel-&gt;basic_publish($msg, 'trends', $routing_key);

echo " [x] Gönderildi..\nAnahtar: ".$routing_key."\nData: ".$data."\n_______________\n\n";
//kanalımızı ve bağlantımızı kapatalım
$channel-&gt;close();
$connection-&gt;close();

Şimdi de burada dinleyen consomer dosyamız:

<?php
//kütüphanemizi projemize dahil edelim
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

//AMQP a bağlantı kuralım.. 
$connection = new AMQPConnection(HOST,PORT,USER,PASS);
$channel = $connection->channel();
//kanalımızı tanımlayalım
$channel->exchange_declare('trends', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
    file_put_contents('php://stderr', "KULLANIM: [queue].[binding_key]\n");
    exit(1);
}

foreach($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'trends', $binding_key);
}

echo ' [*] Bekleniyor.. Kapatmak için CTRL+C', "\n";

$callback = function($msg){
    echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

publisher-topic-rabbitmq consomer-topic-rabbitmq

 

 

 

Gördüğünüz gibi sol tarafta erkeğin beğendiklerini gönderdim ve diğer tarafta # ile tüm gelen talepleri okuyorduk. erkek.like a gelen datayı aldık. Bundan sonra ne yapmak istiyorsak bize kalmış….

Anlaşılır olmuştur umarım, Herkese iyi çalışmalar dilerim..

Kaynakça:
http://www.rabbitmq.com/tutorials/tutorial-one-php.html
https://www.rabbitmq.com/tutorials/tutorial-five-php.html
https://lostechies.com/derekgreer/2012/03/28/rabbitmq-for-windows-exchange-types/

6 thoughts on “RabbitMQ ile asenkron işlemler

  1. Hüseyin Uçak

    Hocam bu konuda bir sorum olacak. Asenkron çalışmayı daha ileri seviyelerde yapabilirmiyiz. Ben araştırdım ama sağlıklı bir sonuca ulaşamadım. Yani şöyle ben 4 tane servisten içerik çekecem diyelim xml ile. fakat her servisin sağladığı xml yapısı ve boyutu farklı olduğu için sonuçlar ekrana çok geç geliyor. burada yapmak istediğim 4 xml yapısını aynı anda çekip erken bağlantı sağlayanları ekrana bastırmak bu süreç zarfında diğer yapılandan gelenleride devamına senkron bir şekilde yaptırmak. bu sistemi php de yapabilirmiyiz tam olarak daha önce ihtiyacım olmadığı için bilmiyorum. yeterli dökümanda bulamadım. Bilginiz veya fikriniz varsa paylaşmanızdan memnun olurum.

    • Onur Canalp

      Tam ne yapmak istediğini anlayamadım açıkcası, yani anladım da kafamda bi örnek ile ilişkilendiremedim..
      Kabaca anladığım kadarı ile yardımcı olmaya çalışmak gerekirse; User bilgilerine veya seçimine göre bi yere post edip xml okuyorsan bunun için rabbit kullanmak pek mantıklı olmaz gibi görünüyor. Zaten aksi halde bi yerden okuduğun datalar ile işlem yapacaksan atıyorum 4 farklı siteden ürünleri okutuyorsun diyelim, o zamanda bunları periyodik olarak okuyup kendinde tutman ve kullanıcıya hızlı şekilde sonuç döndürmek daha mantıklı olabilir..

  2. Ekrem

    Merhabalar, rabbitmq’da publisher mesajın consumer’a iletilip iletilmediğini kontrol ediyor mu bununla ilgili kendisine geri bildirim oluyor mu?

Bir yanıt yazın

E-posta adresiniz yayınlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir