/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import org.apache.kafka.server.util.Json;
import org.apache.kafka.server.util.json.DecodeJson;
import org.apache.kafka.server.util.json.JsonObject;
import org.apache.kafka.server.util.json.JsonValue;
import org.apache.kafka.tools.AdminCommandFailedException;
import org.apache.kafka.tools.AdminOperationException;

public class DeleteRecordsCommand {
    private static final int EARLIEST_VERSION = 1;
    private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger();
    private static final DecodeJson.DecodeLong LONG = new DecodeJson.DecodeLong();
    private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString();

    public static void main(String[] args) throws Exception {
        Exit.exit((int)DeleteRecordsCommand.mainNoExit(args));
    }

    static int mainNoExit(String ... args) {
        try {
            DeleteRecordsCommand.execute(args, System.out);
            return 0;
        }
        catch (Exception e) {
            System.err.println(e.getMessage());
            System.err.println(Utils.stackTrace((Throwable)e));
            return 1;
        }
    }

    static Map<TopicPartition, List<Long>> parseOffsetJsonStringWithoutDedup(String jsonData) throws JsonProcessingException {
        JsonValue js = (JsonValue)Json.parseFull((String)jsonData).orElseThrow(() -> new AdminOperationException("The input string is not a valid JSON"));
        Optional version = js.asJsonObject().get("version");
        return DeleteRecordsCommand.parseJsonData(version.isPresent() ? (Integer)((JsonValue)version.get()).to((DecodeJson)INT) : 1, js);
    }

    private static Map<TopicPartition, List<Long>> parseJsonData(int version, JsonValue js) throws JsonMappingException {
        if (version == 1) {
            JsonValue partitions = (JsonValue)js.asJsonObject().get("partitions").orElseThrow(() -> new AdminOperationException("Missing partitions field"));
            HashMap<TopicPartition, List<Long>> res = new HashMap<TopicPartition, List<Long>>();
            Iterator iterator = partitions.asJsonArray().iterator();
            while (iterator.hasNext()) {
                JsonObject partitionJs = ((JsonValue)iterator.next()).asJsonObject();
                String topic = (String)partitionJs.apply("topic").to((DecodeJson)STRING);
                int partition = (Integer)partitionJs.apply("partition").to((DecodeJson)INT);
                long offset = (Long)partitionJs.apply("offset").to((DecodeJson)LONG);
                res.computeIfAbsent(new TopicPartition(topic, partition), k -> new ArrayList()).add(offset);
            }
            return res;
        }
        throw new AdminOperationException("Not supported version field value " + version);
    }

    public static void execute(String[] args, PrintStream out) throws IOException {
        DeleteRecordsCommandOptions opts = new DeleteRecordsCommandOptions(args);
        try (Admin adminClient = DeleteRecordsCommand.createAdminClient(opts);){
            DeleteRecordsCommand.execute(adminClient, Utils.readFileAsString((String)((String)opts.options.valueOf(opts.offsetJsonFileOpt))), out);
        }
    }

    static void execute(Admin adminClient, String offsetJsonString, PrintStream out) throws JsonProcessingException {
        Map<TopicPartition, List<Long>> offsetSeq = DeleteRecordsCommand.parseOffsetJsonStringWithoutDedup(offsetJsonString);
        Set<TopicPartition> duplicatePartitions = offsetSeq.entrySet().stream().filter(e -> ((List)e.getValue()).size() > 1).map(Map.Entry::getKey).collect(Collectors.toSet());
        if (!duplicatePartitions.isEmpty()) {
            StringJoiner duplicates = new StringJoiner(",");
            duplicatePartitions.forEach(tp -> duplicates.add(tp.toString()));
            throw new AdminCommandFailedException(String.format("Offset json file contains duplicate topic partitions: %s", duplicates));
        }
        HashMap<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<TopicPartition, RecordsToDelete>();
        for (Map.Entry<TopicPartition, List<Long>> e2 : offsetSeq.entrySet()) {
            recordsToDelete.put(e2.getKey(), RecordsToDelete.beforeOffset((long)e2.getValue().get(0)));
        }
        out.println("Executing records delete operation");
        DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete);
        out.println("Records delete operation completed:");
        deleteRecordsResult.lowWatermarks().forEach((tp, partitionResult) -> {
            try {
                out.printf("partition: %s\tlow_watermark: %s%n", tp, ((DeletedRecords)partitionResult.get()).lowWatermark());
            }
            catch (InterruptedException | ExecutionException e) {
                out.printf("partition: %s\terror: %s%n", tp, e.getMessage());
            }
        });
    }

    private static Admin createAdminClient(DeleteRecordsCommandOptions opts) throws IOException {
        Properties props = opts.options.has(opts.commandConfigOpt) ? Utils.loadProps((String)((String)opts.options.valueOf(opts.commandConfigOpt))) : new Properties();
        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
        return Admin.create((Properties)props);
    }

    private static class DeleteRecordsCommandOptions
    extends CommandDefaultOptions {
        private final OptionSpec<String> bootstrapServerOpt;
        private final OptionSpec<String> offsetJsonFileOpt;
        private final OptionSpec<String> commandConfigOpt;

        public DeleteRecordsCommandOptions(String[] args) {
            super(args);
            this.bootstrapServerOpt = this.parser.accepts("bootstrap-server", "REQUIRED: The server to connect to.").withRequiredArg().describedAs("server(s) to use for bootstrapping").ofType(String.class);
            this.offsetJsonFileOpt = this.parser.accepts("offset-json-file", "REQUIRED: The JSON file with offset per partition. The format to use is:\n{\"partitions\":\n  [{\"topic\": \"foo\", \"partition\": 1, \"offset\": 1}],\n \"version\":1\n}").withRequiredArg().describedAs("Offset json file path").ofType(String.class);
            this.commandConfigOpt = this.parser.accepts("command-config", "A property file containing configs to be passed to Admin Client.").withRequiredArg().describedAs("command config property file path").ofType(String.class);
            this.options = this.parser.parse(args);
            CommandLineUtils.maybePrintHelpOrVersion((CommandDefaultOptions)this, (String)"This tool helps to delete records of the given partitions down to the specified offset.");
            CommandLineUtils.checkRequiredArgs((OptionParser)this.parser, (OptionSet)this.options, (OptionSpec[])new OptionSpec[]{this.bootstrapServerOpt, this.offsetJsonFileOpt});
        }
    }
}

