2022年2月16日水曜日

AQを使ってバックグランドでジョブを実行する

 Oracle APEXで作成したアプリケーションは、対話的に操作されます。そのため、処理が長時間に及ぶ場合は、DBMS_SCHEDULERを使ったジョブとしてバックグランド実行するといった対策が必要な場合があります(こちらの記事で、DBMS_SCHEDULERのジョブを使っています)。

処理をバックグラウンドで実行するために、Oracle Database Advanced Queueingを使ってみます。作成するアプリケーションが行う操作はとても単純で、送信した文字列を連結するだけです。開始したジョブの完了を5秒間だけ待ち、それまでにジョブが完了しない場合はユーザーに応答を返します。ただし、要求したジョブはバックグラウンドで継続して処理されます。ジョブが完了していれば、画面をリフレッシュすると結果が表示されます。


アプリケーションの開発は、Always FreeのAutonomous Transaction Processingのインスタンスを使って行います。


表とアプリケーションの作成


受信した文字列と処理結果となる連結した文字列を保存する表MEX_MESSAGESを作成します。バックグラウンドで実行する処理は、列senderと列textの情報を受け取り、連結した結果を列responseに更新します。

SQLワークショップユーティリティよりクイックSQLを開きます。クイックSQLのモデルとして以下を記述します。

# prefix: mex
# semantics: default
messages
    sender vc80
    text vc80
    response vc80

SQLの生成SQLスクリプトを保存と続けて、レビューおよび実行を行います。

実行をクリックします(この時点ではまだ表が作成されていないため、アプリケーションの作成はできません)。

確認画面が表示されるので、即時実行をクリックします。


表MEX_MESSAGESが作成されます。ここで、アプリケーションの作成をクリックします。


開いたダイアログには、表MEX_MESSAGESのレポートとフォームが作成されると記載されています。今回の作業では、これらのデフォルトで作成されるレポートとフォームのページに変更を加えます。

アプリケーションの作成をクリックします。


アプリケーション作成ウィザードが起動します。名前AQを使ったジョブ実行とし、それ以外はデフォルトから変更せず、アプリケーションの作成をクリックします。


アプリケーションが作成されます。



キューの作成と開始



Oracle Database Advanced Queueingのキューの作成し、キューイングを開始します。

最初にワークスペース・スキーマに、Advanced Queueingの操作を行うパッケージの実行権限を与えます。データベース・アクションにAutonomous Transaction Processingの管理者ユーザーであるADMINで接続し、SQLの画面から以下のコマンドを実行します。

grant execute on dbms_aqadm to ワークスペース・スキーマ名;
grant execute on dbms_aq to ワークスペース・スキーマ名;

今回の作業では、ワークスペース・スキーマとしてAPEXDEVを作成して使用しています。そのため、以下のSQLを実行しています。

grant execute on dbms_aqadm to apexdev;
grant execute on dbms_aq to apexdev;


APEXの開発画面に戻り、SQLワークショップSQLスクリプトを呼び出して、以下のスクリプトを実行します。

/*
* タイプの作成
* 単純にJSONをキューに投入する。
* 21cからJSON型が使用できる。
*/
create or replace type job_message_t as object(
body varchar2(4000)
);
/
/*
* キュー表 JOB_Q_TAB、キューJOB_IN_Q、JOB_OUT_Qの作成と開始。
*/
begin
-- 通知を使わない場合は、multiple_consumersはFALSEとする。
dbms_aqadm.create_queue_table(
queue_table => 'job_q_tab'
,queue_payload_type => 'job_message_t'
,multiple_consumers => TRUE
);
dbms_aqadm.create_queue(
queue_name => 'job_in_q'
,queue_table => 'job_q_tab'
);
dbms_aqadm.create_queue(
queue_name => 'job_out_q'
,queue_table => 'job_q_tab'
);
dbms_aqadm.start_queue(
queue_name => 'job_in_q'
);
dbms_aqadm.start_queue(
queue_name => 'job_out_q'
);
end;
/
/*
* サブスクライバの登録
* APEXDEVとなっているところは、ワークスペース・スキーマに置き換える。
* 通知を使わない場合はシングル・コンシューマ・キューなので実行は不要です。
*/
declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent('APEXDEV', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'apexdev.job_in_q',
subscriber => subscriber);
subscriber := sys.aq$_agent('APEXDEV', NULL, NULL);
DBMS_AQADM.ADD_SUBSCRIBER(
queue_name => 'apexdev.job_out_q',
subscriber => subscriber);
END;
/

最初にキューに保存するメッセージのタイプとしてjob_message_tを定義します。メッセージの書式としてJSONを想定しているため、文字型の属性bodyのみを定義に含んでいます。Oracle Database 21cではネィティブのJSON型が追加されたため、VARCHAR2の代わりにJSONを型指定に使えます。

DBMS_ADADMパッケージに含まれるプロシージャCREATE_QUEUE_TABLEを呼び出し、キュー表JOB_Q_TABを作成します。AQの通知を使うため、キュー表の作成時にmultiple_consumersTRUEとします。キュー表を作成した後CREATE_QUEUEを呼び出し、作成したキュー表JOB_Q_TABにキューJOB_IN_QおよびJOB_OUT_Qを作成します。

キューを使った処理は以下の順番で行います。
  1. APEXアプリケーションはジョブの実行リクエストとして、メッセージをJOB_IN_Qにエンキューします。
  2. JOB_IN_Qへメッセージがエンキューされると、登録しているPL/SQLプロシージャに通知されます。
  3. 通知として呼び出されたPL/SQLプロシージャ内で、JOB_IN_Qからメッセージをデキューし処理します。
  4. 処理が完了したら、JOB_OUT_Qにメッセージをエンキューします。
  5. APEXアプリケーションは、JOB_OUT_Qへのメッセージを待機しています。
  6. 待機中(今回は5秒)にメッセージがエンキューされれば、それをデキューしてAPEXアプリケーションのフォアグラウンドの処理を継続します。
  7. 待機中にメッセージが受け取れなければ、待機を打ち切ってフォアグラウンドの処理を継続します。
  8. バックグラウンドでは処理が継続しています。画面をリフレッシュして処理結果を確認するよう案内する、処理結果が必要ない作業を行うことを薦める、または、休憩を取ることを薦める、といった案内を画面に表示すると良いかもしれません。
キューを作成した後、START_QUEUEを呼び出し、メッセージのエンキュー/デキューを実行できるようにします。

プロシージャADD_SUBSCRIBERを呼び出して、作成したキューにサブスクライバを登録します。キューJOB_IN_Q、JOB_OUT_Qともにワークスペース・スキーマ名をサブスクライバにしています。キュー自体はマルチ・コンシューマーとして作成していますが、これはAQの通知の機能を使うためで、サブスクライバは1つ登録されていれば十分です。今回の例ではワークスペース・スキーマ名をサブスクライバの名前にしていますが、必ずしもデータベース・スキーマである必要はありません。

SQLワークショップSQLコマンドより、それぞれのコマンドを実行するか、SQLスクリプトとして作成し1度で実行します。


スクリプトの実行結果を確認します。

キュー表の確認には、ビューUSER_QUEUE_TABLESを使います。

select * from user_queue_tables;


キューの確認には、ビューUSER_QUEUESを使います。

select * from user_queues;


サブスクライバの確認には、ビューUSER_QUEUE_SUBSCRIBERSを使います。

select * from user_queue_subscribers;


以上で、Oracle Database Advanced Queueingのキューの作成と開始が完了しました。


AQの通知の設定



キューJOB_IN_Qにメッセージがエンキューされたときに通知を行う設定と、その通知を受け取るPL/SQLプロシージャの作成を行います。

最初に通知を受け取るPL/SQLプロシージャEXEC_JOBを作成します。プロシージャの記述は以下になります。

create or replace procedure exec_job(
context raw,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload raw, payloadl number)
as
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_handle RAW(16);
message apexdev.job_message_t;
l_job_request json_object_t;
l_job_response json_object_t;
l_id number;
--
enqueue_options dbms_aq.enqueue_options_t;
recipients DBMS_AQ.aq$_recipient_list_t;
l_sender varchar2(80);
l_text varchar2(80);
l_response varchar2(200);
begin
if descr is not null then
-- 引数descrに値があれば、通知として呼び出されている。
dequeue_options.msgid := descr.msg_id;
dequeue_options.consumer_name := descr.consumer_name;
end if;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
/*
* 通知として呼び出される場合は、待機は発生しないはず。
* 通知ではなくスケジューラー・ジョブから呼び出されるときは、
* wait + ジョブの実行間隔がジョブの繰り返し間隔になる。
*/
dequeue_options.wait := 60;
DBMS_AQ.DEQUEUE(
queue_name => 'apexdev.job_in_q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
-- 取り出したメッセージを使って処理を行う。
l_job_request := json_object_t.parse(message.body);
l_id := message_properties.correlation;
l_sender := l_job_request.get_string('sender');
l_response := l_sender || '-' || l_job_request.get_string('text');
message_properties.correlation := l_id;
update mex_messages set response = l_response where id = l_id;
-- シミュレートとしてタイムアウトを発生させるため、1-10秒の間で処理を待機させる。
dbms_session.sleep(mod(l_id,10));
-- 処理終了のメッセージをキューに投入する。
enqueue_options.sequence_deviation := dbms_aq.top;
-- メッセージは設定するが、通知としての使用のみで内容は参照しない。
l_job_response := new json_object_t;
l_job_response.put('response',l_response);
message := apexdev.job_message_t(l_job_response.to_string());
dbms_aq.enqueue(
queue_name => 'apexdev.job_out_q'
,enqueue_options => enqueue_options
,message_properties => message_properties
,payload => message
,msgid => message_handle
);
commit;
end;
view raw exec_job.sql hosted with ❤ by GitHub

先ほどのキューの作成と同様に、SQLワークショップSQLコマンドに貼り付けて実行するか、SQLスクリプトとして実行します。


通知の対象となったメッセージのIDや宛先が引数descrに渡されています。descr.msg_idおよびdescr.consumer_nameを指定して、通知されているメッセージを特定しデキューします。

APEXアプリケーション側ではメッセージをエンキューする際に、correlationに表MEX_MESSAGESの主キーであるIDの値を設定しています。通知として呼び出されているプロシージャでは、メッセージの属性senderと属性textの情報を取り出し連結します。連結した結果を表MEX_MESSGAESの列responseに、correlationの値を主キーとし対象行を特定して更新します。

これだけだと一瞬で処理が完了します。ジョブの完了を待機しているAPEXアプリケーションでタイムアウトを発生させるため、0から9秒の間で処理をsleep(dbms_session.sleepの呼び出し)させています。APEXアプリケーション側では5秒間待機するので、結果として半分の処理はタイムアウトします。

処理が完了したら、キューJOB_OUT_Qにメッセージをエンキューします。このときエンキューするメッセージのcorrelationとしてdescr.msg_idつまり表MEX_MESSAGESの主キーIDを渡しています。APEXアプリケーション側では、correlationが同一のメッセージをリクエストしたジョブの完了通知として認識します。

作成したPL/SQLプロシージャexec_jobを、キューJOB_IN_Qにメッセージがエンキューされたときに呼び出す設定をします。

データベース・アクションのSQLの画面より、以下のスクリプトを実行します。DBMS_AQ.REGISTERを呼び出します。

/*
* AQの通知コールバックとなるPL/SQLプロシージャを登録する。
*
* 管理者アカウント(ADBの場合ADMIN)で実行する。
* 確認は
* select * from DBA_SUBSCR_REGISTRATIONS;
*/
declare
reginfo1 sys.aq$_reg_info;
reginfolist sys.aq$_reg_info_list;
begin
-- AQの通知として呼び出されるプロシージャexec_jobの定義。
reginfo1 := sys.aq$_reg_info('APEXDEV.JOB_IN_Q:APEXDEV',
DBMS_AQ.NAMESPACE_AQ, 'plsql://apexdev.exec_job',
HEXTORAW('FF'));
-- 通知として登録する。
reginfolist := sys.aq$_reg_info_list(reginfo1);
sys.dbms_aq.register(reginfolist, 1);
-- 登録を解除する場合はregisterをunregisterに置き換える。
-- sys.dbms_aq.unregister(reginfolist, 1);
end;
/

sys.aq$_reg_infoとして、APEXDEV.JOB_IN_Q:APEXDEVおよびplsql://apexdev.exec_jobを指定しています。これはスキーマAPEXDEVに作成されているキューJOB_IN_Qに宛先APEXDEVのメッセージがエンキューされたら(APEXDEV.JOB_IN_Q:APEXDEV)、スキーマAPEXDEVに作成されているプロシージャexec_jobを呼び出す(plsql://apexdev.exec_job)、という定義になります。


登録された通知は、ビューDBA_SUBSCR_REGISTRATIONSより確認できます。

select * from dba_subscr_registrations;


以上でAQの通知の設定は完了しました。


ジョブの投入



APEXアプリケーションにて、表MEX_MESSAGESの更新時にメッセージをキューJOB_IN_Qに投入するプロセスを作成します。

ページ・デザイナにて表MEX_MESSAGESのデータを操作するフォームのページ(ページ番号)を開きます。左ペインでプロセス・ビューを開きます。


プロセス上でコンテキスト・メニューを開き、プロセスの作成を実行します。


作成されたプロセスを、プロセス・フォームMex Messageダイアログを閉じるの間の配置します。

識別名前ジョブの投入とします。タイプとしてコードの実行を選択します。ソースPL/SQLコードとして、以下を記述します。

declare
l_id number;
/*
* キューJOB_IN_Qに、ジョブの実行リクエストを送信する。
*/
procedure request_job
is
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
-- メッセージ本文はJSONで記述する。
message apexdev.job_message_t;
l_job_request json_object_t;
begin
-- メッセージは送信時にコミットする。
-- そうしないとページ・プロセスがすべて終了するまでエンキューされない。
enqueue_options.visibility := dbms_aq.immediate;
-- 通常はFIFO、dbms_aq.topを指定するとLIFO。
enqueue_options.sequence_deviation := dbms_aq.top;
-- バックグラウンド・ジョブのパラメータとなる値をJSONで記述する。
-- 21cであれば、JSON型を直接扱うことができる。
l_job_request := new json_object_t;
l_job_request.put('sender', :P3_SENDER);
l_job_request.put('text', :P3_TEXT);
message := apexdev.job_message_t(
l_job_request.to_string()
);
-- キューに投入するメッセージを、表の主キーIDに関連づける。
l_id := :P3_ID;
message_properties.correlation := l_id;
-- ジョブの実行要求を送信する。
dbms_aq.enqueue(
queue_name => 'apexdev.job_in_q'
,enqueue_options => enqueue_options
,message_properties => message_properties
,payload => message
,msgid => message_handle
);
end;
/*
* キューJOB_OUT_Qから、ジョブの完了リクエストを受信する。
*/
procedure confirm_job_completion
is
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message apexdev.job_message_t;
l_job_response json_object_t;
l_response varchar2(200);
-- デキューのタイムアウトの例外を定義する。
dequeue_timeout exception;
pragma exception_init(dequeue_timeout, -25228);
begin
-- ジョブの完了を最長5秒待つ。
dequeue_options.navigation := dbms_aq.first_message;
/*
* 通知を使わずキューがシングル・コンシューマの場合は
* consumer_nameは指定しない。
*/
dequeue_options.consumer_name := 'APEXDEV';
dequeue_options.correlation := l_id;
/*
* ジョブに時間がかかることが分かっている場合は、画面からは
* ジョブを投入するだけで、完了を待機する必要はない。
* つまりJOB_OUT_Qを使った処理自体が不要になる。
*
* dbms_aq.NO_WAITを指定すると、エンキューはされていない
* ため、dequeue_timeoutが発生する。
*/
dequeue_options.wait := 5; -- 5秒だけ待つ。
l_response := '';
begin
:AI_SUCCESS_MESSAGE := 'JOB_SUCCESS_MESSAGE';
dbms_aq.dequeue(
queue_name => 'apexdev.job_out_q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
l_job_response := json_object_t.parse(message.body);
l_response := l_job_response.get_string('response');
exception
when dequeue_timeout then
:AI_SUCCESS_MESSAGE := 'JOB_TIMEOUT_MESSAGE';
end;
:P3_RESPONSE := l_response;
end;
begin
request_job;
confirm_job_completion;
end;

コードが実行されるのは、作成変更の適用のボタンをクリックしたときです。サーバー側の条件として、タイプリクエストは値に含まれるを選択し、としてCREATE,SAVEを指定します。


記述しているコードでは、ページ・アイテムP3_SENDERP3_TEXTをJSONオブジェクトの属性としてメッセージを作成し、キューJOB_IN_QにプロシージャDBMS_AQ.ENQUEUEを呼び出してエンキューしています。その際にメッセージのcorrelationとして、P3_IDつまり主キーの値を設定しています。エンキューした時点でバックグラウンドでの処理が開始されるように、enqueue_options.visibilityにはimmediateを設定しています。この設定によりエンキュー処理は自律トランザクションとしてコミットされるので、ページ・プロセス全体の完了を待たずに、登録済みのPL/SQLプロシージャに通知されます。

メッセージをエンキューした後、ジョブの完了を確認するためdequeue_options.correlationに主キーの値を指定して、プロシージャDBMS_AQ.DEQUEUEを呼び出します。5秒だけ待機し、その間にジョブの完了となるメッセージを受け取れば、ジョブが完了したとユーザーに通知し、そうでない場合は処理がタイムアウトしたと通知します。

メッセージの内容をから属性reponseを取り出し、ページ・アイテムP3_RESPONSEに設定していますが、今回の処理ではこれらの情報は使用していません。不要なコードですが、デキューしたメッセージのペイロードを操作するコーディングを行う場合も考えて、サンプルとして残しています。

もう一つ同様にプロセスを作成します。作成したプロセスをプロセス・フォームMex Messageジョブの投入の間に配置します。

識別名前commitタイプとしてコードを実行を選択します。ソースPL/SQLコードとして一行

commit;

と記述します。サーバー側の条件は、タイプリクエストは値に含まれるCREATE,SAVEとします。


プロセス・フォームMex Messageの処理がコミットされるのは、登録されているページ・プロセス全体が完了したとき(ダイアログを閉じるの終了時)です。そのままだと、AQの通知を受け取ったPL/SQLプロシージャからは、表MEX_MESSAGESへの行の挿入や更新は未コミットであるため参照できません。そのため、プロセス・フォームMex Messageジョブの投入の間で一旦ページ処理のトランザクションをコミットします。

以上でジョブの投入を行う処理が作成されました。


表示メッセージの調整



ダイアログが閉じられた時に、ジョブの完了を受け取っている場合は画面上にメッセージ「ジョブは完了しました。」、タイムアウトが発生した場合は「ジョブはタイムアウトしました。」と表示します。

表示するメッセージはプロセスジョブの投入内で決定します。決定したメッセージを保持するアプリケーション・アイテムAI_SUCCESS_MESSAGEを作成します。

共有コンポーネントアプリケーション・アイテムを開きます。


作成をクリックします。


名前AI_SUCCESS_MESSAGEとし、アプリケーション・アイテムの作成をクリックします。ブラウザから値を設定することはないため、セキュリティセッション・ステート保護は一番厳しい制限付き - ブラウザから設定不可にしておきます。


アプリケーション・アイテムAI_SUCCESS_MESSAGEが作成されました。


プロセスジョブの投入で、アプリケーション・アイテムAI_SUCCESS_MESSAGEに設定している文字列は表示するメッセージそのものではなく、キー値となるJOB_SUCCESS_MESSAGEまたはJOB_TIMEOUT_MESSAGEです。

これらのキー値に対応する、実際に表示されるメッセージを作成します。

共有コンポーネントテキスト・メッセージを開きます。


テキスト・メッセージの作成をクリックします。


名前JOB_SUCCESS_MESSAGEとします。言語日本語(ja)JavaScriptで使用ONにします。テキストとしてジョブは成功しました。を記述します。

もうひとつテキスト・メッセージを作成するので、作成後さらに作成をクリックします。


続いて、名前JOB_TIMEOUT_MESSAGEテキストジョブはタイムアウトしました。として、テキスト・メッセージの作成をクリックします。言語日本語(ja)JavaScriptでの使用ONです。


テキスト・メッセージJOB_SUCCESS_MESSAGEJOB_TIMEOUT_MESSAGEが作成されました。


ダイアログがクローズしたときに、呼び出し元のレポートのページにプロセスジョブの投入で選択したメッセージを返すようにします。選択結果はアプリケーション・アイテムAI_SUCCESS_MESSAGEに設定されています。

ページ・デザイナにてページ番号を開きます。左ペインにてプロセス・ビューを開き、プロセスダイアログを閉じるを選択します。

設定戻すアイテムとして、アプリケーション・アイテムAI_SUCCESS_MESSAGEを設定します。


変更を保存します。

ページ・デザイナにて対話モード・レポートのページ(ページ番号)を開き、ダイアログが閉じた時に戻された値を元に、メッセージを表示させます。

ページ・アイテムP2_MESSAGEを作成し、ダイアログが閉じられたときに戻される値を保持します。

リージョンMex Messages上でコンテキスト・メニューを開き、ページ・アイテムの作成を実行します。

作成したページ・アイテムの識別名前P2_MESSAGEとします。タイプには非表示を選択します。


左ペインで動的アクション・ビューを開き、ダイアログのクローズのタイミングで実行される動的アクションレポートの編集 - ダイアログのクローズのTRUEアクションを作成します。

Trueの上でコンテキスト・メニューを開き、TRUEアクションの作成を実行します。


作成されたアクションはリフレッシュの下に配置します。

識別アクションとして値の設定を選択します。設定タイプの設定Dialog Return Itemを選択し、戻りアイテムとしてAI_SUCCESS_MESSAGEを選択します。影響を受ける要素選択タイプアイテムアイテムP2_MESSAGEを選択します。

アプリケーション・アイテムAI_SUCCESS_MESSAGEの値はセッション・ステートとしてデータベースに保存されています。ダイアログがクローズする際にデータベースから値が読み出され、影響を受ける要素P2_MESSAGEに設定されます。このP2_MESSAGEはブラウザ上で保持されている値になります。続くアクションとしてJavaScriptのコードを記述しますが、そのコード内からはデータベースにアクセスすることなくP2_MESSAGEの値(つまりAI_SUCCESS_MESSAGEの値)を参照できます。


同様にして再度、TRUEアクションを作成します。作成したアクションはアクション値の設定の下に配置します。

識別アクションJavaScriptコードの実行とします。設定コードには以下を記述します。

let msgkey = $v("P2_MESSAGE");
apex.message.clearErrors();
if ( msgkey ) {
let message = apex.lang.getMessage(msgkey);
if ( msgkey == "JOB_SUCCESS_MESSAGE" ) {
  apex.message.showPageSuccess( message );
}
if ( msgkey == "JOB_TIMEOUT_MESSAGE") {
apex.message.showErrors([
{
type: "error",
location: [ "page", "inline" ],
message: message,
unsafe: false
}])
}
}

ページ・アイテムP2_MESSAGEよりメッセージのキー値を読み取ります。apex.lang.getMessageを呼び出してキーに対応したテキスト・メッセージを取り出し、完了であればapex.message.showPageSuccess、そうでなければapex.message.showErrorsを呼び出して、インラインでメッセージを表示します。


以上でAPEXアプリケーションが完成しました。

アプリケーションを実行すると、先頭のGIF動画の動作を確認することができます。

今回作成したAPEXアプリケーションのエクスポートを以下に置きました。
https://github.com/ujnak/apexapps/blob/master/exports/jobqueuesample.sql

表MEX_MESSAGESのDDLはサポートするオブジェクトとして含んでいます。キュー表やキューの定義はDBMS_AQADMの権限が与えられていないとエラーになることもあり、それらはサポートするオブジェクトには含まれていません。

今回の実装では、APEXアプリケーションが待機している間にジョブが完了しないと、ジョブの完了を通知するメッセージがキューJOB_OUT_Qに、デキューされることなく残ります。

メッセージが残っていることで問題は発生しませんが、デキューされることがないメッセージをパージするスクリプトを書きました。DBMS_AQADM.PURGE_QUEUE_TABLEを呼び出しています。

declare
l_queue_table varchar2(80);
l_opt dbms_aqadm.aq$_purge_options_t;
l_cond varchar2(4000);
l_ids varchar2(4000);
begin
l_queue_table := 'apexdev.job_q_tab';
l_opt.block := false;
l_cond := q'~QUEUE = 'JOB_OUT_Q' and CORR_ID in (%PURGE_IDS%)~';
/*
* レスポンスは更新済みで、JOB_OUT_Qに残っているメッセージはタイムアウトしたもの。
* すべて削除できる。
*/
select listagg(chr(39) || corr_id || chr(39), ',') into l_ids
from aq$job_q_tab
where queue = 'JOB_OUT_Q'
and to_number(corr_id) in
(
select id
from mex_messages where response is not null
);
if l_ids is not null then
l_cond := replace(l_cond,'%PURGE_IDS%',l_ids);
-- dbms_output.put_line(l_cond);
DBMS_AQADM.PURGE_QUEUE_TABLE(
queue_table => l_queue_table
,purge_condition => l_cond
,purge_options => l_opt
);
end if;
end;

また、作成したキューやキュー表を削除するスクリプトは以下になります。

/*
* サブスクライバの削除
* キューを削除すると無くなるので必ずしも実行は必要ない。
*/
declare
subscriber sys.aq$_agent;
begin
subscriber := sys.aq$_agent('APEXDEV', NULL, NULL);
DBMS_AQADM.remove_SUBSCRIBER(
queue_name => 'apexdev.job_in_q',
subscriber => subscriber);
subscriber := sys.aq$_agent('APEXDEV', NULL, NULL);
DBMS_AQADM.remove_SUBSCRIBER(
queue_name => 'apexdev.job_out_q',
subscriber => subscriber);
end;
/
/*
* キューやキュー表の削除
*/
begin
dbms_aqadm.stop_queue(
queue_name => 'job_in_q'
);
dbms_aqadm.stop_queue(
queue_name => 'job_out_q'
);
dbms_aqadm.drop_queue(
queue_name => 'job_in_q'
);
dbms_aqadm.drop_queue(
queue_name => 'job_out_q'
);
dbms_aqadm.drop_queue_table(
queue_table => 'job_q_tab'
);
end;
/

今回はバックグランドの処理をAQの通知を使って実現しています。AQの通知は便利ですが、処理の並行度を制御することができないようです。ジョブの実行は同時4つまで、といった制限を掛けたい場合は、並行度に応じた回数だけDBMS_SCHEDULER.CREATE_JOBを呼び出し、その中でメッセージのデキューから始まる処理をするといった実装がより適切でしょう。

以上になります。

Oracle APEXのアプリケーション作成の参考になれば幸いです。