import {
  GetDataSetResponse,
  RegisterDataSetRequest,
  DataContractVersion,
  TargetColumn,
} from 'aws-sdk/clients/tethyscontractservicelambda';
import { orderValues } from '../steps/4-Ingestion';
import {
  hasNoSpaces,
  isValidTableName,
  isValidDatabaseName,
  isValidDatabasePrefix,
  isValidAccount,
  isNotFalsy,
  isValidS3BucketName,
  isValidDdbTableArn,
} from 'src/commons/validationUtils';
import { fileFormats, sourceTypes } from '../../common/constants';
import type { PropertiesPresent, RecursivePartial } from './types';

export const isSourceDDB = ({ DataSource }: GetDataSetResponse) => DataSource.SourceType === sourceTypes.DynamoDB;

export const isSourceS3 = ({ DataSource }: GetDataSetResponse) => DataSource.SourceType === sourceTypes.S3;

export const isParquet = ({ DataFormat = '' }: DataContractVersion) => DataFormat === fileFormats.parquet;

export const isDelimited = ({ DataFormat = '' }: DataContractVersion) => DataFormat === fileFormats.delimited;

const getSchema = (request) =>
  isSourceS3(request)
    ? JSON.parse(request.DataContract.DataProperties.SchemaDefinition).fields
    : request.DataContract.DataProperties.TargetSchema;

export const getSchemaColumnNames = (request) => {
  const schema = getSchema(request);
  const columns = schema.map((f) => f.name || f.Name || f.SourceName);
  return columns;
};

// TODO: send precise error message, in addition to boolean to simplify implementation on step 3.
const validateSchema = (request: RegisterDataSetRequest): boolean => {
  try {
    const schema = getSchema(request);
    if (!schema) return false;

    // Validate if columns have a name. Important when defining PKs, and dedup keys.
    return isSourceS3(request)
      ? schema.some((f) => !!f.name)
      : schema.some((f: TargetColumn) => !!f.Name && !!f.SourceName);
  } catch {
    return false;
  }
};

// Validate if column name is in schema field, and that the ordering value is valid.
const validateDedupKeys = (request: RegisterDataSetRequest) => {
  try {
    const columns = getSchemaColumnNames(request);
    return request.DataContract.DataProperties.DeDupKeyColumns.every(
      ({ ColumnName, SortOrder }) => columns.includes(ColumnName) && orderValues.includes(SortOrder),
    );
  } catch (e) {}
};

//Dataset name is considered as TableName
export function getTableNameErrors(request: RegisterDataSetRequest): string {
  if (!request.TableName) return 'Dataset name is required.';
  if (!hasNoSpaces(request.TableName)) return 'Dataset name should not contain any spaces.';
  if (!isValidTableName(request.TableName)) return 'Dataset name format is invalid.';
  return '';
}

export function getDatabaseNameErrors(request: RegisterDataSetRequest): string {
  if (!request.DatabaseName) return 'Database name is required.';
  if (!hasNoSpaces(request.DatabaseName)) return 'Database name should not contain any spaces.';
  if (!isValidDatabaseName(request.DatabaseName)) return 'Database name format is invalid.';
  if (!isValidDatabasePrefix(request.DatabaseName)) return 'Database name should start with "aws_tethys".';
  return '';
}

export function getCatalogIdErrors(request: RegisterDataSetRequest): string {
  if (!request.CatalogId) return 'Catalog ID is required.';
  if (!isValidAccount(request.CatalogId)) return 'Catalog ID should be a 12-digit number.';
  return '';
}

export function getDataSourceErrors(request: RegisterDataSetRequest, isCreate: boolean): string {
  if (!isCreate) return '';
  if (!Object.values(sourceTypes).includes(request.DataSource.SourceType)) return 'Data source is required.';
  return '';
}

export function getDataFormatErrors(request: RegisterDataSetRequest, isCreate: boolean): string {
  if (!isSourceS3(request) || !isCreate) return '';
  if (!Object.values(fileFormats).includes(request.DataContract.DataFormat)) return 'File format is required.';
  return '';
}

export function getFieldDelimiterErrors(request: RegisterDataSetRequest): string {
  if (!isSourceS3(request)) return '';
  if (isParquet(request.DataContract)) return '';
  if (!isNotFalsy(request.DataContract.FileProperties.FieldDelimiter)) return 'Field delimiter is required.';
  return '';
}

export function getS3BucketErrors(request: RegisterDataSetRequest): string {
  if (!isSourceS3(request)) return '';
  if (!isValidS3BucketName(request.DataSource.S3SourceProperties?.Bucket)) return 'S3 Bucket name is invalid.';
  return '';
}

export function getDDBTableNameErrors(request: RegisterDataSetRequest): string {
  if (!isSourceDDB(request)) return '';
  if (!request.DataSource?.DynamoDBSourceProperties?.DynamoTableArn) return 'DynamoDB table ARN is required';
  if (!isValidDdbTableArn(request.DataSource.DynamoDBSourceProperties.DynamoTableArn))
    return 'DynamoDB table ARN is invalid';
  return '';
}

export const validateStepThree = (request: RegisterDataSetRequest) => ({
  SchemaDefinition: validateSchema(request),
});

export const validateStepFour = (request: RegisterDataSetRequest) => ({
  PrimaryKeyColumns: !!request.DataContract.DataProperties.PrimaryKeyColumns.length,
  DeDupKeyColumns: validateDedupKeys(request),
  IAMRole: !isSourceS3(request) || isNotFalsy(request.DataContract.ServiceLevelAgreement.IAMRole), // Deprecate isValidRole util to enable MVP regions.
  PublishType: !isSourceS3(request) || isNotFalsy(request.DataContract.ServiceLevelAgreement.PublishType),
});

// Iterates through each property to determine if all are valid.
const isStepValid = (step: any) => {
  const isStepValid = !Object.values(step).some((v) => !v); // If a single value is false, the step is not valid.
  return isStepValid;
};

// Conditional mapping to validate the current step.
export const validateStep = (step: number, request: RegisterDataSetRequest, isCreate: boolean): boolean => {
  if (step === 1) {
    return !getTableNameErrors(request) && !getDatabaseNameErrors(request) && !getCatalogIdErrors(request);
  }

  if (step === 2) {
    return (
      !getDataSourceErrors(request, isCreate) &&
      !getDataFormatErrors(request, isCreate) &&
      !getFieldDelimiterErrors(request) &&
      !getS3BucketErrors(request) &&
      !getDDBTableNameErrors(request)
    );
  }

  if (step === 3) {
    const stepThreeAttribs = validateStepThree(request);
    return isStepValid(stepThreeAttribs);
  }

  if (step === 4) {
    const stepFourAttribs = validateStepFour(request);
    return isStepValid(stepFourAttribs);
  }

  return false; // Return false when the step is not valid.
};

export const checkIfParquet = ({ DataFormat }: DataContractVersion) => DataFormat === 'PARQUET';

/*
Helper for testing AWS Schema Registry IAM role:
arn:aws:iam::773135466957:role/GlueAndLakeFormationAdmin
*/

/**
 * Returns false if any element of `array` isn't the same type as `template`.
 * Recursively checks object properties if the array contains objects
 * or array elements if the array contains arrays.
 *
 * An empty array is a valid match for any given template type/value.
 */
export function doElementsMatch<T extends Record<string, any>>(
  array: Array<RecursivePartial<T>>,
  template: PropertiesPresent<T>,
): boolean {
  for (const element of array) {
    if (typeof element !== typeof template) return false;
    if (typeof element === 'object') {
      if (Array.isArray(element) && Array.isArray(template)) {
        if (!doElementsMatch(element, template[0])) return false;
      } else if (Array.isArray(element) !== Array.isArray(template)) {
        // If one is an array and the other isn't, they don't match
        return false;
      } else {
        // null is also an object :(
        if (element === null && template === null) continue;
        if ((element === null) !== (template === null)) return false;
        if (!doPropertiesMatch(element, template)) return false;
      }
    }
  }

  return true;
}

/**
 * For all properties that exist within `object`, returns whether the type of all properties
 * matches the type of all properties in `template`.
 * Recursively checks object properties if the property is an object
 * or array elements if the property is an array.
 *
 * An empty object is a valid match for any given template type/value.
 *
 * ### Limitations
 * Recursive property comparisons don't work on non-record objects (e.g. class instances)
 * like `Date` or `Buffer`. `RecursivePartial<T>` and `PropertiesPresent<T>` will *try*
 * to apply to all functions, and `doPropertiesMatch` will check that the expected type of
 * 'function' matches, but it won't compare function signatures for equality.
 *
 * ### Type Theory
 * The parameter `object` is typed as `RecursivePartial<T>` to help TS catch edge cases,
 * but the runtime type of `object` is only expected to satisfy the type constraint if
 * this function returns true. Note that this function doesn't guarantee that `object`
 * is exactly of type `T`, just for type `RecursivePartial<T>`.
 *
 * For any valid Record of type `T`, `PropertiesPresent<T>` is a subtype of of `T`
 * and `RecursivePartial<T>` is a supertype of `T`. Both `PropertiesPresent<T>` and
 * `RecursivePartial<T>` are covariant over `T`, so if, for all sub-properties of
 * the supertype, there exists a linear transformation to an equivalent sub-property of
 * the subtype (transformation here is a type equivalency check), the supertyping is valid.
 */
export function doPropertiesMatch<T extends Record<string, any>>(
  object: RecursivePartial<T> | any,
  template: PropertiesPresent<T>,
): object is RecursivePartial<T> {
  if (Array.isArray(object) || Array.isArray(template)) {
    // Arrays are technically a subclass of Record<string, any> but aren't supported by this function
    return false;
  }

  for (const key of Object.keys(object)) {
    const objElem = object[key];
    const templateElem = template[key];
    if (typeof objElem !== typeof templateElem) {
      return false;
    }

    // if it's an object, recursively check object properties
    if (typeof objElem === 'object') {
      // if it's an array, check that all array elements match
      if (Array.isArray(objElem) && Array.isArray(templateElem)) {
        const arrayTemplate = templateElem[0];
        if (!doElementsMatch(objElem, arrayTemplate)) {
          return false;
        }
      } else if (Array.isArray(objElem) !== Array.isArray(templateElem)) {
        return false;
      } else if (objElem === null) {
        // null is also an object :(
        if (templateElem === null) continue;
        return false;
      } else {
        if (!doPropertiesMatch(objElem, templateElem)) return false;
      }
    }
  }

  return true;
}

/**
 * Recursively deletes undefined properties from `obj`.
 */
export function cleanUndefinedProperties(obj: Record<string, any>) {
  for (const key in obj) {
    if (obj[key] === undefined) {
      delete obj[key];
    } else if (typeof obj[key] === 'object' && obj[key] !== null && !Array.isArray(obj[key])) {
      cleanUndefinedProperties(obj[key]);
    }
  }
}
