Kafka i OpenEdge cz. II

W poprzednim artykule pokazałem jak zainstalować i skonfigurować lokalne środowisko Kafki a teraz pokażę jak zintegrować je z OpenEdgem.
Niektórzy z Was, którzy będą potrzebować zintegrować się z istniejącą architekturą Kafki, będą zainteresowani od tego właśnie miejsca.

Najlepiej w tym celu sprawdzić dokumentację OE dla danej wersji np. online. Ważne, że trzeba posiadać wersję OE 12.5 lub wyższą.

Pierwszym krokiem jest pobranie biblioteki Apache Kafka C/C++ dla Windows lub Unixa (link Download package po prawej stronie). Minimalna wymagana wersja to 1.7, ale zaleca się pobranie ostatniej wersji.

Gdy plik mamy już pobrany zmieniamy jego rozszerzenie z nupkg na zip i rozpakowujemy. W przypadku Windows będą nas interesowały pliki z podkatalogu runtimes\win-x64\native\, które umieszczamy w katalogu roboczym naszej aplikacji ABL.

Dodajemy do PROPATH następujace biblioteki:

  • $DLC/tty/netlib/OpenEdge.Net.pl
  • $DLC/tty/messaging/OpenEdge.Messaging.pl

Teraz możemy wziąć się za napisanie procedury w ABL, a raczej za pobranie jej z dokumentacji i małą kastomizację.

/* producer.p */

using OpenEdge.Messaging.*.
using OpenEdge.Messaging.Kafka.*.

block-level on error undo, throw.

var RecordBuilder recordBuilder.
var KafkaProducerBuilder pb.
var IProducer producer.
var IProducerRecord record.
var ISendResponse response.

pb = cast(ProducerBuilder:Create("progress-kafka"), KafkaProducerBuilder).

pb:SetBootstrapServers("172.22.13.101:9092").
pb:SetClientId("test client").
pb:SetBodySerializer(new StringSerializer()).
pb:SetKeySerializer(new StringSerializer()).

pb:SetProducerOption("message.timeout.ms", "10000").

producer = pb:Build().

recordBuilder = producer:RecordBuilder.
recordBuilder:SetTopicName("mytopic").   

Find first customer.
recordBuilder:SetBody(name).

record = recordBuilder:Build().

response = producer:Send(record).

producer:Flush(1000).
    
repeat while not response:Completed:
    pause .1 no-message.
end.

if response:Success then do:
   message "Send successful" view-as alert-box.
end.
else do:
    undo, throw new Progress.Lang.AppError("Failed to send the record: " +
        response:ErrorMessage, 0).
end.
 

catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

finally:
    delete object producer no-error.
end.

Po pierwsze wstawiam własny adres IP w metodzie SetBootstrapServers i nazwę topicu mytopic.
Potem ustawiam SetBodySerializer() (metoda ustawia proces serializacji dla treści wiadomości) na StringSerializer. Serializator zajmuje się konwersją danych na bajty w celu przesłania do topicu. OpenEdge udostępnia również MemptrSerializer i JsonSerializer, jak również tworzenie własnych niestandardowych serializatorów.
I wreszcie znajduję pierwszy rekord customer i ustawiam jego nazwę (name) jako wartość do przesłania.
Po uruchomieniu procedury sprawdzam co pojawiło się w topicu Kafki. Na czerwono zaznaczyłem wartość przesłanej wartości (Lift Tours).

Teraz pora na odczyt wartości z topicu Kafki w procedurze ABL.

/* consumer.p */

block-level on error undo, throw.
 
using OpenEdge.Messaging.ConsumerBuilder from propath.
using OpenEdge.Messaging.IConsumer from propath.
using OpenEdge.Messaging.IConsumerRecord from propath.
using Progress.Json.ObjectModel.JsonConstruct from propath.
using Progress.Json.ObjectModel.JsonObject from propath.
 
var ConsumerBuilder cb.
var IConsumer consumer.
var IConsumerRecord record.
//var Progress.Lang.Object messageBody.
var char messageBody.
 
cb = ConsumerBuilder:Create("progress-kafka").
 
// Kafka requires at least one bootstrap server host and port.
cb:SetConsumerOption("bootstrap.servers", "172.22.13.101:9092").

// Explicitly disable auto commit so it can be controlled within the application.
cb:SetConsumerOption("enable.auto.commit", "false").

// Identify the consumer group. The consumer group allows multiple clients to
//  coordinate the consumption of multiple topics and partitions
//cb:SetConsumerOption("group.id", "my.consumer.group").
cb:SetConsumerOption("group.id", "test-consumer-group").

// Specify whether the consumer group should automatically be deleted when
//  the consumer is garbage collected.
cb:SetConsumerOption("auto.delete.group", "true").

// Configure the consumer's deserializer in order to convert values from
//  the network messages to string objects.
cb:SetConsumerOption("value.deserializer", "OpenEdge.Messaging.StringDeserializer").

// Set the consumer starting position to the most recent message
cb:SetConsumerOption("auto.offset.reset", "latest").
     
// identify one or more topics to consume
cb:AddSubscription("mytopic").
     
// build the consumer
consumer = cb:Build().
          
// loop forever receiving and processing records.
repeat while true:
    // request a record, waiting up to 1 second for some records to be available
    record = consumer:Poll(1000).
         
    if valid-object(record) then do:
           
        // acknowledge the message so the client can resume where it leaves off
        // the next time it is started
        consumer:CommitOffset(record).

        MESSAGE record:TopicName VIEW-AS ALERT-BOX.   

       messageBody = record:Body:ToString().      
       message messageBody view-as alert-box.
      
    end.
         
end.
     
catch err as Progress.Lang.Error :
    message err:GetMessage(1) view-as alert-box.
end catch.

finally:
    delete object consumer no-error.
end.

Uruchamiamy procedurę producer.p i consumer.p. Widać, że pierwsza wyświetla komunikat o poprawnym wysłaniu, a druga prawidłową wartość pobraną z kolejki.

Na koniec jeszcze jedna informacja związana z ustawianiem wartości parametrów. Otóż można to zrobić na dwa sposoby: poprzez użycie metod akceptujących wartości silnie typizowane (tzw. strongly-typed) lub używając par nazwa-wartość.
Obie poniższe linie kodu są równoważne:

pb:SetProducerOption("bootstrap.servers", "172.22.13.101:9092").
pb:SetBootstrapServers("172.22.13.101:9092").

Kafka i OpenEdge cz. I

O integracji OpenEdge z Apache Kafka chciałem napisać od dawna. Najpierw musiałem pokonać pewne kłopoty techniczne, tak  aby pokazać pełna ścieżkę od instalacji tejże Kafki do napisania programów w ABL do wysyłania i pobierania danych, ale po kolei. Co to jest Kafka.

Na głównej stronie Apache Kafki możemy przeczytać w tłumaczeniu:

Apache Kafka to rozproszona platforma do strumieniowego przesyłania komunikatów typu open-source, z której korzystają tysiące firm w celu zapewnienia wydajnego przetwarzania danych, analizy przesyłania strumieniowego, integracji danych i aplikacji o znaczeniu krytycznym.

Komunikaty w Kafce pogrupowane są w tzw. topiki (tematy, ang. topic). Nadawca (producer), jak i odbiorca (consumer) powiązani są z jednym lub wieloma topikami.
Rzućmy okiem na uproszczony schemat systemu Kafki.

Kafka zapewnia przesyłanie danych w czasie zbliżonym do rzeczywistego ze źródeł, takich jak bazy danych, aplikacje, czujniki, urządzenia mobilne, usługi w chmurze itd.

Nadawcy i odbiorcy mogą być procesami napisanymi w różnych językach programowania, na różnych systemach operacyjnych; musi istnieć tylko mechanizm aby wpięli się do systemu. Dla klientów ABL również jest taki mechanizm ale o tym napiszę później.

Klientom Progressa ta architektura może przypominać produkty z grupy Sonic, jednakże Kafka jest technologią nieporównywalnie popularniejszą, stosowaną przez wiele wielkich i rozpoznawalnych firm jak Netflix, Twitter, Spotify, Cisco, LinkedIn i dziesiątki innych. Zresztą różnic jest więcej. Kafka jest platformą o wysokiej wydajności i skalowalności, przesyłane komunikaty nie są automatycznie kasowane z kolejki po odczytaniu (mogą być przechowywane bez ograniczenia czasowego) itd…

Komunikaty (nazywane także zdarzeniami lub rekordami) z danego topiku dopisywane są na końcu tzw. partycji. Partycja to uporządkowany rejestr komunikatów, mówiąc niskopoziomowo, jest to plik na dysku brokera, do którego zapisywane są komunikaty. Aby konsument był w stanie odebrać określony komunikat / sekwencję komunikatów – musi znać pozycję ostatnio przeczytanego komunikatu.

OpenEdge udostępnia nowe OpenEdge.Messaging API do wysyłania wiadomości do klastra Kafki poprzez Kafka producer i odbierania wiadomości z tego klastra przez Kafka consumer.

Tyle wstępnych informacji. Chcę Wam pokazać cały proces od zainstalowania Kafki na lokalnej maszynie aby każdy mógł przetestować ten system.

Kafka jest przeznaczona do uruchamiania w systemie Linux i Mac i nie jest przeznaczona do natywnego uruchamiania w systemie Windows. Dlatego w przypadku tej platformy zaleca się stosować technologię Docker lub WSL2.

WSL2 (Windows Subsystem for Linux 2) zapewnia środowisko Linux dla komputera z systemem Windows 10+, które nie wymaga maszyny wirtualnej.

Moja konfiguracja wygląda następująco: Windows 10, zainstalowany WSL2 oraz Windows Terminal, w którym mam dostęp do Linuxa Ubuntu.

Instaluję Apache Kafka korzystając z oryginalnej strony. Polega to (ach ten Linux!) jedynie na pobraniu i rozpakowaniu plików. Najpierw niezbędne jest zainstalowanie infrastruktury do utrzymywania i koordynowania brokerów. Może to być KRaft lub ZooKeeper. Ja wybieram to drugie rozwiązanie.

W Windows Terminal otwieram kilka zakładek dla Ubuntu.

W pierwszej uruchamiam ZooKeepera:

sudo bin/zookeeper-server-start.sh config/zookeeper.properties

W drugiej Kafkę:

sudo bin/kafka-server-start.sh config/server.properties

Jak widać cała operacja jest bardzo prosta. Mogę dalej podążać za instrukcją: dodać topik, okno consumera, nadawać i odbierać komunikaty. Ta testowa konfiguracja odwołuje się do parametru bootstrap-server, który zawiera listę par „hostname:port”, które adresują jednego lub więcej brokerów. W tym prostym przypadku dodanie topicu wygląda następująco:

kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Zastopujmy teraz wszystkie procesy Kafki poprzez zamknięcie zakładek.

Ponieważ będę potrzebował mieć dostęp z zewnętrznej aplikacji ABL, więc zamiast wartości localhost muszę wstawić wartość ip “maszyny” z Linuxem. Jeśli ip wynosi np. 172.22.13.101, to tę wartość wstawiam w następujących miejscach.
plik: config/server.properties
listeners=PLAINTEXT://172.22.13.101:9092
jak poniżej:

plik: config/producer.properties
bootstrap-servers=172.22.13.101:9092

plik: config/consumer.properties
bootstrap-servers=172.22.13.101:9092

OK, uruchamiam ponownie procesy ZooKeepera i Kafki. W trzeciej zakładce dodaję topic mytopic,

bin/kafka-topics.sh --create --topic mytopic --bootstrap-server 172.22.13.101:9092

a następnie uruchamiam proces producera:

bin/kafka-console-producer.sh --topic mytopic --bootstrap-server 172.22.13.101:9092

a w oknie czwartym consumera:

bin/kafka-console-consumer.sh --topic mytopic --bootstrap-server 172.22.13.101:9092

Wysyłam przykładowe wiadomości w oknie producera.

Pojawiają się one natychmiast w oknie consumera.

OK, mamy zatem skonfigurowany prosty system Kafki i w następnym wpisie pokażę jak przyłączyć się do niego z poziomu aplikacji ABL.