/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.slf4j.Logger;

public class StreamsRebalanceListener
implements ConsumerRebalanceListener {
    private final Time time;
    private final TaskManager taskManager;
    private final StreamThread streamThread;
    private final Logger log;
    private final AtomicInteger assignmentErrorCode;

    StreamsRebalanceListener(Time time, TaskManager taskManager, StreamThread streamThread, Logger log, AtomicInteger assignmentErrorCode) {
        this.time = time;
        this.taskManager = taskManager;
        this.streamThread = streamThread;
        this.log = log;
        this.assignmentErrorCode = assignmentErrorCode;
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        if (this.assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
            this.log.error("Received error code {}. {}", (Object)AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.codeName(), (Object)AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.description());
            this.taskManager.handleRebalanceComplete();
            throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
        }
        if (this.assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
            this.log.info("Received version probing code {}", (Object)AssignorError.VERSION_PROBING);
        } else {
            if (this.assignmentErrorCode.get() == AssignorError.ASSIGNMENT_ERROR.code()) {
                this.log.error("Received error code {}", (Object)AssignorError.ASSIGNMENT_ERROR);
                this.taskManager.handleRebalanceComplete();
                throw new TaskAssignmentException("Hit an unexpected exception during task assignment phase of rebalance");
            }
            if (this.assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
                this.log.error("A Kafka Streams client in this Kafka Streams application is requesting to shutdown the application");
                this.taskManager.handleRebalanceComplete();
                this.streamThread.shutdownToError();
                return;
            }
            if (this.assignmentErrorCode.get() != AssignorError.NONE.code()) {
                this.log.error("Received unknown error code {}", (Object)this.assignmentErrorCode.get());
                throw new TaskAssignmentException("Hit an unrecognized exception during rebalance");
            }
        }
        this.streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.streamThread.setPartitionAssignedTime(this.time.milliseconds());
        this.taskManager.handleRebalanceComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        this.log.debug("Current state {}: revoked partitions {} because of consumer rebalance.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n", new Object[]{this.streamThread.state(), partitions, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds()});
        if (!(this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED) == null && this.streamThread.state() != StreamThread.State.PENDING_SHUTDOWN || partitions.isEmpty())) {
            long start = this.time.milliseconds();
            try {
                this.taskManager.handleRevocation(partitions);
            }
            finally {
                this.log.info("partition revocation took {} ms.", (Object)(this.time.milliseconds() - start));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        this.log.info("at state {}: partitions {} lost due to missed rebalance.\n\tlost active tasks: {}\n\tlost assigned standby tasks: {}\n", new Object[]{this.streamThread.state(), partitions, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds()});
        long start = this.time.milliseconds();
        try {
            this.taskManager.handleLostAll();
        }
        finally {
            this.log.info("partitions lost took {} ms.", (Object)(this.time.milliseconds() - start));
        }
    }
}

