2022年2月22日火曜日

AQの伝搬を使って異なるデータベースでメッセージを処理する

 AQのメッセージを異なるデータベースに伝播させて、そのデータベースでメッセージを処理させる設定を行なってみます。

以下の記事の作業の組み合わせになります。最終的に作成するAPEXアプリケーションは、以下の記事と同じ動作になります(バックグランドの処理が同じサーバーで実行されているか、別サーバーで実行されているかの違い)。

AQを使ってバックグランドでジョブを実行する
AQを使ってバックグランドでジョブを実行する - 通知の代わりにジョブを使う
Autonomous Databaseを繋ぐデータベース・リンクを作成する

本記事で新たに行う作業は、プロシージャDBMS_AQADM.SCHEDULE_PROPAGATIONを呼び出してAQのメッセージの伝搬をさせる設定です。それ以外は概ね説明済みなので、実施するコマンドの記載に留め、解説は少なめにします。

Always FreeのAutonomous Transaction Processingを使って作業を行います。ユーザー・インターフェースとなるAPEXアプリケーションの実装はAPEXDEVというインスタンスで行い、エンキューされたメッセージの処理はAPEXSRVというインスタンスで行います。

インスタンスAPEXDEVにはAPEXワークスペース(およびワークスペース・スキーマ)としてAPEXDEV、インスタンスAPEXSRVにはAPEXワークスペース(およびワークスペース・スキーマ)としてAPEXSRVが作成済みとします。



データベース・リンクの作成



それぞれのデータベースで、相手方のデータベースに接続するデータベース・リンクを作成します。データベースからウォレット・ファイルをダウンロードし、ウォレット・ファイルに含まれているファイルcwallet.ssoをオブジェクト・ストレージにアップロードし、事前定義済リクエストのURLを取得しておきます。

データベースAPEXDEVデータベース・アクションに管理ユーザーADMINで接続し、以下を実行します。今後の作業に必要な権限をまとめて、ユーザーAPEXDEVに与えています。

create directory DBLINK_WALLET_DIR as 'wallet';
begin
dbms_cloud.get_object(
object_uri => 'APEXSRVのcwallet.ssoの事前定義済リクエストのURL'
,directory_name => 'DBLINK_WALLET_DIR'
);
end;
/
grant read on directory DBLINK_WALLET_DIR to apexdev;
grant execute on dbms_cloud_admin to apexdev;
grant execute on dbms_cloud to apexdev;
grant create database link to apexdev;
grant execute on dbms_aq to apexdev;
grant execute on dbms_aqadm to apexdev;


APEXのワークスペースにユーザーAPEXDEVでサインインし、SQLワークショップSQLコマンドで以下を実行します。hostname、service_nameなどはAPEXSRVのウォレット・ファイルに含まれるtnsnames.oraより取得します。

begin
dbms_cloud.create_credential(
credential_name => 'CRED_APEXSRV'
, username => 'APEXSRV'
, password => 'APEXSRVのパスワード'
);
dbms_cloud_admin.create_database_link(
db_link_name => 'DBLINK_APEXSRV'
,hostname => 'adb.us-ashburn-1.oraclecloud.com'
,port => 1522
,service_name => 'adbuniqueid_apexsrv_low.adb.oraclecloud.com'
,ssl_server_cert_dn => 'CN=adwc.uscom-east-1.oraclecloud.com, OU=Oracle BMCS US, O=Oracle Corporation, L=Redwood City, ST=California, C=US'
,credential_name => 'CRED_APEXSRV'
,directory_name => 'DBLINK_WALLET_DIR'
);
end;


以上でデータベース・リンクが作成されたので、以下のSELECT文を実行して動作を確認します。Xが返されれば、動作確認完了です。

select * from dual@dblink_apexsrv;

同様の作業をデータベースAPEXSRVで行います。

データベース・アクションで実行するスクリプトは以下になります。

create directory DBLINK_WALLET_DIR as 'wallet';
begin
dbms_cloud.get_object(
object_uri => 'APEXDEVのcwallet.ssoの事前定義済リクエストのURL'
,directory_name => 'DBLINK_WALLET_DIR'
);
end;
/
grant read on directory DBLINK_WALLET_DIR to apexsrv;
grant execute on dbms_cloud_admin to apexsrv;
grant execute on dbms_cloud to apexsrv;
grant create database link to apexsrv;
grant execute on dbms_aq to apexsrv;
grant execute on dbms_aqadm to apexsrv;

APEXのSQLコマンドで実行するスクリプトは以下になります。

begin
dbms_cloud.create_credential(
credential_name => 'CRED_APEXDEV'
, username => 'APEXDEV'
, password => 'APEXDEVのパスワード'
);
dbms_cloud_admin.create_database_link(
db_link_name => 'DBLINK_APEXDEV'
,hostname => 'adb.us-ashburn-1.oraclecloud.com'
,port => 1522
,service_name => 'adbuniqueid_apexdev_low.adb.oraclecloud.com'
,ssl_server_cert_dn => 'CN=adwc.uscom-east-1.oraclecloud.com, OU=Oracle BMCS US, O=Oracle Corporation, L=Redwood City, ST=California, C=US'
,credential_name => 'CRED_APEXDEV'
,directory_name => 'DBLINK_WALLET_DIR'
);
end;

データベースAPEXDEVにはデータベース・リンクDBLINK_APEXSRV、データベースAPEXSRVにはデータベース・リンクDBLINK_APEXDEVが作成されています。


キューの作成と開始



APEXのSQLコマンドで以下のコマンドを実行し、タイプJOB_MESSAGE_Tを作成します。データベースAPEXDEVAPEXSRVの双方で実行します。

create or replace type job_message_t as object(
body varchar2(4000)
);
/


キュー表JOB_Q_TABとキューJOB_IN_QJOB_OUT_Qを作成し、両方のキューを開始します。データベースAPEXDEVAPEXSRVの双方で実行します。

begin
dbms_aqadm.create_queue_table(
queue_table => 'job_q_tab'
,queue_payload_type => 'job_message_t'
,multiple_consumers => TRUE
);
dbms_aqadm.create_queue(
queue_name => 'job_in_q'
,queue_table => 'job_q_tab'
);
dbms_aqadm.create_queue(
queue_name => 'job_out_q'
,queue_table => 'job_q_tab'
);
dbms_aqadm.start_queue(
queue_name => 'job_in_q'
);
dbms_aqadm.start_queue(
queue_name => 'job_out_q'
);
end;


作成したキューは、以下のSELECT文で確認できます。

select * from user_queues;


サブスクライバの作成



データベースAPEXDEVのキューJOB_IN_Qに投入されたメッセージは、データベースAPEXSRVに伝播され、そこでデキューされて処理されます。そのためキューJOB_IN_Qのサブスクライバが、リモート・データベースのAPEXSRVとなるよう定義します。

キューJOB_OUT_QはデータベースAPEXSRVから伝搬されたメッセージが投入されます。データベースAPEXDEVから見るとローカルのキューからメッセージを取り出すことになるため、サブスクライバはAPEXDEVとします。

declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent('APEXSRV', 'APEXSRV.JOB_IN_Q@DBLINK_APEXSRV', NULL);
dbms_aqadm.add_subscriber(
queue_name => 'apexdev.job_in_q'
, subscriber => subscriber
);
subscriber := sys.aq$_agent('APEXDEV', NULL, NULL);
dbms_aqadm.add_subscriber(
queue_name => 'apexdev.job_out_q',
subscriber => subscriber);
END;
/

APEXのSQLコマンドより実行します。


データベースAPEXSRVではキューJOB_OUT_Qに投入したメッセージが、データベースAPEXDEVに伝搬されます。キューJOB_IN_Qはローカル・キューの扱いです。

APEXSRVでのサブスクライバの作成は以下になります。

declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent('APEXSRV', NULL, NULL);
dbms_aqadm.add_subscriber(
queue_name => 'apexsrv.job_in_q'
, subscriber => subscriber
);
subscriber := sys.aq$_agent('APEXDEV', 'APEXDEV.JOB_OUT_Q@DBLINK_APEXDEV', NULL);
dbms_aqadm.add_subscriber(
queue_name => 'apexsrv.job_out_q',
subscriber => subscriber);
END;
/

作成したサブスクライバの確認は、以下のSELECT文を実行して行います。

select * from user_queue_subscribers;


伝播のスケジュール



データベースAPEXDEVからAPEXSRVへ、キューJOB_IN_Qにエンキューされたメッセージを伝搬するジョブをスケジュールします。プロシージャDBMS_AQADM.SCHEDULE_PROPAGATIONを呼び出します。

begin
dbms_aqadm.schedule_propagation(
queue_name => 'APEXDEV.JOB_IN_Q'
, destination => 'DBLINK_APEXSRV'
, duration => NULL
, latency => 0
);
end;

start_timeを指定していないため、伝播は即時に開始されます。durationはNULLなので無期限になり、プロシージャDBMS_AQADM.UNSCHEDULE_PROPAGATIONを呼び出すまで伝搬は継続します。latencyに0を指定することにより、キューJOB_IN_Qへのエンキューを待機し、エンキューされるとすぐに伝搬するようにしています。


スケジュールされた伝搬のジョブの状況は、データベース・アクションにてビューDBA_SCHEDULER_JOBSを検索することで確認できます。

select job_name, enabled, state from dba_scheduler_jobs where program_name = 'AQ$_PROPAGATION_PROGRAM';


同様にデータベースAPEXSRV上で、キューJOB_OUT_Qにエンキューされたメッセージを伝搬するジョブをスケジュールします。

begin
dbms_aqadm.schedule_propagation(
queue_name => 'APEXSRV.JOB_OUT_Q'
, destination => 'DBLINK_APEXDEV'
, duration => NULL
, latency => 0
);
end;



伝播の動作確認



ここで一旦それぞれのキューにメッセージをエンキューして、動作を確認してみます。

データベースAPEXDEVで、キューJOB_IN_Qにメッセージを投入します。メッセージ・プロパティのcorrelationとして1234を設定しています。

declare
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message apexdev.job_message_t;
l_job_message json_object_t;
begin
l_job_message := new json_object_t;
l_job_message.put('sender', 'test');
l_job_message.put('text', 'test');
message := apexdev.job_message_t(l_job_message.to_string());
message_properties.correlation := to_char(1234);
dbms_aq.enqueue(
queue_name => 'apexdev.job_in_q'
,enqueue_options => enqueue_options
,message_properties => message_properties
,payload => message
,msgid => message_handle
);
end;


データベースAPEXSRVにて、キュー表JOB_Q_TABの内容を確認します。

select * from aq$job_q_tab;

CORR_IDが1234のメッセージが、キューJOB_IN_Qに見つかります。


同じSQLをエンキューを行ったデータベースAPEXDEVで実行してみます。キューJOB_IN_Qからメッセージは削除されています。


キューJOB_IN_Qのメッセージが伝搬されていることを確認しました。データベースAPEXSRVでデキューを実行します。

declare
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message apexsrv.job_message_t;
l_job_message json_object_t;
begin
dequeue_options.consumer_name := 'APEXSRV';
dequeue_options.wait := 1;
dbms_aq.dequeue(
queue_name => 'apexsrv.job_in_q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
l_job_message := json_object_t.parse(message.body);
dbms_output.put_line('correlation = ' || message_properties.correlation);
dbms_output.put_line('sender = ' || l_job_message.get_string('sender'));
dbms_output.put_line('text = ' || l_job_message.get_string('text'));
dbms_output.put_line('response = ' || l_job_message.get_string('response'));
end;


同様にして、キューJOB_OUT_Qのメッセージの伝搬を確認します。エンキューはデータベースAPEXSRVで実行します。

declare
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message apexsrv.job_message_t;
l_job_message json_object_t;
begin
l_job_message := new json_object_t;
l_job_message.put('sender', 'test');
l_job_message.put('text', 'test');
message := apexsrv.job_message_t(l_job_message.to_string());
message_properties.correlation := to_char(1234);
dbms_aq.enqueue(
queue_name => 'apexsrv.job_out_q'
,enqueue_options => enqueue_options
,message_properties => message_properties
,payload => message
,msgid => message_handle
);
end;

デキューはデータベースAPEXDEVで実行します。

declare
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message apexdev.job_message_t;
l_job_message json_object_t;
begin
dequeue_options.consumer_name := 'APEXDEV';
dequeue_options.wait := 1;
dbms_aq.dequeue(
queue_name => 'apexdev.job_out_q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
l_job_message := json_object_t.parse(message.body);
dbms_output.put_line('correlation = ' || message_properties.correlation);
dbms_output.put_line('sender = ' || l_job_message.get_string('sender'));
dbms_output.put_line('text = ' || l_job_message.get_string('text'));
dbms_output.put_line('response = ' || l_job_message.get_string('response'));
end;

キューJOB_IN_Qと同様の動作を確認できれば、伝搬の設定は完了です。


サーバー側のバックグランド・ジョブの作成



サーバーであるデータベースAPEXSRVにて、キューJOB_IN_Qよりメッセージをデキューして処理を行い、結果をキューJOB_OUT_Qにエンキューするプロシージャを作成します。

プロシージャ名はEXEC_JOBです。受け取ったJSON形式のメッセージより、属性sendertextを取り出し、それを連結して応答となるJSONメッセージに属性responseとして書き込みます。そのメッセージをキューJOB_OUT_Qにエンキューします。

create or replace procedure exec_job
as
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_handle RAW(16);
message apexsrv.job_message_t;
l_job_message json_object_t;
l_id number;
--
enqueue_options dbms_aq.enqueue_options_t;
recipients DBMS_AQ.aq$_recipient_list_t;
l_response varchar2(200);
dequeue_timeout exception;
pragma exception_init(dequeue_timeout, -25228);
begin
dequeue_options.consumer_name := 'APEXSRV';
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.wait := 60;
begin
DBMS_AQ.DEQUEUE(
queue_name => 'apexsrv.job_in_q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
exception
when dequeue_timeout then
return;
end;
-- 取り出したメッセージを使って処理を行う。
l_job_message := json_object_t.parse(message.body);
l_id := to_number(message_properties.correlation);
l_response := 'remote-'
|| l_job_message.get_string('sender') || '-'
|| l_job_message.get_string('text');
/*
* 処理結果をJOB_OUT_Qに投入する。
*/
message_properties.correlation := to_char(l_id);
enqueue_options.sequence_deviation := dbms_aq.top;
l_job_message.put('response',l_response);
message := apexsrv.job_message_t(l_job_message.to_string());
dbms_aq.enqueue(
queue_name => 'apexsrv.job_out_q'
,enqueue_options => enqueue_options
,message_properties => message_properties
,payload => message
,msgid => message_handle
);
commit;
end;

APEXのSQLコマンドで実行します。


動作を確認します。

データベースAPEXDEVにて、キューJOB_IN_Qにメッセージをエンキューします。スクリプトは以前に実行しているので、履歴から参照できます。


データベースAPEXSRVにてプロシージャEXEC_JOBを実行します。

begin
exec_job;
end;


データベースAPEXDEVにて、キューJOB_OUT_Qよりメッセージをデキューします。デキューするスクリプトも以前に実行済みなので、履歴より見つけて実行できます。


データベースAPEXSRVではプロシージャEXEC_JOBが実行され、処理結果がJOB_OUT_Qにエンキューされています。キューJOB_OUT_QのメッセージはデータベースAPEXDEVのJOB_OUT_Qに伝搬されるため、データベースAPEXDEVでEXEC_JOBがエンキューしたメッセージを、データベースAPEXDEVにてデキューすることができます。

メッセージの属性responseの結果が確認できれば、プロシージャEXEC_JOBの動作確認は完了です。

プロシージャEXEC_JOBをDBMS_SCHEDULERのジョブとして定期的に動作させます。以下のスクリプトを実行します。

begin
dbms_scheduler.create_program(
program_name => 'MEX_SAMPLE_PROC'
, program_type => 'STORED_PROCEDURE'
, program_action => 'APEXSRV.EXEC_JOB'
, number_of_arguments => 0
, enabled => true
, comments => 'Run exec_job.'
);
dbms_scheduler.create_job(
job_name => 'MEX_WORKER_JOB01'
, program_name => 'MEX_SAMPLE_PROC'
, job_style => 'LIGHTWEIGHT'
, start_date => systimestamp
, repeat_interval => 'FREQ=SECONDLY;INTERVAL=10'
, enabled => true
);
end;



スケジュールされたジョブは、以下のSQLによって状況を確認できます。

select * from user_scheduler_jobs;

データベースAPEXSRV上で受け付けたメッセージは、スケジューラーのジョブによって自動的に処理されるようになりました。現在の設定はメッセージの到着まで60秒待機して、メッセージが到着が無ければ一旦プロシージャEXEC_JOBから抜けています。メッセージを受け付けて処理した後にもEXEC_JOBは終了します。その後10秒待機して、プロシージャEXEC_JOBが再実行されます。メッセージを待機する秒数と、プロシージャEXEC_JOBの再実行までの秒数は、メッセージの処理にかかる時間に応じて調整する必要が出てくるでしょう。


APEXアプリケーションの変更



以前の記事に従ってAPEXアプリケーションを作るのも大変なので、APEXアプリケーションは以下をインポートして作成します。

ページ番号のプロセスジョブの投入ソースPL/SQLコードを変更します。


元のコードはAQの通知またはスケジューラーのジョブによって表MEX_MESSAGESの列RESPONSEが更新されていました。今回はリモートのデータベースで処理が行われ、そのデータベース(APEXSRV)には表MEX_MESSAGESは無いので列RESPONSEは更新できません。

そのためキューJOB_OUT_Qに投入されたメッセージに含まれているJSONドキュメントの属性responseを取り出し、表MEX_MESSAGESの列RESPONSEを更新します。

変更したコード全体は以下になります。変更点は最後の方に以下の一行を追加しただけです。

update mex_messages set response = l_response where id = l_id;


declare
l_id number;
/*
* キューJOB_IN_Qに、ジョブの実行リクエストを送信する。
*/
procedure request_job
is
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
-- メッセージ本文はJSONで記述する。
message apexdev.job_message_t;
l_job_request json_object_t;
begin
-- メッセージは送信時にコミットする。
-- そうしないとページ・プロセスがすべて終了するまでエンキューされない。
enqueue_options.visibility := dbms_aq.immediate;
-- 通常はFIFO、dbms_aq.topを指定するとLIFO。
enqueue_options.sequence_deviation := dbms_aq.top;
-- バックグラウンド・ジョブのパラメータとなる値をJSONで記述する。
-- 21cであれば、JSON型を直接扱うことができる。
l_job_request := new json_object_t;
l_job_request.put('sender', :P3_SENDER);
l_job_request.put('text', :P3_TEXT);
message := apexdev.job_message_t(
l_job_request.to_string()
);
-- キューに投入するメッセージを、表の主キーIDに関連づける。
l_id := :P3_ID;
message_properties.correlation := l_id;
-- ジョブの実行要求を送信する。
dbms_aq.enqueue(
queue_name => 'apexdev.job_in_q'
,enqueue_options => enqueue_options
,message_properties => message_properties
,payload => message
,msgid => message_handle
);
end;
/*
* キューJOB_OUT_Qから、ジョブの完了リクエストを受信する。
*/
procedure confirm_job_completion
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message apexdev.job_message_t;
l_job_response json_object_t;
l_response varchar2(200);
-- デキューのタイムアウトの例外を定義する。
dequeue_timeout exception;
pragma exception_init(dequeue_timeout, -25228);
begin
-- ジョブの完了を最長5秒待つ。
dequeue_options.navigation := dbms_aq.first_message;
dequeue_options.consumer_name := 'APEXDEV';
dequeue_options.correlation := l_id;
dequeue_options.wait := 5;
l_response := '';
begin
:AI_SUCCESS_MESSAGE := 'JOB_SUCCESS_MESSAGE';
dbms_aq.dequeue(
queue_name => 'apexdev.job_out_q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
l_job_response := json_object_t.parse(message.body);
l_response := l_job_response.get_string('response');
exception
when dequeue_timeout then
:AI_SUCCESS_MESSAGE := 'JOB_TIMEOUT_MESSAGE';
end;
:P3_RESPONSE := l_response;
/*
* バックグランド処理で表MEX_MESSAGESはアップデートされないので、
* メッセージを受け取ったプロセスが列RESPONSEをアップデートする。
*/
update mex_messages set response = l_response where id = l_id;
end;
begin
request_job;
confirm_job_completion;
end;

列RESPONSEの更新処理は先行するプロセスプロセス・フォームMex Messagesと同じトランザクションで行われるため、中間のコミット操作が不要になりました。そのため、削除します。


APEXアプリケーションの変更は以上で完了です。


APEXDEVでのメッセージの取りこぼしの対応



APEXアプリケーションがメッセージの処理結果を待つのは5秒までになっています。それを超えると結果を待たずにユーザーに応答を返します。サーバーAPEXSRVで処理が終了すると、メッセージはJOB_OUT_Qに投入されますが、それはキューに残ります。

キューJOB_OUT_Qに残っているメッセージを一括で処理するプロシージャとしてUPDATE_RESPONSEを作成します。JOB_OUT_Qに残っているメッセージを取り出し、表MEX_MESSAGESの列RESPONSEを更新します。JOB_OUT_Qのメッセージが無くなると、プロシージャを終了します。

create or replace procedure update_response
as
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message apexdev.job_message_t;
l_job_message json_object_t;
l_response varchar2(200);
l_id number;
dequeue_timeout exception;
pragma exception_init(dequeue_timeout, -25228);
begin
dequeue_options.consumer_name := 'APEXDEV';
dequeue_options.wait := dbms_aq.NO_WAIT;
while true
loop
begin
dbms_aq.dequeue(
queue_name => 'apexdev.job_out_q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
exception
when dequeue_timeout then
commit;
return;
end;
l_job_message := json_object_t.parse(message.body);
l_id := to_number(message_properties.correlation);
l_response := l_job_message.get_string('response');
update mex_messages set response = l_response where id = l_id;
end loop;
end;

SQLワークショップSQLコマンドで実行します。


プロシージャUPDATE_RESPONSEを、定期的にジョブとして実行するようにします。

begin
dbms_scheduler.create_program(
program_name => 'MEX_UPDATE_RESPONSE'
, program_type => 'STORED_PROCEDURE'
, program_action => 'APEXDEV.UPDATE_RESPONSE'
, number_of_arguments => 0
, enabled => true
, comments => 'Process JOB_OUT_Q'
);
dbms_scheduler.create_job(
job_name => 'MEX_BATCH_PROCESS_OUTQ'
, program_name => 'MEX_UPDATE_RESPONSE'
, job_style => 'LIGHTWEIGHT'
, start_date => systimestamp
, repeat_interval => 'FREQ=SECONDLY;INTERVAL=60'
, enabled => true
);
end;
/

SQLコマンドより実行します。


以上で、対話的なAPEXアプリケーションが取りこぼした応答メッセージを、定期的にバッチで処理するようになりました。

以上で本記事は終了です。

Oracle  APEXのアプリケーション作成の参考になれば幸いです。