Parallel Processing in ABAP ermöglicht die gleichzeitige Ausführung von Aufgaben über mehrere Workprozesse. Mit asynchronen RFC-Aufrufen (aRFC) und STARTING NEW TASK können datenintensive Operationen signifikant beschleunigt werden.
Grundkonzept
| Methode | Beschreibung |
|---|---|
STARTING NEW TASK | Startet asynchronen RFC |
CALLING ... ON END OF TASK | Callback bei Fertigstellung |
WAIT UNTIL | Wartet auf Ergebnisse |
RECEIVE RESULTS | Empfängt Rückgabewerte |
Syntax
" Asynchroner AufrufCALL FUNCTION 'FUNC_NAME' STARTING NEW TASK task_name DESTINATION destination CALLING callback_method ON END OF TASK EXPORTING param = value TABLES itab = lt_data.
" Warten auf ErgebnisseWAIT UNTIL lv_counter >= lv_expected.
" Ergebnisse abholen (im Callback)RECEIVE RESULTS FROM FUNCTION 'FUNC_NAME' IMPORTING result = lv_result TABLES itab = lt_result.Beispiele
1. Einfacher asynchroner Aufruf
REPORT zparallel_simple.
DATA: gv_result TYPE string, gv_done TYPE abap_bool.
START-OF-SELECTION. " Asynchronen Task starten CALL FUNCTION 'Z_LONG_RUNNING_TASK' STARTING NEW TASK 'TASK1' DESTINATION 'NONE' CALLING on_task_complete ON END OF TASK EXPORTING iv_input = 'Test'.
" Auf Ergebnis warten WAIT UNTIL gv_done = abap_true UP TO 60 SECONDS.
IF gv_done = abap_true. WRITE: / 'Ergebnis:', gv_result. ELSE. WRITE: / 'Timeout!'. ENDIF.
FORM on_task_complete USING p_task TYPE clike. RECEIVE RESULTS FROM FUNCTION 'Z_LONG_RUNNING_TASK' IMPORTING ev_result = gv_result. gv_done = abap_true.ENDFORM.2. Parallele Verarbeitung mit Klasse
CLASS zcl_parallel_processor DEFINITION. PUBLIC SECTION. TYPES: BEGIN OF ty_task, id TYPE i, status TYPE string, result TYPE string, END OF ty_task, ty_tasks TYPE STANDARD TABLE OF ty_task WITH KEY id.
METHODS: run_parallel IMPORTING it_items TYPE string_table RETURNING VALUE(rt_results) TYPE ty_tasks.
METHODS: on_task_end FOR EVENT task_end OF cl_abap_parallel.
PRIVATE SECTION. DATA: mt_tasks TYPE ty_tasks, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_parallel_processor IMPLEMENTATION. METHOD run_parallel. mv_total = lines( it_items ). mv_completed = 0. CLEAR mt_tasks.
DATA(lv_task_id) = 0.
LOOP AT it_items INTO DATA(lv_item). lv_task_id = lv_task_id + 1.
APPEND VALUE #( id = lv_task_id status = 'RUNNING' ) TO mt_tasks.
" Asynchroner Aufruf CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |TASK{ lv_task_id }| DESTINATION 'NONE' CALLING on_task_complete ON END OF TASK EXPORTING iv_item = lv_item iv_task_id = lv_task_id. ENDLOOP.
" Warten bis alle fertig WAIT UNTIL mv_completed >= mv_total UP TO 300 SECONDS.
rt_results = mt_tasks. ENDMETHOD.
METHOD on_task_complete. DATA: lv_result TYPE string, lv_task_id TYPE i.
RECEIVE RESULTS FROM FUNCTION 'Z_PROCESS_ITEM' IMPORTING ev_result = lv_result ev_task_id = lv_task_id.
" Ergebnis speichern MODIFY mt_tasks FROM VALUE #( id = lv_task_id status = 'COMPLETED' result = lv_result ) TRANSPORTING status result WHERE id = lv_task_id.
mv_completed = mv_completed + 1. ENDMETHOD.ENDCLASS.3. Batch-Verarbeitung mit Chunks
CLASS zcl_batch_parallel DEFINITION. PUBLIC SECTION. CONSTANTS: c_max_tasks TYPE i VALUE 10.
METHODS: process_in_batches IMPORTING it_data TYPE ty_data_tab RETURNING VALUE(rt_results) TYPE ty_result_tab.
PRIVATE SECTION. DATA: mv_active_tasks TYPE i, mv_completed TYPE i, mt_results TYPE ty_result_tab.
METHODS: on_batch_complete IMPORTING p_task TYPE clike.ENDCLASS.
CLASS zcl_batch_parallel IMPLEMENTATION. METHOD process_in_batches. DATA: lt_chunk TYPE ty_data_tab, lv_chunk_size TYPE i, lv_task_no TYPE i.
" Chunk-Größe berechnen lv_chunk_size = COND #( WHEN lines( it_data ) <= c_max_tasks THEN 1 ELSE lines( it_data ) / c_max_tasks + 1 ).
mv_active_tasks = 0. mv_completed = 0. CLEAR mt_results.
" Daten in Chunks aufteilen und parallel verarbeiten DATA(lv_start) = 1. WHILE lv_start <= lines( it_data ). lv_task_no = lv_task_no + 1.
" Chunk erstellen CLEAR lt_chunk. LOOP AT it_data INTO DATA(ls_data) FROM lv_start. APPEND ls_data TO lt_chunk. IF lines( lt_chunk ) >= lv_chunk_size. EXIT. ENDIF. ENDLOOP.
" Task starten CALL FUNCTION 'Z_PROCESS_BATCH' STARTING NEW TASK |BATCH{ lv_task_no }| DESTINATION 'NONE' CALLING on_batch_complete ON END OF TASK EXPORTING iv_batch_id = lv_task_no TABLES it_data = lt_chunk.
mv_active_tasks = mv_active_tasks + 1. lv_start = lv_start + lv_chunk_size.
" Max. gleichzeitige Tasks begrenzen IF mv_active_tasks >= c_max_tasks. WAIT UNTIL mv_completed > 0 UP TO 60 SECONDS. ENDIF. ENDWHILE.
" Auf alle verbleibenden Tasks warten WAIT UNTIL mv_active_tasks = mv_completed UP TO 300 SECONDS.
rt_results = mt_results. ENDMETHOD.
METHOD on_batch_complete. DATA: lt_batch_results TYPE ty_result_tab.
RECEIVE RESULTS FROM FUNCTION 'Z_PROCESS_BATCH' TABLES et_results = lt_batch_results.
APPEND LINES OF lt_batch_results TO mt_results. mv_completed = mv_completed + 1. ENDMETHOD.ENDCLASS.4. Server-Gruppe nutzen
DATA: lv_group TYPE rzlli_apts.
" Server-Gruppe ermittelnCALL FUNCTION 'SPBT_INITIALIZE' EXPORTING group_name = 'parallel_generators' IMPORTING free_pbt_wps = DATA(lv_free_wps) EXCEPTIONS invalid_group_name = 1 internal_error = 2 pbt_env_already_initialized = 3 currently_no_resources_avail = 4 no_pbt_resources_found = 5 cant_init_different_pbt_groups = 6 OTHERS = 7.
IF sy-subrc = 0. lv_group = 'parallel_generators'.ELSE. lv_group = 'parallel_generators'. " Default-GruppeENDIF.
" Mit Server-Gruppe aufrufenCALL FUNCTION 'Z_HEAVY_CALCULATION' STARTING NEW TASK 'CALC1' DESTINATION IN GROUP lv_group CALLING on_calculation_done ON END OF TASK EXPORTING iv_param = lv_param.5. Ressourcen-Management
CLASS zcl_resource_manager DEFINITION. PUBLIC SECTION. METHODS: get_available_processes RETURNING VALUE(rv_count) TYPE i.
METHODS: process_with_throttling IMPORTING it_items TYPE string_table.
PRIVATE SECTION. DATA: mv_max_parallel TYPE i VALUE 5, mv_running TYPE i, mv_completed TYPE i.ENDCLASS.
CLASS zcl_resource_manager IMPLEMENTATION. METHOD get_available_processes. " Freie Workprozesse ermitteln CALL FUNCTION 'SPBT_INITIALIZE' EXPORTING group_name = 'parallel_generators' IMPORTING free_pbt_wps = rv_count EXCEPTIONS OTHERS = 1.
IF sy-subrc <> 0. rv_count = 3. " Fallback ENDIF.
" Maximum begrenzen IF rv_count > mv_max_parallel. rv_count = mv_max_parallel. ENDIF. ENDMETHOD.
METHOD process_with_throttling. DATA(lv_max) = get_available_processes( ). mv_running = 0. mv_completed = 0.
DATA(lv_task_no) = 0.
LOOP AT it_items INTO DATA(lv_item). lv_task_no = lv_task_no + 1.
" Warten wenn Maximum erreicht WHILE mv_running - mv_completed >= lv_max. WAIT UNTIL mv_completed > ( mv_running - lv_max ) UP TO 5 SECONDS. ENDWHILE.
" Task starten CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |PROC{ lv_task_no }| DESTINATION 'NONE' CALLING on_item_done ON END OF TASK EXPORTING iv_item = lv_item.
mv_running = mv_running + 1. ENDLOOP.
" Auf alle warten WAIT UNTIL mv_completed >= mv_running UP TO 600 SECONDS. ENDMETHOD.ENDCLASS.6. Fehlerbehandlung
CLASS zcl_parallel_with_errors DEFINITION. PUBLIC SECTION. TYPES: BEGIN OF ty_result, task_id TYPE i, success TYPE abap_bool, message TYPE string, data TYPE string, END OF ty_result, ty_results TYPE STANDARD TABLE OF ty_result WITH KEY task_id.
METHODS: process_with_error_handling IMPORTING it_items TYPE string_table RETURNING VALUE(rt_results) TYPE ty_results.
PRIVATE SECTION. DATA: mt_results TYPE ty_results, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_parallel_with_errors IMPLEMENTATION. METHOD process_with_error_handling. mv_total = lines( it_items ). mv_completed = 0. CLEAR mt_results.
DATA(lv_task_id) = 0.
LOOP AT it_items INTO DATA(lv_item). lv_task_id = lv_task_id + 1.
TRY. CALL FUNCTION 'Z_RISKY_OPERATION' STARTING NEW TASK |RISK{ lv_task_id }| DESTINATION 'NONE' CALLING on_risky_complete ON END OF TASK EXPORTING iv_item = lv_item iv_task_id = lv_task_id.
CATCH cx_root INTO DATA(lx_error). " Startfehler dokumentieren APPEND VALUE #( task_id = lv_task_id success = abap_false message = lx_error->get_text( ) ) TO mt_results. mv_completed = mv_completed + 1. ENDTRY. ENDLOOP.
" Timeout mit Fehlerbehandlung DATA(lv_start) = sy-uzeit. WHILE mv_completed < mv_total. WAIT UNTIL mv_completed >= mv_total UP TO 5 SECONDS.
" Timeout-Check (5 Minuten) IF sy-uzeit - lv_start > 300. " Verbleibende als Timeout markieren LOOP AT mt_results INTO DATA(ls_result) WHERE success IS INITIAL. ls_result-message = 'Timeout'. MODIFY mt_results FROM ls_result. ENDLOOP. EXIT. ENDIF. ENDWHILE.
rt_results = mt_results. ENDMETHOD.ENDCLASS.
" Callback mit FehlerbehandlungFORM on_risky_complete USING p_task TYPE clike. DATA: lv_result TYPE string, lv_task_id TYPE i, lv_subrc TYPE sy-subrc, lv_message TYPE string.
RECEIVE RESULTS FROM FUNCTION 'Z_RISKY_OPERATION' IMPORTING ev_result = lv_result ev_task_id = lv_task_id EXCEPTIONS communication_failure = 1 MESSAGE lv_message system_failure = 2 MESSAGE lv_message OTHERS = 3.
IF sy-subrc = 0. APPEND VALUE #( task_id = lv_task_id success = abap_true data = lv_result ) TO mt_results. ELSE. APPEND VALUE #( task_id = lv_task_id success = abap_false message = COND #( WHEN lv_message IS NOT INITIAL THEN lv_message ELSE |Fehler { sy-subrc }| ) ) TO mt_results. ENDIF.
mv_completed = mv_completed + 1.ENDFORM.7. RFC-Funktionsbaustein für Parallelisierung
FUNCTION z_parallel_worker.*"----------------------------------------------------------------------*"*"Lokale Schnittstelle:*" IMPORTING*" VALUE(IV_TASK_ID) TYPE I*" VALUE(IT_DATA) TYPE TY_DATA_TAB*" EXPORTING*" VALUE(ET_RESULTS) TYPE TY_RESULT_TAB*" VALUE(EV_TASK_ID) TYPE I*"----------------------------------------------------------------------
ev_task_id = iv_task_id.
LOOP AT it_data INTO DATA(ls_data). " Verarbeitung DATA(ls_result) = VALUE ty_result( id = ls_data-id status = 'PROCESSED' value = ls_data-value * 2 ). APPEND ls_result TO et_results. ENDLOOP.
ENDFUNCTION.8. CL_ABAP_PARALLEL für moderne Parallelisierung
CLASS zcl_modern_parallel DEFINITION. PUBLIC SECTION. INTERFACES: if_abap_parallel.
CLASS-METHODS: run_parallel IMPORTING it_items TYPE string_table RETURNING VALUE(rt_results) TYPE string_table.
PRIVATE SECTION. DATA: mv_item TYPE string.ENDCLASS.
CLASS zcl_modern_parallel IMPLEMENTATION. METHOD if_abap_parallel~do. " Diese Methode wird parallel ausgeführt DATA(lv_result) = |Verarbeitet: { mv_item }|.
" Ergebnis serialisieren p_out = cl_abap_parallel=>get_parallel_output( ). EXPORT result = lv_result TO DATA BUFFER p_out. ENDMETHOD.
METHOD run_parallel. DATA: lt_instances TYPE TABLE OF REF TO zcl_modern_parallel, lt_handles TYPE abap_parallel_handles.
" Instanzen erstellen LOOP AT it_items INTO DATA(lv_item). DATA(lo_instance) = NEW zcl_modern_parallel( ). lo_instance->mv_item = lv_item. APPEND lo_instance TO lt_instances. ENDLOOP.
" Parallel ausführen TRY. cl_abap_parallel=>run_parallel( EXPORTING i_instances = lt_instances IMPORTING e_handles = lt_handles ).
" Ergebnisse sammeln LOOP AT lt_handles INTO DATA(ls_handle). IF ls_handle-done = abap_true. DATA: lv_result TYPE string. IMPORT result = lv_result FROM DATA BUFFER ls_handle-out. IF sy-subrc = 0. APPEND lv_result TO rt_results. ENDIF. ENDIF. ENDLOOP.
CATCH cx_abap_parallel INTO DATA(lx_error). " Fehlerbehandlung ENDTRY. ENDMETHOD.ENDCLASS.9. Fortschrittsanzeige
CLASS zcl_parallel_progress DEFINITION. PUBLIC SECTION. METHODS: process_with_progress IMPORTING it_items TYPE string_table.
PRIVATE SECTION. DATA: mv_total TYPE i, mv_completed TYPE i, mv_errors TYPE i.
METHODS: update_progress.ENDCLASS.
CLASS zcl_parallel_progress IMPLEMENTATION. METHOD process_with_progress. mv_total = lines( it_items ). mv_completed = 0. mv_errors = 0.
DATA(lv_task_no) = 0.
" Fortschrittsanzeige initialisieren CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR' EXPORTING percentage = 0 text = |Starte { mv_total } parallele Tasks...|.
" Tasks starten LOOP AT it_items INTO DATA(lv_item). lv_task_no = lv_task_no + 1.
CALL FUNCTION 'Z_PROCESS_ITEM' STARTING NEW TASK |PROG{ lv_task_no }| DESTINATION 'NONE' CALLING on_progress_update ON END OF TASK EXPORTING iv_item = lv_item. ENDLOOP.
" Warten und Fortschritt aktualisieren WHILE mv_completed < mv_total. WAIT UNTIL mv_completed >= mv_total UP TO 1 SECONDS. update_progress( ). ENDWHILE.
" Abschluss CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR' EXPORTING percentage = 100 text = |Fertig: { mv_completed } OK, { mv_errors } Fehler|. ENDMETHOD.
METHOD update_progress. DATA(lv_percent) = mv_completed * 100 / mv_total.
CALL FUNCTION 'SAPGUI_PROGRESS_INDICATOR' EXPORTING percentage = lv_percent text = |{ mv_completed }/{ mv_total } verarbeitet ({ mv_errors } Fehler)|. ENDMETHOD.ENDCLASS.10. Map-Reduce-Pattern
CLASS zcl_map_reduce DEFINITION. PUBLIC SECTION. TYPES: BEGIN OF ty_key_value, key TYPE string, value TYPE i, END OF ty_key_value, ty_key_values TYPE STANDARD TABLE OF ty_key_value WITH KEY key.
METHODS: map_reduce IMPORTING it_data TYPE string_table RETURNING VALUE(rt_result) TYPE ty_key_values.
PRIVATE SECTION. DATA: mt_map_results TYPE ty_key_values, mv_map_done TYPE i, mv_map_total TYPE i.ENDCLASS.
CLASS zcl_map_reduce IMPLEMENTATION. METHOD map_reduce. " MAP Phase - parallel mv_map_total = lines( it_data ). mv_map_done = 0. CLEAR mt_map_results.
DATA(lv_chunk_no) = 0. LOOP AT it_data INTO DATA(lv_data). lv_chunk_no = lv_chunk_no + 1.
CALL FUNCTION 'Z_MAP_FUNCTION' STARTING NEW TASK |MAP{ lv_chunk_no }| DESTINATION 'NONE' CALLING on_map_complete ON END OF TASK EXPORTING iv_data = lv_data. ENDLOOP.
" Warten auf alle Map-Tasks WAIT UNTIL mv_map_done >= mv_map_total UP TO 300 SECONDS.
" REDUCE Phase - lokal (aggregieren) DATA: lt_grouped TYPE SORTED TABLE OF ty_key_value WITH UNIQUE KEY key.
LOOP AT mt_map_results INTO DATA(ls_result). READ TABLE lt_grouped ASSIGNING FIELD-SYMBOL(<fs_group>) WITH KEY key = ls_result-key. IF sy-subrc = 0. <fs_group>-value = <fs_group>-value + ls_result-value. ELSE. INSERT ls_result INTO TABLE lt_grouped. ENDIF. ENDLOOP.
rt_result = lt_grouped. ENDMETHOD.ENDCLASS.11. Transaktionale Parallelverarbeitung
CLASS zcl_transactional_parallel DEFINITION. PUBLIC SECTION. METHODS: process_and_commit IMPORTING it_items TYPE ty_item_tab RETURNING VALUE(rv_success) TYPE abap_bool.
PRIVATE SECTION. DATA: mv_all_success TYPE abap_bool VALUE abap_true, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_transactional_parallel IMPLEMENTATION. METHOD process_and_commit. mv_total = lines( it_items ). mv_completed = 0. mv_all_success = abap_true.
DATA(lv_task_no) = 0.
" Parallele Verarbeitung (ohne Commit) LOOP AT it_items INTO DATA(ls_item). lv_task_no = lv_task_no + 1.
CALL FUNCTION 'Z_PROCESS_NO_COMMIT' STARTING NEW TASK |TRANS{ lv_task_no }| DESTINATION 'NONE' CALLING on_trans_complete ON END OF TASK EXPORTING is_item = ls_item. ENDLOOP.
" Auf alle warten WAIT UNTIL mv_completed >= mv_total UP TO 300 SECONDS.
" Commit oder Rollback IF mv_all_success = abap_true. CALL FUNCTION 'BAPI_TRANSACTION_COMMIT' EXPORTING wait = abap_true. rv_success = abap_true. ELSE. CALL FUNCTION 'BAPI_TRANSACTION_ROLLBACK'. rv_success = abap_false. ENDIF. ENDMETHOD.ENDCLASS.12. Praktisches Beispiel: Massendaten-Export
CLASS zcl_mass_export DEFINITION. PUBLIC SECTION. METHODS: export_customers_parallel IMPORTING iv_country TYPE land1 RETURNING VALUE(rv_file) TYPE string.
PRIVATE SECTION. CONSTANTS: c_chunk_size TYPE i VALUE 1000.
DATA: mt_all_data TYPE ty_customer_tab, mv_completed TYPE i, mv_total TYPE i.ENDCLASS.
CLASS zcl_mass_export IMPLEMENTATION. METHOD export_customers_parallel. " Kundennummern laden SELECT kunnr FROM kna1 WHERE land1 = @iv_country INTO TABLE @DATA(lt_customers).
mv_total = lines( lt_customers ) / c_chunk_size + 1. mv_completed = 0. CLEAR mt_all_data.
" Chunks parallel laden DATA: lv_from TYPE i VALUE 1, lv_chunk_no TYPE i VALUE 0.
WHILE lv_from <= lines( lt_customers ). lv_chunk_no = lv_chunk_no + 1.
" Chunk-Range berechnen DATA(lv_to) = lv_from + c_chunk_size - 1. IF lv_to > lines( lt_customers ). lv_to = lines( lt_customers ). ENDIF.
" Kundennummern für Chunk DATA(lt_chunk_keys) = VALUE ty_kunnr_tab( FOR i = lv_from WHILE i <= lv_to ( lt_customers[ i ]-kunnr ) ).
" Parallel laden CALL FUNCTION 'Z_LOAD_CUSTOMER_DATA' STARTING NEW TASK |EXP{ lv_chunk_no }| DESTINATION 'NONE' CALLING on_chunk_loaded ON END OF TASK EXPORTING it_customers = lt_chunk_keys.
lv_from = lv_to + 1. ENDWHILE.
" Warten WAIT UNTIL mv_completed >= mv_total UP TO 600 SECONDS.
" Datei schreiben rv_file = |/tmp/customers_{ iv_country }_{ sy-datum }.csv|.
OPEN DATASET rv_file FOR OUTPUT IN TEXT MODE ENCODING UTF-8. IF sy-subrc = 0. TRANSFER 'KUNNR;NAME1;ORT01;LAND1' TO rv_file. LOOP AT mt_all_data INTO DATA(ls_data). DATA(lv_line) = |{ ls_data-kunnr };{ ls_data-name1 };{ ls_data-ort01 };{ ls_data-land1 }|. TRANSFER lv_line TO rv_file. ENDLOOP. CLOSE DATASET rv_file. ENDIF. ENDMETHOD.ENDCLASS.Destination-Optionen
| Destination | Beschreibung |
|---|---|
'NONE' | Lokaler Server, gleiche Instanz |
IN GROUP | Server-Gruppe nutzen |
IN GROUP DEFAULT | Standard-Gruppe |
| RFC-Destination | Externes System |
WAIT-Varianten
" Auf Bedingung wartenWAIT UNTIL gv_done = abap_true.
" Mit TimeoutWAIT UNTIL gv_done = abap_true UP TO 60 SECONDS.
" Auf beliebigen Task wartenWAIT FOR ASYNCHRONOUS TASKS UNTIL gv_count > 0.
" Mehrere BedingungenWAIT UNTIL gv_task1_done = abap_true AND gv_task2_done = abap_true UP TO 120 SECONDS.Wichtige Hinweise / Best Practice
- Anzahl paralleler Tasks an verfügbare Workprozesse anpassen.
- SPBT_INITIALIZE für Server-Gruppen-Konfiguration nutzen.
- Fehlerbehandlung in RECEIVE RESULTS mit EXCEPTIONS.
- Timeout immer definieren (UP TO … SECONDS).
- Keine DB-Locks über Tasks hinweg – Deadlocks vermeiden.
- Callback-Form/Method muss zur Laufzeit existieren.
- Datenvolumen pro Task optimieren (nicht zu groß, nicht zu klein).
- RFC-Funktionsbausteine müssen remote-fähig sein.
- COMMIT WORK erst nach Abschluss aller Tasks.
- Kombinieren Sie mit HTTP-Client für parallele API-Aufrufe.