package kalix.javasdk.testkit.impl;

import akka.NotUsed;
import akka.NotUsed$;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props$;
import akka.stream.BoundedSourceQueue;
import akka.stream.Materializer$;
import akka.stream.scaladsl.Sink$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.testkit.TestProbe$;
import java.util.concurrent.ConcurrentHashMap;
import kalix.eventing.DirectSource;
import kalix.eventing.DirectSource$;
import kalix.eventing.EventSource;
import kalix.eventing.EventSource$Source$Direct$;
import kalix.eventing.EventSource$Source$Empty$;
import kalix.eventing.EventSource$Source$EventSourcedEntity$;
import kalix.eventing.EventSource$Source$Topic$;
import kalix.eventing.EventSource$Source$ValueEntity$;
import kalix.javasdk.impl.MessageCodec;
import kalix.javasdk.testkit.EventingTestKit;
import kalix.javasdk.testkit.impl.EventingTestKitImpl;
import kalix.testkit.protocol.eventing_test_backend.EmitSingleCommand;
import kalix.testkit.protocol.eventing_test_backend.EmitSingleResult;
import kalix.testkit.protocol.eventing_test_backend.EmitSingleResult$;
import kalix.testkit.protocol.eventing_test_backend.EventStreamOutCommand;
import kalix.testkit.protocol.eventing_test_backend.EventStreamOutResult;
import kalix.testkit.protocol.eventing_test_backend.EventingTestKitService;
import kalix.testkit.protocol.eventing_test_backend.RunSourceCommand;
import kalix.testkit.protocol.eventing_test_backend.RunSourceCommand$;
import kalix.testkit.protocol.eventing_test_backend.RunSourceCommand$Command$Ack$;
import kalix.testkit.protocol.eventing_test_backend.RunSourceCommand$Command$Create$;
import kalix.testkit.protocol.eventing_test_backend.RunSourceCreate;
import kalix.testkit.protocol.eventing_test_backend.RunSourceCreate$;
import kalix.testkit.protocol.eventing_test_backend.SourceElem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: EventingTestKitImpl.scala */
/* loaded from: input_file:kalix/javasdk/testkit/impl/EventingTestServiceImpl.class */
public final class EventingTestServiceImpl implements EventingTestKit {
    private final ActorSystem system;
    private final String host;
    private int port;
    private final MessageCodec codec;
    public final ActorSystem kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys;
    public final ExecutionContextExecutor kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec;
    public final Logger kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log = LoggerFactory.getLogger(EventingTestServiceImpl.class);
    private final ConcurrentHashMap<String, OutgoingMessagesImpl> topicDestinations = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, VeIncomingMessagesImpl> veSubscriptions = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, IncomingMessagesImpl> esSubscriptions = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, IncomingMessagesImpl> streamSubscriptions = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, IncomingMessagesImpl> topicSubscriptions = new ConcurrentHashMap<>();

    /* compiled from: EventingTestKitImpl.scala */
    /* loaded from: input_file:kalix/javasdk/testkit/impl/EventingTestServiceImpl$ServiceImpl.class */
    public final class ServiceImpl implements EventingTestKitService {
        private final /* synthetic */ EventingTestServiceImpl $outer;

        public ServiceImpl(EventingTestServiceImpl eventingTestServiceImpl) {
            if (eventingTestServiceImpl == null) {
                throw new NullPointerException();
            }
            this.$outer = eventingTestServiceImpl;
        }

        @Override // kalix.testkit.protocol.eventing_test_backend.EventingTestKitService
        public Future<EmitSingleResult> emitSingle(EmitSingleCommand emitSingleCommand) {
            this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log.debug("Receiving message from test broker: [{}]", emitSingleCommand);
            emitSingleCommand.destination().foreach(eventDestination -> {
                ActorRef ref = this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicOutgoingMessagesImpl(eventDestination.getTopic()).destinationProbe().ref();
                ref.$bang(emitSingleCommand, ref.$bang$default$2(emitSingleCommand));
            });
            if (emitSingleCommand.destination().isEmpty()) {
                this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log.warn("Received a message without destination, ignoring. {}", emitSingleCommand);
            }
            return Future$.MODULE$.successful(EmitSingleResult$.MODULE$.apply(EmitSingleResult$.MODULE$.$lessinit$greater$default$1()));
        }

        @Override // kalix.testkit.protocol.eventing_test_backend.EventingTestKitService
        public Source<SourceElem, NotUsed> runSource(Source<RunSourceCommand, NotUsed> source) {
            this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log.debug("Reading topic from test broker - runSource request started: {}", source);
            Promise apply = Promise$.MODULE$.apply();
            source.watchTermination((notUsed, future) -> {
                future.onComplete(r5 -> {
                    if (r5 instanceof Success) {
                        this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log.debug("runSource in completed");
                    } else {
                        if (!(r5 instanceof Failure)) {
                            throw new MatchError(r5);
                        }
                        this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log.error("runSource in failed", ((Failure) r5).exception());
                    }
                }, this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec);
            }).runWith(Sink$.MODULE$.fold(None$.MODULE$, (option, runSourceCommand) -> {
                RunSourceCreate _1;
                DirectSource _12;
                Tuple2 apply2 = Tuple2$.MODULE$.apply(option, runSourceCommand);
                if (apply2 != null) {
                    Some some = (Option) apply2._1();
                    RunSourceCommand runSourceCommand = (RunSourceCommand) apply2._2();
                    if (None$.MODULE$.equals(some) && runSourceCommand != null) {
                        RunSourceCommand unapply = RunSourceCommand$.MODULE$.unapply(runSourceCommand);
                        RunSourceCommand.Command _13 = unapply._1();
                        unapply._2();
                        if ((_13 instanceof RunSourceCommand.Command.Create) && (_1 = RunSourceCommand$Command$Create$.MODULE$.unapply((RunSourceCommand.Command.Create) _13)._1()) != null) {
                            RunSourceCreate unapply2 = RunSourceCreate$.MODULE$.unapply(_1);
                            String _14 = unapply2._1();
                            Some _2 = unapply2._2();
                            unapply2._3();
                            if (_2 instanceof Some) {
                                EventSource eventSource = (EventSource) _2.value();
                                this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log.debug("runSource request got initial create command for service name [{}], source: [{}]", _14, eventSource);
                                Tuple2 preMaterialize = Source$.MODULE$.queue(10).preMaterialize(Materializer$.MODULE$.matFromSystem(this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys));
                                if (preMaterialize == null) {
                                    throw new MatchError(preMaterialize);
                                }
                                Tuple2 apply3 = Tuple2$.MODULE$.apply((BoundedSourceQueue) preMaterialize._1(), (Source) preMaterialize._2());
                                EventingTestKitImpl.RunningSourceProbe apply4 = EventingTestKitImpl$RunningSourceProbe$.MODULE$.apply(_14, eventSource, (BoundedSourceQueue) apply3._1(), (Source) apply3._2());
                                EventSource.Source source2 = eventSource.source();
                                if (EventSource$Source$Empty$.MODULE$.equals(source2)) {
                                    throw new IllegalStateException("not recognized empty eventing source");
                                }
                                if (source2 instanceof EventSource.Source.Topic) {
                                    this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicIncomingMessagesImpl(EventSource$Source$Topic$.MODULE$.unapply((EventSource.Source.Topic) source2)._1()).addSourceProbe(apply4);
                                } else if (source2 instanceof EventSource.Source.EventSourcedEntity) {
                                    this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getEventSourcedSubscriptionImpl(EventSource$Source$EventSourcedEntity$.MODULE$.unapply((EventSource.Source.EventSourcedEntity) source2)._1()).addSourceProbe(apply4);
                                } else if (source2 instanceof EventSource.Source.ValueEntity) {
                                    this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getValueEntityIncomingMessagesImpl(EventSource$Source$ValueEntity$.MODULE$.unapply((EventSource.Source.ValueEntity) source2)._1()).addSourceProbe(apply4);
                                } else {
                                    if (!(source2 instanceof EventSource.Source.Direct) || (_12 = EventSource$Source$Direct$.MODULE$.unapply((EventSource.Source.Direct) source2)._1()) == null) {
                                        throw new MatchError(source2);
                                    }
                                    DirectSource unapply3 = DirectSource$.MODULE$.unapply(_12);
                                    String _15 = unapply3._1();
                                    String _22 = unapply3._2();
                                    unapply3._3();
                                    this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getStreamIncomingMessagesImpl(_15, _22).addSourceProbe(apply4);
                                }
                                apply.success(apply4);
                                return Some$.MODULE$.apply(apply4);
                            }
                        }
                    }
                    if (some instanceof Some) {
                        Some some2 = some;
                        if (runSourceCommand != null) {
                            RunSourceCommand unapply4 = RunSourceCommand$.MODULE$.unapply(runSourceCommand);
                            RunSourceCommand.Command _16 = unapply4._1();
                            unapply4._2();
                            if (_16 instanceof RunSourceCommand.Command.Ack) {
                                this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log.debug("runSource request got ack [{}]", RunSourceCommand$Command$Ack$.MODULE$.unapply((RunSourceCommand.Command.Ack) _16)._1());
                                return some2;
                            }
                        }
                    }
                }
                throw new MatchError("Unexpected fold input: " + apply2);
            }), Materializer$.MODULE$.matFromSystem(this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys));
            return Source$.MODULE$.futureSource(apply.future().map(EventingTestServiceImpl::kalix$javasdk$testkit$impl$EventingTestServiceImpl$ServiceImpl$$_$runSource$$anonfun$3, this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec)).watchTermination((future2, future3) -> {
                future3.onComplete(r5 -> {
                    if (r5 instanceof Success) {
                        this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log.debug("runSource out completed");
                    } else {
                        if (!(r5 instanceof Failure)) {
                            throw new MatchError(r5);
                        }
                        this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$log.error("runSource out failed", ((Failure) r5).exception());
                    }
                }, this.$outer.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec);
            }).mapMaterializedValue(EventingTestServiceImpl::kalix$javasdk$testkit$impl$EventingTestServiceImpl$ServiceImpl$$_$runSource$$anonfun$5);
        }

        @Override // kalix.testkit.protocol.eventing_test_backend.EventingTestKitService
        public Source<EventStreamOutResult, NotUsed> eventStreamOut(Source<EventStreamOutCommand, NotUsed> source) {
            throw new UnsupportedOperationException("Feature not supported in the testkit yet");
        }

        public final /* synthetic */ EventingTestServiceImpl kalix$javasdk$testkit$impl$EventingTestServiceImpl$ServiceImpl$$$outer() {
            return this.$outer;
        }
    }

    public EventingTestServiceImpl(ActorSystem actorSystem, String str, int i, MessageCodec messageCodec) {
        this.system = actorSystem;
        this.host = str;
        this.port = i;
        this.codec = messageCodec;
        this.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys = actorSystem;
        this.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$ec = this.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys.dispatcher();
    }

    public String host() {
        return this.host;
    }

    public int port() {
        return this.port;
    }

    public void port_$eq(int i) {
        this.port = i;
    }

    @Override // kalix.javasdk.testkit.EventingTestKit
    public EventingTestKit.IncomingMessages getTopicIncomingMessages(String str) {
        return kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicIncomingMessagesImpl(str);
    }

    public IncomingMessagesImpl kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicIncomingMessagesImpl(String str) {
        return this.topicSubscriptions.computeIfAbsent(str, str2 -> {
            return new IncomingMessagesImpl(this.system.actorOf(Props$.MODULE$.apply(ClassTag$.MODULE$.apply(SourcesHolder.class)), "topic-holder-" + str), this.codec);
        });
    }

    @Override // kalix.javasdk.testkit.EventingTestKit
    public EventingTestKit.OutgoingMessages getTopicOutgoingMessages(String str) {
        return kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicOutgoingMessagesImpl(str);
    }

    public OutgoingMessagesImpl kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getTopicOutgoingMessagesImpl(String str) {
        return this.topicDestinations.computeIfAbsent(str, str2 -> {
            return new OutgoingMessagesImpl(TestProbe$.MODULE$.apply(this.kalix$javasdk$testkit$impl$EventingTestServiceImpl$$sys), this.codec);
        });
    }

    @Override // kalix.javasdk.testkit.EventingTestKit
    public EventingTestKit.IncomingMessages getValueEntityIncomingMessages(String str) {
        return kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getValueEntityIncomingMessagesImpl(str);
    }

    public VeIncomingMessagesImpl kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getValueEntityIncomingMessagesImpl(String str) {
        return this.veSubscriptions.computeIfAbsent(str, str2 -> {
            return new VeIncomingMessagesImpl(this.system.actorOf(Props$.MODULE$.apply(ClassTag$.MODULE$.apply(SourcesHolder.class)), "ve-holder-" + str), this.codec);
        });
    }

    @Override // kalix.javasdk.testkit.EventingTestKit
    public EventingTestKit.IncomingMessages getEventSourcedEntityIncomingMessages(String str) {
        return kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getEventSourcedSubscriptionImpl(str);
    }

    public IncomingMessagesImpl kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getEventSourcedSubscriptionImpl(String str) {
        return this.esSubscriptions.computeIfAbsent(str, str2 -> {
            return new IncomingMessagesImpl(this.system.actorOf(Props$.MODULE$.apply(ClassTag$.MODULE$.apply(SourcesHolder.class)), "es-holder-" + str), this.codec);
        });
    }

    @Override // kalix.javasdk.testkit.EventingTestKit
    public EventingTestKit.IncomingMessages getStreamIncomingMessages(String str, String str2) {
        return kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getStreamIncomingMessagesImpl(str, str2);
    }

    public IncomingMessagesImpl kalix$javasdk$testkit$impl$EventingTestServiceImpl$$getStreamIncomingMessagesImpl(String str, String str2) {
        return this.streamSubscriptions.computeIfAbsent(str + "/" + str2, str3 -> {
            return new IncomingMessagesImpl(this.system.actorOf(Props$.MODULE$.apply(ClassTag$.MODULE$.apply(SourcesHolder.class)), "stream-holder-" + str + "-" + str2), this.codec);
        });
    }

    public static final /* synthetic */ Source kalix$javasdk$testkit$impl$EventingTestServiceImpl$ServiceImpl$$_$runSource$$anonfun$3(EventingTestKitImpl.RunningSourceProbe runningSourceProbe) {
        return runningSourceProbe.outSource();
    }

    public static final /* synthetic */ NotUsed$ kalix$javasdk$testkit$impl$EventingTestServiceImpl$ServiceImpl$$_$runSource$$anonfun$5(BoxedUnit boxedUnit) {
        return NotUsed$.MODULE$;
    }
}
