Skip to content

gemdat.trajectory

This module contains class for dealing with trajectories from molecular dynamics simulations.

Trajectory(*, metadata=None, **kwargs)

Bases: Trajectory

Trajectory of sites from a molecular dynamics simulation.

Parameters:

Source code in src/gemdat/trajectory.py
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(self, *, metadata: dict | None = None, **kwargs):
    """Initialize class.

    Parameters
    ----------
    metadata : dict | None, optional
        Optional dictionary with metadata
    **kwargs
        These are passed to [pymatgen.core.trajectory.Trajectory][]
    """
    super().__init__(**kwargs)
    self.metadata = metadata if metadata else {}
    self.kinisi_cache_data: dict[Any, Any] = {}

cumulative_displacements property

Return cumulative displacement vectors from base positions.

This differs from displacements() in that it ignores the periodic boundary conditions. Instead, it cumulatively tracks the lattice translation vector (jimage).

displacements property

Return trajectory displacements as fractional coordinates from base position.

Returns:

  • displacements ( ndarray ) –

    Output array with displacements

positions property

Return trajectory positions as fractional coordinates.

Returns:

  • positions ( ndarray ) –

    Output array with positions

sampling_frequency property

Return number of time steps per second.

time_step_ps property

Return time step in picoseconds.

total_time property

Return total time for trajectory.

apply_drift_correction(*, fixed_species=None, floating_species=None)

Apply drift correction to trajectory.

For details see drift().

If no species are specified, use all species to calculate drift.

Only one of fixed_species and floating_species should be specified.

Parameters:

  • fixed_species (None | str | Collection[str], default: None ) –

    These species are assumed fixed, and are used to calculate drift (e.g. framework species).

  • floating_species (None | str | Collection[str], default: None ) –

    These species are assumed floating, and is used to determine the fixed species.

Returns:

  • trajectory ( Trajectory ) –

    Ouput trajectory with positions corrected for drift

Source code in src/gemdat/trajectory.py
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
def apply_drift_correction(
    self,
    *,
    fixed_species: None | str | Collection[str] = None,
    floating_species: None | str | Collection[str] = None,
) -> Trajectory:
    """Apply drift correction to trajectory.

    For details see [drift()][gemdat.trajectory.Trajectory.drift].

    If no species are specified, use all species to calculate drift.

    Only one of `fixed_species` and `floating_species` should be specified.

    Parameters
    ----------
    fixed_species : None | str | Collection[str]
        These species are assumed fixed, and are used to calculate
        drift (e.g. framework species).
    floating_species : None | str | Collection[str]
        These species are assumed floating, and is used to determine
        the fixed species.

    Returns
    -------
    trajectory : Trajectory
        Ouput trajectory with positions corrected for drift
    """
    drift = self.drift(fixed_species=fixed_species, floating_species=floating_species)

    return self.__class__(
        species=self.species,
        coords=self.displacements - drift,
        lattice=self.get_lattice(),
        metadata=self.metadata,
        coords_are_displacement=True,
        base_positions=self.base_positions,
        time_step=self.time_step,
    )

center_of_mass()

Return trajectory with center of mass for positions.

Source code in src/gemdat/trajectory.py
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
def center_of_mass(self) -> Trajectory:
    """Return trajectory with center of mass for positions."""
    weights = []
    for s in self.species:
        assert isinstance(s, (Species, Element)), f'got {type(s)=}'
        weights.append(s.atomic_mass)

    positions_no_pbc = self.base_positions + self.cumulative_displacements

    center_of_mass = np.average(positions_no_pbc, axis=1, weights=weights).reshape(-1, 1, 3)

    return self.__class__(
        species=['X'],
        coords=center_of_mass,
        lattice=self.get_lattice(),
        metadata=self.metadata,
        time_step=self.time_step,
    )

distances_from_base_position()

Return total distances from base positions.

Ignores periodic boundary conditions.

Source code in src/gemdat/trajectory.py
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
def distances_from_base_position(self) -> np.ndarray:
    """Return total distances from base positions.

    Ignores periodic boundary conditions.
    """
    lattice = self.get_lattice()

    all_distances = []

    for diff_vectors in self.cumulative_displacements:
        distances = _lengths(diff_vectors, lattice=lattice)
        all_distances.append(distances)

    all_distances = np.array(all_distances).T

    return all_distances

drift(*, fixed_species=None, floating_species=None)

Compute drift by averaging the displacement from the base positions per frame.

If no species are specified, use all species to calculate drift.

Only one of fixed_species and floating_species should be specified.

Parameters:

  • fixed_species (None | str | Collection[str], default: None ) –

    These species are assumed fixed, and are used to calculate drift (e.g. framework species).

  • floating_species (None | str | Collection[str], default: None ) –

    These species are assumed floating, and is used to determine the fixed species.

Returns:

  • drift ( array ) –

    Output array with average drift per frame.

Source code in src/gemdat/trajectory.py
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
def drift(
    self,
    *,
    fixed_species: None | str | Collection[str] = None,
    floating_species: None | str | Collection[str] = None,
) -> np.ndarray:
    """Compute drift by averaging the displacement from the base positions
    per frame.

    If no species are specified, use all species to calculate drift.

    Only one of `fixed_species` and `floating_species` should be specified.

    Parameters
    ----------
    fixed_species : None | str | Collection[str]
        These species are assumed fixed, and are used to calculate drift
        (e.g. framework species).
    floating_species : None | str | Collection[str]
        These species are assumed floating, and is used to determine the
        fixed species.

    Returns
    -------
    drift : np.array
        Output array with average drift per frame.
    """
    if fixed_species:
        displacements = self.filter(species=fixed_species).displacements
    elif floating_species:
        species = set()
        for sp in self.species:
            assert isinstance(sp, Species), f'got {type(sp)=}'
            if sp.symbol not in floating_species:
                species.add(sp)

        displacements = self.filter(species=species).displacements  # type: ignore
    else:
        displacements = self.displacements

    return np.mean(displacements, axis=1)[:, None, :]

filter(species)

Return trajectory with coordinates for given species only.

Parameters:

Returns:

  • trajectory ( Trajectory ) –

    Output trajectory with coordinates for selected species only

Source code in src/gemdat/trajectory.py
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
def filter(self, species: str | Collection[str]) -> Trajectory:
    """Return trajectory with coordinates for given species only.

    Parameters
    ----------
    species : str | Collection[str]
        Species to select, i.e. 'Li'

    Returns
    -------
    trajectory : Trajectory
        Output trajectory with coordinates for selected species only
    """
    if isinstance(species, str):
        species = [species]

    idx = []
    for sp in self.species:
        assert isinstance(sp, (Species, Element))
        idx.append(sp.symbol in species)

    new_coords = self.positions[:, idx]
    new_species = list(compress(self.species, idx))

    return self.__class__(
        species=new_species,
        coords=new_coords,
        lattice=self.lattice,
        metadata=self.metadata,
        time_step=self.time_step,
        constant_lattice=self.constant_lattice,
    )

from_ase_trajectory(trajectory, *, stride=1, constant_lattice=True, temperature=None, time_step_ps=None, cache=None) classmethod

Create a trajectory from an ASE .traj file or ASE Trajectory.

Parameters:

  • trajectory (str | Path | Trajectory) –

    Input ASE trajectory or path to an .traj file.

  • stride (int, default: 1 ) –

    Only read every stride-th frame.

  • constant_lattice (bool | None, default: True ) –

    Whether the lattice is constant. If None, it is inferred from the cells.

  • temperature (float | None, default: None ) –

    Temperature of simulation in K (stored in metadata if given).

  • time_step_ps (float | None, default: None ) –

    Time step of the simulation in ps.

  • cache (str | Path | None, default: None ) –

    If specified (or auto-derived for filename input), cache the parsed Trajectory using pickle.

Returns:

  • trajectory ( Trajectory ) –

    A GEMDAT trajectory instance.

Source code in src/gemdat/trajectory.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
@classmethod
def from_ase_trajectory(
    cls,
    trajectory: str | Path | AseTrajectory,
    *,
    stride: int = 1,
    constant_lattice: bool = True,
    temperature: float | None = None,
    time_step_ps: float | None = None,
    cache: str | Path | None = None,
) -> Trajectory:
    """Create a trajectory from an ASE ``.traj`` file or ASE Trajectory.

    Parameters
    ----------
    trajectory : str | Path | ase.io.trajectory.Trajectory
        Input ASE trajectory or path to an ``.traj`` file.
    stride : int, optional
        Only read every ``stride``-th frame.
    constant_lattice : bool | None, optional
        Whether the lattice is constant. If None, it is inferred from the cells.
    temperature : float | None
        Temperature of simulation in K (stored in `metadata` if given).
    time_step_ps : float | None
        Time step of the simulation in ps.
    cache : str | Path | None
        If specified (or auto-derived for filename input), cache the parsed
        Trajectory using pickle.

    Returns
    -------
    trajectory : gemdat.Trajectory
        A GEMDAT trajectory instance.
    """
    from ase.io.trajectory import Trajectory as AseTrajectory

    if stride < 1:
        raise ValueError(f'{stride=} must be >= 1')

    close_when_done = False

    if isinstance(trajectory, (str, Path)):
        traj_path = Path(trajectory)
        if not cache:
            kwargs = {
                'trajectory': str(traj_path),
                'stride': stride,
                'constant_lattice': constant_lattice,
                'temperature': temperature,
                'time_step_ps': time_step_ps,
            }
            serialized = json.dumps(kwargs, sort_keys=True).encode()
            hashid = hashlib.sha1(serialized).hexdigest()[:8]
            cache = traj_path.with_suffix(f'.traj.{hashid}.cache')

        if cache and Path(cache).exists():
            try:
                return cls.from_cache(cache)
            except Exception as e:
                print(e)
                print(f'Error reading from cache, reading {str(traj_path)!r}')

        ase_traj = AseTrajectory(str(traj_path), 'r')
        close_when_done = True
    else:
        if cache is not None:
            raise ValueError('`cache` is only supported when reading from a filename/path')
        ase_traj = trajectory

    try:
        n_total = len(ase_traj)
        n_frames = math.ceil(n_total / stride)
        atoms0 = ase_traj[0]
        symbols = atoms0.get_chemical_symbols()
        species = [Species(sym) for sym in symbols]
        n_atoms = len(symbols)

        frac = np.empty((n_frames, n_atoms, 3), dtype=float)
        cells = np.empty((n_frames, 3, 3), dtype=float)

        j = 0
        for i in range(0, n_total, stride):
            atoms = ase_traj[i]
            if len(atoms) != n_atoms:
                raise ValueError(
                    'ASE trajectory contains frames with a different number of atoms'
                )

            if atoms.get_chemical_symbols() != symbols:
                raise ValueError(
                    'ASE trajectory contains frames with different chemical symbols'
                )

            frac[j] = atoms.get_scaled_positions(wrap=False)
            cells[j] = np.asarray(atoms.cell.array, dtype=float)
            j += 1

        lattice = cells[0] if constant_lattice else cells

        md: dict = {}
        if temperature is not None:
            md['temperature'] = temperature

        obj = cls(
            species=species,
            coords=frac,
            lattice=lattice,
            constant_lattice=constant_lattice,
            time_step=(time_step_ps * 1e-12) if time_step_ps is not None else None,
            metadata=md,
        )
        obj.to_positions()

        if cache:
            obj.to_cache(cache)

        return obj

    finally:
        if close_when_done and hasattr(ase_traj, 'close'):
            ase_traj.close()

from_cache(cache) classmethod

Load data from cache using pickle.

Parameters:

  • cache (Path) –

    Name of cache file

Source code in src/gemdat/trajectory.py
195
196
197
198
199
200
201
202
203
204
205
206
@classmethod
def from_cache(cls, cache: str | Path) -> Trajectory:
    """Load data from cache using pickle.

    Parameters
    ----------
    cache : Path
        Name of cache file
    """
    with open(cache, 'rb') as f:
        obj = pickle.load(f)
    return obj

from_gromacs(*, topology_file, coords_file, constant_lattice=True, temperature, extract_edr=False, edr_file=None, cache=None) classmethod

Load data from GROMACS.

Parameters:

  • topology_file (Path | str) –

    GROMACS topology data file

  • coords_file (Path | str) –

    GROMACS trajectory data file

  • constant_lattice (bool, default: True ) –

    Whether the lattice changes during the simulation, such as in an NPT MD simulation.

  • temperature (float) –

    Temperature of simulation in K

  • edr_file (Optional[str | Path], default: None ) –

    If specified, extract time-series energy data from GROMACS EDR file

  • cache (Optional[Path], default: None ) –

    Path to cache data for vasprun.xml

Returns:

  • trajectory ( Trajectory ) –

    Output trajectory

Source code in src/gemdat/trajectory.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
@classmethod
def from_gromacs(
    cls,
    *,
    topology_file: Path | str,
    coords_file: Path | str,
    constant_lattice: bool = True,
    temperature: float,
    extract_edr: bool = False,
    edr_file: Optional[str | Path] = None,
    cache: Optional[str | Path] = None,
) -> Trajectory:
    """Load data from GROMACS.

    Parameters
    ----------
    topology_file : Path | str
        GROMACS topology data file
    coords_file : Path | str
        GROMACS trajectory data file
    constant_lattice : bool
        Whether the lattice changes during the simulation,
        such as in an NPT MD simulation.
    temperature : float
        Temperature of simulation in K
    edr_file: Optional[Path], optional
        If specified, extract time-series energy data from GROMACS EDR file
    cache : Optional[Path], optional
        Path to cache data for vasprun.xml

    Returns
    -------
    trajectory : Trajectory
        Output trajectory
    """
    import MDAnalysis as mda
    from pymatgen.core import Lattice

    topology_file = str(topology_file)
    coords_file = str(coords_file)

    if not cache:
        kwargs = {
            'topology_file': topology_file,
            'coords_file': coords_file,
            'edr_file': edr_file,
            'temperature': temperature,
            'constant_lattice': constant_lattice,
        }
        serialized = json.dumps(kwargs, sort_keys=True).encode()
        hashid = hashlib.sha1(serialized).hexdigest()[:8]
        cache = Path(coords_file).with_suffix(f'.{hashid}.cache')

    if Path(cache).exists():
        try:
            return cls.from_cache(cache)
        except Exception as e:
            print(e)
            print(f'Error reading from cache, reading {coords_file!r}')

    utraj = mda.Universe(topology_file, coords_file)
    coords = utraj.trajectory.timeseries()

    if not constant_lattice:
        lattices = [Lattice.from_parameters(*ts.dimensions) for ts in utraj.trajectory]
        for ts, lat in enumerate(lattices):
            coords[ts, :, :] = lat.get_fractional_coords(coords[ts, :, :])
    else:
        lattice = Lattice.from_parameters(*utraj.trajectory[0].dimensions)
        coords = lattice.get_fractional_coords(coords)

    species = [Element(SP_NAME.match(sp).group().capitalize()) for sp in utraj.atoms.names]  # type: ignore

    site_properties = {
        'residue': [sp.residue for sp in utraj.atoms],
        'residue_name': [sp.resname for sp in utraj.atoms],
        'residue_id': [sp.resid for sp in utraj.atoms],
    }

    metadata = {
        'temperature': temperature,
    }

    if edr_file:
        edr_file = str(edr_file)
        aux = mda.auxiliary.EDR.EDRReader(edr_file)
        metadata['aux'] = aux.get_data(aux.terms)

    obj = cls(
        species=species,
        coords=coords,
        lattice=lattice,
        time_step=utraj.trajectory.dt * 1e-12,  # ps -> s
        constant_lattice=constant_lattice,
        metadata=metadata,
        site_properties=site_properties,
    )
    obj.to_positions()

    if cache:
        obj.to_cache(cache)

    return obj

from_lammps(*, coords_file, data_file, temperature, time_step, coords_format='xyz', atom_style='atomic', type_mapping=None, cache=None, constant_lattice=True) classmethod

Load data from LAMMPS.

Parameters:

  • coords_file (Path | str) –

    LAMMPS coords file with trajectory data

  • data_file (Path | str) –

    LAMMPS data file with the lattice

  • temperature (float) –

    Temperature of simulation in K

  • time_step (float) –

    Time step of the simulation in ps

  • coords_format (str, default: 'xyz' ) –

    Format of the coords file

  • atom_style (str, default: 'atomic' ) –

    Atom style for box file

  • type_mapping (Optional[dict[str, str]], default: None ) –

    If specified, map numbers to element names. This is for LAMMPS data that do not contain element labels for the different species. See: https://github.com/GEMDAT-repos/GEMDAT/issues/353

  • cache (Optional[Path], default: None ) –

    Path to cache data for vasprun.xml

  • constant_lattice (bool, default: True ) –

    Whether the lattice changes during the simulation, such as in an NPT MD simulation.

Returns:

  • trajectory ( Trajectory ) –

    Output trajectory

Source code in src/gemdat/trajectory.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
@classmethod
def from_lammps(
    cls,
    *,
    coords_file: Path | str,
    data_file: Path | str,
    temperature: float,
    time_step: float,
    coords_format: str = 'xyz',
    atom_style: str = 'atomic',
    type_mapping: Optional[dict[str, str]] = None,
    cache: Optional[str | Path] = None,
    constant_lattice: bool = True,
) -> Trajectory:
    """Load data from LAMMPS.

    Parameters
    ----------
    coords_file : Path | str
        LAMMPS coords file with trajectory data
    data_file : Path | str
        LAMMPS data file with the lattice
    temperature : float
        Temperature of simulation in K
    time_step : float
        Time step of the simulation in ps
    coords_format: str
        Format of the coords file
    atom_style : str
        Atom style for box file
    type_mapping: dict[str, str], optional
        If specified, map numbers to element names. This is for LAMMPS
        data that do not contain element labels for the different species.
        See: https://github.com/GEMDAT-repos/GEMDAT/issues/353
    cache : Optional[Path], optional
        Path to cache data for vasprun.xml
    constant_lattice : bool
        Whether the lattice changes during the simulation,
        such as in an NPT MD simulation.

    Returns
    -------
    trajectory : Trajectory
        Output trajectory
    """
    from MDAnalysis import Universe
    from pymatgen.io.lammps.data import LammpsData

    coords_file = str(coords_file)
    data_file = str(data_file)

    if not cache:
        kwargs = {
            'coords_file': coords_file,
            'data_file': data_file,
            'temperature': temperature,
            'time_step': time_step,
            'constant_lattice': constant_lattice,
        }
        serialized = json.dumps(kwargs, sort_keys=True).encode()
        hashid = hashlib.sha1(serialized).hexdigest()[:8]
        cache = Path(coords_file).with_suffix(f'.{coords_format}.{hashid}.cache')

    if Path(cache).exists():
        try:
            return cls.from_cache(cache)
        except Exception as e:
            print(e)
            print(f'Error reading from cache, reading {coords_file!r}')

    if not constant_lattice:
        raise NotImplementedError('Lammps reader does not support NPT simulations')

    try:
        lammps_data = LammpsData.from_file(filename=data_file, atom_style=atom_style)
    except pd.errors.ParserError as exc:
        msg = (
            f"Could not parse LAMMPS data file '{data_file}'."
            '\nSuggestion: Export the data file directly from LAMMPS'
            ' using the `write_data` command.'
        )
        raise IOError(msg) from exc

    lammps_data = LammpsData.from_file(filename=data_file, atom_style=atom_style)
    lattice = lammps_data.structure.lattice

    utraj = Universe(coords_file, format=coords_format)
    coords = utraj.trajectory.timeseries()
    coords = lattice.get_fractional_coords(coords)

    if type_mapping:
        species = [Element(type_mapping.get(_type)) for _type in utraj.atoms.types]  # type: ignore
    else:
        species = [Element(sp) for sp in utraj.atoms.elements]

    obj = cls(
        species=species,
        coords=coords,
        lattice=lattice,
        time_step=time_step * 1e-12,  # ps -> s
        constant_lattice=constant_lattice,
        metadata={'temperature': temperature},
    )
    obj.to_positions()

    if cache:
        obj.to_cache(cache)

    return obj

from_vasprun(xml_file, cache=None, constant_lattice=True, **kwargs) classmethod

Load data from a vasprun.xml.

Parameters:

  • xml_file (Path) –

    Path to vasprun.xml

  • cache (Optional[Path], default: None ) –

    Path to cache data for vasprun.xml

  • constant_lattice (bool, default: True ) –

    Whether the lattice changes during the simulation, such as in an NPT MD simulation.

  • **kwargs (dict, default: {} ) –

    Optional arguments passed to pymatgen.io.vasp.outputs.Vasprun

Returns:

  • trajectory ( Trajectory ) –

    Output trajectory

Source code in src/gemdat/trajectory.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
@classmethod
def from_vasprun(
    cls,
    xml_file: str | Path,
    cache: Optional[str | Path] = None,
    constant_lattice: bool = True,
    **kwargs,
) -> Trajectory:
    """Load data from a `vasprun.xml`.

    Parameters
    ----------
    xml_file : Path
        Path to vasprun.xml
    cache : Optional[Path], optional
        Path to cache data for vasprun.xml
    constant_lattice : bool
        Whether the lattice changes during the simulation,
        such as in an NPT MD simulation.
    **kwargs : dict
        Optional arguments passed to [pymatgen.io.vasp.outputs.Vasprun][]

    Returns
    -------
    trajectory : Trajectory
        Output trajectory
    """
    kwargs.setdefault('parse_dos', False)
    kwargs.setdefault('parse_eigen', False)
    kwargs.setdefault('parse_projected_eigen', False)
    kwargs.setdefault('parse_potcar_file', False)

    if not cache:
        hash_kwargs = {**kwargs, 'constant_lattice': constant_lattice}
        serialized = json.dumps(hash_kwargs, sort_keys=True).encode()
        hashid = hashlib.sha1(serialized).hexdigest()[:8]
        cache = Path(xml_file).with_suffix(f'.xml.{hashid}.cache')

    if Path(cache).exists():
        try:
            return cls.from_cache(cache)
        except Exception as e:
            print(e)
            print(f'Error reading from cache, reading {xml_file!r}')
    try:
        run = vasp.Vasprun(xml_file, **kwargs)
    except ET.ParseError as e:
        raise Exception(
            f'Error parsing {xml_file!r}, to parse incomplete data '
            'adding `exception_on_bad_xml=False` might help'
        ) from e

    metadata = {'temperature': run.parameters['TEBEG']}

    obj = cls.from_structures(
        run.structures,
        constant_lattice=constant_lattice,
        time_step=run.parameters['POTIM'] * 1e-15,
        metadata=metadata,
    )
    obj.to_positions()

    if cache:
        obj.to_cache(cache)

    return obj

get_lattice(idx=None)

Get lattice.

Parameters:

  • idx (int | None, default: None ) –

    Optionally, get lattice at specified index if the lattice is not constant

Returns:

  • lattice ( Lattice ) –

    Pymatgen Lattice object

Source code in src/gemdat/trajectory.py
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
def get_lattice(self, idx: int | None = None) -> Lattice:
    """Get lattice.

    Parameters
    ----------
    idx : int | None, optional
        Optionally, get lattice at specified index if the lattice is not constant

    Returns
    -------
    lattice : pymatgen.core.lattice.Lattice
        Pymatgen Lattice object
    """
    if self.constant_lattice:
        return Lattice(self.lattice)  # type: ignore

    latt = self.lattice[idx]  # type: ignore
    return Lattice(latt)

mean_squared_displacement()

Computes the mean squared displacement using fast Fourier transform.

The algorithm is described in [https://doi.org/10.1051/sfn/201112010]. See also [https://stackoverflow.com/questions/34222272/ computing-mean-square-displacement-using-python-and-fft].

Source code in src/gemdat/trajectory.py
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
def mean_squared_displacement(self) -> np.ndarray:
    """Computes the mean squared displacement using fast Fourier transform.

    The algorithm is described in [https://doi.org/10.1051/sfn/201112010].
    See also [https://stackoverflow.com/questions/34222272/
    computing-mean-square-displacement-using-python-and-fft].
    """
    r = self.cumulative_displacements
    lattice = self.get_lattice()
    r = lattice.get_cartesian_coords(r)

    pos = np.transpose(r, (1, 0, 2))
    n_times = pos.shape[1]

    # Autocorrelation term using FFT [https://doi.org/10.1051/sfn/201112010]:
    # - perform FFT, square it, and then perform inverse FFT
    fft_result = np.fft.ifft(np.abs(np.fft.fft(pos, n=2 * n_times, axis=-2)) ** 2, axis=-2)
    # - keep only the first n_times elements
    fft_result = fft_result[:, :n_times, :].real
    # - sum over the coordinates and divide by the corresponding time window
    S2 = np.sum(fft_result, axis=-1) / (n_times - np.arange(n_times)[None, :])

    # Compute the sum of squared displacements
    D = np.square(pos).sum(axis=-1)
    D = np.append(D, np.zeros((pos.shape[0], 1)), axis=-1)

    # Compute the first term of the MSD:
    # - compute 2* the sum of squared positions
    double_sum_D = 2 * np.sum(D, axis=-1)[:, None]
    # - compute the cumulative sum of the sum of squares of the positions
    cumsum_D = np.cumsum(
        np.insert(D[:, 0:-1], 0, 0, axis=-1) + np.flip(D, axis=-1), axis=-1
    )
    # - compute the first term in the MSD calculation
    S1 = (double_sum_D - cumsum_D)[:, :-1] / (n_times - np.arange(n_times)[None, :])

    msd = S1 - 2 * S2
    return msd

metrics()

See gemdat.TrajectoryMetrics for more info.

Source code in src/gemdat/trajectory.py
1112
1113
1114
1115
1116
def metrics(self) -> TrajectoryMetrics:
    """See [gemdat.TrajectoryMetrics][] for more info."""
    from .metrics import TrajectoryMetrics

    return TrajectoryMetrics(trajectory=self)

plot_displacement_histogram(*, module, **kwargs)

See gemdat.plots.displacement_histogram for more info.

Source code in src/gemdat/trajectory.py
1146
1147
1148
1149
@plot_backend
def plot_displacement_histogram(self, *, module, **kwargs):
    """See [gemdat.plots.displacement_histogram][] for more info."""
    return module.displacement_histogram(trajectory=self, **kwargs)

plot_displacement_per_atom(*, module, **kwargs)

See gemdat.plots.displacement_per_atom for more info.

Source code in src/gemdat/trajectory.py
1126
1127
1128
1129
@plot_backend
def plot_displacement_per_atom(self, *, module, **kwargs):
    """See [gemdat.plots.displacement_per_atom][] for more info."""
    return module.displacement_per_atom(trajectory=self, **kwargs)

plot_displacement_per_element(*, module, **kwargs)

See gemdat.plots.displacement_per_element for more info.

Source code in src/gemdat/trajectory.py
1131
1132
1133
1134
@plot_backend
def plot_displacement_per_element(self, *, module, **kwargs):
    """See [gemdat.plots.displacement_per_element][] for more info."""
    return module.displacement_per_element(trajectory=self, **kwargs)

plot_frequency_vs_occurence(*, module, **kwargs)

See gemdat.plots.frequency_vs_occurence for more info.

Source code in src/gemdat/trajectory.py
1151
1152
1153
1154
@plot_backend
def plot_frequency_vs_occurence(self, *, module, **kwargs):
    """See [gemdat.plots.frequency_vs_occurence][] for more info."""
    return module.frequency_vs_occurence(trajectory=self, **kwargs)

plot_msd_kinisi(*, module, **kwargs)

See gemdat.plots.msd_kinisi for more info.

Source code in src/gemdat/trajectory.py
1141
1142
1143
1144
@plot_backend
def plot_msd_kinisi(self, *, module, **kwargs):
    """See [gemdat.plots.msd_kinisi][] for more info."""
    return module.msd_kinisi(trajectory=self, **kwargs)

plot_msd_per_element(*, module, **kwargs)

See gemdat.plots.msd_per_element for more info.

Source code in src/gemdat/trajectory.py
1136
1137
1138
1139
@plot_backend
def plot_msd_per_element(self, *, module, **kwargs):
    """See [gemdat.plots.msd_per_element][] for more info."""
    return module.msd_per_element(trajectory=self, **kwargs)

plot_vibrational_amplitudes(*, module, **kwargs)

See gemdat.plots.vibrational_amplitudes for more info.

Source code in src/gemdat/trajectory.py
1156
1157
1158
1159
@plot_backend
def plot_vibrational_amplitudes(self, *, module, **kwargs):
    """See [gemdat.plots.vibrational_amplitudes][] for more info."""
    return module.vibrational_amplitudes(trajectory=self, **kwargs)

radial_distribution_between_species(*, module, **kwargs)

See gemdat.rdf.radial_distribution_between_species for more info.

Source code in src/gemdat/trajectory.py
1118
1119
1120
1121
1122
1123
1124
@plot_backend
def radial_distribution_between_species(self, *, module, **kwargs) -> RDFData:
    """See [gemdat.rdf.radial_distribution_between_species][] for more
    info."""
    from gemdat import rdf

    return rdf.radial_distribution_between_species(trajectory=self, **kwargs)

split(n_parts=10, equal_parts=False)

Split the trajectory in n similar parts.

Parameters:

  • n_parts (int, default: 10 ) –

    n_parts

  • equal_parts (bool = False, default: False ) –

    Return equal parts, convenient for some routines

Returns:

Source code in src/gemdat/trajectory.py
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
def split(self, n_parts: int = 10, equal_parts: bool = False) -> list[Trajectory]:
    """Split the trajectory in n similar parts.

    Parameters
    ----------
    n_parts : int
        n_parts
    equal_parts : bool = False
        Return equal parts, convenient for some routines

    Returns
    -------
    List[Trajectory]
    """
    interval = np.linspace(0, len(self) - 1, n_parts + 1, dtype=int)
    subtrajectories = [self[start:stop] for start, stop in pairwise(interval)]

    if equal_parts:
        minsize = len(self)

        # Get the smallest size
        for start, stop in pairwise(interval):
            size = stop - start
            minsize = min(minsize, size)

        # Trim all subtrajectories
        subtrajectories = [trajectory[0:minsize] for trajectory in subtrajectories]

    return subtrajectories

to_ase_trajectory(*, filename='md.traj', stride=1)

Write trajectory to an ASE .traj file.

The resulting file can be opened by ASE and most ASE-compatible tools.

Parameters:

  • filename (str | Path, default: 'md.traj' ) –

    Output ASE trajectory filename.

  • stride (int, default: 1 ) –

    Only write every stride-th frame.

Returns:

  • ase_traj ( Trajectory ) –
  • ASE trajectory opened in read mode. –
Source code in src/gemdat/trajectory.py
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
def to_ase_trajectory(
    self,
    *,
    filename: str | Path = 'md.traj',
    stride: int = 1,
) -> AseTrajectory:
    """Write trajectory to an ASE ``.traj`` file.

    The resulting file can be opened by ASE and most ASE-compatible tools.

    Parameters
    ----------
    filename : str | Path, optional
        Output ASE trajectory filename.
    stride : int, optional
        Only write every ``stride``-th frame.

    Returns
    -------
    ase_traj : ase.io.trajectory.Trajectory
    ASE trajectory opened in read mode.
    """
    if stride < 1:
        raise ValueError(f'{stride=} must be >= 1')

    from ase import Atoms
    from ase.io.trajectory import Trajectory as AseTrajectory

    def _as_cell(lattice) -> np.ndarray:
        """Return a (3, 3) cell matrix in Γ… from a pymatgen Lattice or
        array-like."""
        if hasattr(lattice, 'matrix'):
            return np.asarray(lattice.matrix, dtype=float)
        return np.asarray(lattice, dtype=float)

    filename = Path(filename)
    frac = np.asarray(self.positions[::stride], dtype=float)
    symbols = [getattr(s, 'symbol', str(s)) for s in self.species]

    constant_lattice = bool(getattr(self, 'constant_lattice', True))
    lattice = (
        self.get_lattice()
        if hasattr(self, 'get_lattice')
        else getattr(self, 'lattice', None)
    )

    with AseTrajectory(str(filename), mode='w') as out:
        if constant_lattice:
            cell = _as_cell(lattice)
            atoms = Atoms(symbols=symbols, cell=cell, pbc=True)
            for f in frac:
                atoms.set_scaled_positions(f)
                out.write(atoms)
        else:
            lattices = np.asarray(lattice, dtype=float)
            lattices = lattices[::stride]
            atoms = Atoms(symbols=symbols, pbc=True)
            for f, lat in zip(frac, lattices):
                atoms.set_cell(_as_cell(lat), scale_atoms=False)
                atoms.set_scaled_positions(f)
                out.write(atoms)

    return AseTrajectory(str(filename), mode='r')

to_cache(cache)

Dump data to cache using pickle.

Parameters:

  • cache (Path) –

    Name of cache file

Source code in src/gemdat/trajectory.py
208
209
210
211
212
213
214
215
216
217
def to_cache(self, cache: str | Path):
    """Dump data to cache using pickle.

    Parameters
    ----------
    cache : Path
        Name of cache file
    """
    with open(cache, 'wb') as f:
        pickle.dump(self, f)

to_kinisi_diffusion_analyzer(specie, *, step_skip=1, dt=None, dimension='xyz', distance_unit='angstrom', specie_indices=None, masses=None, progress=True, save_cache=True, return_cache=True)

Construct a kinisi DiffusionAnalyzer from this GEMDAT trajectory.

This method parses the GEMDAT trajectory with :class:kinisi.pymatgen.PymatgenParser. It then computes the mean-squared displacement (MSD) using :func:kinisi.displacement.calculate_msd and attaches it to the returned :class:kinisi.analyze.DiffusionAnalyzer.

The algorithm of kinisi DiffusionAnalyzer is described in [https://doi.org/10.1021/acs.jctc.4c01249]. See also [https://github.com/kinisi-dev/kinisi.git].

Parameters:

  • specie (str) –

    Specie to calculate diffusivity for, e.g. "Li".

  • step_skip (int, default: 1 ) –

    Number of MD integrator time steps between stored frames.

  • dt ('sc.Variable | None', default: None ) –

    Time intervals to calculate displacements over. Optional; if None, kinisi defaults to a regular grid from the smallest interval (time_step * step_skip) to the full trajectory length.

  • dimension (str, default: 'xyz' ) –

    Subset of "xyz" indicating displacement axes of interest.

  • distance_unit (str, default: 'angstrom' ) –

    Unit of distance in the input structures, as a string understood by scipp.Unit(...) (default: "angstrom").

  • specie_indices ('sc.Variable | None', default: None ) –

    Indices of the specie to calculate the diffusivity for. Optional; if None, kinisi selects indices based on specie.

  • masses ('sc.Variable | None', default: None ) –

    Masses for centre-of-mass handling. Optional.

  • progress (bool, default: True ) –

    Show progress bars during parsing and MSD evaluation.

  • save_cache (bool, default: True ) –

    Cache the populated analyzer on this trajectory instance.

  • return_cache (bool, default: True ) –

    Return cached data.

Returns:

  • DiffusionAnalyzer –

    A DiffusionAnalyzer with MSD already computed and attached (so .msd and .dt are available).

Source code in src/gemdat/trajectory.py
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
def to_kinisi_diffusion_analyzer(
    self,
    specie: str,
    *,
    step_skip: int = 1,
    dt: 'sc.Variable | None' = None,
    dimension: str = 'xyz',
    distance_unit: str = 'angstrom',
    specie_indices: 'sc.Variable | None' = None,
    masses: 'sc.Variable | None' = None,
    progress: bool = True,
    save_cache: bool = True,
    return_cache: bool = True,
) -> 'DiffusionAnalyzer':
    """Construct a kinisi ``DiffusionAnalyzer`` from this GEMDAT
    trajectory.

    This method parses the GEMDAT trajectory with :class:`kinisi.pymatgen.PymatgenParser`.
    It then computes the mean-squared displacement (MSD) using
    :func:`kinisi.displacement.calculate_msd` and attaches it to the returned
    :class:`kinisi.analyze.DiffusionAnalyzer`.

    The algorithm of kinisi ``DiffusionAnalyzer`` is described in
    [https://doi.org/10.1021/acs.jctc.4c01249].
    See also [https://github.com/kinisi-dev/kinisi.git].

    Parameters
    ----------
    specie
        Specie to calculate diffusivity for, e.g. ``"Li"``.
    step_skip
        Number of MD integrator time steps between stored frames.
    dt
        Time intervals to calculate displacements over. Optional; if ``None``,
        kinisi defaults to a regular grid from the smallest interval
        (``time_step * step_skip``) to the full trajectory length.
    dimension
        Subset of ``"xyz"`` indicating displacement axes of interest.
    distance_unit
        Unit of distance in the input structures, as a string understood by
        ``scipp.Unit(...)`` (default: ``"angstrom"``).
    specie_indices
        Indices of the specie to calculate the diffusivity for. Optional; if ``None``,
        kinisi selects indices based on ``specie``.
    masses
        Masses for centre-of-mass handling. Optional.
    progress
        Show progress bars during parsing and MSD evaluation.
    save_cache
        Cache the populated analyzer on this trajectory instance.
    return_cache
        Return cached data.

    Returns
    -------
    kinisi.analyze.DiffusionAnalyzer
        A DiffusionAnalyzer with MSD already computed and attached
        (so .msd and .dt are available).
    """
    if step_skip < 1:
        raise ValueError('step_skip must be >= 1')

    import scipp as sc
    from kinisi.analyze import DiffusionAnalyzer
    from kinisi.displacement import calculate_msd
    from kinisi.pymatgen import PymatgenParser

    cache_data = getattr(self, 'kinisi_cache_data', None)
    if not isinstance(cache_data, dict):
        cache_data = {}
        self.kinisi_cache_data = cache_data

    cache_key = (
        specie,
        int(step_skip),
        None if dt is None else id(dt),
        dimension,
        distance_unit,
        None if specie_indices is None else id(specie_indices),
        None if masses is None else id(masses),
    )

    if return_cache and cache_data.get('cache_key') == cache_key:
        diffusion_analyzer = cache_data.get('diffusion_analyzer')
        if diffusion_analyzer is not None:
            return diffusion_analyzer

    time_step = sc.scalar(self.time_step_ps, unit=sc.Unit('ps'))
    step_skip_sc = sc.scalar(int(step_skip), unit=sc.Unit('dimensionless'))

    parser = PymatgenParser(
        structures=self,
        specie=specie,
        time_step=time_step,
        step_skip=step_skip_sc,
        dt=dt,
        dimension=dimension,
        distance_unit=sc.Unit(distance_unit),
        specie_indices=specie_indices,
        masses=masses,
        progress=progress,
    )

    diff = DiffusionAnalyzer(parser)
    diff._dg = calculate_msd(parser, progress=progress)

    print(
        'This analysis uses the `kinisi` package. Please additionally cite kinisi.'
        ' See https://github.com/kinisi-dev/kinisi.git for citation guidance.',
        file=sys.stderr,
    )

    if save_cache:
        cache_data['diffusion_analyzer'] = diff
        cache_data['cache_key'] = cache_key

        input_params = {
            'specie': specie,
            'step_skip': int(step_skip),
            'dt': dt,
            'dimension': dimension,
            'distance_unit': distance_unit,
            'specie_indices': specie_indices,
            'masses': masses,
        }

        cache_data.update(input_params)
        self.kinisi_cache_data = cache_data

    return diff

to_positions()

Pymatgen does not mod coords back to original unit cell.

See GEMDAT#103

Source code in src/gemdat/trajectory.py
121
122
123
124
125
126
127
def to_positions(self):
    """Pymatgen does not mod coords back to original unit cell.

    See [GEMDAT#103](https://github.com/GEMDAT-repos/GEMDAT/issues/103)
    """
    super().to_positions()
    self.coords = np.mod(self.coords, 1)

to_volume(resolution=0.2)

Calculate density volume from a trajectory.

All coordinates are binned into voxels. The value of each voxel represents the number of coodinates that are associated with it.

For more info, see gemdat.Volume.

Parameters:

  • resolution (float, default: 0.2 ) –

    Minimum resolution for the voxels in Angstrom

Returns:

  • vol ( Volume ) –

    Output volume

Source code in src/gemdat/trajectory.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def to_volume(self, resolution: float = 0.2) -> Volume:
    """Calculate density volume from a trajectory.

    All coordinates are binned into voxels. The value of each
    voxel represents the number of coodinates that are associated
    with it.

    For more info, see [gemdat.Volume][].

    Parameters
    ----------
    resolution : float, optional
        Minimum resolution for the voxels in Angstrom

    Returns
    -------
    vol : Volume
        Output volume
    """
    from gemdat.volume import trajectory_to_volume

    return trajectory_to_volume(self, resolution=resolution)

transitions_between_sites(sites, floating_specie, site_radius=None, site_inner_fraction=1.0, remove_part_occup_from_structure=False, fraction_of_overlap=0.0)

Compute transitions between given sites for floating specie.

Parameters:

  • sites (Structure) –

    Input structure with known sites

  • floating_specie (str) –

    Name of the floating specie to calculate transitions for

  • site_radius (float | dict[str, float] | None, default: None ) –

    A custom site radius in Γ…ngstrom to determine if an atom is at a site. A dict keyed by the site label can be used to have a site per atom type, e.g. `site_radius = {'Li1': 1.0, 'Li2': 1.2}.

  • site_inner_fraction (float | dict[str, float], default: 1.0 ) –

    A fraction of the site radius which is determined to be the inner site which is used in jump calculations

  • remove_part_occup_from_structure (bool, default: False ) –

    A flag to remove partial occupancies from structure

  • fraction_of_overlap (float, default: 0.0 ) –

    Fraction of allowed overlap between sites

Returns:

Source code in src/gemdat/trajectory.py
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
def transitions_between_sites(
    self,
    sites: Structure,
    floating_specie: str,
    site_radius: float | dict[str, float] | None = None,
    site_inner_fraction: float | dict[str, float] = 1.0,
    remove_part_occup_from_structure: bool = False,
    fraction_of_overlap: float = 0.0,
) -> Transitions:
    """Compute transitions between given sites for floating specie.

    Parameters
    ----------
    sites : pymatgen.core.structure.Structure
        Input structure with known sites
    floating_specie : str
        Name of the floating specie to calculate transitions for
    site_radius: Optional[float, dict[str, float]]
        A custom site radius in Γ…ngstrom to determine
        if an atom is at a site. A dict keyed by the site label can
        be used to have a site per atom type, e.g.
        `site_radius = {'Li1': 1.0, 'Li2': 1.2}.
    site_inner_fraction: Optional[float, dict[str, float]]
        A fraction of the site radius which is determined to be the `inner site`
        which is used in jump calculations
    remove_part_occup_from_structure: bool
        A flag to remove partial occupancies from structure
    fraction_of_overlap: float
        Fraction of allowed overlap between sites

    Returns
    -------
    transitions: Transitions
    """
    from gemdat.transitions import Transitions

    return Transitions.from_trajectory(
        trajectory=self,
        sites=sites,
        floating_specie=floating_specie,
        site_radius=site_radius,
        site_inner_fraction=site_inner_fraction,
        remove_part_occup_from_structure=remove_part_occup_from_structure,
        fraction_of_overlap=fraction_of_overlap,
    )