here is the code for missing classes as requested, package com.coupang.cgfs.retry.domain.adjustments.service; import com.coupang.apigateway.services.fp_bep.FpbepApiV1ParentidAdapter; import com.coupang.cgfs.retry.domain.adjustments.enums.AdjustmentEventState; import com.coupang.cgfs.retry.domain.adjustments.model.ProcessingEvent; import com.coupang.cgfs.retry.domain.adjustments.model.event.AdjustmentProcessingEvent; import com.coupang.cgfs.retry.domain.adjustments.model.request.AdjustmentRequest; import com.coupang.cgfs.retry.domain.adjustments.repository.ProcessingEventRepository; import com.coupang.cgfs.retry.domain.service.AdjustmentEventPublisher; import com.coupang.fp.enums.AdjustmentEventType; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.*; @Slf4j @Service @RequiredArgsConstructor public class EventGenerationServiceImpl implements EventGenerationService { private final ProcessingEventRepository processingEventRepository; // Use the *port* private final FpbepApiV1ParentidAdapter fpbepApiV1ParentidAdapter; // Pylon adapter private final AdjustmentEventPublisher adjustmentEventPublisher; // Inject the publisher @Override @Transactional public void generateUndoEvents(AdjustmentRequest adjustmentRequest) { // Get finance event IDs from BEP (replace with actual Pylon call) Map> financeEventsMap = getFinanceEvents(adjustmentRequest.getEventIds()); // Create UNDO events for each event ID for (String eventId : adjustmentRequest.getEventIds()) { List financeEventIds = financeEventsMap.getOrDefault(eventId, Collections.emptyList()); List undoEvents = new ArrayList<>(); for (String financeEventId : financeEventIds) { ProcessingEvent item = ProcessingEvent.builder() .adjustmentRequestId(adjustmentRequest.getId()) .adjustmentEventType(AdjustmentEventType.UNDO) .eventId(eventId) .financeEventId(financeEventId) .state(AdjustmentEventState.PENDING) .build(); undoEvents.add(item); } processingEventRepository.saveAll(undoEvents); // Publish messages to the *internal* Kafka topic AFTER saving to the database. AdjustmentProcessingEvent event = new AdjustmentProcessingEvent(eventId, adjustmentRequest.getId()); adjustmentEventPublisher.publish(event); // Use the publisher } } @Override @Transactional public void generateReprocessEvent(List completedUndoEvents) { List reprocessEvents = new ArrayList<>(); for(ProcessingEvent completedUndoEvent : completedUndoEvents) { // Create a REPROCESS event for each completed UNDO event ProcessingEvent reprocessEvent = ProcessingEvent.builder() .adjustmentRequestId(completedUndoEvent.getAdjustmentRequestId()) .parentProcessingEventId( completedUndoEvent.getId()) // Link to UNDO event using String ID .eventId(completedUndoEvent.getEventId()) .financeEventId(completedUndoEvent.getFinanceEventId()) .adjustmentEventType(AdjustmentEventType.REPROCESS) .state(AdjustmentEventState.PENDING) .build(); reprocessEvents.add(reprocessEvent); } processingEventRepository.saveAll(reprocessEvents); // Publish to internal queue after save. AdjustmentProcessingEvent event = new AdjustmentProcessingEvent(completedUndoEvents.get(0).getEventId(), completedUndoEvents.get(0).getAdjustmentRequestId()); adjustmentEventPublisher.publish(event); } // get finance events using BEP api public Map> getFinanceEvents(List eventIds) { Map> financeEventsMap = new HashMap<>(); // Replace with actual Pylon call // TODO: Implement the actual call to BEP using the Pylon adapter // For now, return a mock response. for (String eventId : eventIds) { financeEventsMap.put(eventId, List.of("FE" + eventId + "_1", "FE" + eventId + "_2")); } return financeEventsMap; } } package com.coupang.cgfs.retry.domain.adjustments.model; import com.coupang.cgfs.retry.domain.adjustments.enums.AdjustmentEventState; import com.coupang.fp.enums.AdjustmentEventType; import lombok.*; import java.time.LocalDateTime; @Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor public class ProcessingEvent { private Long id; private Long adjustmentRequestId; //Foreign key to AdjustmentRequest table id private AdjustmentEventType adjustmentEventType; private String eventId; private String financeEventId; private AdjustmentEventState state; private Long parentProcessingEventId; private LocalDateTime createdAt; private LocalDateTime updatedAt; public void updateState(AdjustmentEventState state) { this.state = state; } } package com.coupang.cgfs.retry.domain.adjustments.model.request; import com.coupang.apigateway.services.fp_bep.model.EventResponse.BusinessEnum; import com.coupang.apigateway.services.fp_bep.model.EventResponse.RegionCodeEnum; import com.coupang.cgfs.retry.domain.adjustments.enums.AdjustmentEventState; import com.coupang.cgfs.retry.domain.adjustments.enums.AdjustmentRequestedIdentityType; import com.coupang.cgfs.retry.domain.adjustments.enums.AdjustmentSearchType; import com.coupang.fp.enums.AdjustmentRequestType; import lombok.*; import java.time.LocalDateTime; import java.util.List; @Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor public class AdjustmentRequest { private Long id; private String clientIdempotencyKey; private List eventIds; private BusinessEnum business; private RegionCodeEnum regionCode; private AdjustmentRequestType adjustmentRequestType; private AdjustmentEventState adjustmentStatus; private LocalDateTime createdAt; private LocalDateTime updatedAt; private AdjustmentRequestedIdentityType requestedIdentityType; private String requestedIdentity; public void updateState(AdjustmentEventState newState) { this.adjustmentStatus = newState; } } package com.coupang.cgfs.retry.domain.adjustments.model.event; import com.coupang.apigateway.services.fp_bep.model.EventResponse.BusinessEnum; import com.coupang.apigateway.services.fp_bep.model.EventResponse.RegionCodeEnum; import com.coupang.fp.enums.AdjustmentEventType; import lombok.*; @Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor public class AdjustmentBepEvent { private Long adjustmentRequestId; // GUID for the entire adjustment request private String eventId; // Original event ID private AdjustmentEventType type; // UNDO or REPROCESS private BusinessEnum business; private RegionCodeEnum regionCode; } package com.coupang.cgfs.retry.domain.adjustments.model.event; import lombok.*; @Getter @Setter @Builder @NoArgsConstructor @AllArgsConstructor public class AdjustmentFcsFeedbackEvent { private String eventId; private String financeEventId; private Long adjustmentRequestId; private String status; // "COMPLETED" or "FAILED" } package com.coupang.cgfs.retry.domain.adjustments.model.event; import lombok.*; @Getter @Setter @Builder @AllArgsConstructor @NoArgsConstructor public class AdjustmentProcessingEvent { private String eventId; private Long adjustmentRequestId; } package com.coupang.cgfs.retry.adapter.counect3.service; import com.coupang.cgfs.retry.adapter.counect3.RetryProducerTopic; import com.coupang.cgfs.retry.domain.adjustments.model.event.AdjustmentBepEvent; import com.coupang.cgfs.retry.domain.adjustments.model.event.AdjustmentProcessingEvent; import com.coupang.cgfs.retry.domain.service.AdjustmentEventPublisher; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Component @RequiredArgsConstructor @Slf4j public class CounectAdjustmentEventPublisher implements AdjustmentEventPublisher { private final CounectProducerService counectProducerService; @Override public void publish(AdjustmentBepEvent event) { counectProducerService.send(RetryProducerTopic.TOPIC_CGFS_RETRY_MESSAGE_V1, event.getAdjustmentRequestId().toString(), event, null); log.info("Sent message to BEP (RetryUpstream): {}", event); } @Override public void publish(AdjustmentProcessingEvent event) { counectProducerService.send(RetryProducerTopic.TOPIC_CGFS_RETRY_PROCESSING_EVENTS_V1, event.getAdjustmentRequestId().toString(), event, null); log.info("Sent message to internal queue: {}", event); } }