The Event Backlog๏ƒ

Offers a container for managing events via insertion, retrieval, and deletion.


Following code fragment demonstrates:

  • Create Backlog

  • Add Events to Backlog

[1]:
from logprep.ng.event.set_event_backlog import SetEventBacklog
from logprep.ng.event.event_state import EventStateType
from logprep.ng.event.log_event import LogEvent

backlog = SetEventBacklog()

events = [
    LogEvent(
        data={"id": 0, "message": "Test Event 0"},
        original=b"",
        state=EventStateType.RECEIVING),
    LogEvent(
        data={"id": 1, "message": "Test Event 1"},
        original=b"",
        state=EventStateType.PROCESSING),
    LogEvent(
        data={"id": 2, "message": "Test Event 2"},
        original=b"",
        state=EventStateType.ACKED,
    ),
]

print(f"\nLen: {len(backlog.backlog)}")
backlog.register(events)

print("\n๐Ÿ“ฆ Events in Backlog:")

for event in backlog.backlog:
    print(f"> {event.data=}, state: {event.state}")

print(f"\nLen: {len(backlog.backlog)}")

Len: 0

๐Ÿ“ฆ Events in Backlog:
> event.data={'id': 0, 'message': 'Test Event 0'}, state: receiving
> event.data={'id': 1, 'message': 'Test Event 1'}, state: processing
> event.data={'id': 2, 'message': 'Test Event 2'}, state: acked

Len: 3

Following code fragment demonstrates:

  • Getting Event from Backlog

[2]:
processing_events = backlog.get(EventStateType.PROCESSING)

print(f"Len: {len(backlog.backlog)}")

print("\n๐Ÿ“ฅ Events in PROCESSING state:")

for event in processing_events:
    print(f"> {event.data=}, state: {event.state}")

print(f"\nLen: {len(backlog.backlog)}")

Len: 3

๐Ÿ“ฅ Events in PROCESSING state:
> event.data={'id': 1, 'message': 'Test Event 1'}, state: processing

Len: 3

Following code fragment demonstrates:

  • Removing Events from Backlog

[3]:
print(f"Len: {len(backlog.backlog)}")

acked_events = backlog.unregister(EventStateType.ACKED)
print("\n๐Ÿงน Unregistered events with state ACKED:")

for event in acked_events:
    print(f"> {event.data=}, state: {event.state}")


print("\n๐Ÿ“ฆ Remaining events in backlog:")

for state in EventStateType:
    still_in_backlog = backlog.get(state)
    for event in still_in_backlog:
        print(f"> {event.data=}, state: {event.state}")

print(f"\nLen: {len(backlog.backlog)}")

Len: 3

๐Ÿงน Unregistered events with state ACKED:
> event.data={'id': 2, 'message': 'Test Event 2'}, state: acked

๐Ÿ“ฆ Remaining events in backlog:
> event.data={'id': 0, 'message': 'Test Event 0'}, state: receiving
> event.data={'id': 1, 'message': 'Test Event 1'}, state: processing

Len: 2

Following code fragment demonstrates:

  • FAILING Removing Events from Backlog

[4]:
fail_unregister_states = [
    EventStateType.RECEIVING,
    EventStateType.RECEIVED,
    EventStateType.PROCESSING,
    EventStateType.PROCESSED,
    EventStateType.STORED_IN_OUTPUT,
    EventStateType.STORED_IN_ERROR,
    EventStateType.DELIVERED,
]

cnt_failing = 0

for state in fail_unregister_states:
    try:
        backlog.unregister(state)
    except ValueError:
        cnt_failing += 1
        print(f"> Failed: unregister({state})")

print(f"Expected Failing: {len(fail_unregister_states)}\nFailed: {cnt_failing}")

success_unregister_states = [
    EventStateType.FAILED,
    EventStateType.ACKED,
]

print("\nShould work without exceptions")
for state in success_unregister_states:
    backlog.unregister(state)
    print(f"> Ok: unregister({state})")

> Failed: unregister(receiving)
> Failed: unregister(received)
> Failed: unregister(processing)
> Failed: unregister(processed)
> Failed: unregister(stored_in_output)
> Failed: unregister(stored_in_error)
> Failed: unregister(delivered)
Expected Failing: 7
Failed: 7

Should work without exceptions
> Ok: unregister(failed)
> Ok: unregister(acked)
[ ]: