475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768 | def register(self, app: typer.Typer) -> None: # noqa: C901
"""Register auth commands with the main app."""
auth = typer.Typer(
help="Authentication commands. Note: 'qsh auth login' and 'qsh login' are equivalent."
)
@auth.command("login")
def auth_login( # noqa: C901
provider_arg: str | None = typer.Argument(None, help="Auth provider to use"),
) -> None:
"""Perform interactive or programmatic login.
Usage:
qsh auth login <provider> # positional provider
qsh auth login # login to all providers
"""
# Use positional argument if provided, otherwise None
provider = provider_arg
ctx = self.ctx_provider()
if provider:
self._login_single_provider_interactive(ctx, provider)
else:
self._login_all_context_providers(ctx)
@auth.command("logout")
def auth_logout( # noqa: C901
provider_arg: str | None = typer.Argument(None, help="Auth provider to use"),
) -> None:
"""Clear cached credentials.
Usage:
qsh auth logout <provider> # positional provider
qsh auth logout # logout from all providers
"""
# Use positional argument if provided, otherwise None
provider = provider_arg
ctx = self.ctx_provider()
if provider:
self.logger.debug(f"logout: provider={provider}")
auth_provider = self._get_or_create_provider_strict(ctx, provider)
# Check if provider supports logout
if not self._supports_login_logout(auth_provider):
raise ConfigurationError(
f"Provider '{provider}' does not support logout operations",
exit_code=ExitCode.USAGE_ERROR,
)
try:
auth_provider.logout()
except KeyboardInterrupt:
raise
except QlamCoreError:
# Preserve typed exit codes from the domain layer.
raise
except (OSError, RuntimeError, ValueError) as exc:
raise AuthenticationError(
f"Logout failed for provider '{provider}': {exc}"
) from exc
typer.echo(f"Logout successful with provider: {provider}")
else:
self.logger.debug("logout: auto-detecting providers for current context")
# Auto-detect and logout from all authenticated providers in current context
all_authenticated = self._get_authenticated_providers(ctx)
providers_to_logout = self._get_login_capable_providers(all_authenticated, ctx)
if not all_authenticated:
typer.echo("No authenticated providers found in current context.")
return
if not providers_to_logout:
typer.echo("No authenticated providers support logout operations.")
return
success_count = 0
failed_providers = []
for provider_name in providers_to_logout:
try:
self.logger.debug(f"Attempting logout from provider: {provider_name}")
# Get the provider
auth_provider = self._get_or_create_provider(ctx, provider_name)
if not auth_provider:
self.logger.warning(f"Unknown provider type: {provider_name}")
failed_providers.append(provider_name)
continue
# Attempt logout
auth_provider.logout()
typer.echo(f"✅ Logout successful from provider: {provider_name}")
success_count += 1
except KeyboardInterrupt:
raise
except QlamCoreError as exc:
# Multi-provider logout is best-effort: log at DEBUG for --verbose and continue.
self.logger.debug(
"Logout failed for provider %s: %s",
provider_name,
exc,
exc_info=exc,
)
typer.echo(f"❌ Logout failed for provider {provider_name}: {exc}")
failed_providers.append(provider_name)
except (OSError, RuntimeError, ValueError) as exc:
self.logger.debug(
"Logout failed for provider %s: %s",
provider_name,
exc,
exc_info=exc,
)
typer.echo(f"❌ Logout failed for provider {provider_name}: {exc}")
failed_providers.append(provider_name)
# Summary
if success_count > 0:
typer.echo(f"\n🎉 Successfully logged out from {success_count} provider(s)")
if failed_providers:
typer.echo(f"⚠️ Failed to logout from: {', '.join(failed_providers)}")
if success_count == 0:
raise AuthenticationError(
f"Logout failed for all attempted providers: {', '.join(failed_providers)}"
)
else:
typer.echo("All authenticated providers logged out successfully!")
@auth.command("refresh")
def auth_refresh(
provider_arg: str | None = typer.Argument(None, help="Auth provider to refresh"),
force: bool = typer.Option(
False,
"--force",
help="Refresh even when credentials appear valid",
),
) -> None:
"""Refresh cached credentials non-interactively."""
provider = provider_arg
ctx = self.ctx_provider()
if provider:
self._refresh_single_provider(ctx, provider, force=force)
else:
self._refresh_all_context_providers(ctx, force=force)
@auth.command("list")
def list_providers() -> None:
"""List all authentication providers in the current context."""
ctx = self.ctx_provider()
self.logger.debug("auth list")
# Get auth providers from current context
current_context = ctx.config.current_context
if not current_context.auth_providers:
typer.echo(
f"No authentication providers configured in context '{current_context.name}'."
)
return
from qlam_core.auth.registry import auth_registry
# Prepare data for display
provider_data = []
for auth_provider_config in current_context.auth_providers:
provider_name = auth_provider_config.name
provider_type = auth_provider_config.provider
# Check if provider is registered and authenticated
provider = self._get_or_create_provider(ctx, provider_name)
if provider:
try:
is_authenticated = provider.is_authenticated()
status = "✅ Authenticated" if is_authenticated else "❌ Not authenticated"
except _STATUS_CHECK_ERRORS as e:
self.logger.debug(
f"Failed to check authentication status for {provider_name}: {e}"
)
status = "❓ Unknown"
else:
status = "❓ Error"
# Get description and extra info from registry capabilities
capabilities = auth_registry.get_capabilities(provider_type)
description = capabilities.get("description", f"{provider_type} authentication")
# Get extra info from provider class
provider_class = auth_registry._get_provider_class(provider_type)
extra_info = {}
if provider_class:
with contextlib.suppress(
AttributeError, # Config missing expected attributes
KeyError, # Missing key in config dict
TypeError, # Invalid config type
):
extra_info = provider_class.extra_info_from_config(auth_provider_config)
provider_info = {
"name": provider_name,
"type": provider_type,
"status": status,
"description": description,
}
provider_info.update(extra_info)
provider_data.append(provider_info)
render(provider_data, ctx.effective_output_format)
@auth.command("show")
def show_credentials(
provider_name: str = typer.Argument(..., help="Provider name"),
output: str | None = typer.Option(None, "-o", "--output", help=OUTPUT_FORMAT_HELP),
) -> None:
"""Show credentials for a specific authentication provider."""
ctx = self.ctx_provider()
self.logger.debug(f"auth show: provider={provider_name}")
# Get or create provider using the helper method
provider = self._get_or_create_provider(ctx, provider_name)
# If still not found, error
if not provider:
raise ConfigurationError(
f"Unknown auth provider: {provider_name}",
exit_code=ExitCode.USAGE_ERROR,
)
# Get provider config
provider_config = self._find_auth_provider_config(ctx, provider_name)
if not provider_config:
raise ConfigurationError(
f"Provider '{provider_name}' not found in context configuration.",
exit_code=ExitCode.USAGE_ERROR,
)
# Build credential data
try:
credential_data = self._build_credential_data(
provider_name, provider, provider_config
)
render(credential_data, output or ctx.effective_output_format)
except KeyboardInterrupt:
raise
except QlamCoreError:
raise
except (OSError, RuntimeError, ValueError) as exc:
raise AuthenticationError(
f"Failed to show credentials for provider '{provider_name}': {exc}"
) from exc
@auth.command("get")
def get_credentials(
provider_name: str = typer.Argument(..., help="Provider name"),
output: str | None = typer.Option(None, "-o", "--output", help=OUTPUT_FORMAT_HELP),
) -> None:
"""Get credentials for a specific authentication provider (alias for show)."""
show_credentials(provider_name, output)
# Add dynamic provider-specific commands from current context
self._add_dynamic_provider_commands(auth, self.ctx_provider, self.logger)
app.add_typer(auth, name="auth")
# Add top-level login command that delegates to auth login
@app.command("login")
def top_level_login(
provider_arg: str | None = typer.Argument(None, help="Auth provider to use"),
) -> None:
"""Perform interactive or programmatic login (alias for auth login).
Usage:
qsh login <provider> # positional provider
qsh login # login to all providers
"""
# Delegate to the auth login function
auth_login(provider_arg)
# Add top-level logout command that delegates to auth logout
@app.command("logout")
def top_level_logout(
provider_arg: str | None = typer.Argument(None, help="Auth provider to use"),
) -> None:
"""Clear cached credentials (alias for auth logout).
Usage:
qsh logout <provider> # positional provider
qsh logout # logout from all providers
"""
# Delegate to the auth logout function
auth_logout(provider_arg)
|