DestinationForm() — supabase Function Reference
Architecture documentation for the DestinationForm() function in index.tsx from the supabase codebase.
Entity Profile
Dependency Diagram
graph TD fffb5bea_3c71_7ec5_e4c1_4bf313a4a834["DestinationForm()"] 2fe82f23_0649_59a7_6fad_27f69700fe7e["buildDestinationConfigForValidation()"] fffb5bea_3c71_7ec5_e4c1_4bf313a4a834 -->|calls| 2fe82f23_0649_59a7_6fad_27f69700fe7e style fffb5bea_3c71_7ec5_e4c1_4bf313a4a834 fill:#6366f1,stroke:#818cf8,color:#fff
Relationship Graph
Source Code
apps/studio/components/interfaces/Database/Replication/DestinationPanel/DestinationForm/index.tsx lines 70–688
export const DestinationForm = ({
selectedType,
visible,
existingDestination,
onClose,
}: DestinationFormProps) => {
const { ref: projectRef } = useParams()
const { setRequestStatus } = usePipelineRequestStatus()
const etlEnableBigQuery = useFlag('etlEnableBigQuery')
const etlEnableIceberg = useFlag('etlEnableIceberg')
const { can: canReadAPIKeys } = useAsyncCheckPermissions(PermissionAction.SECRETS_READ, '*')
const [isFormInteracting, setIsFormInteracting] = useState(false)
const [showDisclaimerDialog, setShowDisclaimerDialog] = useState(false)
const [publicationPanelVisible, setPublicationPanelVisible] = useState(false)
const [newBucketSheetVisible, setNewBucketSheetVisible] = useState(false)
const [pendingFormValues, setPendingFormValues] = useState<z.infer<typeof FormSchema> | null>(
null
)
const [hasRunValidation, setHasRunValidation] = useState(false)
const [destinationValidationFailures, setDestinationValidationFailures] = useState<
ValidationFailure[]
>([])
const [pipelineValidationFailures, setPipelineValidationFailures] = useState<ValidationFailure[]>(
[]
)
const validationSectionRef = useRef<HTMLDivElement>(null)
const editMode = !!existingDestination
// Compute available destinations based on feature flags
const availableDestinations = useMemo(() => {
const destinations = []
if (etlEnableBigQuery) destinations.push({ value: 'BigQuery', label: 'BigQuery' })
if (etlEnableIceberg)
destinations.push({ value: 'Analytics Bucket', label: 'Analytics Bucket' })
return destinations
}, [etlEnableBigQuery, etlEnableIceberg])
const hasNoAvailableDestinations = availableDestinations.length === 0
const { data: sourcesData } = useReplicationSourcesQuery({ projectRef })
const sourceId = sourcesData?.sources.find((s) => s.name === projectRef)?.id
const {
data: publications = [],
isSuccess: isSuccessPublications,
refetch: refetchPublications,
} = useReplicationPublicationsQuery({ projectRef, sourceId })
const { data: destinationData } = useReplicationDestinationByIdQuery({
projectRef,
destinationId: existingDestination?.destinationId,
})
const { data: pipelineData } = useReplicationPipelineByIdQuery({
projectRef,
pipelineId: existingDestination?.pipelineId,
})
const { data: apiKeys } = useAPIKeysQuery(
{ projectRef, reveal: true },
{ enabled: canReadAPIKeys }
)
const { serviceKey } = getKeys(apiKeys)
const catalogToken = serviceKey?.api_key ?? ''
const { data: projectSettings } = useProjectSettingsV2Query({ projectRef })
const { mutateAsync: createDestinationPipeline, isPending: creatingDestinationPipeline } =
useCreateDestinationPipelineMutation({
onSuccess: () => form.reset(defaultValues),
})
const { mutateAsync: updateDestinationPipeline, isPending: updatingDestinationPipeline } =
useUpdateDestinationPipelineMutation({
onSuccess: () => form.reset(defaultValues),
})
const { mutateAsync: startPipeline, isPending: startingPipeline } = useStartPipelineMutation()
const { restartPipeline } = useRestartPipelineHelper()
const { mutateAsync: createS3AccessKey, isPending: isCreatingS3AccessKey } =
useS3AccessKeyCreateMutation()
const { mutateAsync: createNamespace, isPending: isCreatingNamespace } =
useIcebergNamespaceCreateMutation()
const { mutateAsync: validateDestination, isPending: isValidatingDestination } =
useValidateDestinationMutation()
const { mutateAsync: validatePipeline, isPending: isValidatingPipeline } =
useValidatePipelineMutation()
const isValidating = isValidatingDestination || isValidatingPipeline
const defaultValues = useMemo(() => {
const config = destinationData?.config
const isBigQueryConfig = config && 'big_query' in config
const isIcebergConfig = config && 'iceberg' in config
return {
// Common fields
name: destinationData?.name ?? '',
publicationName: pipelineData?.config.publication_name ?? '',
maxFillMs: pipelineData?.config?.batch?.max_fill_ms ?? undefined,
maxSize: pipelineData?.config?.batch?.max_size ?? undefined,
maxTableSyncWorkers: pipelineData?.config?.max_table_sync_workers ?? undefined,
// BigQuery fields
projectId: isBigQueryConfig ? config.big_query.project_id : '',
datasetId: isBigQueryConfig ? config.big_query.dataset_id : '',
serviceAccountKey: isBigQueryConfig ? config.big_query.service_account_key : '',
maxStalenessMins: isBigQueryConfig ? config.big_query.max_staleness_mins : undefined, // Default: null
// Analytics Bucket fields
warehouseName: isIcebergConfig ? config.iceberg.supabase.warehouse_name : '',
namespace: isIcebergConfig ? config.iceberg.supabase.namespace : '',
newNamespaceName: '',
catalogToken: isIcebergConfig ? config.iceberg.supabase.catalog_token : catalogToken,
s3AccessKeyId: isIcebergConfig ? config.iceberg.supabase.s3_access_key_id : '',
s3SecretAccessKey: isIcebergConfig ? config.iceberg.supabase.s3_secret_access_key : '',
s3Region:
projectSettings?.region ?? (isIcebergConfig ? config.iceberg.supabase.s3_region : ''),
}
}, [destinationData, pipelineData, catalogToken, projectSettings])
const form = useForm<z.infer<typeof FormSchema>>({
mode: 'onChange',
reValidateMode: 'onChange',
resolver: zodResolver(
FormSchema.superRefine((data, ctx) => {
const addRequiredFieldError = (path: string, message: string) => {
ctx.addIssue({
code: z.ZodIssueCode.custom,
message,
path: [path],
})
}
if (selectedType === 'BigQuery') {
if (!data.projectId?.length) addRequiredFieldError('projectId', 'Project ID is required')
if (!data.datasetId?.length) addRequiredFieldError('datasetId', 'Dataset ID is required')
if (!data.serviceAccountKey?.length)
addRequiredFieldError('serviceAccountKey', 'Service Account Key is required')
} else if (selectedType === 'Analytics Bucket') {
if (!data.warehouseName?.length)
addRequiredFieldError('warehouseName', 'Bucket is required')
const hasValidNamespace =
(data.namespace?.length && data.namespace !== 'create-new-namespace') ||
(data.namespace === 'create-new-namespace' && data.newNamespaceName?.length)
if (!hasValidNamespace) {
const isCreatingNew = data.namespace === 'create-new-namespace'
addRequiredFieldError(
isCreatingNew ? 'newNamespaceName' : 'namespace',
isCreatingNew ? 'Namespace name is required' : 'Namespace is required'
)
}
if (!data.s3Region?.length) addRequiredFieldError('s3Region', 'S3 Region is required')
if (!data.s3AccessKeyId?.length)
addRequiredFieldError('s3AccessKeyId', 'S3 Access Key ID is required')
if (data.s3AccessKeyId !== 'create-new' && !data.s3SecretAccessKey?.length) {
addRequiredFieldError('s3SecretAccessKey', 'S3 Secret Access Key is required')
}
}
})
),
defaultValues,
})
const { publicationName, warehouseName } = form.watch()
const publicationNames = useMemo(() => publications?.map((pub) => pub.name) ?? [], [publications])
const isSelectedPublicationMissing =
isSuccessPublications && !!publicationName && !publicationNames.includes(publicationName)
const allValidationFailures = [...destinationValidationFailures, ...pipelineValidationFailures]
const hasValidationFailures = allValidationFailures.some((f) => f.failure_type === 'critical')
const isSaving =
creatingDestinationPipeline ||
updatingDestinationPipeline ||
startingPipeline ||
isCreatingS3AccessKey ||
isCreatingNamespace ||
isValidating
const isSubmitDisabled =
isSaving || isSelectedPublicationMissing || (!editMode && hasNoAvailableDestinations)
const getSubmitButtonText = () => {
if (editMode) {
return existingDestination?.enabled ? 'Apply and restart' : 'Apply and start'
} else {
return 'Create and start'
}
}
// Helper function to handle namespace creation if needed
const resolveNamespace = async (data: z.infer<typeof FormSchema>) => {
if (data.namespace === CREATE_NEW_NAMESPACE) {
if (!data.newNamespaceName) throw new Error('New namespace name is required')
await createNamespace({
projectRef,
warehouse: data.warehouseName!,
namespace: data.newNamespaceName,
})
return data.newNamespaceName
}
return data.namespace
}
// Helper function to validate configuration
const validateConfiguration = async (data: z.infer<typeof FormSchema>) => {
if (!projectRef || !sourceId) return false
setHasRunValidation(true)
// Call both validation endpoints in parallel and wait for both to complete
// even if one fails - this makes the validation feel like a single operation
const results = await Promise.allSettled([
validateDestination({
projectRef,
destinationConfig: buildDestinationConfigForValidation({ projectRef, selectedType, data }),
}),
validatePipeline({
projectRef,
sourceId,
publicationName: data.publicationName,
maxFillMs: data.maxFillMs,
maxSize: data.maxSize,
maxTableSyncWorkers: data.maxTableSyncWorkers,
}),
])
// Extract results from settled promises
const destResult = results[0]
const pipelineResult = results[1]
// Check if any validation request failed completely
const hasRequestError = results.some((r) => r.status === 'rejected')
if (hasRequestError) {
// If any request failed, show a generic error and stop
toast.error('Failed to validate configuration. Please try again.')
setHasRunValidation(false)
return false
}
// Both requests succeeded, extract validation failures
const destValidationResult =
destResult.status === 'fulfilled' ? destResult.value : { validation_failures: [] }
const pipelineValidationResult =
pipelineResult.status === 'fulfilled' ? pipelineResult.value : { validation_failures: [] }
setDestinationValidationFailures(destValidationResult.validation_failures)
setPipelineValidationFailures(pipelineValidationResult.validation_failures)
// Check if there are critical failures or warnings
const allFailures = [
...destValidationResult.validation_failures,
...pipelineValidationResult.validation_failures,
]
const hasCriticalFailures = allFailures.some((f) => f.failure_type === 'critical')
const hasAnyFailures = allFailures.length > 0
// Scroll to validation section if there are any failures
if (hasAnyFailures) {
setTimeout(() => {
validationSectionRef.current?.scrollIntoView({ behavior: 'smooth', block: 'start' })
}, 100)
}
return !hasCriticalFailures
}
// [Joshen] I reckon this function can be refactored to be a bit more modular, it's currently pretty
// complicated with 4 different types of flows -> edit bigquery/analytics, and create bigquery/analytics
// At first glance we could try grouping as edit / create bigquery, edit / create analytics
// since the destination config seems rather similar between edit and create for the same type
const submitPipeline = async (data: z.infer<typeof FormSchema>) => {
if (!projectRef) return console.error('Project ref is required')
if (!sourceId) return console.error('Source id is required')
if (isSelectedPublicationMissing) {
return toast.error('Please select another publication before continuing')
}
try {
if (editMode && existingDestination) {
if (!existingDestination.pipelineId) return console.error('Pipeline id is required')
let destinationConfig: DestinationConfig | undefined = undefined
if (selectedType === 'BigQuery') {
const bigQueryConfig: BigQueryDestinationConfig = {
projectId: data.projectId ?? '',
datasetId: data.datasetId ?? '',
serviceAccountKey: data.serviceAccountKey ?? '',
}
if (!!data.maxStalenessMins) {
bigQueryConfig.maxStalenessMins = data.maxStalenessMins
}
destinationConfig = { bigQuery: bigQueryConfig }
} else if (selectedType === 'Analytics Bucket') {
let s3Keys = { accessKey: data.s3AccessKeyId, secretKey: data.s3SecretAccessKey }
if (data.s3AccessKeyId === CREATE_NEW_KEY) {
const newKeys = await createS3AccessKey({
projectRef,
description: `Autogenerated key for replication to ${snakeCase(warehouseName)}`,
})
s3Keys = { accessKey: newKeys.access_key, secretKey: newKeys.secret_key }
}
// Resolve namespace (create if needed)
const finalNamespace = await resolveNamespace(data)
const icebergConfig: IcebergDestinationConfig = {
projectRef: projectRef,
warehouseName: data.warehouseName ?? '',
namespace: finalNamespace,
catalogToken: data.catalogToken ?? '',
s3AccessKeyId: s3Keys.accessKey ?? '',
s3SecretAccessKey: s3Keys.secretKey ?? '',
s3Region: data.s3Region ?? '',
}
destinationConfig = { iceberg: icebergConfig }
}
const batchConfig: BatchConfig | undefined =
data.maxFillMs !== undefined || data.maxSize !== undefined
? {
...(data.maxFillMs !== undefined ? { maxFillMs: data.maxFillMs } : {}),
...(data.maxSize !== undefined ? { maxSize: data.maxSize } : {}),
}
: undefined
const hasBatchFields = batchConfig !== undefined
if (!destinationConfig) throw new Error('Destination configuration is missing')
await updateDestinationPipeline({
destinationId: existingDestination.destinationId,
pipelineId: existingDestination.pipelineId,
projectRef,
destinationName: data.name,
destinationConfig,
pipelineConfig: {
publicationName: data.publicationName,
maxTableSyncWorkers: data.maxTableSyncWorkers,
...(hasBatchFields ? { batch: batchConfig } : {}),
},
sourceId,
})
// Set request status only right before starting, then fire and close
const snapshot =
existingDestination.statusName ?? (existingDestination.enabled ? 'started' : 'stopped')
if (existingDestination.enabled) {
setRequestStatus(
existingDestination.pipelineId,
PipelineStatusRequestStatus.RestartRequested,
snapshot
)
toast.success('Settings applied. Restarting the pipeline...')
restartPipeline({ projectRef, pipelineId: existingDestination.pipelineId })
} else {
setRequestStatus(
existingDestination.pipelineId,
PipelineStatusRequestStatus.StartRequested,
snapshot
)
toast.success('Settings applied. Starting the pipeline...')
startPipeline({ projectRef, pipelineId: existingDestination.pipelineId })
}
onClose()
} else {
let destinationConfig: DestinationConfig | undefined = undefined
if (selectedType === 'BigQuery') {
const bigQueryConfig: BigQueryDestinationConfig = {
projectId: data.projectId ?? '',
datasetId: data.datasetId ?? '',
serviceAccountKey: data.serviceAccountKey ?? '',
}
if (!!data.maxStalenessMins) {
bigQueryConfig.maxStalenessMins = data.maxStalenessMins
}
destinationConfig = { bigQuery: bigQueryConfig }
} else if (selectedType === 'Analytics Bucket') {
let s3Keys = { accessKey: data.s3AccessKeyId, secretKey: data.s3SecretAccessKey }
if (data.s3AccessKeyId === CREATE_NEW_KEY) {
const newKeys = await createS3AccessKey({
projectRef,
description: `Autogenerated key for replication to ${snakeCase(warehouseName)}`,
})
s3Keys = { accessKey: newKeys.access_key, secretKey: newKeys.secret_key }
}
// Resolve namespace (create if needed)
const finalNamespace = await resolveNamespace(data)
const icebergConfig: IcebergDestinationConfig = {
projectRef: projectRef,
warehouseName: data.warehouseName ?? '',
namespace: finalNamespace,
catalogToken: data.catalogToken ?? '',
s3AccessKeyId: s3Keys.accessKey ?? '',
s3SecretAccessKey: s3Keys.secretKey ?? '',
s3Region: data.s3Region ?? '',
}
destinationConfig = { iceberg: icebergConfig }
}
const batchConfig: BatchConfig | undefined =
data.maxFillMs !== undefined || data.maxSize !== undefined
? {
...(data.maxFillMs !== undefined ? { maxFillMs: data.maxFillMs } : {}),
...(data.maxSize !== undefined ? { maxSize: data.maxSize } : {}),
}
: undefined
const hasBatchFields = batchConfig !== undefined
if (!destinationConfig) throw new Error('Destination configuration is missing')
const { pipeline_id: pipelineId } = await createDestinationPipeline({
projectRef,
destinationName: data.name,
destinationConfig,
sourceId,
pipelineConfig: {
publicationName: data.publicationName,
maxTableSyncWorkers: data.maxTableSyncWorkers,
...(hasBatchFields ? { batch: batchConfig } : {}),
},
})
// Set request status only right before starting, then fire and close
setRequestStatus(pipelineId, PipelineStatusRequestStatus.StartRequested, undefined)
toast.success('Destination created. Starting the pipeline...')
startPipeline({ projectRef, pipelineId })
onClose()
}
} catch (error) {
const action = editMode ? 'apply and run' : 'create and start'
toast.error(`Failed to ${action} destination`)
}
}
const onSubmit = async (data: z.infer<typeof FormSchema>) => {
if (!editMode) {
// For new pipelines, validate configuration first if not already validated
// OR if user has critical failures and clicks "Validate again"
if (!hasRunValidation || isValidating || hasValidationFailures) {
const isValid = await validateConfiguration(data)
if (!isValid) {
// Validation failed with critical errors, show inline and stop
return
}
// Validation passed or only has warnings, continue to disclaimer
}
// Validation passed or only warnings, proceed to disclaimer
setPendingFormValues(data)
setShowDisclaimerDialog(true)
return
}
await submitPipeline(data)
}
const handleDisclaimerDialogChange = (open: boolean) => {
setShowDisclaimerDialog(open)
if (!open) {
setPendingFormValues(null)
}
}
const handleDisclaimerConfirm = async () => {
if (!pendingFormValues) return
const values = pendingFormValues
setPendingFormValues(null)
setShowDisclaimerDialog(false)
await submitPipeline(values)
}
useEffect(() => {
if (editMode && destinationData && pipelineData && !isFormInteracting) {
form.reset(defaultValues)
}
}, [destinationData, pipelineData, editMode, defaultValues, form, isFormInteracting])
// Ensure the form always reflects the freshest data whenever the panel opens
useEffect(() => {
if (visible) {
form.reset(defaultValues)
setIsFormInteracting(false)
setHasRunValidation(false)
setDestinationValidationFailures([])
setPipelineValidationFailures([])
}
}, [visible, defaultValues, form])
useEffect(() => {
if (visible && projectRef && sourceId) {
refetchPublications()
}
}, [visible, projectRef, sourceId, refetchPublications])
return (
<>
<SheetSection className="flex-grow overflow-auto px-0 py-0">
{hasNoAvailableDestinations && !editMode ? (
<NoDestinationsAvailable />
) : (
<Form_Shadcn_ {...form}>
<form id={formId} onSubmit={form.handleSubmit(onSubmit)}>
<div className="p-5 flex flex-col gap-y-6">
<p className="text-sm font-medium text-foreground">Destination details</p>
<div className="space-y-4">
<DestinationNameInput form={form} />
<PublicationSelection
form={form}
sourceId={sourceId}
visible={visible}
onSelectNewPublication={() => setPublicationPanelVisible(true)}
/>
</div>
</div>
<DialogSectionSeparator />
{selectedType === 'BigQuery' && etlEnableBigQuery ? (
<BigQueryFields form={form} />
) : selectedType === 'Analytics Bucket' && etlEnableIceberg ? (
<AnalyticsBucketFields
form={form}
setIsFormInteracting={setIsFormInteracting}
onSelectNewBucket={() => setNewBucketSheetVisible(true)}
/>
) : null}
<DialogSectionSeparator />
<AdvancedSettings type={selectedType} form={form} />
{!editMode && hasRunValidation && !isValidating && (
<>
<DialogSectionSeparator />
<div ref={validationSectionRef}>
<ValidationFailuresSection
destinationFailures={destinationValidationFailures}
pipelineFailures={pipelineValidationFailures}
/>
</div>
</>
)}
</form>
</Form_Shadcn_>
)}
</SheetSection>
<SheetFooter className="!justify-between">
<AnimatePresence mode="wait">
{isValidating || isSaving ? (
<motion.div
className="flex items-center gap-x-2"
initial={{ opacity: 0, y: 5 }}
animate={{ opacity: 1, y: 0 }}
exit={{ opacity: 0, y: 5 }}
transition={{ duration: 0.2, ease: 'easeOut' }}
>
<Loader2 className="animate-spin" size={14} />
<p className="text-foreground-light text-sm">
{isValidating
? 'Validating destination configuration...'
: `${editMode ? 'Updating' : 'Creating'} destination...`}
</p>
</motion.div>
) : (
<div />
)}
</AnimatePresence>
<div className="flex items-center gap-x-2">
<Button disabled={isSaving} type="default" onClick={onClose}>
Cancel
</Button>
<Button disabled={isSubmitDisabled} loading={isSaving} form={formId} htmlType="submit">
{getSubmitButtonText()}
</Button>
</div>
</SheetFooter>
<NewPublicationPanel
sourceId={sourceId}
visible={publicationPanelVisible}
onClose={() => setPublicationPanelVisible(false)}
/>
<CreateAnalyticsBucketSheet
open={newBucketSheetVisible}
onOpenChange={setNewBucketSheetVisible}
/>
<ReplicationDisclaimerDialog
open={showDisclaimerDialog}
onOpenChange={handleDisclaimerDialogChange}
isLoading={isSaving}
onConfirm={handleDisclaimerConfirm}
/>
</>
)
}
Domain
Subdomains
Source
Frequently Asked Questions
What does DestinationForm() do?
DestinationForm() is a function in the supabase codebase.
What does DestinationForm() call?
DestinationForm() calls 1 function(s): buildDestinationConfigForValidation.
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free