package com.fusion.slim.im.core.protocol;

import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import android.os.Message;
import ch.qos.logback.core.CoreConstants;
import com.fasterxml.jackson.databind.JsonNode;
import com.fusion.slim.common.helpers.ApiTransformer;
import com.fusion.slim.common.helpers.ThreadHelper;
import com.fusion.slim.common.models.Valuable;
import com.fusion.slim.common.models.im.CommandMethod;
import com.fusion.slim.common.models.im.CommandType;
import com.fusion.slim.common.speech.LoggingEvents;
import com.fusion.slim.im.core.di.ImCore;
import com.fusion.slim.im.core.protocol.Host;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.fabric.sdk.android.services.settings.SettingsJsonConstants;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.inject.Inject;
import javax.inject.Named;
import org.slf4j.Logger;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subscriptions.CompositeSubscription;

/* loaded from: classes.dex */
public class IMChannel implements Channel<JsonNode> {
    private static final int CMD_SEND_REQUEST = 100;
    private static final int REQUEST_MAX_COUNT = 1;
    private static final ImmutableMap ackRequestBody = ImmutableMap.of(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE, (int) CommandType.Notify.value(), LoggingEvents.VoiceIme.EXTRA_START_METHOD, (int) CommandMethod.Acknowledgement.value(), "result", 0);
    private final String aliasName;
    private final long appId;
    private final long dataSourceId;
    private Observable<JsonNode> eventObservable;
    private final Host host;

    @Inject
    @Named(ImCore.LOGGER_CHANNEL)
    protected Logger logger;
    private final Observable<JsonNode> messageObservable;
    private final RequestHandler requestHandler;
    private final HandlerThread requestThread;
    private final Scheduler workerScheduler;
    private final AtomicReference<Status> status = new AtomicReference<>(Status.Init);
    private final PublishSubject<Response> messageSentSubject = PublishSubject.create();
    private final CompositeSubscription compositeSubscription = new CompositeSubscription();
    private final BlockingQueue<Request> requestQueue = new LinkedBlockingDeque(1);
    private final String channelId = UUID.randomUUID().toString();

    /* loaded from: classes2.dex */
    public static class Request {
        final ImmutableMap body;
        final UUID uuid = UUID.randomUUID();

        Request(ImmutableMap immutableMap) {
            this.body = immutableMap;
        }

        public String toString() {
            return "Request{uuid=" + this.uuid + ", body=" + this.body + CoreConstants.CURLY_RIGHT;
        }
    }

    /* loaded from: classes2.dex */
    public static class RequestHandler extends Handler {
        private final WeakReference<IMChannel> channelWrapper;

        RequestHandler(Looper looper, IMChannel iMChannel) {
            super(looper);
            this.channelWrapper = new WeakReference<>(iMChannel);
        }

        @Override // android.os.Handler
        public void handleMessage(Message message) {
            IMChannel iMChannel = this.channelWrapper.get();
            if (iMChannel == null) {
                return;
            }
            switch (message.what) {
                case 100:
                    iMChannel.enqueueRequestAndRun((Request) message.obj);
                    return;
                default:
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: classes2.dex */
    public static class RequestSubscriber extends Subscriber<JsonNode> {
        private final WeakReference<IMChannel> channelWrapper;

        public RequestSubscriber(IMChannel iMChannel) {
            this.channelWrapper = new WeakReference<>(iMChannel);
        }

        @Override // rx.Observer
        public void onCompleted() {
        }

        @Override // rx.Observer
        public void onError(Throwable th) {
            IMChannel iMChannel = this.channelWrapper.get();
            if (iMChannel != null) {
                iMChannel.deliverError(th);
            }
        }

        @Override // rx.Observer
        public void onNext(JsonNode jsonNode) {
            IMChannel iMChannel = this.channelWrapper.get();
            if (iMChannel != null) {
                iMChannel.deliverMessage(jsonNode);
            }
        }
    }

    /* loaded from: classes2.dex */
    public static class Response {
        static final Response EMPTY = new Response(UUID.randomUUID(), ApiTransformer.EMPTY_RESPONSE);
        final JsonNode body;
        final UUID uuid;

        Response(UUID uuid, JsonNode jsonNode) {
            this.uuid = uuid;
            this.body = jsonNode;
        }

        public String toString() {
            return "Response{uuid=" + this.uuid + ", body=" + this.body + CoreConstants.CURLY_RIGHT;
        }
    }

    /* loaded from: classes2.dex */
    public enum Status {
        Init,
        Opened,
        Closed
    }

    public IMChannel(Host host, long j, long j2, String str) {
        this.aliasName = str;
        this.host = host;
        this.appId = j;
        this.dataSourceId = j2;
        this.messageObservable = createMessageObservable(host);
        ImCore.Initializer.get().inject(this);
        this.requestThread = new HandlerThread(getName());
        this.requestThread.start();
        this.requestHandler = new RequestHandler(this.requestThread.getLooper(), this);
        this.workerScheduler = Schedulers.from(Executors.newSingleThreadExecutor(new ThreadHelper.NamedThreadFactory(getName(), "worker")));
        this.compositeSubscription.add(this.messageObservable.observeOn(this.workerScheduler).subscribe((Subscriber<? super JsonNode>) new RequestSubscriber(this)));
    }

    private void closeInternal() {
        if (this.status.get() != Status.Closed) {
            this.status.set(Status.Closed);
            this.logger.info("close channel: {}", getName());
            if (this.requestHandler != null) {
                this.requestHandler.removeCallbacksAndMessages(null);
            }
            this.requestThread.interrupt();
        }
    }

    private Observable<JsonNode> createMessageObservable(Host host) {
        Func1<? super JsonNode, ? extends R> func1;
        Observable<JsonNode> filter = host.getMessage().share().filter(IMChannel$$Lambda$2.lambdaFactory$(this));
        func1 = IMChannel$$Lambda$3.instance;
        return filter.map(func1);
    }

    public void deliverError(Throwable th) {
        this.logger.error("error: {}, ch: {}", th, this);
        this.messageSentSubject.onError(th);
    }

    public void deliverMessage(JsonNode jsonNode) {
        this.logger.info("<-- cmd: {}, ch: {}", jsonNode.get(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE), this);
        if (jsonNode.get(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE).asText().equals(CommandType.Notify.value())) {
            sendAck();
            return;
        }
        try {
            Request poll = this.requestQueue.poll(1L, TimeUnit.MINUTES);
            if (poll == null) {
                this.logger.warn("no request matched, ignore this request: {}!", jsonNode.get(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE));
            } else {
                this.logger.info("dequeue and publish request: {}, ch: {}", jsonNode.get(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE), this);
                this.messageSentSubject.onNext(new Response(poll.uuid, jsonNode));
            }
        } catch (InterruptedException e) {
            this.logger.error("occurred an InterruptedException on channel '{}': {} ", this, e);
        }
    }

    public void enqueueRequestAndRun(Request request) {
        try {
            if (this.requestQueue.offer(request, 5L, TimeUnit.MINUTES)) {
                ImmutableMap immutableMap = request.body;
                sendData(immutableMap);
                this.logger.info("enqueue and run request: {}, ch: {}", immutableMap.get(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE), this);
            } else {
                this.logger.warn("request: {} is canceled due to timeout!", request.body.get(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE));
            }
        } catch (InterruptedException e) {
            this.logger.error("occurred an InterruptedException on channel '{}', request:{}", this, request);
            if (!this.requestQueue.isEmpty()) {
                this.logger.error("requestQueue is not empty, need clear it, current size is {}", Integer.valueOf(this.requestQueue.size()));
                this.requestQueue.clear();
            }
            enqueueRequestAndRun(request);
        }
    }

    public /* synthetic */ Boolean lambda$createMessageObservable$146(JsonNode jsonNode) {
        boolean z = false;
        if (!jsonNode.hasNonNull("ch") || !jsonNode.hasNonNull("data")) {
            return false;
        }
        JsonNode jsonNode2 = jsonNode.get("ch");
        if (this.appId == jsonNode2.get(SettingsJsonConstants.APP_KEY).asLong() && this.dataSourceId == jsonNode2.get("ds").asLong()) {
            z = true;
        }
        return Boolean.valueOf(z);
    }

    public static /* synthetic */ Boolean lambda$getEvent$145(JsonNode jsonNode) {
        return Boolean.valueOf(jsonNode.get(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE).asText().equals("notify") && jsonNode.get(LoggingEvents.VoiceIme.EXTRA_START_METHOD).asText().equals(CommandMethod.Request.value()));
    }

    public static /* synthetic */ Boolean lambda$request$148(Request request, Response response) {
        return Boolean.valueOf(response.uuid.equals(request.uuid));
    }

    public /* synthetic */ void lambda$request$150(String str, Object obj) {
        this.logger.info("<- '{}' request on {} next!", str, this);
    }

    public /* synthetic */ void lambda$request$151(String str, Throwable th) {
        this.logger.warn("error when DO '{}' request on {}: ", str, this, th);
    }

    public /* synthetic */ void lambda$request$152(String str) {
        this.logger.info("<-> '{}' request on {} completed!", str, this);
    }

    private void sendData(ImmutableMap immutableMap) {
        if (!isAlive()) {
            this.logger.warn("channel is not alive");
        } else {
            this.logger.info("--> cmd: {}, ch: {}", immutableMap.get(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE), this);
            this.host.sendMessageToChannel(immutableMap, this.channelId).subscribe();
        }
    }

    private Request sendRequestAsync(ImmutableMap immutableMap) {
        Request request = new Request(immutableMap);
        this.requestHandler.obtainMessage(100, request).sendToTarget();
        return request;
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public void close() {
        closeInternal();
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public void dispose() {
        closeInternal();
        this.messageSentSubject.onCompleted();
        this.compositeSubscription.clear();
        ThreadHelper.closeThread(this.requestThread);
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public long getAppId() {
        return this.appId;
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public String getChannelId() {
        return this.channelId;
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public long getDataSourceId() {
        return this.dataSourceId;
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public Observable<JsonNode> getEvent() {
        Func1<? super JsonNode, Boolean> func1;
        if (this.eventObservable == null) {
            Observable<JsonNode> observable = this.messageObservable;
            func1 = IMChannel$$Lambda$1.instance;
            this.eventObservable = observable.filter(func1).share().observeOn(this.workerScheduler);
        }
        return this.eventObservable;
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public String getName() {
        return this.aliasName;
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public boolean isAlive() {
        return this.host.getStatus() == Host.Status.Opened;
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public void open() {
        this.status.set(Status.Opened);
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public <R> Observable<R> request(Valuable valuable, Map<String, Object> map, Func1<? super JsonNode, ? extends R> func1) {
        return request(valuable.value(), map, func1);
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public <R> Observable<R> request(Valuable valuable, Func1<? super JsonNode, ? extends R> func1) {
        return request(valuable.value(), (Map<String, Object>) null, func1);
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public <R> Observable<R> request(String str, Map<String, Object> map, Func1<? super JsonNode, ? extends R> func1) {
        Func1<? super Response, ? extends R> func12;
        if (!isAlive()) {
            this.logger.warn("channel is not alive!");
            return Observable.empty();
        }
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(func1);
        ImmutableMap.Builder builder = new ImmutableMap.Builder();
        builder.put(LoggingEvents.VoiceIme.EXTRA_TEXT_MODIFIED_TYPE, str);
        builder.put(LoggingEvents.VoiceIme.EXTRA_START_METHOD, CommandMethod.Request.value());
        if (map != null) {
            builder.putAll(map);
        }
        Observable<Response> firstOrDefault = this.messageSentSubject.firstOrDefault(Response.EMPTY, IMChannel$$Lambda$4.lambdaFactory$(sendRequestAsync(builder.build())));
        func12 = IMChannel$$Lambda$5.instance;
        return firstOrDefault.map(func12).compose(ApiTransformer.resultTransformer).map(func1).doOnNext(IMChannel$$Lambda$6.lambdaFactory$(this, str)).doOnError(IMChannel$$Lambda$7.lambdaFactory$(this, str)).doOnCompleted(IMChannel$$Lambda$8.lambdaFactory$(this, str));
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public void sendAck() {
        sendData(ackRequestBody);
    }

    @Override // com.fusion.slim.im.core.protocol.Channel
    public void setDataSourceId(long j) {
    }

    public String toString() {
        return "{app=" + this.appId + ", ds=" + this.dataSourceId + ", alias='" + getName() + "'" + CoreConstants.CURLY_RIGHT;
    }
}
