netMHCpan #1 환자 시퀀스 생성 #
#2025-07-23
path
data/
├── clusters.tsv
├── meta.csv
└── codon
├── reference_codon.csv
└── *.codon.csv (*: patient id)
#
1. Load package #
import pandas as pd
import numpy as np
import os
import sys
import re
sys.path.append('/data/home/ysh980101/2409/bin')
from mhc_epitope import *
#
2. Load data #
import pandas as pd
import os
def make_sequence_df():
# 참조 시퀀스 파일 불러오기 및 컬럼 이름 변경
ref_sequence = pd.read_csv("data/codon/reference_codon.csv", index_col=0)
ref_sequence.rename(columns={'sequence': 'reference'}, inplace=True)
# 코돈 파일 목록 불러오기
file_list = os.listdir("data/codon")
patient_list = [f.split('.')[0] for f in file_list if f.endswith('.codon.csv')]
# 메타데이터 파일 불러오기
meta = pd.read_csv("data/meta.csv")
meta_list = meta['sampleID'].tolist()
# 공통 샘플 목록 생성
common_list = list(set(meta_list) & set(patient_list))
common_list.sort()
# 각 환자 파일 불러오고 병합하기
for pid in common_list:
df = pd.read_csv(f"data/codon/{pid}.codon.csv", index_col=0)
df.rename(columns={'sequence': pid}, inplace=True)
ref_sequence = pd.merge(ref_sequence, df, on='gene', how='outer')
return ref_sequence
sequence_df = make_sequence_df()
sequence_df
#
3. Make allprot.fasta #
# 데이터 로드
sequence_df = make_sequence_df()
cluster_df = pd.read_csv('data/clusters.tsv', sep='\t')
# 특정 클러스터에 해당하는 행 찾기
cur_cluster = "c315"
cluster_row = cluster_df[cluster_df['cluster'] == cur_cluster]
cur_pos1 = cluster_row['pos1'].values[0]
cur_pos2 = cluster_row['pos2'].values[0]
# cur_pos1보다 처음으로 큰 start 값을 가진 행의 이전 행 찾기
cur_idx_temp = sequence_df[sequence_df['start'] > cur_pos1].index[0]
cur_start_temp = sequence_df.at[cur_idx_temp, 'start']
cur_end_temp = sequence_df.at[cur_idx_temp, 'end']
cur_idx = cur_idx_temp - 1
cur_start = sequence_df.at[cur_idx, 'start']
cur_end = sequence_df.at[cur_idx, 'end']
# cur_pos2가 cur_end 보다 클 경우, 조정
flag = 0
if cur_pos2 > cur_end:
cur_pos2_temp = cur_pos2
cur_pos2 = cur_end
cur_pos1_temp = cur_start_temp
flag = 1
# 인덱스 계산
cur_left_idx_temp = cur_pos1_temp - cur_start_temp
cur_right_idx_temp = cur_pos2_temp - cur_start_temp
# 기본 인덱스 계산
cur_left_idx = cur_pos1 - cur_start
cur_right_idx = cur_pos2 - cur_start
# fasta 파일 저장 경로 설정
output_dir = f"data/{cur_cluster}/"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 시퀀스 처리 및 fasta 파일 생성
all_fasta_content = ""
for column in sequence_df.columns[3:]: # 첫 세 컬럼을 건너뛰고 4번째 컬럼부터 처리
cur_pid = column
if flag == 1:
# 두 시퀀스 부분을 처리
cur_sequence = sequence_df.at[cur_idx, cur_pid]
cur_sequence = ''.join(c * 3 for c in cur_sequence)
updated_sequence1 = cur_sequence[cur_left_idx:cur_right_idx+1]
cur_sequence_temp = sequence_df.at[cur_idx_temp, cur_pid]
cur_sequence_temp = ''.join(c * 3 for c in cur_sequence_temp)
updated_sequence2 = cur_sequence_temp[cur_left_idx_temp:cur_right_idx_temp+1]
# boundary 계산
boundary_length = cur_start_temp - cur_end - 1
boundary = '' if boundary_length == 0 else '-' * boundary_length
final_sequence = f"{updated_sequence1}{boundary}{updated_sequence2}"
else:
# 단일 시퀀스 부분만 처리
cur_sequence = sequence_df.at[cur_idx, cur_pid]
cur_sequence = ''.join(c * 3 for c in cur_sequence)
final_sequence = cur_sequence[cur_left_idx:cur_right_idx+1]
# 시작과 끝 문자열 처리
if len(final_sequence) >= 3:
# 시작 부분 처리
if final_sequence[:3] != final_sequence[0] * 3:
final_sequence = final_sequence[1:] if final_sequence[:2] == final_sequence[0] * 2 else final_sequence[2:]
# 끝 부분 처리
if final_sequence[-3:] != final_sequence[-1] * 3:
final_sequence = final_sequence[:-1] if final_sequence[-2:] == final_sequence[-1] * 2 else final_sequence[:-2]
# 결과 시퀀스 줄여서 저장
compressed_sequence = ''.join(final_sequence[i] for i in range(0, len(final_sequence), 3))
fasta_content = f">{cur_pid}|{cur_cluster}\n{compressed_sequence}\n"
all_fasta_content += fasta_content
# 한 파일에 모든 결과 저장
with open(f"{output_dir}allprot.fasta", "w") as fasta_file:
fasta_file.write(all_fasta_content)
result
data/
├── clusters.tsv
├── meta.csv
├── codon
│ ├── reference_codon.csv
│ └── *.codon.csv (*: patient id)
└── c315
│ └── allprot.fasta
└── c442
└── allprot.fasta